Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support ALTER PLACEMENT POLICY ... #27733

Merged
merged 10 commits into from
Sep 5, 2021
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type DDL interface {
AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error

// CreateSchemaWithInfo creates a database (schema) given its database info.
//
Expand Down
44 changes: 40 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6211,10 +6211,10 @@ func (d *ddl) AlterTablePartitionAttributes(ctx sessionctx.Context, ident ast.Id
return errors.Trace(err)
}

func buildPolicyInfo(stmt *ast.CreatePlacementPolicyStmt) (*placementpolicy.PolicyInfo, error) {
func buildPolicyInfo(name model.CIStr, options []*ast.PlacementOption) (*placementpolicy.PolicyInfo, error) {
policyInfo := &placementpolicy.PolicyInfo{}
policyInfo.Name = stmt.PolicyName
for _, opt := range stmt.PlacementOptions {
policyInfo.Name = name
for _, opt := range options {
switch opt.Tp {
case ast.PlacementOptionPrimaryRegion:
policyInfo.PrimaryRegion = opt.StrValue
Expand Down Expand Up @@ -6259,11 +6259,16 @@ func (d *ddl) CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlac
return err
}
// Auto fill the policyID when it is inserted.
policyInfo, err := buildPolicyInfo(stmt)
policyInfo, err := buildPolicyInfo(stmt.PolicyName, stmt.PlacementOptions)
if err != nil {
return errors.Trace(err)
}

err = checkPolicyValidation(policyInfo)
if err != nil {
return err
}

job := &model.Job{
SchemaName: policyInfo.Name.L,
Type: model.ActionCreatePlacementPolicy,
Expand Down Expand Up @@ -6300,3 +6305,34 @@ func (d *ddl) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacemen
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) (err error) {
policyName := stmt.PolicyName
is := d.GetInfoSchemaWithInterceptor(ctx)
// Check policy existence.
policy, ok := is.PolicyByName(policyName)
if !ok {
return infoschema.ErrPlacementPolicyNotExists.GenWithStackByArgs(policyName)
}

newPolicyInfo, err := buildPolicyInfo(policy.Name, stmt.PlacementOptions)
if err != nil {
return errors.Trace(err)
}

err = checkPolicyValidation(newPolicyInfo)
if err != nil {
return err
}

job := &model.Job{
SchemaID: policy.ID,
SchemaName: policy.Name.L,
Type: model.ActionAlterPlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newPolicyInfo},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onCreatePlacementPolicy(d, t, job)
case model.ActionDropPlacementPolicy:
ver, err = onDropPlacementPolicy(t, job)
case model.ActionAlterPlacementPolicy:
ver, err = onAlterPlacementPolicy(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
36 changes: 36 additions & 0 deletions ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,39 @@ func onDropPlacementPolicy(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
return ver, errors.Trace(err)
}

func onAlterPlacementPolicy(t *meta.Meta, job *model.Job) (ver int64, _ error) {
alterPolicy := &placementpolicy.PolicyInfo{}
if err := job.DecodeArgs(alterPolicy); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

oldPolicy, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

newPolicyInfo := *oldPolicy
newPolicyInfo.PlacementSettings = alterPolicy.PlacementSettings

err = checkPolicyValidation(&newPolicyInfo)
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = t.UpdatePolicy(&newPolicyInfo)
if err != nil {
return ver, errors.Trace(err)
}

ver, err = updateSchemaVersion(t, job)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
job.FinishDBJob(model.JobStateDone, model.StatePublic, ver, nil)
return ver, nil
}
170 changes: 121 additions & 49 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test

import (
"context"
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -140,57 +141,128 @@ func (s *testDBSuite6) TestConstraintCompatibility(c *C) {
tk.MustExec("use test")
tk.MustExec("drop placement policy if exists x")

// Dict is not allowed for common constraint.
_, err := tk.Exec("create placement policy x " +
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1,cn-east-2\" " +
"LEARNERS=1 " +
"LEARNER_CONSTRAINTS=\"[+zone=cn-west-1]\" " +
"CONSTRAINTS=\"{'+disk=ssd':2}\"")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]invalid label constraints format: should be [constraint1, ...] (error yaml: unmarshal errors:\n line 1: cannot unmarshal !!map into []string)")

// Special constraints may be incompatible with itself.
_, err = tk.Exec("create placement policy x " +
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1,cn-east-2\" " +
"LEARNERS=1 " +
"LEARNER_CONSTRAINTS=\"[+zone=cn-west-1, +zone=cn-west-2]\"")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]conflicting label constraints: '+zone=cn-west-2' and '+zone=cn-west-1'")
cases := []struct {
settings string
success bool
errmsg string
}{
// Dict is not allowed for common constraint.
{
settings: "PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1,cn-east-2\" " +
"LEARNERS=1 " +
"LEARNER_CONSTRAINTS=\"[+zone=cn-west-1]\" " +
"CONSTRAINTS=\"{'+disk=ssd':2}\"",
errmsg: "invalid label constraints format: should be [constraint1, ...] (error yaml: unmarshal errors:\n line 1: cannot unmarshal !!map into []string)",
},
// Special constraints may be incompatible with itself.
{
settings: "PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1,cn-east-2\" " +
"LEARNERS=1 " +
"LEARNER_CONSTRAINTS=\"[+zone=cn-west-1, +zone=cn-west-2]\"",
errmsg: "conflicting label constraints: '+zone=cn-west-2' and '+zone=cn-west-1'",
},
{
settings: "PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1,cn-east-2\" " +
"LEARNERS=1 " +
"LEARNER_CONSTRAINTS=\"[+zone=cn-west-1, -zone=cn-west-1]\"",
errmsg: "conflicting label constraints: '-zone=cn-west-1' and '+zone=cn-west-1'",
},
{
settings: "PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1,cn-east-2\" " +
"LEARNERS=1 " +
"LEARNER_CONSTRAINTS=\"[+zone=cn-west-1, +zone=cn-west-1]\"",
success: true,
},
// Special constraints may be incompatible with common constraint.
{
settings: "PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+zone=cn-east-2]\"",
errmsg: "conflicting label constraints: '+zone=cn-east-2' and '+zone=cn-east-1'",
},
{
settings: "PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+disk=ssd,-zone=cn-east-1]\"",
errmsg: "conflicting label constraints: '-zone=cn-east-1' and '+zone=cn-east-1'",
},
}

_, err = tk.Exec("create placement policy x " +
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1,cn-east-2\" " +
"LEARNERS=1 " +
"LEARNER_CONSTRAINTS=\"[+zone=cn-west-1, -zone=cn-west-1]\"")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]conflicting label constraints: '-zone=cn-west-1' and '+zone=cn-west-1'")
// test for create
for _, ca := range cases {
sql := fmt.Sprintf("%s %s", "create placement policy x", ca.settings)
if ca.success {
tk.MustExec(sql)
tk.MustExec("drop placement policy if exists x")
} else {
err := tk.ExecToErr(sql)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ca.errmsg)
}
}

_, err = tk.Exec("create placement policy x " +
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1,cn-east-2\" " +
"LEARNERS=1 " +
"LEARNER_CONSTRAINTS=\"[+zone=cn-west-1, +zone=cn-west-1]\"")
c.Assert(err, IsNil)
// test for alter
tk.MustExec("create placement policy x regions=\"cn-east1,cn-east\"")
for _, ca := range cases {
sql := fmt.Sprintf("%s %s", "alter placement policy x", ca.settings)
if ca.success {
tk.MustExec(sql)
tk.MustExec("alter placement policy x regions=\"cn-east1,cn-east\"")
} else {
err := tk.ExecToErr(sql)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ca.errmsg)
tk.MustQuery("show placement where target='POLICY x'").Check(testkit.Rows("POLICY x REGIONS=\"cn-east1,cn-east\" SCHEDULED"))
}
}
tk.MustExec("drop placement policy x")
}

// Special constraints may be incompatible with common constraint.
func (s *testDBSuite6) TestAlterPlacementPolicy(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop placement policy if exists x")
_, err = tk.Exec("create placement policy x " +
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+zone=cn-east-2]\"")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]conflicting label constraints: '+zone=cn-east-2' and '+zone=cn-east-1'")

_, err = tk.Exec("create placement policy x " +
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+disk=ssd,-zone=cn-east-1]\"")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]conflicting label constraints: '-zone=cn-east-1' and '+zone=cn-east-1'")
tk.MustExec("create placement policy x primary_region=\"cn-east-1\" regions=\"cn-east1,cn-east\"")
defer tk.MustExec("drop placement policy if exists x")

