Skip to content

Commit

Permalink
ddl: add sync bundles logic for creating and droping (#28037)
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Sep 23, 2021
1 parent ca7ba5a commit d14566c
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 39 deletions.
7 changes: 2 additions & 5 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2874,8 +2874,7 @@ func (s *testSerialDBSuite) TestCreateTableWithSpecialComment(c *C) {
"/*T![placement] PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"CONSTRAINTS=\"[+disk=ssd]\" " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" */",
"CONSTRAINTS=\"[+disk=ssd]\" */",
)
tk.MustQuery(`show create table t`).Check(testutil.RowsWithSep("|",
"t CREATE TABLE `t` (\n"+
Expand All @@ -2884,8 +2883,7 @@ func (s *testSerialDBSuite) TestCreateTableWithSpecialComment(c *C) {
"/*T![placement] PRIMARY_REGION=\"cn-east-1\" "+
"REGIONS=\"cn-east-1, cn-east-2\" "+
"FOLLOWERS=2 "+
"CONSTRAINTS=\"[+disk=ssd]\" "+
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" */",
"CONSTRAINTS=\"[+disk=ssd]\" */",
))

// case for policy
Expand All @@ -2894,7 +2892,6 @@ func (s *testSerialDBSuite) TestCreateTableWithSpecialComment(c *C) {
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+disk=ssd]\" ")
tk.MustExec("create table t(a int)" +
"/*T![placement] PLACEMENT POLICY=`x` */")
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
case model.ActionCreateView:
ver, err = onCreateView(d, t, job)
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
ver, err = onDropTableOrView(t, job)
ver, err = onDropTableOrView(d, t, job)
case model.ActionDropTablePartition:
ver, err = w.onDropTablePartition(d, t, job)
case model.ActionTruncateTablePartition:
Expand Down Expand Up @@ -831,7 +831,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
case model.ActionDropPlacementPolicy:
ver, err = onDropPlacementPolicy(d, t, job)
case model.ActionAlterPlacementPolicy:
ver, err = onAlterPlacementPolicy(t, job)
ver, err = onAlterPlacementPolicy(d, t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
5 changes: 3 additions & 2 deletions ddl/placement/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,9 @@ func (b *Bundle) Tidy() error {
// Reset resets the bundle ID and keyrange of all rules.
func (b *Bundle) Reset(newID int64) *Bundle {
b.ID = GroupID(newID)
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(newID)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(newID+1)))
// Involve all the table level objects.
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(newID)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(newID+1)))
for _, rule := range b.Rules {
rule.GroupID = b.ID
rule.StartKeyHex = startKey
Expand Down
4 changes: 2 additions & 2 deletions ddl/placement/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,10 +983,10 @@ func (s *testBundleSuite) TestReset(c *C) {
c.Assert(bundle.Rules, HasLen, 1)
c.Assert(bundle.Rules[0].GroupID, Equals, bundle.ID)

startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(3)))
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(3)))
c.Assert(bundle.Rules[0].StartKeyHex, Equals, startKey)

endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(4)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(4)))
c.Assert(bundle.Rules[0].EndKeyHex, Equals, endKey)
}

