Skip to content

Commit

Permalink
Merge branch 'release-5.3' into release-5.3-b860500988e0
Browse files Browse the repository at this point in the history
  • Loading branch information
zyguan authored Jun 16, 2022
2 parents 1becb88 + bae61dc commit 1827286
Show file tree
Hide file tree
Showing 25 changed files with 712 additions and 138 deletions.
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Session interface {
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
Close()
GetGlobalVariable(name string) (string, error)
}

// Progress is an interface recording the current execution progress.
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func (gs *tidbSession) Close() {
gs.se.Close()
}

// GetGlobalVariables implements glue.Session.
func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}

// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
Expand Down
46 changes: 46 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ type UniqueTableName struct {
Table string
}

type DDLJobFilterRule func(ddlJob *model.Job) bool

var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{
model.ActionSetTiFlashReplica: {},
model.ActionUpdateTiFlashReplicaStatus: {},
model.ActionLockTable: {},
model.ActionUnlockTable: {},
}

// NewDB returns a new DB.
func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
se, err := g.CreateSession(store)
Expand Down Expand Up @@ -71,6 +80,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

if ddlJob.Query == "" {
log.Warn("query of ddl job is empty, ignore it",
zap.Stringer("type", ddlJob.Type),
zap.String("db", ddlJob.SchemaName))
return nil
}

if tableInfo != nil {
switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDBSQL)
Expand Down Expand Up @@ -280,6 +296,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [
return ddlJobs
}

// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) {
dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs))
for _, ddlJob := range srcDDLJobs {
passed := true
for _, rule := range rules {
if rule(ddlJob) {
passed = false
break
}
}

if passed {
dstDDLJobs = append(dstDDLJobs, ddlJob)
}
}

return
}

// DDLJobBlockListRule rule for filter ddl job with type in block list.
func DDLJobBlockListRule(ddlJob *model.Job) bool {
return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList)
}

func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
Expand All @@ -290,3 +331,8 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
}
return
}

func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool {
_, ok := actions[action]
return ok
}
71 changes: 71 additions & 0 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,74 @@ func (s *testRestoreSchemaSuite) TestFilterDDLJobsV2(c *C) {
}
c.Assert(len(ddlJobs), Equals, 7)
}

func (s *testRestoreSchemaSuite) TestDB_ExecDDL(c *C) {
ctx := context.Background()
ddlJobs := []*model.Job{
{
Type: model.ActionAddIndex,
Query: "CREATE DATABASE IF NOT EXISTS test_db;",
BinlogInfo: &model.HistoryInfo{},
},
{
Type: model.ActionAddIndex,
Query: "",
BinlogInfo: &model.HistoryInfo{},
},
}

db, err := restore.NewDB(gluetidb.New(), s.mock.Storage)
c.Assert(err, IsNil)

for _, ddlJob := range ddlJobs {
err = db.ExecDDL(ctx, ddlJob)
c.Assert(err, IsNil)
}
}

func (s *testRestoreSchemaSuite) TestFilterDDLJobByRules(c *C) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
}

expectedDDLTypes := []model.ActionType{
model.ActionAddPrimaryKey,
model.ActionCreateTable,
model.ActionAddIndex,
model.ActionCreateSchema,
model.ActionModifyColumn,
}

ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

c.Assert(len(ddlJobs), Equals, len(expectedDDLTypes))
for i, ddlJob := range ddlJobs {
c.Assert(ddlJob.Type, Equals, expectedDDLTypes[i])
}
}
11 changes: 11 additions & 0 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
statsHandle = mgr.GetDomain().StatsHandle()
}

se, err := g.CreateSession(mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
newCollationEnable, err := se.GetGlobalVariable(tidbNewCollationEnabled)
if err != nil {
return errors.Trace(err)
}
log.Info("get newCollationEnable for check during restore", zap.String("newCollationEnable", newCollationEnable))

client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -339,6 +349,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
m.ClusterId = req.ClusterId
m.ClusterVersion = clusterVersion
m.BrVersion = brVersion
m.NewCollationsEnabled = newCollationEnable
})

