Skip to content

Commit

Permalink
br: Ignore ddl jobs with empty query or blacklist type when exec rest…
Browse files Browse the repository at this point in the history
…ore (#33384)

close #33322
  • Loading branch information
WangLe1321 authored Mar 28, 2022
1 parent 4ff3ec2 commit 11db011
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 0 deletions.
46 changes: 46 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ type UniqueTableName struct {
Table string
}

type DDLJobFilterRule func(ddlJob *model.Job) bool

var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{
model.ActionSetTiFlashReplica: {},
model.ActionUpdateTiFlashReplicaStatus: {},
model.ActionLockTable: {},
model.ActionUnlockTable: {},
}

// NewDB returns a new DB.
func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error) {
se, err := g.CreateSession(store)
Expand Down Expand Up @@ -90,6 +99,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

if ddlJob.Query == "" {
log.Warn("query of ddl job is empty, ignore it",
zap.Stringer("type", ddlJob.Type),
zap.String("db", ddlJob.SchemaName))
return nil
}

if tableInfo != nil {
switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDBSQL)
Expand Down Expand Up @@ -409,6 +425,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [
return ddlJobs
}

// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) {
dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs))
for _, ddlJob := range srcDDLJobs {
passed := true
for _, rule := range rules {
if rule(ddlJob) {
passed = false
break
}
}

if passed {
dstDDLJobs = append(dstDDLJobs, ddlJob)
}
}

return
}

// DDLJobBlockListRule rule for filter ddl job with type in block list.
func DDLJobBlockListRule(ddlJob *model.Job) bool {
return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList)
}

func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
Expand All @@ -419,3 +460,8 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
}
return
}

func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool {
_, ok := actions[action]
return ok
}
75 changes: 75 additions & 0 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
Expand Down Expand Up @@ -293,3 +294,77 @@ func TestFilterDDLJobsV2(t *testing.T) {
}
require.Equal(t, 7, len(ddlJobs))
}

func TestDB_ExecDDL(t *testing.T) {
s, clean := createRestoreSchemaSuite(t)
defer clean()

ctx := context.Background()
ddlJobs := []*model.Job{
{
Type: model.ActionAddIndex,
Query: "CREATE DATABASE IF NOT EXISTS test_db;",
BinlogInfo: &model.HistoryInfo{},
},
{
Type: model.ActionAddIndex,
Query: "",
BinlogInfo: &model.HistoryInfo{},
},
}

db, _, err := restore.NewDB(gluetidb.New(), s.mock.Storage, "STRICT")
require.NoError(t, err)

for _, ddlJob := range ddlJobs {
err = db.ExecDDL(ctx, ddlJob)
assert.NoError(t, err)
}
}

func TestFilterDDLJobByRules(t *testing.T) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
}

expectedDDLTypes := []model.ActionType{
model.ActionAddPrimaryKey,
model.ActionCreateTable,
model.ActionAddIndex,
model.ActionCreateSchema,
model.ActionModifyColumn,
}

ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

require.Equal(t, len(expectedDDLTypes), len(ddlJobs))
for i, ddlJob := range ddlJobs {
assert.Equal(t, expectedDDLTypes[i], ddlJob.Type)
}
}
1 change: 1 addition & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
newTS = restoreTS
}
ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

err = client.PreCheckTableTiFlashReplica(ctx, tables)
if err != nil {
Expand Down

0 comments on commit 11db011

Please sign in to comment.