Expand Down
88 changes: 83 additions & 5 deletions ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package ddl

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
)
Expand Down Expand Up @@ -194,10 +196,6 @@ func onDropPlacementPolicy(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
if err != nil {
return ver, errors.Trace(err)
}
// TODO: Reset all the policy reference, (modify meta & notify pd)
// If any partitions currently use this policy, they will be converted to the policy used by the table
// they belong to. If any databases use this policy, they will be converted to the default placement_policy policy.

// Finish this job. By now policy don't consider the binlog sync.
job.FinishDBJob(model.JobStateDone, model.StateNone, ver, nil)
default:
Expand All @@ -206,7 +204,7 @@ func onDropPlacementPolicy(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
return ver, errors.Trace(err)
}

func onAlterPlacementPolicy(t *meta.Meta, job *model.Job) (ver int64, _ error) {
func onAlterPlacementPolicy(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
alterPolicy := &model.PolicyInfo{}
if err := job.DecodeArgs(alterPolicy); err != nil {
job.State = model.JobStateCancelled
Expand All @@ -232,6 +230,53 @@ func onAlterPlacementPolicy(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

dbIDs, tblIDs, partIDs, err := getPlacementPolicyDependedObjectsIDs(t, oldPolicy)
if err != nil {
return ver, errors.Trace(err)
}
if len(dbIDs)+len(tblIDs)+len(partIDs) != 0 {
// build bundle from new placement policy.
bundle, err := placement.NewBundleFromOptions(newPolicyInfo.PlacementSettings)
if err != nil {
return ver, errors.Trace(err)
}
err = bundle.Tidy()
if err != nil {
return ver, errors.Trace(err)
}
// Do the http request only when the rules is existed.
bundles := make([]*placement.Bundle, 0, len(tblIDs)+len(partIDs))
// Reset bundle for tables.
for _, id := range tblIDs {
cp := bundle.Clone()
bundles = append(bundles, cp.Reset(id))
if len(bundle.Rules) == 0 {
bundle.Index = 0
bundle.Override = false
} else {
bundle.Index = placement.RuleIndexTable
bundle.Override = true
}
}
// Reset bundle for partitions.
for _, id := range partIDs {
cp := bundle.Clone()
bundles = append(bundles, cp.Reset(id))
if len(bundle.Rules) == 0 {
bundle.Index = 0
bundle.Override = false
} else {
bundle.Index = placement.RuleIndexPartition
bundle.Override = true
}
}
err = infosync.PutRuleBundles(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
}

ver, err = updateSchemaVersion(t, job)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -269,6 +314,39 @@ func checkPlacementPolicyNotInUseFromInfoSchema(is infoschema.InfoSchema, policy
return nil
}

func getPlacementPolicyDependedObjectsIDs(t *meta.Meta, policy *model.PolicyInfo) (dbIDs, tblIDs, partIDs []int64, err error) {
schemas, err := t.ListDatabases()
if err != nil {
return nil, nil, nil, err
}
// DB ids don't have to set the bundle themselves, but to check the dependency.
dbIDs = make([]int64, 0, len(schemas))
tblIDs = make([]int64, 0, len(schemas))
partIDs = make([]int64, 0, len(schemas))
for _, dbInfo := range schemas {
if dbInfo.PlacementPolicyRef != nil && dbInfo.PlacementPolicyRef.ID == policy.ID {
dbIDs = append(dbIDs, dbInfo.ID)
}
tables, err := t.ListTables(dbInfo.ID)
if err != nil {
return nil, nil, nil, err
}
for _, tblInfo := range tables {
if ref := tblInfo.PlacementPolicyRef; ref != nil && ref.ID == policy.ID {
tblIDs = append(tblIDs, tblInfo.ID)
}
if tblInfo.Partition != nil {
for _, part := range tblInfo.Partition.Definitions {
if part.PlacementPolicyRef != nil && part.PlacementPolicyRef.ID == part.ID {
partIDs = append(partIDs, part.ID)
}
}
}
}
}
return dbIDs, tblIDs, partIDs, nil
}

func checkPlacementPolicyNotInUseFromMeta(t *meta.Meta, policy *model.PolicyInfo) error {
schemas, err := t.ListDatabases()
if err != nil {
Expand Down
119 changes: 113 additions & 6 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func (s *testDBSuite6) TestCreateTableWithPlacementPolicy(c *C) {
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+disk=ssd]\"")

tbl := testGetTableByName(c, tk.Se, "test", "t")
Expand All @@ -297,7 +296,7 @@ func (s *testDBSuite6) TestCreateTableWithPlacementPolicy(c *C) {
c.Assert(policySetting.PrimaryRegion, Equals, "cn-east-1")
c.Assert(policySetting.Regions, Equals, "cn-east-1, cn-east-2")
c.Assert(policySetting.Followers, Equals, uint64(2))
c.Assert(policySetting.FollowerConstraints, Equals, "[+zone=cn-east-1]")
c.Assert(policySetting.FollowerConstraints, Equals, "")
c.Assert(policySetting.Voters, Equals, uint64(0))
c.Assert(policySetting.VoterConstraints, Equals, "")
c.Assert(policySetting.Learners, Equals, uint64(0))
Expand All @@ -313,7 +312,6 @@ func (s *testDBSuite6) TestCreateTableWithPlacementPolicy(c *C) {
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+disk=ssd]\" " +
"PLACEMENT POLICY=\"x\"")
c.Assert(err, NotNil)
Expand All @@ -326,7 +324,6 @@ func (s *testDBSuite6) TestCreateTableWithPlacementPolicy(c *C) {
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+disk=ssd]\" ")
tk.MustExec("create table t(a int)" +
"PLACEMENT POLICY=\"x\"")
Expand All @@ -352,7 +349,6 @@ func (s *testDBSuite6) TestCreateTableWithPlacementPolicy(c *C) {
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"FOLLOWER_CONSTRAINTS=\"[+zone=cn-east-1]\" " +
"CONSTRAINTS=\"[+disk=ssd]\" ")

tbl = testGetTableByName(c, tk.Se, "test", "t")
Expand All @@ -363,7 +359,7 @@ func (s *testDBSuite6) TestCreateTableWithPlacementPolicy(c *C) {
c.Assert(policySetting.PrimaryRegion, Equals, "cn-east-1")
c.Assert(policySetting.Regions, Equals, "cn-east-1, cn-east-2")
c.Assert(policySetting.Followers, Equals, uint64(2))
c.Assert(policySetting.FollowerConstraints, Equals, "[+zone=cn-east-1]")
c.Assert(policySetting.FollowerConstraints, Equals, "")
c.Assert(policySetting.Voters, Equals, uint64(0))
c.Assert(policySetting.VoterConstraints, Equals, "")
c.Assert(policySetting.Learners, Equals, uint64(0))
Expand Down Expand Up @@ -427,3 +423,114 @@ func (s *testDBSuite6) TestDropPlacementPolicyInUse(c *C) {
c.Assert(err.Error(), Equals, fmt.Sprintf("[ddl:8241]Placement policy '%s' is still in use", policyName))
}
}

func testGetPolicyByName(c *C, ctx sessionctx.Context, name string, mustExist bool) *model.PolicyInfo {
dom := domain.GetDomain(ctx)
// Make sure the table schema is the new schema.
err := dom.Reload()
c.Assert(err, IsNil)
po, ok := dom.InfoSchema().PolicyByName(model.NewCIStr(name))
if mustExist {
c.Assert(ok, Equals, true)
}
return po
}

func testGetPolicyDependency(storage kv.Storage, name string) []int64 {
ids := make([]int64, 0, 32)
err1 := kv.RunInNewTxn(context.Background(), storage, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
dbs, err := t.ListDatabases()
if err != nil {
return err
}
for _, db := range dbs {
tbls, err := t.ListTables(db.ID)
if err != nil {
return err
}
for _, tbl := range tbls {
if tbl.PlacementPolicyRef != nil && tbl.PlacementPolicyRef.Name.L == name {
ids = append(ids, tbl.ID)
}
}
}
return nil
})
if err1 != nil {
return []int64{}
}
return ids
}

func (s *testDBSuite6) TestPolicyCacheAndPolicyDependencyCache(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop placement policy if exists x")

// Test policy cache.
tk.MustExec("create placement policy x primary_region=\"r1\" regions=\"r1,r2\" schedule=\"EVEN\";")
po := testGetPolicyByName(c, tk.Se, "x", true)
c.Assert(po, NotNil)
tk.MustQuery("show placement").Check(testkit.Rows("POLICY x PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" SCHEDULE=\"EVEN\" SCHEDULED"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int) placement policy \"x\"")
tbl := testGetTableByName(c, tk.Se, "test", "t")

// Test policy dependency cache.
dependencies := testGetPolicyDependency(s.store, "x")
c.Assert(dependencies, NotNil)
c.Assert(len(dependencies), Equals, 1)
c.Assert(dependencies[0], Equals, tbl.Meta().ID)

tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2 (a int) placement policy \"x\"")
tbl2 := testGetTableByName(c, tk.Se, "test", "t2")

dependencies = testGetPolicyDependency(s.store, "x")
c.Assert(dependencies, NotNil)
c.Assert(len(dependencies), Equals, 2)
in := func() bool {
for _, one := range dependencies {
if one == tbl2.Meta().ID {
return true
}
}
return false
}
c.Assert(in(), Equals, true)

// Test drop policy can't succeed cause there are still some table depend on them.
_, err := tk.Exec("drop placement policy x")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:8241]Placement policy 'x' is still in use")

// Drop depended table t firstly.
tk.MustExec("drop table if exists t")
dependencies = testGetPolicyDependency(s.store, "x")
c.Assert(dependencies, NotNil)
c.Assert(len(dependencies), Equals, 1)
c.Assert(dependencies[0], Equals, tbl2.Meta().ID)

_, err = tk.Exec("drop placement policy x")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:8241]Placement policy 'x' is still in use")

// Drop depended table t2 secondly.
tk.MustExec("drop table if exists t2")
dependencies = testGetPolicyDependency(s.store, "x")
c.Assert(dependencies, NotNil)
c.Assert(len(dependencies), Equals, 0)

po = testGetPolicyByName(c, tk.Se, "x", true)
c.Assert(po, NotNil)

tk.MustExec("drop placement policy x")

po = testGetPolicyByName(c, tk.Se, "x", false)
c.Assert(po, IsNil)
dependencies = testGetPolicyDependency(s.store, "x")
c.Assert(dependencies, NotNil)
c.Assert(len(dependencies), Equals, 0)
}
Loading

0 comments on commit d14566c

Please sign in to comment.