Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

br: Ignore ddl jobs with empty query or blacklist type when exec restore #1480

Merged
merged 2 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ type DB struct {
se glue.Session
}

// UniqueTableName identifies a unique table
type UniqueTableName struct {
DB string
Table string
}

// DDLJobFilterRule judges whether a ddl job should be ignored
type DDLJobFilterRule func(ddlJob *model.Job) bool

var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{
model.ActionSetTiFlashReplica: {},
model.ActionUpdateTiFlashReplicaStatus: {},
model.ActionLockTable: {},
model.ActionUnlockTable: {},
}

// NewDB returns a new DB.
func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
se, err := g.CreateSession(store)
Expand Down Expand Up @@ -67,6 +83,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

if ddlJob.Query == "" {
log.Warn("query of ddl job is empty, ignore it",
zap.Stringer("type", ddlJob.Type),
zap.String("db", ddlJob.SchemaName))
return nil
}

if tableInfo != nil {
switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDBSQL)
Expand Down Expand Up @@ -265,6 +288,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [
return ddlJobs
}

// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) {
dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs))
for _, ddlJob := range srcDDLJobs {
passed := true
for _, rule := range rules {
if rule(ddlJob) {
passed = false
break
}
}

if passed {
dstDDLJobs = append(dstDDLJobs, ddlJob)
}
}

return
}

// DDLJobBlockListRule rule for filter ddl job with type in block list.
func DDLJobBlockListRule(ddlJob *model.Job) bool {
return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList)
}

func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
Expand All @@ -275,3 +323,8 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
}
return
}

func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool {
_, ok := actions[action]
return ok
}
71 changes: 71 additions & 0 deletions pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,74 @@ func (s *testRestoreSchemaSuite) TestFilterDDLJobsV2(c *C) {
}
c.Assert(len(ddlJobs), Equals, 7)
}

func (s *testRestoreSchemaSuite) TestDB_ExecDDL(c *C) {
ctx := context.Background()
ddlJobs := []*model.Job{
{
Type: model.ActionAddIndex,
Query: "CREATE DATABASE IF NOT EXISTS test_db;",
BinlogInfo: &model.HistoryInfo{},
},
{
Type: model.ActionAddIndex,
Query: "",
BinlogInfo: &model.HistoryInfo{},
},
}

db, err := restore.NewDB(gluetidb.New(), s.mock.Storage)
c.Assert(err, IsNil)

for _, ddlJob := range ddlJobs {
err = db.ExecDDL(ctx, ddlJob)
c.Assert(err, IsNil)
}
}

func (s *testRestoreSchemaSuite) TestFilterDDLJobByRules(c *C) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
}

expectedDDLTypes := []model.ActionType{
model.ActionAddPrimaryKey,
model.ActionCreateTable,
model.ActionAddIndex,
model.ActionCreateSchema,
model.ActionModifyColumn,
}

ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

c.Assert(len(ddlJobs), Equals, len(expectedDDLTypes))
for i, ddlJob := range ddlJobs {
c.Assert(ddlJob.Type, Equals, expectedDDLTypes[i])
}
}
1 change: 1 addition & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
newTS = restoreTS
}
ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

err = client.PreCheckTableTiFlashReplica(ctx, tables)
if err != nil {
Expand Down