From 88932727f0eafe41af95466e4ab5a84f502b3410 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Tue, 26 Apr 2022 11:24:51 +0800 Subject: [PATCH] cherry pick #34214 to release-5.4 Signed-off-by: ti-srebot --- br/pkg/lightning/restore/meta_manager.go | 32 ++++++++++++++++--- br/pkg/lightning/restore/meta_manager_test.go | 30 +++++++++++++++++ br/pkg/lightning/restore/restore.go | 18 +++++++++-- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index a8b43167c83ea..d3e1712679a55 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -1035,7 +1035,9 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { return nil } -type singleMgrBuilder struct{} +type singleMgrBuilder struct { + taskID int64 +} func (b singleMgrBuilder) Init(context.Context) error { return nil @@ -1043,7 +1045,8 @@ func (b singleMgrBuilder) Init(context.Context) error { func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { return &singleTaskMetaMgr{ - pd: pd, + pd: pd, + taskID: b.taskID, } } @@ -1052,15 +1055,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { } type singleTaskMetaMgr struct { - pd *pdutil.PdController + pd *pdutil.PdController + taskID int64 + initialized bool + sourceBytes uint64 + clusterAvail uint64 } func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error { + m.sourceBytes = uint64(source) + m.initialized = true return nil } func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { - _, err := action(nil) + newTasks, err := action([]taskMeta{ + { + taskID: m.taskID, + status: taskMetaStatusInitial, + sourceBytes: m.sourceBytes, + clusterAvail: m.clusterAvail, + }, + }) + for _, t := range newTasks { + if m.taskID == t.taskID { + m.sourceBytes = t.sourceBytes + m.clusterAvail = t.clusterAvail + } + } return err } @@ -1069,7 +1091,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut } func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { - return true, nil + return m.initialized, nil } func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) { diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 31a3c35569c67..7f9ec05d9ee33 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -6,6 +6,11 @@ import ( "context" "database/sql/driver" "sort" +<<<<<<< HEAD +======= + "testing" + "time" +>>>>>>> 59566fad3... lightning: maintain task meta in singleTaskMetaMgr (#34214) "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" @@ -323,3 +328,28 @@ func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) { c.Assert(err, IsNil) } + +func TestSingleTaskMetaMgr(t *testing.T) { + metaBuilder := singleMgrBuilder{ + taskID: time.Now().UnixNano(), + } + metaMgr := metaBuilder.TaskMetaMgr(nil) + + ok, err := metaMgr.CheckTaskExist(context.Background()) + require.NoError(t, err) + require.False(t, ok) + + err = metaMgr.InitTask(context.Background(), 1<<30) + require.NoError(t, err) + + ok, err = metaMgr.CheckTaskExist(context.Background()) + require.NoError(t, err) + require.True(t, ok) + + err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) { + require.Len(t, tasks, 1) + require.Equal(t, uint64(1<<30), tasks[0].sourceBytes) + return nil, nil + }) + require.NoError(t, err) +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index f8bcebbfcd0e0..ffc5388617b41 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -389,7 +389,9 @@ func NewRestoreControllerWithPauser( needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff, } case isSSTImport: - metaBuilder = singleMgrBuilder{} + metaBuilder = singleMgrBuilder{ + taskID: cfg.TaskID, + } default: metaBuilder = noopMetaMgrBuilder{} } @@ -1893,7 +1895,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if err = rc.taskMgr.InitTask(ctx, source); err != nil { return errors.Trace(err) } - if rc.cfg.App.CheckRequirements { + } + if rc.cfg.App.CheckRequirements { + needCheck := true + if rc.cfg.Checkpoint.Enable { + taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx) + if err != nil { + return common.ErrReadCheckpoint.Wrap(err).GenWithStack("get task checkpoint failed") + } + // If task checkpoint is initialized, it means check has been performed before. + // We don't need and shouldn't check again, because lightning may have already imported some data. + needCheck = taskCheckpoints == nil + } + if needCheck { err = rc.localResource(source) if err != nil { return errors.Trace(err)