Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: support the ddl(exchange partition) when PiTR #37050

Merged
merged 9 commits into from
Aug 25, 2022
68 changes: 50 additions & 18 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type DBReplace struct {

type SchemasReplace struct {
DbMap map[OldID]*DBReplace
RewriteTS uint64
TableFilter filter.Filter
globalTableIdMap map[OldID]NewID
RewriteTS uint64 // used to rewrite commit ts in meta kv.
TableFilter filter.Filter // used to filter schema/table
genGenGlobalID func(ctx context.Context) (int64, error)
genGenGlobalIDs func(ctx context.Context, n int) ([]int64, error)
insertDeleteRangeForTable func(jobID int64, tableIDs []int64)
Expand Down Expand Up @@ -92,8 +93,19 @@ func NewSchemasReplace(
insertDeleteRangeForTable func(jobID int64, tableIDs []int64),
insertDeleteRangeForIndex func(jobID int64, elementID *int64, tableID int64, indexIDs []int64),
) *SchemasReplace {
globalTableIdMap := make(map[OldID]NewID)
for _, dr := range dbMap {
for tblID, tr := range dr.TableMap {
globalTableIdMap[tblID] = tr.NewTableID
for oldpID, newpID := range tr.PartitionMap {
globalTableIdMap[oldpID] = newpID
}
}
}

return &SchemasReplace{
DbMap: dbMap,
globalTableIdMap: globalTableIdMap,
RewriteTS: restoreTS,
TableFilter: tableFilter,
genGenGlobalID: genID,
Expand Down Expand Up @@ -199,6 +211,11 @@ func (sr *SchemasReplace) rewriteKeyForTable(
parseField func([]byte) (tableID int64, err error),
encodeField func(tableID int64) []byte,
) ([]byte, bool, error) {
var (
err error
newID int64
exist bool
)
rawMetaKey, err := ParseTxnMetaKeyFrom(key)
if err != nil {
return nil, false, errors.Trace(err)
Expand All @@ -216,7 +233,7 @@ func (sr *SchemasReplace) rewriteKeyForTable(

dbReplace, exist := sr.DbMap[dbID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -226,9 +243,13 @@ func (sr *SchemasReplace) rewriteKeyForTable(

tableReplace, exist := dbReplace.TableMap[tableID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tableID]
if !exist {
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tableID] = newID
}
tableReplace = NewTableReplace(nil, newID)
dbReplace.TableMap[tableID] = tableReplace
Expand All @@ -243,15 +264,20 @@ func (sr *SchemasReplace) rewriteKeyForTable(
}

func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bool, error) {
var tableInfo model.TableInfo
var (
tableInfo model.TableInfo
err error
newID int64
exist bool
)
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, false, errors.Trace(err)
}

// update table ID
dbReplace, exist := sr.DbMap[dbID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -261,10 +287,15 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo

tableReplace, exist := dbReplace.TableMap[tableInfo.ID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tableInfo.ID]
if !exist {
newID, err = sr.genGenGlobalID(context.TODO())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tableInfo.ID] = newID
}

tableReplace = NewTableReplace(&tableInfo, newID)
dbReplace.TableMap[tableInfo.ID] = tableReplace
} else {
Expand All @@ -287,12 +318,15 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo
partitions := newTableInfo.GetPartitionInfo()
if partitions != nil {
for i, tbl := range partitions.Definitions {
newID, exist := tableReplace.PartitionMap[tbl.ID]
newID, exist = tableReplace.PartitionMap[tbl.ID]
if !exist {
var err error
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tbl.ID]
if !exist {
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tbl.ID] = newID
}
tableReplace.PartitionMap[tbl.ID] = newID
}
Expand Down Expand Up @@ -470,8 +504,6 @@ func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
return err
}
}
case model.ActionExchangeTablePartition:
return errors.Errorf("restore of ddl `exchange-table-partition` is not supported")
}
}
return nil
Expand Down
110 changes: 101 additions & 9 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@ func mockGenGenGlobalID(ctx context.Context) (int64, error) {
return increaseID, nil
}

func ProduceValue(tableName string, dbID int64) ([]byte, error) {
tableInfo := model.TableInfo{
ID: dbID,
Name: model.NewCIStr(tableName),
}

return json.Marshal(tableInfo)
}

func MockEmptySchemasReplace(midr *mockInsertDeleteRange) *SchemasReplace {
dbMap := make(map[OldID]*DBReplace)
if midr == nil {
Expand Down Expand Up @@ -211,6 +202,107 @@ func TestRewriteValueForPartitionTable(t *testing.T) {
require.Equal(t, tableInfo.Partition.Definitions[1].ID, newID2)
}

func TestRewriteValueForExchangePartition(t *testing.T) {
var (
dbID1 int64 = 100
tableID1 int64 = 101
pt1ID int64 = 102
pt2ID int64 = 103
tableName1 = "t1"
pt1Name = "pt1"
pt2Name = "pt2"

dbID2 int64 = 105
tableID2 int64 = 106
tableName2 = "t2"
tableInfo model.TableInfo
)

// construct the partition table t1
pt1 := model.PartitionDefinition{
ID: pt1ID,
Name: model.NewCIStr(pt1Name),
}
pt2 := model.PartitionDefinition{
ID: pt2ID,
Name: model.NewCIStr(pt2Name),
}

pi := model.PartitionInfo{
Enable: true,
Definitions: make([]model.PartitionDefinition, 0),
}
pi.Definitions = append(pi.Definitions, pt1, pt2)
t1 := model.TableInfo{
ID: tableID1,
Name: model.NewCIStr(tableName1),
Partition: &pi,
}
db1 := model.DBInfo{
ID: dbID1,
}

// construct the no partition table t2
t2 := model.TableInfo{
ID: tableID2,
Name: model.NewCIStr(tableName2),
}
db2 := model.DBInfo{
ID: dbID2,
}

// construct the SchemaReplace
dbMap := make(map[OldID]*DBReplace)
dbMap[dbID1] = NewDBReplace(&db1, dbID1+100)
dbMap[dbID1].TableMap[tableID1] = NewTableReplace(&t1, tableID1+100)
dbMap[dbID1].TableMap[tableID1].PartitionMap[pt1ID] = pt1ID + 100
dbMap[dbID1].TableMap[tableID1].PartitionMap[pt2ID] = pt2ID + 100

dbMap[dbID2] = NewDBReplace(&db2, dbID2+100)
dbMap[dbID2].TableMap[tableID2] = NewTableReplace(&t2, tableID2+100)

sr := NewSchemasReplace(
dbMap,
0,
filter.All(),
mockGenGenGlobalID,
nil,
nil,
nil,
)
require.Equal(t, len(sr.globalTableIdMap), 4)

//exchange parition, t1 parition0 with the t2
t1Copy := t1.Clone()
t1Copy.Partition = t1.Partition.Clone()
t2Copy := t2.Clone()

t1Copy.Partition.Definitions[0].ID = tableID2
t2Copy.ID = pt1ID

// rewrite partition table
value, err := json.Marshal(&t1Copy)
require.Nil(t, err)
value, needRewrite, err := sr.rewriteTableInfo(value, dbID1)
require.Nil(t, err)
require.True(t, needRewrite)
err = json.Unmarshal(value, &tableInfo)
require.Nil(t, err)
require.Equal(t, tableInfo.ID, tableID1+100)
require.Equal(t, tableInfo.Partition.Definitions[0].ID, tableID2+100)
require.Equal(t, tableInfo.Partition.Definitions[1].ID, pt2ID+100)

// rewrite no partition table
value, err = json.Marshal(&t2Copy)
require.Nil(t, err)
value, needRewrite, err = sr.rewriteTableInfo(value, dbID2)
require.Nil(t, err)
require.True(t, needRewrite)
err = json.Unmarshal(value, &tableInfo)
require.Nil(t, err)
require.Equal(t, tableInfo.ID, pt1ID+100)
}

// db:70->80 -
// | - t0:71->81 -
// | | - p0:72->82
Expand Down