From 80a89fc476bb474a8ad07315c9970f17bb0e5ae4 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Tue, 26 Apr 2022 22:42:52 +0800 Subject: [PATCH] lightning: maintain task meta in singleTaskMetaMgr (#34214) (#34226) close pingcap/tidb#34213 --- br/pkg/lightning/restore/meta_manager.go | 32 ++++++++++++++++--- br/pkg/lightning/restore/meta_manager_test.go | 26 +++++++++++++++ br/pkg/lightning/restore/restore.go | 18 +++++++++-- 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index a8b43167c83ea..d3e1712679a55 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -1035,7 +1035,9 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { return nil } -type singleMgrBuilder struct{} +type singleMgrBuilder struct { + taskID int64 +} func (b singleMgrBuilder) Init(context.Context) error { return nil @@ -1043,7 +1045,8 @@ func (b singleMgrBuilder) Init(context.Context) error { func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { return &singleTaskMetaMgr{ - pd: pd, + pd: pd, + taskID: b.taskID, } } @@ -1052,15 +1055,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { } type singleTaskMetaMgr struct { - pd *pdutil.PdController + pd *pdutil.PdController + taskID int64 + initialized bool + sourceBytes uint64 + clusterAvail uint64 } func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error { + m.sourceBytes = uint64(source) + m.initialized = true return nil } func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { - _, err := action(nil) + newTasks, err := action([]taskMeta{ + { + taskID: m.taskID, + status: taskMetaStatusInitial, + sourceBytes: m.sourceBytes, + clusterAvail: m.clusterAvail, + }, + }) + for _, t := range newTasks { + if m.taskID == t.taskID { + m.sourceBytes = t.sourceBytes + m.clusterAvail = t.clusterAvail + } + } return err } @@ -1069,7 +1091,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut } func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { - return true, nil + return m.initialized, nil } func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) { diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 31a3c35569c67..28c3e74b91215 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -6,6 +6,7 @@ import ( "context" "database/sql/driver" "sort" + "time" "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" @@ -323,3 +324,28 @@ func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) { c.Assert(err, IsNil) } + +func (s *taskMetaMgrSuite) TestSingleTaskMetaMgr(c *C) { + metaBuilder := singleMgrBuilder{ + taskID: time.Now().UnixNano(), + } + metaMgr := metaBuilder.TaskMetaMgr(nil) + + ok, err := metaMgr.CheckTaskExist(context.Background()) + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + + err = metaMgr.InitTask(context.Background(), 1<<30) + c.Assert(err, IsNil) + + ok, err = metaMgr.CheckTaskExist(context.Background()) + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) + + err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) { + c.Assert(len(tasks), Equals, 1) + c.Assert(tasks[0].sourceBytes, Equals, uint64(1<<30)) + return nil, nil + }) + c.Assert(err, IsNil) +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index f8bcebbfcd0e0..907d00d979a95 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -389,7 +389,9 @@ func NewRestoreControllerWithPauser( needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff, } case isSSTImport: - metaBuilder = singleMgrBuilder{} + metaBuilder = singleMgrBuilder{ + taskID: cfg.TaskID, + } default: metaBuilder = noopMetaMgrBuilder{} } @@ -1893,7 +1895,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if err = rc.taskMgr.InitTask(ctx, source); err != nil { return errors.Trace(err) } - if rc.cfg.App.CheckRequirements { + } + if rc.cfg.App.CheckRequirements { + needCheck := true + if rc.cfg.Checkpoint.Enable { + taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx) + if err != nil { + return errors.Trace(err) + } + // If task checkpoint is initialized, it means check has been performed before. + // We don't need and shouldn't check again, because lightning may have already imported some data. + needCheck = taskCheckpoints == nil + } + if needCheck { err = rc.localResource(source) if err != nil { return errors.Trace(err)