// test for normal cases
tk.MustExec("alter placement policy x REGIONS=\"bj,sh\"")
tk.MustQuery("show placement where target='POLICY x'").Check(testkit.Rows("POLICY x REGIONS=\"bj,sh\" SCHEDULED"))

tk.MustExec("alter placement policy x " +
"PRIMARY_REGION=\"bj\" " +
"REGIONS=\"sh\" " +
"SCHEDULE=\"EVEN\"")
tk.MustQuery("show placement where target='POLICY x'").Check(testkit.Rows("POLICY x PRIMARY_REGION=\"bj\" REGIONS=\"sh\" SCHEDULE=\"EVEN\" SCHEDULED"))

tk.MustExec("alter placement policy x " +
"LEADER_CONSTRAINTS=\"[+region=us-east-1]\" " +
"FOLLOWER_CONSTRAINTS=\"[+region=us-east-2]\" " +
"FOLLOWERS=3")
tk.MustQuery("show placement where target='POLICY x'").Check(
testkit.Rows("POLICY x LEADER_CONSTRAINTS=\"[+region=us-east-1]\" FOLLOWERS=3 FOLLOWER_CONSTRAINTS=\"[+region=us-east-2]\" SCHEDULED"),
)

tk.MustExec("alter placement policy x " +
"VOTER_CONSTRAINTS=\"[+region=bj]\" " +
"LEARNER_CONSTRAINTS=\"[+region=sh]\" " +
"CONSTRAINTS=\"[+disk=ssd]\"" +
"VOTERS=5 " +
"LEARNERS=3")
tk.MustQuery("show placement where target='POLICY x'").Check(
testkit.Rows("POLICY x CONSTRAINTS=\"[+disk=ssd]\" VOTERS=5 VOTER_CONSTRAINTS=\"[+region=bj]\" LEARNERS=3 LEARNER_CONSTRAINTS=\"[+region=sh]\" SCHEDULED"),
)

