Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Consider partition table's placement for br #33139

Merged
merged 6 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func BuildBackupRangeAndSchema(
if !isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
tableInfo.PlacementPolicyRef = nil
tableInfo.ClearPlacement()
}

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
Expand Down Expand Up @@ -478,7 +478,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.PlacementPolicyRef = nil
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
Expand Down
14 changes: 6 additions & 8 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,21 +459,19 @@ func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error {
log.Info("skip create database", zap.Stringer("database", db.Name))
return nil
}

if !rc.supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("database", db.Name))
db.PlacementPolicyRef = nil
}
if db.PlacementPolicyRef != nil && rc.policyMap != nil {
if policy, ok := rc.policyMap.Load(db.PlacementPolicyRef.Name.L); ok {
err := rc.db.CreatePlacementPolicy(ctx, policy.(*model.PolicyInfo))
if err != nil {
return errors.Trace(err)
}
// delete policy in cache after restore succeed
rc.policyMap.Delete(db.PlacementPolicyRef.Name.L)

if db.PlacementPolicyRef != nil {
if err := rc.db.ensurePlacementPolicy(ctx, db.PlacementPolicyRef.Name, rc.policyMap); err != nil {
return errors.Trace(err)
}
}

return rc.db.CreateDatabase(ctx, db)
}

Expand Down
74 changes: 50 additions & 24 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,13 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
m := map[string][]*model.TableInfo{}
for _, table := range tables {
m[table.DB.Name.L] = append(m[table.DB.Name.L], table.Info)
if table.Info.PlacementPolicyRef != nil && policyMap != nil {
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
table.Info.PlacementPolicyRef = nil
} else if p, exists := policyMap.Load(table.Info.PlacementPolicyRef.Name.L); exists {
err := db.CreatePlacementPolicy(ctx, p.(*model.PolicyInfo))
if err != nil {
return errors.Trace(err)
}
// delete policy in cache after restore table succeed.
policyMap.Delete(table.Info.PlacementPolicyRef.Name.L)
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
table.Info.ClearPlacement()
} else {
if err := db.ensureTablePlacementPolicies(ctx, table.Info, policyMap); err != nil {
return errors.Trace(err)
}
}
}
Expand All @@ -295,20 +290,16 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
// CreateTable executes a CREATE TABLE SQL.
func (db *DB) CreateTable(ctx context.Context, table *metautil.Table,
ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error {
if table.Info.PlacementPolicyRef != nil && policyMap != nil {
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
table.Info.PlacementPolicyRef = nil
} else if p, exists := policyMap.Load(table.Info.PlacementPolicyRef.Name.L); exists {
err := db.CreatePlacementPolicy(ctx, p.(*model.PolicyInfo))
if err != nil {
return errors.Trace(err)
}
// delete policy in cache after restore table succeed.
policyMap.Delete(table.Info.PlacementPolicyRef.Name.L)
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
table.Info.ClearPlacement()
} else {
if err := db.ensureTablePlacementPolicies(ctx, table.Info, policyMap); err != nil {
return errors.Trace(err)
}
}

err := db.se.CreateTable(ctx, table.DB.Name, table.Info)
if err != nil {
log.Error("create table failed",
Expand All @@ -331,6 +322,41 @@ func (db *DB) Close() {
db.se.Close()
}

func (db *DB) ensurePlacementPolicy(ctx context.Context, policyName model.CIStr, policies *sync.Map) error {
if policies == nil {
return nil
}

if policy, ok := policies.LoadAndDelete(policyName.L); ok {
return db.CreatePlacementPolicy(ctx, policy.(*model.PolicyInfo))
}

// This means policy already created
return nil
}

func (db *DB) ensureTablePlacementPolicies(ctx context.Context, tableInfo *model.TableInfo, policies *sync.Map) error {
if tableInfo.PlacementPolicyRef != nil {
if err := db.ensurePlacementPolicy(ctx, tableInfo.PlacementPolicyRef.Name, policies); err != nil {
return err
}
}

if tableInfo.Partition != nil {
for _, def := range tableInfo.Partition.Definitions {
if def.PlacementPolicyRef == nil {
continue
}

if err := db.ensurePlacementPolicy(ctx, def.PlacementPolicyRef.Name, policies); err != nil {
return err
}
}
}

return nil
}

// FilterDDLJobs filters ddl jobs.
func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs []*model.Job) {
// Sort the ddl jobs by schema version in descending order.
Expand Down
49 changes: 36 additions & 13 deletions br/tests/br_tidb_placement_policy/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ rm -rf $PROGRESS_FILE

run_sql "create schema $DB;"
run_sql "create placement policy fivereplicas followers=4;"
run_sql "create placement policy tworeplicas followers=1;"

# generate 30 tables with 1 row content with policy fivereplicas;.
i=1
while [ $i -le $TABLES_COUNT ]; do
run_sql "create table $DB.sbtest$i(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;"
run_sql "create table $DB.sbtest$i(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas partition by range(id) (partition p0 values less than (10) placement policy tworeplicas, partition p1 values less than MAXVALUE)"
run_sql "insert into $DB.sbtest$i values ($i, $i, '$i', '$i');"
i=$(($i+1))
done
Expand All @@ -48,20 +49,28 @@ run_br backup full --log-file $BACKUPMETAV1_LOG -s "local://$TEST_DIR/$DB" --pd
# clear data and policy fore restore.
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"

# restore with tidb-placement-policy
echo "restore with tidb-placement start..."
run_br restore db --db $DB -s "local://$TEST_DIR/${DB}v2" --pd $PD_ADDR

policy_name=$(run_sql "use $DB; show placement;" | grep "POLICY" | awk '{print $2}')
if [ "$policy_name" -ne "fivereplicas" ];then
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY fivereplicas" | wc -l)
if [ "$policy_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed! due to policy restore failed"
exit 1
fi

policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY tworeplicas" | wc -l)
if [ "$policy_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed! due to policy restore failed"
exit 1
fi

# clear data and policy for restore.
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"

# restore without tidb-placement-policy
echo "restore without tidb-placement start..."
Expand All @@ -79,16 +88,18 @@ run_sql "DROP DATABASE $DB;"
echo "test backup db can ignore placement policy"
run_sql "create schema $DB;"
run_sql "create placement policy fivereplicas followers=4;"
run_sql "create placement policy tworeplicas followers=1;"

# generate one table with one row content with policy fivereplicas;.
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;"
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas partition by range(id) (partition p0 values less than (10) placement policy tworeplicas, partition p1 values less than MAXVALUE);"
run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');"

run_br backup db --db $DB -s "local://$TEST_DIR/${DB}_db" --pd $PD_ADDR

# clear data and policy for restore.
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"

# restore should success and no policy have been restored.
run_br restore db --db $DB -s "local://$TEST_DIR/${DB}_db" --pd $PD_ADDR
Expand All @@ -104,12 +115,13 @@ run_sql "DROP DATABASE $DB;"

echo "test only restore related placement policy..."
run_sql "create schema $DB;"
# we have two policies
# we have three policies
run_sql "create placement policy fivereplicas followers=4;"
run_sql "create placement policy tworeplicas followers=1;"
run_sql "create placement policy foureplicas followers=3;"

# generate one table with one row content with policy fivereplicas;.
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;"
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas partition by range(id) (partition p0 values less than (10) placement policy tworeplicas, partition p1 values less than MAXVALUE);"
run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');"

# backup table and policies
Expand All @@ -119,36 +131,46 @@ run_br backup full -s "local://$TEST_DIR/${DB}_related" --pd $PD_ADDR
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"
run_sql "DROP PLACEMENT POLICY foureplicas;"

# restore table
run_br restore table --db $DB --table sbtest -s "local://$TEST_DIR/${DB}_related" --pd $PD_ADDR

# verify only one policy has been restored
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | wc -l)
if [ "$policy_count" -ne "1" ];then
if [ "$policy_count" -ne "2" ];then
echo "TEST: [$TEST_NAME] failed! due to policy should be ignore"
exit 1
fi

# which is fivereplicas...
policy_name=$(run_sql "use $DB; show placement;" | grep "POLICY" | awk '{print $2}')
if [ "$policy_name" = "fivereplicas" ];then
# which have fivereplicas...
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | grep "fivereplicas" | wc -l)
if [ "$policy_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed! due to policy restore failed"
exit 1
fi

# which have tworeplicas...
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | grep "tworeplicas" | wc -l)
if [ "$policy_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed! due to policy restore failed"
exit 1
fi

# clear data and policies for next case.
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"

echo "test restore all placement policies..."
run_sql "create schema $DB;"
# we have two policies
# we have three policies
run_sql "create placement policy fivereplicas followers=4;"
run_sql "create placement policy tworeplicas followers=1;"
run_sql "create placement policy foureplicas followers=3;"

# generate one table with one row content with policy fivereplicas;.
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;"
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas partition by range(id) (partition p0 values less than (10) placement policy tworeplicas, partition p1 values less than MAXVALUE);"
run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');"

# backup table and policies
Expand All @@ -158,13 +180,14 @@ run_br backup full -s "local://$TEST_DIR/${DB}_all" --pd $PD_ADDR
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"
run_sql "DROP PLACEMENT POLICY foureplicas;"

# restore table
run_br restore full -f "$DB.sbtest" -s "local://$TEST_DIR/${DB}_all" --pd $PD_ADDR

# verify all policies have been restored even we only restore one table during tableFilter.
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | wc -l)
if [ "$policy_count" -ne "2" ];then
if [ "$policy_count" -ne "3" ];then
echo "TEST: [$TEST_NAME] failed! due to policy should be ignore"
exit 1
fi
11 changes: 11 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,17 @@ func (t *TableInfo) IsLocked() bool {
return t.Lock != nil && len(t.Lock.Sessions) > 0
}

// ClearPlacement clears all table and partitions' placement settings
func (t *TableInfo) ClearPlacement() {
t.PlacementPolicyRef = nil
if t.Partition != nil {
for i := range t.Partition.Definitions {
def := &t.Partition.Definitions[i]
def.PlacementPolicyRef = nil
}
}
}

// NewExtraHandleColInfo mocks a column info for extra handle column.
func NewExtraHandleColInfo() *ColumnInfo {
colInfo := &ColumnInfo{
Expand Down