// nothing to backup
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
crypterAES128KeyLen = 16
crypterAES192KeyLen = 24
crypterAES256KeyLen = 32

tidbNewCollationEnabled = "new_collation_enabled"
)

// TLSConfig is the common configuration for TLS connection.
Expand Down
43 changes: 43 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package task

import (
"context"
"strings"
"time"

"github.com/opentracing/opentracing-go"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/spf13/pflag"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -212,6 +214,42 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error {
return nil
}

func CheckNewCollationEnable(
backupNewCollationEnable string,
g glue.Glue,
storage kv.Storage,
CheckRequirements bool,
) error {
if backupNewCollationEnable == "" {
if CheckRequirements {
return errors.Annotatef(berrors.ErrUnknown,
"NewCollactionEnable not found in backupmeta. "+
"if you ensure the NewCollactionEnable config of backup cluster is as same as restore cluster, "+
"use --check-requirements=false to skip")
} else {
log.Warn("no NewCollactionEnable in backup")
return nil
}
}

se, err := g.CreateSession(storage)
if err != nil {
return errors.Trace(err)
}

newCollationEnable, err := se.GetGlobalVariable(tidbNewCollationEnabled)
if err != nil {
return errors.Trace(err)
}

if !strings.EqualFold(backupNewCollationEnable, newCollationEnable) {
return errors.Annotatef(berrors.ErrUnknown,
"newCollationEnable not match, upstream:%v, downstream: %v",
backupNewCollationEnable, newCollationEnable)
}
return nil
}

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
cfg.adjustRestoreConfig()
Expand Down Expand Up @@ -278,6 +316,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(versionErr)
}
}
if err = CheckNewCollationEnable(backupMeta.GetNewCollationsEnabled(), g, mgr.GetStorage(), cfg.CheckRequirements); err != nil {
return errors.Trace(err)
}

reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.InitBackupMeta(c, backupMeta, u, s, reader); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -322,6 +364,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
newTS = restoreTS
}
ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

err = client.PreCheckTableTiFlashReplica(ctx, tables)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func CheckClusterVersion(ctx context.Context, client pd.Client, checker VerCheck
if err := checkTiFlashVersion(s); err != nil {
return errors.Trace(err)
}
continue
}

tikvVersionString := removeVAndHash(s.Version)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# config of tidb

# Schema lease duration
# There are lot of ddl in the tests, setting this
# to 360s to test whether BR is gracefully shutdown.
lease = "360s"

new_collations_enabled_on_first_bootstrap = false

[security]
ssl-ca = "/tmp/backup_restore_test/certs/ca.pem"
ssl-cert = "/tmp/backup_restore_test/certs/tidb.pem"
ssl-key = "/tmp/backup_restore_test/certs/tidb.key"
cluster-ssl-ca = "/tmp/backup_restore_test/certs/ca.pem"
cluster-ssl-cert = "/tmp/backup_restore_test/certs/tidb.pem"
cluster-ssl-key = "/tmp/backup_restore_test/certs/tidb.key"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# config of tidb

# Schema lease duration
# There are lot of ddl in the tests, setting this
# to 360s to test whether BR is gracefully shutdown.
lease = "360s"

new_collations_enabled_on_first_bootstrap = true

[security]
ssl-ca = "/tmp/backup_restore_test/certs/ca.pem"
ssl-cert = "/tmp/backup_restore_test/certs/tidb.pem"
ssl-key = "/tmp/backup_restore_test/certs/tidb.key"
cluster-ssl-ca = "/tmp/backup_restore_test/certs/ca.pem"
cluster-ssl-cert = "/tmp/backup_restore_test/certs/tidb.pem"
cluster-ssl-key = "/tmp/backup_restore_test/certs/tidb.key"
Loading

0 comments on commit 1827286

Please sign in to comment.