// test alter not exist policies
tk.MustExec("drop placement policy x")
tk.MustGetErrCode("alter placement policy x REGIONS=\"bj,sh\"", mysql.ErrPlacementPolicyNotExists)
tk.MustGetErrCode("alter placement policy x2 REGIONS=\"bj,sh\"", mysql.ErrPlacementPolicyNotExists)
}
7 changes: 7 additions & 0 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
err = e.executeCreatePlacementPolicy(x)
case *ast.DropPlacementPolicyStmt:
err = e.executeDropPlacementPolicy(x)
case *ast.AlterPlacementPolicyStmt:
err = e.executeAlterPlacementPolicy(x)
}
if err != nil {
// If the owner return ErrTableNotExists error when running this DDL, it may be caused by schema changed,
Expand Down Expand Up @@ -911,6 +913,11 @@ func (e *DDLExec) executeAlterSequence(s *ast.AlterSequenceStmt) error {
func (e *DDLExec) executeCreatePlacementPolicy(s *ast.CreatePlacementPolicyStmt) error {
return domain.GetDomain(e.ctx).DDL().CreatePlacementPolicy(e.ctx, s)
}

func (e *DDLExec) executeDropPlacementPolicy(s *ast.DropPlacementPolicyStmt) error {
return domain.GetDomain(e.ctx).DDL().DropPlacementPolicy(e.ctx, s)
}

func (e *DDLExec) executeAlterPlacementPolicy(s *ast.AlterPlacementPolicyStmt) error {
return domain.GetDomain(e.ctx).DDL().AlterPlacementPolicy(e.ctx, s)
}
19 changes: 19 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
return nil, b.applyCreatePolicy(m, diff)
case model.ActionDropPlacementPolicy:
return b.applyDropPolicy(diff.SchemaID), nil
case model.ActionAlterPlacementPolicy:
return b.applyAlterPolicy(m, diff)
}
roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
if !ok {
Expand Down Expand Up @@ -260,6 +262,23 @@ func (b *Builder) applyCreatePolicy(m *meta.Meta, diff *model.SchemaDiff) error
return nil
}

func (b *Builder) applyAlterPolicy(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
po, err := m.GetPolicy(diff.SchemaID)
if err != nil {
return nil, errors.Trace(err)
}

if po == nil {
return nil, ErrPlacementPolicyExists.GenWithStackByArgs(
fmt.Sprintf("(Policy ID %d)", diff.SchemaID),
)
}

b.is.policyMap[po.Name.L] = po
// TODO: return the policy related table ids
return []int64{}, nil
}

func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error {
di, err := m.GetDatabase(diff.SchemaID)
if err != nil {
Expand Down