From aca389cd88666ae35255cae9b22c8113c14f2557 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 14 Apr 2023 17:58:55 +0800 Subject: [PATCH 01/20] basic implementation Signed-off-by: hillium --- errno/errcode.go | 1 + errno/errname.go | 1 + executor/brie.go | 162 ++++++++++++++---- planner/core/planbuilder.go | 32 +++- .../brietest/backup_restore_test.go | 7 +- tests/realtikvtest/brietest/brie_test.go | 76 ++++++++ util/dbterror/exeerrors/errors.go | 1 + 7 files changed, 244 insertions(+), 36 deletions(-) create mode 100644 tests/realtikvtest/brietest/brie_test.go diff --git a/errno/errcode.go b/errno/errcode.go index 9e5bd187875b6..f5ffa35da5d35 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1062,6 +1062,7 @@ const ( ErrLoadDataJobNotFound = 8170 ErrLoadDataInvalidOperation = 8171 ErrLoadDataLocalUnsupportedOption = 8172 + ErrBRJobNotFound = 8173 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index c338fbe733de9..da85527e00f82 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1095,6 +1095,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrBRIERestoreFailed: mysql.Message("Restore failed: %s", nil), ErrBRIEImportFailed: mysql.Message("Import failed: %s", nil), ErrBRIEExportFailed: mysql.Message("Export failed: %s", nil), + ErrBRJobNotFound: mysql.Message("BRIE Job %d not found", nil), ErrInvalidTableSample: mysql.Message("Invalid TABLESAMPLE: %s", nil), diff --git a/executor/brie.go b/executor/brie.go index 6eef77750967d..9bb6178813beb 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -24,8 +24,10 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/task" @@ -52,6 +54,7 @@ import ( filter "github.com/pingcap/tidb/util/table-filter" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "go.uber.org/zap" ) const clearInterval = 10 * time.Minute @@ -67,7 +70,7 @@ type brieTaskProgress struct { // this field is atomically updated outside of the lock below. current int64 - // lock is the mutex protected the two fields below. + // lock is the mutex protected the three fields below. lock syncutil.Mutex // cmd is the name of the step the BRIE task is currently performing. cmd string @@ -94,11 +97,17 @@ func (p *brieTaskProgress) GetCurrent() int64 { // Close implements glue.Progress func (p *brieTaskProgress) Close() { p.lock.Lock() + current := atomic.LoadInt64(&p.current) + if current < p.total { + p.cmd = fmt.Sprintf("%s Cacneled", p.cmd) + } atomic.StoreInt64(&p.current, p.total) p.lock.Unlock() } type brieTaskInfo struct { + id uint64 + query string queueTime types.Time execTime types.Time finishTime types.Time @@ -149,10 +158,19 @@ func (bq *brieQueue) registerTask( taskID := atomic.AddUint64(&bq.nextID, 1) bq.tasks.Store(taskID, item) + info.id = taskID return taskCtx, taskID } +// query task queries a task from the queue. +func (bq *brieQueue) queryTask(taskID uint64) (*brieTaskInfo, bool) { + if item, ok := bq.tasks.Load(taskID); ok { + return item.(*brieQueueItem).info, true + } + return nil, false +} + // acquireTask prepares to execute a BRIE task. Only one BRIE task can be // executed at a time, and this function blocks until the task is ready. // @@ -176,12 +194,16 @@ func (bq *brieQueue) releaseTask() { <-bq.workerCh } -func (bq *brieQueue) cancelTask(taskID uint64) { +func (bq *brieQueue) cancelTask(taskID uint64) bool { item, ok := bq.tasks.Load(taskID) if !ok { - return + return false } - item.(*brieQueueItem).cancel() + i := item.(*brieQueueItem) + i.cancel() + i.progress.Close() + log.Info("BRIE job canceled.", zap.Uint64("ID", i.info.id)) + return true } func (bq *brieQueue) clearTask(sc *stmtctx.StatementContext) { @@ -223,8 +245,23 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) } if s.Kind == ast.BRIEKindShowBackupMeta { - e.fillByShowMetadata(s) - return e + return execOnce(&showMetaExec{ + showConfig: buildShowMetadataConfigFrom(s), + }) + } + + if s.Kind == ast.BRIEKindShowQuery { + return execOnce(&showQueryExec{ + baseExecutor: newBaseExecutor(b.ctx, schema, 0), + targetID: uint64(s.JobID), + }) + } + + if s.Kind == ast.BRIEKindCancelJob { + return &cancelJobExec{ + baseExecutor: newBaseExecutor(b.ctx, schema, 0), + targetID: uint64(s.JobID), + } } tidbCfg := config.GetGlobalConfig() @@ -308,6 +345,12 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) // is expected to be performed insensitive. cfg.TableFilter = filter.CaseInsensitive(cfg.TableFilter) + query, ok := b.ctx.Value(sessionctx.QueryString).(string) + if !ok { + query = "N/A" + } + e.info.query = query + switch s.Kind { case ast.BRIEKindBackup: e.backupCfg = &task.BackupConfig{Config: cfg} @@ -354,6 +397,67 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) return e } +// oneshotExecutor warps a executor, making its `Next` would only be called once. +type oneshotExecutor struct { + Executor + finished bool +} + +func (o *oneshotExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if o.finished { + req.Reset() + return nil + } + + if err := o.Executor.Next(ctx, req); err != nil { + return err + } + o.finished = true + return nil +} + +func execOnce(ex Executor) Executor { + return &oneshotExecutor{Executor: ex} +} + +type showQueryExec struct { + baseExecutor + + targetID uint64 +} + +func (s *showQueryExec) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + + tsk, ok := globalBRIEQueue.queryTask(s.targetID) + if !ok { + return nil + } + + req.AppendString(0, tsk.query) + return nil +} + +type cancelJobExec struct { + baseExecutor + + targetID uint64 +} + +func (s cancelJobExec) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if !globalBRIEQueue.cancelTask(s.targetID) { + s.ctx.GetSessionVars().StmtCtx.AppendWarning(exeerrors.ErrLoadDataJobNotFound.FastGenByArgs(s.targetID)) + } + return nil +} + +type showMetaExec struct { + baseExecutor + + showConfig show.Config +} + // BRIEExec represents an executor for BRIE statements (BACKUP, RESTORE, etc) type BRIEExec struct { baseExecutor @@ -364,23 +468,23 @@ type BRIEExec struct { info *brieTaskInfo } -func (e *BRIEExec) fillByShowMetadata(s *ast.BRIEStmt) { +func buildShowMetadataConfigFrom(s *ast.BRIEStmt) show.Config { if s.Kind != ast.BRIEKindShowBackupMeta { panic(fmt.Sprintf("precondition failed: `fillByShowMetadata` should always called by a ast.BRIEKindShowBackupMeta, but it is %s.", s.Kind)) } store := s.Storage - e.showConfig = &show.Config{ + cfg := show.Config{ Storage: store, Cipher: backuppb.CipherInfo{ CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, }, } - e.info.kind = ast.BRIEKindShowBackupMeta + return cfg } -func (e *BRIEExec) runShowMetadata(ctx context.Context, req *chunk.Chunk) error { - exe, err := show.CreateExec(ctx, *e.showConfig) +func (e *showMetaExec) Next(ctx context.Context, req *chunk.Chunk) error { + exe, err := show.CreateExec(ctx, e.showConfig) if err != nil { return errors.Annotate(err, "failed to create show exec") } @@ -404,24 +508,17 @@ func (e *BRIEExec) runShowMetadata(ctx context.Context, req *chunk.Chunk) error } req.AppendTime(5, types.NewTime(types.FromGoTime(endTime), mysql.TypeDatetime, 0)) } - e.info = nil return nil } // Next implements the Executor Next interface. func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() if e.info == nil { return nil } - if e.info.kind == ast.BRIEKindShowBackupMeta { - // This should be able to execute without the queue. - // NOTE: maybe extract the procedure of executing task in queue - // into a function, make it more tidy. - return e.runShowMetadata(ctx, req) - } - bq := globalBRIEQueue bq.clearTask(e.ctx.GetSessionVars().StmtCtx) @@ -429,7 +526,13 @@ func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error { e.info.queueTime = types.CurrentTime(mysql.TypeDatetime) taskCtx, taskID := bq.registerTask(ctx, e.info) defer bq.cancelTask(taskID) - + failpoint.Inject("block-on-brie", func() { + log.Warn("You shall not pass, nya. :3") + <-taskCtx.Done() + if taskCtx.Err() != nil { + failpoint.Return(taskCtx.Err()) + } + }) // manually monitor the Killed status... go func() { ticker := time.NewTicker(3 * time.Second) @@ -502,17 +605,18 @@ func (e *ShowExec) fetchShowBRIE(kind ast.BRIEKind) error { item.progress.lock.Lock() defer item.progress.lock.Unlock() current := atomic.LoadInt64(&item.progress.current) - e.result.AppendString(0, item.info.storage) - e.result.AppendString(1, item.progress.cmd) - e.result.AppendFloat64(2, 100.0*float64(current)/float64(item.progress.total)) - e.result.AppendTime(3, item.info.queueTime) - e.result.AppendTime(4, item.info.execTime) - e.result.AppendTime(5, item.info.finishTime) - e.result.AppendUint64(6, item.info.connID) + e.result.AppendUint64(0, item.info.id) + e.result.AppendString(1, item.info.storage) + e.result.AppendString(2, item.progress.cmd) + e.result.AppendFloat64(3, 100.0*float64(current)/float64(item.progress.total)) + e.result.AppendTime(4, item.info.queueTime) + e.result.AppendTime(5, item.info.execTime) + e.result.AppendTime(6, item.info.finishTime) + e.result.AppendUint64(7, item.info.connID) if len(item.info.message) > 0 { - e.result.AppendString(7, item.info.message) + e.result.AppendString(8, item.info.message) } else { - e.result.AppendNull(7) + e.result.AppendNull(8) } } return true diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 8b04681350575..e114d918d154e 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3116,16 +3116,22 @@ func buildShowBackupMetaSchema() (*expression.Schema, types.NameSlice) { schema := newColumnsWithNames(len(names)) for i := range names { fLen, _ := mysql.GetDefaultFieldLengthAndDecimal(ftypes[i]) + if ftypes[i] == mysql.TypeVarchar { + // the default varchar value is `5`, which might be too short for us. + fLen = 255 + } schema.Append(buildColumnWithName("", names[i], ftypes[i], fLen)) } return schema.col2Schema(), schema.names } -func buildBRIESchema(kind ast.BRIEKind) (*expression.Schema, types.NameSlice) { - if kind == ast.BRIEKindShowBackupMeta { - return buildShowBackupMetaSchema() - } +func buildShowBackupQuerySchema() (*expression.Schema, types.NameSlice) { + schema := newColumnsWithNames(1) + schema.Append(buildColumnWithName("", "Query", mysql.TypeVarchar, 4096)) + return schema.col2Schema(), schema.names +} +func buildBackupRestoreSchema(kind ast.BRIEKind) (*expression.Schema, types.NameSlice) { longlongSize, _ := mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeLonglong) datetimeSize, _ := mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeDatetime) @@ -3141,6 +3147,20 @@ func buildBRIESchema(kind ast.BRIEKind) (*expression.Schema, types.NameSlice) { return schema.col2Schema(), schema.names } +func buildBRIESchema(kind ast.BRIEKind) (*expression.Schema, types.NameSlice) { + switch kind { + case ast.BRIEKindShowBackupMeta: + return buildShowBackupMetaSchema() + case ast.BRIEKindShowQuery: + return buildShowBackupQuerySchema() + case ast.BRIEKindBackup, ast.BRIEKindRestore: + return buildBackupRestoreSchema(kind) + default: + s := newColumnsWithNames(0) + return s.col2Schema(), s.names + } +} + func buildCalibrateResourceSchema() (*expression.Schema, types.NameSlice) { longlongSize, _ := mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeLonglong) schema := newColumnsWithNames(1) @@ -5240,8 +5260,8 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp names = []string{"Supported_builtin_functions"} ftypes = []byte{mysql.TypeVarchar} case ast.ShowBackups, ast.ShowRestores: - names = []string{"Destination", "State", "Progress", "Queue_time", "Execution_time", "Finish_time", "Connection", "Message"} - ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDouble, mysql.TypeDatetime, mysql.TypeDatetime, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeVarchar} + names = []string{"Id", "Destination", "State", "Progress", "Queue_time", "Execution_time", "Finish_time", "Connection", "Message"} + ftypes = []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDouble, mysql.TypeDatetime, mysql.TypeDatetime, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeVarchar} case ast.ShowPlacementLabels: names = []string{"Key", "Values"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeJSON} diff --git a/tests/realtikvtest/brietest/backup_restore_test.go b/tests/realtikvtest/brietest/backup_restore_test.go index fe77aede45f21..d159d96db5f36 100644 --- a/tests/realtikvtest/brietest/backup_restore_test.go +++ b/tests/realtikvtest/brietest/backup_restore_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestBackupAndRestore(t *testing.T) { +func initTestKit(t *testing.T) *testkit.TestKit { if !*realtikvtest.WithRealTiKV { t.Skip("only run BR SQL integration test with tikv store") } @@ -38,6 +38,11 @@ func TestBackupAndRestore(t *testing.T) { config.StoreGlobalConfig(cfg) tk := testkit.NewTestKit(t, store) + return tk +} + +func TestBackupAndRestore(t *testing.T) { + tk := initTestKit(t) tk.MustExec("create database if not exists br") tk.MustExec("use br") tk.MustExec("create table t1(v int)") diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go new file mode 100644 index 0000000000000..d17fada9f9bc8 --- /dev/null +++ b/tests/realtikvtest/brietest/brie_test.go @@ -0,0 +1,76 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package brietest + +import ( + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" +) + +func makeTempDirForBackup(t *testing.T) string { + d, err := os.MkdirTemp(os.TempDir(), "briesql-*") + require.NoError(t, err) + t.Cleanup(func() { + os.RemoveAll(d) + }) + return d +} + +func TestShowBackupQuery(t *testing.T) { + tk := initTestKit(t) + tmp := makeTempDirForBackup(t) + sqlTmp := strings.ReplaceAll(tmp, "'", "''") + + log.SetLevel(zapcore.ErrorLevel) + tk.MustExec("use test;") + tk.MustExec("create table foo(pk int primary key auto_increment, v varchar(255));") + tk.MustExec("insert into foo(v) values " + strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",") + ";") + backupQuery := fmt.Sprintf("backup database * to 'local://%s';", sqlTmp) + _ = tk.MustQuery(backupQuery) + // NOTE: we assume a auto-increamental ID here. + // once we implement other ID allocation, we may have to change this case. + res := tk.MustQuery("show br job query 1;") + res.CheckContain(backupQuery) + + tk.MustExec("drop table foo;") + restoreQuery := fmt.Sprintf("restore table test.foo from 'local://%s';", sqlTmp) + tk.MustQuery(restoreQuery) + res = tk.MustQuery("show br job query 2;") + res.CheckContain(restoreQuery) +} + +func TestCancel(t *testing.T) { + tk := initTestKit(t) + tk.MustExec("use test;") + failpoint.Enable("github.com/pingcap/tidb/executor/block-on-brie", "return") + + req := require.New(t) + ch := make(chan struct{}) + go func() { + err := tk.QueryToErr("backup database * to 'noop://'") + req.Error(err) + close(ch) + }() + + check := func() bool { + wb := tk.Session().GetSessionVars().StmtCtx.WarningCount() + tk.MustExec("cancel br job 1;") + wa := tk.Session().GetSessionVars().StmtCtx.WarningCount() + return wb == wa + } + req.Eventually(check, 5 * time.Second, 1 * time.Second) + + select { + case <-ch: + case <-time.After(5 * time.Second): + req.FailNow("the backup job doesn't be canceled") + } +} diff --git a/util/dbterror/exeerrors/errors.go b/util/dbterror/exeerrors/errors.go index 40221aed99d6b..f8f780c1f9f7a 100644 --- a/util/dbterror/exeerrors/errors.go +++ b/util/dbterror/exeerrors/errors.go @@ -62,6 +62,7 @@ var ( ErrBRIERestoreFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIERestoreFailed) ErrBRIEImportFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEImportFailed) ErrBRIEExportFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEExportFailed) + ErrBRJobNotFound = dbterror.ClassExecutor.NewStd(mysql.ErrBRJobNotFound) ErrCTEMaxRecursionDepth = dbterror.ClassExecutor.NewStd(mysql.ErrCTEMaxRecursionDepth) ErrNotSupportedWithSem = dbterror.ClassOptimizer.NewStd(mysql.ErrNotSupportedWithSem) ErrPluginIsNotLoaded = dbterror.ClassExecutor.NewStd(mysql.ErrPluginIsNotLoaded) From 0a7fdc35a1b0e5e3521f2dfeac8755bb36e46c63 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 14 Apr 2023 18:07:08 +0800 Subject: [PATCH 02/20] make format && make bazel_prepare Signed-off-by: hillium --- errno/errname.go | 2 +- planner/core/planbuilder.go | 22 +++++++++---------- tests/realtikvtest/brietest/BUILD.bazel | 4 ++++ .../brietest/backup_restore_test.go | 2 +- tests/realtikvtest/brietest/brie_test.go | 20 ++++++++--------- util/dbterror/exeerrors/errors.go | 2 +- 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/errno/errname.go b/errno/errname.go index da85527e00f82..fe44b5b434d78 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1095,7 +1095,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrBRIERestoreFailed: mysql.Message("Restore failed: %s", nil), ErrBRIEImportFailed: mysql.Message("Import failed: %s", nil), ErrBRIEExportFailed: mysql.Message("Export failed: %s", nil), - ErrBRJobNotFound: mysql.Message("BRIE Job %d not found", nil), + ErrBRJobNotFound: mysql.Message("BRIE Job %d not found", nil), ErrInvalidTableSample: mysql.Message("Invalid TABLESAMPLE: %s", nil), diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index e114d918d154e..51efd0cfa2097 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3148,17 +3148,17 @@ func buildBackupRestoreSchema(kind ast.BRIEKind) (*expression.Schema, types.Name } func buildBRIESchema(kind ast.BRIEKind) (*expression.Schema, types.NameSlice) { - switch kind { - case ast.BRIEKindShowBackupMeta: - return buildShowBackupMetaSchema() - case ast.BRIEKindShowQuery: - return buildShowBackupQuerySchema() - case ast.BRIEKindBackup, ast.BRIEKindRestore: - return buildBackupRestoreSchema(kind) - default: - s := newColumnsWithNames(0) - return s.col2Schema(), s.names - } + switch kind { + case ast.BRIEKindShowBackupMeta: + return buildShowBackupMetaSchema() + case ast.BRIEKindShowQuery: + return buildShowBackupQuerySchema() + case ast.BRIEKindBackup, ast.BRIEKindRestore: + return buildBackupRestoreSchema(kind) + default: + s := newColumnsWithNames(0) + return s.col2Schema(), s.names + } } func buildCalibrateResourceSchema() (*expression.Schema, types.NameSlice) { diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index c3118c4d7a88a..d4545619a3330 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -6,6 +6,7 @@ go_test( srcs = [ "backup_restore_test.go", "binlog_test.go", + "brie_test.go", "main_test.go", ], flaky = True, @@ -18,9 +19,12 @@ go_test( "//testkit", "//testkit/testsetup", "//tests/realtikvtest", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_log//:log", "@com_github_pingcap_tipb//go-binlog", "@com_github_stretchr_testify//require", "@org_golang_google_grpc//:grpc", "@org_uber_go_goleak//:goleak", + "@org_uber_go_zap//zapcore", ], ) diff --git a/tests/realtikvtest/brietest/backup_restore_test.go b/tests/realtikvtest/brietest/backup_restore_test.go index d159d96db5f36..2fa0ba6bebfd5 100644 --- a/tests/realtikvtest/brietest/backup_restore_test.go +++ b/tests/realtikvtest/brietest/backup_restore_test.go @@ -38,7 +38,7 @@ func initTestKit(t *testing.T) *testkit.TestKit { config.StoreGlobalConfig(cfg) tk := testkit.NewTestKit(t, store) - return tk + return tk } func TestBackupAndRestore(t *testing.T) { diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index d17fada9f9bc8..a392663563101 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -61,16 +61,16 @@ func TestCancel(t *testing.T) { }() check := func() bool { - wb := tk.Session().GetSessionVars().StmtCtx.WarningCount() - tk.MustExec("cancel br job 1;") - wa := tk.Session().GetSessionVars().StmtCtx.WarningCount() - return wb == wa + wb := tk.Session().GetSessionVars().StmtCtx.WarningCount() + tk.MustExec("cancel br job 1;") + wa := tk.Session().GetSessionVars().StmtCtx.WarningCount() + return wb == wa } - req.Eventually(check, 5 * time.Second, 1 * time.Second) + req.Eventually(check, 5*time.Second, 1*time.Second) - select { - case <-ch: - case <-time.After(5 * time.Second): - req.FailNow("the backup job doesn't be canceled") - } + select { + case <-ch: + case <-time.After(5 * time.Second): + req.FailNow("the backup job doesn't be canceled") + } } diff --git a/util/dbterror/exeerrors/errors.go b/util/dbterror/exeerrors/errors.go index f8f780c1f9f7a..40339d67845ed 100644 --- a/util/dbterror/exeerrors/errors.go +++ b/util/dbterror/exeerrors/errors.go @@ -62,7 +62,7 @@ var ( ErrBRIERestoreFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIERestoreFailed) ErrBRIEImportFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEImportFailed) ErrBRIEExportFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEExportFailed) - ErrBRJobNotFound = dbterror.ClassExecutor.NewStd(mysql.ErrBRJobNotFound) + ErrBRJobNotFound = dbterror.ClassExecutor.NewStd(mysql.ErrBRJobNotFound) ErrCTEMaxRecursionDepth = dbterror.ClassExecutor.NewStd(mysql.ErrCTEMaxRecursionDepth) ErrNotSupportedWithSem = dbterror.ClassOptimizer.NewStd(mysql.ErrNotSupportedWithSem) ErrPluginIsNotLoaded = dbterror.ClassExecutor.NewStd(mysql.ErrPluginIsNotLoaded) From 1c5bc851a5962ba385bd2e47c5b1fe51e9c97ef0 Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 17 Apr 2023 10:08:07 +0800 Subject: [PATCH 03/20] make clippy happy Signed-off-by: hillium --- executor/brie.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/brie.go b/executor/brie.go index 9bb6178813beb..b79ea6378384e 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -513,7 +513,6 @@ func (e *showMetaExec) Next(ctx context.Context, req *chunk.Chunk) error { // Next implements the Executor Next interface. func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error { - req.Reset() if e.info == nil { return nil From 32125ab93669bf13f787858cee6b048e21371c0e Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 17 Apr 2023 11:27:11 +0800 Subject: [PATCH 04/20] redact the access key (along with endpoints, consistency?) Signed-off-by: hillium --- executor/brie.go | 19 ++++++++++---- tests/realtikvtest/brietest/brie_test.go | 32 ++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/executor/brie.go b/executor/brie.go index b79ea6378384e..13498d3b34a89 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" @@ -345,11 +346,10 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) // is expected to be performed insensitive. cfg.TableFilter = filter.CaseInsensitive(cfg.TableFilter) - query, ok := b.ctx.Value(sessionctx.QueryString).(string) - if !ok { - query = "N/A" - } - e.info.query = query + // We cannot directly use the query string, or the secret may be print. + // NOTE: the ownership of `s.Storage` is taken here. + s.Storage = e.info.storage + e.info.query = restoreQuery(s) switch s.Kind { case ast.BRIEKindBackup: @@ -760,3 +760,12 @@ func (gs *tidbGlueSession) UseOneShotSession(store kv.Storage, closeDomain bool, // in SQL backup. we don't need to close domain. return fn(gs) } + +func restoreQuery(stmt *ast.BRIEStmt) string { + out := bytes.NewBuffer(nil) + rc := format.NewRestoreCtx(format.RestoreNameBackQuotes|format.RestoreStringSingleQuotes, out) + if err := stmt.Restore(rc); err != nil { + return "N/A" + } + return out.String() +} diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index a392663563101..ba894f187dbb5 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -47,6 +47,38 @@ func TestShowBackupQuery(t *testing.T) { res.CheckContain(restoreQuery) } +func TestShowBackupQueryRedact(t *testing.T) { + tk := initTestKit(t) + + failpoint.Enable("github.com/pingcap/tidb/executor/block-on-brie", "return") + ch := make(chan any) + go func() { + err := tk.QueryToErr("backup database * to 's3://nonexist/real?endpoint=http://127.0.0.1&access-key=notleaked&secret-access-key=notleaked'") + require.Error(t, err) + close(ch) + }() + + check := func() bool { + res := tk.MustQuery("show br job query 1;") + rs := res.Rows() + if len(rs) == 0 { + return false + } + theItem := rs[0][0].(string) + if strings.Contains(theItem, "secret-access-key") { + t.Fatalf("The secret key not redacted: %q", theItem) + } + fmt.Println(theItem) + res.CheckContain("BACKUP DATABASE * TO 's3://nonexist/real'") + return true + } + require.Eventually(t, check, 5*time.Second, 1*time.Second) + tk.MustExec("cancel br job 1;") + // Make sure the background job returns. + // So `goleak` would be happy. + <-ch +} + func TestCancel(t *testing.T) { tk := initTestKit(t) tk.MustExec("use test;") From 7448ab621e2d0d8d284e4fa28869ce5dbe0208fa Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 17 Apr 2023 13:59:41 +0800 Subject: [PATCH 05/20] fix typo Signed-off-by: hillium --- executor/brie.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/brie.go b/executor/brie.go index 13498d3b34a89..1ba4373eb184e 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -71,7 +71,7 @@ type brieTaskProgress struct { // this field is atomically updated outside of the lock below. current int64 - // lock is the mutex protected the three fields below. + // lock is the mutex protected the two fields below. lock syncutil.Mutex // cmd is the name of the step the BRIE task is currently performing. cmd string From 66957a6a1f0a7d3ac4749f61a84288ab34bcae81 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 18 Apr 2023 16:34:11 +0800 Subject: [PATCH 06/20] make err Signed-off-by: hillium --- errors.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/errors.toml b/errors.toml index e7db1e4db82ef..3810c63b72d7f 100644 --- a/errors.toml +++ b/errors.toml @@ -1756,6 +1756,11 @@ error = ''' Unsupported option for LOAD DATA LOCAL INFILE: %s ''' +["executor:8173"] +error = ''' +BRIE Job %d not found +''' + ["executor:8212"] error = ''' Failed to split region ranges: %s From e6351bcbef12973162aa51400675beb3ddd11395 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 18 Apr 2023 16:48:47 +0800 Subject: [PATCH 07/20] make a detailed license Signed-off-by: hillium --- tests/realtikvtest/brietest/brie_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index ba894f187dbb5..6cce4aba3b43d 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -1,4 +1,16 @@ -// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package brietest From 9db27b560f963fc74e789d04b4485f0b4cf9d65c Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 18 Apr 2023 17:09:39 +0800 Subject: [PATCH 08/20] remove thrilling spaces Signed-off-by: hillium --- tests/realtikvtest/brietest/brie_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index 6cce4aba3b43d..84e78374e1902 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 PingCAP, Inc. +// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 2cb905823ec32a15fe2d5d53c137bec897cddab5 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 18 Apr 2023 21:28:24 +0800 Subject: [PATCH 09/20] fix tests Signed-off-by: hillium --- executor/brie.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/brie.go b/executor/brie.go index 1ba4373eb184e..a9dd987cdfb54 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -247,7 +247,8 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) if s.Kind == ast.BRIEKindShowBackupMeta { return execOnce(&showMetaExec{ - showConfig: buildShowMetadataConfigFrom(s), + baseExecutor: newBaseExecutor(b.ctx, schema, 0), + showConfig: buildShowMetadataConfigFrom(s), }) } From 084ce649cdd0103e06945c637b9630f5b1825e60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Wed, 19 Apr 2023 15:03:17 +0800 Subject: [PATCH 10/20] Update planner/core/planbuilder.go Co-authored-by: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> --- planner/core/planbuilder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 51efd0cfa2097..c85cbc7d98cac 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3117,7 +3117,7 @@ func buildShowBackupMetaSchema() (*expression.Schema, types.NameSlice) { for i := range names { fLen, _ := mysql.GetDefaultFieldLengthAndDecimal(ftypes[i]) if ftypes[i] == mysql.TypeVarchar { - // the default varchar value is `5`, which might be too short for us. + // the default varchar length is `5`, which might be too short for us. fLen = 255 } schema.Append(buildColumnWithName("", names[i], ftypes[i], fLen)) From af8d4deb3c4dbe690e6cbe2fd2410b722c59d13c Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 27 Apr 2023 14:42:49 +0800 Subject: [PATCH 11/20] address comments Signed-off-by: hillium --- errno/errcode.go | 2 +- executor/brie.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/errno/errcode.go b/errno/errcode.go index d369a4dc71968..8aed6b3e84197 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1063,7 +1063,7 @@ const ( ErrLoadDataInvalidOperation = 8171 ErrLoadDataLocalUnsupportedOption = 8172 ErrLoadDataPreCheckFailed = 8173 - ErrBRJobNotFound = 8174 + ErrBRJobNotFound = 8174 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/executor/brie.go b/executor/brie.go index a9dd987cdfb54..fa604dec79b1e 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -238,13 +238,6 @@ func (b *executorBuilder) parseTSString(ts string) (uint64, error) { } func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) Executor { - e := &BRIEExec{ - baseExecutor: newBaseExecutor(b.ctx, schema, 0), - info: &brieTaskInfo{ - kind: s.Kind, - }, - } - if s.Kind == ast.BRIEKindShowBackupMeta { return execOnce(&showMetaExec{ baseExecutor: newBaseExecutor(b.ctx, schema, 0), @@ -266,6 +259,13 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) } } + e := &BRIEExec{ + baseExecutor: newBaseExecutor(b.ctx, schema, 0), + info: &brieTaskInfo{ + kind: s.Kind, + }, + } + tidbCfg := config.GetGlobalConfig() cfg := task.Config{ TLS: task.TLSConfig{ @@ -398,7 +398,7 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) return e } -// oneshotExecutor warps a executor, making its `Next` would only be called once. +// oneshotExecutor wraps a executor, making its `Next` would only be called once. type oneshotExecutor struct { Executor finished bool From d01b260b9e8e01d57f0c474ad0167dc97bdac1c3 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 11:49:19 +0800 Subject: [PATCH 12/20] try fix tests Signed-off-by: hillium --- executor/brie.go | 9 +++++++++ tests/realtikvtest/brietest/brie_test.go | 9 +++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/executor/brie.go b/executor/brie.go index fa604dec79b1e..6bfffd8c2a835 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -142,6 +142,15 @@ var globalBRIEQueue = &brieQueue{ workerCh: make(chan struct{}, 1), } +// ResetGlobalBRIEQueueForTest resets the ID allocation for the global BRIE queue. +// In some of our test cases, we rely on the ID is allocated from 1. +// When batch executing test cases, the assumation may be broken and make the cases fail. +func ResetGlobalBRIEQueueForTest() { + globalBRIEQueue = &brieQueue { + workerCh: make(chan struct{}, 1), + } +} + // registerTask registers a BRIE task in the queue. func (bq *brieQueue) registerTask( ctx context.Context, diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index 84e78374e1902..f6fe2cc1eecd5 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/executor" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" ) @@ -38,6 +39,7 @@ func makeTempDirForBackup(t *testing.T) string { func TestShowBackupQuery(t *testing.T) { tk := initTestKit(t) + executor.ResetGlobalBRIEQueueForTest() tmp := makeTempDirForBackup(t) sqlTmp := strings.ReplaceAll(tmp, "'", "''") @@ -45,15 +47,16 @@ func TestShowBackupQuery(t *testing.T) { tk.MustExec("use test;") tk.MustExec("create table foo(pk int primary key auto_increment, v varchar(255));") tk.MustExec("insert into foo(v) values " + strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",") + ";") - backupQuery := fmt.Sprintf("backup database * to 'local://%s';", sqlTmp) + backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s'", sqlTmp) _ = tk.MustQuery(backupQuery) // NOTE: we assume a auto-increamental ID here. // once we implement other ID allocation, we may have to change this case. res := tk.MustQuery("show br job query 1;") + fmt.Println(res.Rows()); res.CheckContain(backupQuery) tk.MustExec("drop table foo;") - restoreQuery := fmt.Sprintf("restore table test.foo from 'local://%s';", sqlTmp) + restoreQuery := fmt.Sprintf("RESTORE TABLE `test`.`foo` FROM 'local://%s'", sqlTmp) tk.MustQuery(restoreQuery) res = tk.MustQuery("show br job query 2;") res.CheckContain(restoreQuery) @@ -62,6 +65,7 @@ func TestShowBackupQuery(t *testing.T) { func TestShowBackupQueryRedact(t *testing.T) { tk := initTestKit(t) + executor.ResetGlobalBRIEQueueForTest() failpoint.Enable("github.com/pingcap/tidb/executor/block-on-brie", "return") ch := make(chan any) go func() { @@ -93,6 +97,7 @@ func TestShowBackupQueryRedact(t *testing.T) { func TestCancel(t *testing.T) { tk := initTestKit(t) + executor.ResetGlobalBRIEQueueForTest() tk.MustExec("use test;") failpoint.Enable("github.com/pingcap/tidb/executor/block-on-brie", "return") From f7f2eb9892f5017ef708543b6e79e4959bd6b326 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 12:38:51 +0800 Subject: [PATCH 13/20] make fmt Signed-off-by: hillium --- executor/brie.go | 6 +++--- tests/realtikvtest/brietest/brie_test.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/executor/brie.go b/executor/brie.go index 6bfffd8c2a835..478ccbab46eb4 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -146,9 +146,9 @@ var globalBRIEQueue = &brieQueue{ // In some of our test cases, we rely on the ID is allocated from 1. // When batch executing test cases, the assumation may be broken and make the cases fail. func ResetGlobalBRIEQueueForTest() { - globalBRIEQueue = &brieQueue { - workerCh: make(chan struct{}, 1), - } + globalBRIEQueue = &brieQueue{ + workerCh: make(chan struct{}, 1), + } } // registerTask registers a BRIE task in the queue. diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index f6fe2cc1eecd5..9044b6f4ef5b9 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -39,7 +39,7 @@ func makeTempDirForBackup(t *testing.T) string { func TestShowBackupQuery(t *testing.T) { tk := initTestKit(t) - executor.ResetGlobalBRIEQueueForTest() + executor.ResetGlobalBRIEQueueForTest() tmp := makeTempDirForBackup(t) sqlTmp := strings.ReplaceAll(tmp, "'", "''") @@ -52,7 +52,7 @@ func TestShowBackupQuery(t *testing.T) { // NOTE: we assume a auto-increamental ID here. // once we implement other ID allocation, we may have to change this case. res := tk.MustQuery("show br job query 1;") - fmt.Println(res.Rows()); + fmt.Println(res.Rows()) res.CheckContain(backupQuery) tk.MustExec("drop table foo;") @@ -65,7 +65,7 @@ func TestShowBackupQuery(t *testing.T) { func TestShowBackupQueryRedact(t *testing.T) { tk := initTestKit(t) - executor.ResetGlobalBRIEQueueForTest() + executor.ResetGlobalBRIEQueueForTest() failpoint.Enable("github.com/pingcap/tidb/executor/block-on-brie", "return") ch := make(chan any) go func() { @@ -97,7 +97,7 @@ func TestShowBackupQueryRedact(t *testing.T) { func TestCancel(t *testing.T) { tk := initTestKit(t) - executor.ResetGlobalBRIEQueueForTest() + executor.ResetGlobalBRIEQueueForTest() tk.MustExec("use test;") failpoint.Enable("github.com/pingcap/tidb/executor/block-on-brie", "return") From eaeb7b9571640c8056dd725ebe48c6eb9ca38b17 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 12:46:58 +0800 Subject: [PATCH 14/20] make bazel_prepare Signed-off-by: hillium --- tests/realtikvtest/brietest/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index d4545619a3330..56b4b8e176cf0 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -13,6 +13,7 @@ go_test( race = "on", deps = [ "//config", + "//executor", "//parser/mysql", "//sessionctx/binloginfo", "//store/mockstore/mockcopr", From 234b24e5543e3bd9f1e42b7d219f8fef4f16f138 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 13:15:25 +0800 Subject: [PATCH 15/20] try fix race Signed-off-by: hillium --- br/pkg/task/backup.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 157ea661206ae..e297b522c2fb1 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -10,6 +10,7 @@ import ( "os" "strconv" "strings" + "sync/atomic" "time" "github.com/docker/go-units" @@ -602,11 +603,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig ctx, cmdName, int64(len(ranges)), !cfg.LogProgress) } - progressCount := 0 + progressCount := uint64(0) progressCallBack := func(callBackUnit backup.ProgressUnit) { if unit == callBackUnit { updateCh.Inc() - progressCount++ + atomic.AddUint64(&progressCount, 1) failpoint.Inject("progress-call-back", func(v failpoint.Value) { log.Info("failpoint progress-call-back injected") if fileName, ok := v.(string); ok { @@ -614,7 +615,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if osErr != nil { log.Warn("failed to create file", zap.Error(osErr)) } - msg := []byte(fmt.Sprintf("%s:%d\n", unit, progressCount)) + msg := []byte(fmt.Sprintf("%s:%d\n", unit, atomic.LoadUint64(&progressCount))) _, err = f.Write(msg) if err != nil { log.Warn("failed to write data to file", zap.Error(err)) From 47c4cf0263b0d1a403550986d575e1262a31137a Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 14:03:09 +0800 Subject: [PATCH 16/20] fix race Signed-off-by: hillium --- tests/realtikvtest/brietest/brie_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index 9044b6f4ef5b9..7d6b929dc655d 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" ) @@ -69,6 +70,7 @@ func TestShowBackupQueryRedact(t *testing.T) { failpoint.Enable("github.com/pingcap/tidb/executor/block-on-brie", "return") ch := make(chan any) go func() { + tk := testkit.NewTestKit(t, tk.Session().GetStore()) err := tk.QueryToErr("backup database * to 's3://nonexist/real?endpoint=http://127.0.0.1&access-key=notleaked&secret-access-key=notleaked'") require.Error(t, err) close(ch) @@ -104,6 +106,7 @@ func TestCancel(t *testing.T) { req := require.New(t) ch := make(chan struct{}) go func() { + tk := testkit.NewTestKit(t, tk.Session().GetStore()) err := tk.QueryToErr("backup database * to 'noop://'") req.Error(err) close(ch) From 166f68f2a9820a324434a869cb2f0db0e9c037bc Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 14:03:28 +0800 Subject: [PATCH 17/20] make fmt Signed-off-by: hillium --- tests/realtikvtest/brietest/brie_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index 7d6b929dc655d..cb7f14f101553 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -70,7 +70,7 @@ func TestShowBackupQueryRedact(t *testing.T) { failpoint.Enable("github.com/pingcap/tidb/executor/block-on-brie", "return") ch := make(chan any) go func() { - tk := testkit.NewTestKit(t, tk.Session().GetStore()) + tk := testkit.NewTestKit(t, tk.Session().GetStore()) err := tk.QueryToErr("backup database * to 's3://nonexist/real?endpoint=http://127.0.0.1&access-key=notleaked&secret-access-key=notleaked'") require.Error(t, err) close(ch) @@ -106,7 +106,7 @@ func TestCancel(t *testing.T) { req := require.New(t) ch := make(chan struct{}) go func() { - tk := testkit.NewTestKit(t, tk.Session().GetStore()) + tk := testkit.NewTestKit(t, tk.Session().GetStore()) err := tk.QueryToErr("backup database * to 'noop://'") req.Error(err) close(ch) From 3da80d7a9fbbf7cb872263ce15061a7c25d1d347 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 17:08:09 +0800 Subject: [PATCH 18/20] implementated basic pause-gc Signed-off-by: hillium --- br/cmd/br/BUILD.bazel | 2 ++ br/cmd/br/main.go | 1 + br/cmd/br/operator.go | 47 +++++++++++++++++++++++++++ br/pkg/pdutil/pd.go | 2 +- br/pkg/task/common.go | 8 +++++ br/pkg/task/operator/BUILD.bazel | 19 +++++++++++ br/pkg/task/operator/cmd.go | 56 ++++++++++++++++++++++++++++++++ br/pkg/task/operator/config.go | 30 +++++++++++++++++ br/pkg/utils/misc.go | 9 +++++ 9 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 br/cmd/br/operator.go create mode 100644 br/pkg/task/operator/BUILD.bazel create mode 100644 br/pkg/task/operator/cmd.go create mode 100644 br/pkg/task/operator/config.go diff --git a/br/cmd/br/BUILD.bazel b/br/cmd/br/BUILD.bazel index a95909dc84f03..1c20553569792 100644 --- a/br/cmd/br/BUILD.bazel +++ b/br/cmd/br/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "cmd.go", "debug.go", "main.go", + "operator.go", "restore.go", "stream.go", ], @@ -26,6 +27,7 @@ go_library( "//br/pkg/streamhelper/config", "//br/pkg/summary", "//br/pkg/task", + "//br/pkg/task/operator", "//br/pkg/trace", "//br/pkg/utils", "//br/pkg/version/build", diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index 29944fa9e2691..d70d9425e0653 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -49,6 +49,7 @@ func main() { NewBackupCommand(), NewRestoreCommand(), NewStreamCommand(), + newOpeartorCommand(), ) // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go new file mode 100644 index 0000000000000..7967921c375cf --- /dev/null +++ b/br/cmd/br/operator.go @@ -0,0 +1,47 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package main + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/task/operator" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/spf13/cobra" +) + +func newOpeartorCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "operator ", + Short: "utilities for operators like tidb-operator.", + PersistentPreRunE: func(c *cobra.Command, args []string) error { + if err := Init(c); err != nil { + return errors.Trace(err) + } + build.LogInfo(build.BR) + utils.LogEnvVariables() + task.LogArguments(c) + return nil + }, + Hidden: true, + } + cmd.AddCommand(newPauseGcCommand()) + return cmd +} + +func newPauseGcCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "pause-gc", + Short: "pause gc to the ts until the program exits.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := operator.PauseGcConfig{} + cfg.ParseFromFlags(cmd.Flags()) + ctx := GetDefaultContext() + return operator.PauseGC(ctx, &cfg) + }, + } + operator.DefineFlagsForPauseGcConfig(cmd.Flags()) + return cmd +} diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 26677fc89dd87..d4e0d26ce9c4c 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -257,7 +257,7 @@ func NewPdController( } if failure != nil { return nil, errors.Annotatef(berrors.ErrPDUpdateFailed, - "pd address (%s) not available, please check network", pdAddrs) + "pd address (%s) not available, error is %s, please check network", pdAddrs, failure) } version := parseVersion(versionBytes) diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 3e1fa25d72840..54dc131b2e9d4 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -138,6 +138,14 @@ func (tls *TLSConfig) ToTLSConfig() (*tls.Config, error) { return tlsConfig, nil } +func (tls *TLSConfig) ToPDSecurityOption() pd.SecurityOption { + securityOption := pd.SecurityOption{} + securityOption.CAPath = tls.CA + securityOption.CertPath = tls.Cert + securityOption.KeyPath = tls.Key + return securityOption +} + // ParseFromFlags parses the TLS config from the flag set. func (tls *TLSConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) { tls.CA, tls.Cert, tls.Key, err = ParseTLSTripleFromFlags(flags) diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel new file mode 100644 index 0000000000000..c2f1575403aec --- /dev/null +++ b/br/pkg/task/operator/BUILD.bazel @@ -0,0 +1,19 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "operator", + srcs = [ + "cmd.go", + "config.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/task/operator", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/pdutil", + "//br/pkg/task", + "//br/pkg/utils", + "@com_github_pingcap_log//:log", + "@com_github_spf13_pflag//:pflag", + "@org_uber_go_zap//:zap", + ], +) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go new file mode 100644 index 0000000000000..b191124d5a69a --- /dev/null +++ b/br/pkg/task/operator/cmd.go @@ -0,0 +1,56 @@ +package operator + +import ( + "context" + "crypto/tls" + "strings" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/pdutil" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/utils" + "go.uber.org/zap" +) + +func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { + pdAddrs := strings.Join(cfg.PD, ",") + var tc *tls.Config + if cfg.TLS.IsEnabled() { + var err error + tc, err = cfg.TLS.ToTLSConfig() + if err != nil { + return nil, err + } + } + mgr, err := pdutil.NewPdController(ctx, pdAddrs, tc, cfg.TLS.ToPDSecurityOption()) + if err != nil { + return nil, err + } + return mgr, nil +} + +func PauseGC(ctx context.Context, cfg *PauseGcConfig) error { + mgr, err := dialPD(ctx, &cfg.Config) + if err != nil { + return err + } + sp := utils.BRServiceSafePoint{ + ID: utils.MakeSafePointID(), + TTL: int64(cfg.TTL.Seconds()), + BackupTS: cfg.SafePoint, + } + if sp.BackupTS == 0 { + rts, err := mgr.GetMinResolvedTS(ctx) + if err != nil { + return err + } + log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) + sp.BackupTS = rts + } + err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) + if err != nil { + return err + } + <-ctx.Done() + return nil +} diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go new file mode 100644 index 0000000000000..68762160ca5f5 --- /dev/null +++ b/br/pkg/task/operator/config.go @@ -0,0 +1,30 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package operator + +import ( + "time" + + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/spf13/pflag" +) + +type PauseGcConfig struct { + task.Config + + SafePoint uint64 `json:"safepoint" yaml:"safepoint"` + TTL time.Duration `json:"ttl" yaml:"ttl"` +} + +func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) { + _ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.") + _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") +} + +func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) { + cfg.Config.ParseFromFlags(flags) + + cfg.SafePoint = utils.Must(flags.GetUint64("safepoint")) + cfg.TTL = utils.Must(flags.GetDuration("ttl")) +} diff --git a/br/pkg/utils/misc.go b/br/pkg/utils/misc.go index ab62a7b7db534..b98caa2a53ea0 100644 --- a/br/pkg/utils/misc.go +++ b/br/pkg/utils/misc.go @@ -103,3 +103,12 @@ func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, opts . } return connection, nil } + +// Must checks whether a two-value returned function returns a success result. +// If it was successed, return the success value, otherwise panic. +func Must[T any](t T, err error) T { + if err != nil { + panic(errors.Annotate(err, "must assertion failed")) + } + return t +} From 2dc141dfa897af5ddd812351ebc0ca6ea4820e2a Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 17:39:29 +0800 Subject: [PATCH 19/20] make clippy happy Signed-off-by: hillium --- br/cmd/br/operator.go | 4 +++- br/pkg/task/common.go | 1 + br/pkg/task/operator/cmd.go | 1 + br/pkg/task/operator/config.go | 23 +++++++++++++++++------ br/pkg/utils/misc.go | 9 --------- 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index 7967921c375cf..cfd48d85ae393 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -37,7 +37,9 @@ func newPauseGcCommand() *cobra.Command { Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { cfg := operator.PauseGcConfig{} - cfg.ParseFromFlags(cmd.Flags()) + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err + } ctx := GetDefaultContext() return operator.PauseGC(ctx, &cfg) }, diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 54dc131b2e9d4..98b0810e8261e 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -138,6 +138,7 @@ func (tls *TLSConfig) ToTLSConfig() (*tls.Config, error) { return tlsConfig, nil } +// Convert the TLS config to the PD security option. func (tls *TLSConfig) ToPDSecurityOption() pd.SecurityOption { securityOption := pd.SecurityOption{} securityOption.CAPath = tls.CA diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index b191124d5a69a..961b919073c9a 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -29,6 +29,7 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) return mgr, nil } +// PauseGC blocks the current goroutine and pause the GC safepoint by the config. func PauseGC(ctx context.Context, cfg *PauseGcConfig) error { mgr, err := dialPD(ctx, &cfg.Config) if err != nil { diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index 68762160ca5f5..eb7e12a49af56 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -6,7 +6,6 @@ import ( "time" "github.com/pingcap/tidb/br/pkg/task" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/spf13/pflag" ) @@ -22,9 +21,21 @@ func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) { _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") } -func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) { - cfg.Config.ParseFromFlags(flags) - - cfg.SafePoint = utils.Must(flags.GetUint64("safepoint")) - cfg.TTL = utils.Must(flags.GetDuration("ttl")) +// ParseFromFlags fills the config via the flags. +func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error { + if err := cfg.Config.ParseFromFlags(flags); err != nil { + return err + } + + var err error + cfg.SafePoint, err = flags.GetUint64("safepoint") + if err != nil { + return err + } + cfg.TTL, err = flags.GetDuration("ttl") + if err != nil { + return err + } + + return nil } diff --git a/br/pkg/utils/misc.go b/br/pkg/utils/misc.go index b98caa2a53ea0..ab62a7b7db534 100644 --- a/br/pkg/utils/misc.go +++ b/br/pkg/utils/misc.go @@ -103,12 +103,3 @@ func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, opts . } return connection, nil } - -// Must checks whether a two-value returned function returns a success result. -// If it was successed, return the success value, otherwise panic. -func Must[T any](t T, err error) T { - if err != nil { - panic(errors.Annotate(err, "must assertion failed")) - } - return t -} From 09e57e9e891b936b6baaaa388de7f33ee077c111 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 18:15:10 +0800 Subject: [PATCH 20/20] fix test Signed-off-by: hillium --- executor/brie_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/executor/brie_test.go b/executor/brie_test.go index 73838be3a5a45..05c3224275182 100644 --- a/executor/brie_test.go +++ b/executor/brie_test.go @@ -17,6 +17,7 @@ package executor import ( "context" "fmt" + "strconv" "strings" "testing" "time" @@ -42,7 +43,8 @@ func TestGlueGetVersion(t *testing.T) { } func brieTaskInfoToResult(info *brieTaskInfo) string { - arr := make([]string, 0, 8) + arr := make([]string, 0, 9) + arr = append(arr, strconv.Itoa(int(info.id))) arr = append(arr, info.storage) arr = append(arr, "Wait") arr = append(arr, "0") @@ -67,6 +69,7 @@ func fetchShowBRIEResult(t *testing.T, e *ShowExec, brieColTypes []*types.FieldT func TestFetchShowBRIE(t *testing.T) { sctx := mock.NewContext() sctx.GetSessionVars().User = &auth.UserIdentity{Username: "test"} + ResetGlobalBRIEQueueForTest() ctx := context.Background() // Compose schema. @@ -102,9 +105,9 @@ func TestFetchShowBRIE(t *testing.T) { storage: "noop://test/backup1", message: "killed", } - info1Res := brieTaskInfoToResult(info1) globalBRIEQueue.registerTask(ctx, info1) + info1Res := brieTaskInfoToResult(info1) require.Equal(t, info1Res, fetchShowBRIEResult(t, e, brieColTypes)) // Query again, this info should already have been cleaned @@ -112,6 +115,7 @@ func TestFetchShowBRIE(t *testing.T) { // Register this task again, we should be able to fetch this info globalBRIEQueue.registerTask(ctx, info1) + info1Res = brieTaskInfoToResult(info1) require.Equal(t, info1Res, fetchShowBRIEResult(t, e, brieColTypes)) // Query again, we should be able to fetch this info again, because we have cleared in last clearInterval @@ -121,6 +125,7 @@ func TestFetchShowBRIE(t *testing.T) { globalBRIEQueue.lastClearTime = time.Now().Add(-clearInterval - time.Second) currTime := types.CurrentTime(tp) info2 := &brieTaskInfo{ + id: 2, kind: ast.BRIEKindBackup, connID: e.ctx.GetSessionVars().ConnectionID, queueTime: currTime, @@ -129,8 +134,8 @@ func TestFetchShowBRIE(t *testing.T) { storage: "noop://test/backup2", message: "", } - info2Res := brieTaskInfoToResult(info2) globalBRIEQueue.registerTask(ctx, info2) + info2Res := brieTaskInfoToResult(info2) globalBRIEQueue.clearTask(e.ctx.GetSessionVars().StmtCtx) require.Equal(t, info2Res, fetchShowBRIEResult(t, e, brieColTypes)) }