Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
br: Ignore ddl jobs with empty query or blacklist type when exec rest…
Browse files Browse the repository at this point in the history
…ore (#1480)
  • Loading branch information
WangLe1321 committed Dec 14, 2022
1 parent 3ac0ac1 commit 1a12817
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
53 changes: 53 additions & 0 deletions pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ type DB struct {
se glue.Session
}

// UniqueTableName identifies a unique table
type UniqueTableName struct {
DB string
Table string
}

// DDLJobFilterRule judges whether a ddl job should be ignored
type DDLJobFilterRule func(ddlJob *model.Job) bool

var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{
model.ActionSetTiFlashReplica: {},
model.ActionUpdateTiFlashReplicaStatus: {},
model.ActionLockTable: {},
model.ActionUnlockTable: {},
}

// NewDB returns a new DB.
func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
se, err := g.CreateSession(store)
Expand Down Expand Up @@ -67,6 +83,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

if ddlJob.Query == "" {
log.Warn("query of ddl job is empty, ignore it",
zap.Stringer("type", ddlJob.Type),
zap.String("db", ddlJob.SchemaName))
return nil
}

if tableInfo != nil {
switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDBSQL)
Expand Down Expand Up @@ -265,6 +288,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [
return ddlJobs
}

// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) {
dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs))
for _, ddlJob := range srcDDLJobs {
passed := true
for _, rule := range rules {
if rule(ddlJob) {
passed = false
break
}
}

if passed {
dstDDLJobs = append(dstDDLJobs, ddlJob)
}
}

return
}

// DDLJobBlockListRule rule for filter ddl job with type in block list.
func DDLJobBlockListRule(ddlJob *model.Job) bool {
return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList)
}

func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
Expand All @@ -275,3 +323,8 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
}
return
}

func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool {
_, ok := actions[action]
return ok
}
71 changes: 71 additions & 0 deletions pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,74 @@ func (s *testRestoreSchemaSuite) TestFilterDDLJobsV2(c *C) {
}
c.Assert(len(ddlJobs), Equals, 7)
}

func (s *testRestoreSchemaSuite) TestDB_ExecDDL(c *C) {
ctx := context.Background()
ddlJobs := []*model.Job{
{
Type: model.ActionAddIndex,
Query: "CREATE DATABASE IF NOT EXISTS test_db;",
BinlogInfo: &model.HistoryInfo{},
},
{
Type: model.ActionAddIndex,
Query: "",
BinlogInfo: &model.HistoryInfo{},
},
}

db, err := restore.NewDB(gluetidb.New(), s.mock.Storage)
c.Assert(err, IsNil)

for _, ddlJob := range ddlJobs {
err = db.ExecDDL(ctx, ddlJob)
c.Assert(err, IsNil)
}
}

func (s *testRestoreSchemaSuite) TestFilterDDLJobByRules(c *C) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
}

expectedDDLTypes := []model.ActionType{
model.ActionAddPrimaryKey,
model.ActionCreateTable,
model.ActionAddIndex,
model.ActionCreateSchema,
model.ActionModifyColumn,
}

ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

c.Assert(len(ddlJobs), Equals, len(expectedDDLTypes))
for i, ddlJob := range ddlJobs {
c.Assert(ddlJob.Type, Equals, expectedDDLTypes[i])
}
}
1 change: 1 addition & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
newTS = restoreTS
}
ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

err = client.PreCheckTableTiFlashReplica(ctx, tables)
if err != nil {
Expand Down

0 comments on commit 1a12817

Please sign in to comment.