Skip to content

Commit

Permalink
cherry pick pingcap#34214 to release-5.4
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
sleepymole authored and ti-srebot committed Apr 26, 2022
1 parent 11dbb91 commit 8893272
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
32 changes: 27 additions & 5 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,15 +1035,18 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}
type singleMgrBuilder struct {
taskID int64
}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
pd: pd,
taskID: b.taskID,
}
}

Expand All @@ -1052,15 +1055,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
pd *pdutil.PdController
taskID int64
initialized bool
sourceBytes uint64
clusterAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
m.sourceBytes = uint64(source)
m.initialized = true
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
_, err := action(nil)
newTasks, err := action([]taskMeta{
{
taskID: m.taskID,
status: taskMetaStatusInitial,
sourceBytes: m.sourceBytes,
clusterAvail: m.clusterAvail,
},
})
for _, t := range newTasks {
if m.taskID == t.taskID {
m.sourceBytes = t.sourceBytes
m.clusterAvail = t.clusterAvail
}
}
return err
}

Expand All @@ -1069,7 +1091,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
return m.initialized, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
Expand Down
30 changes: 30 additions & 0 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"context"
"database/sql/driver"
"sort"
<<<<<<< HEAD
=======
"testing"
"time"
>>>>>>> 59566fad3... lightning: maintain task meta in singleTaskMetaMgr (#34214)

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
Expand Down Expand Up @@ -323,3 +328,28 @@ func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) {
c.Assert(err, IsNil)

}

func TestSingleTaskMetaMgr(t *testing.T) {
metaBuilder := singleMgrBuilder{
taskID: time.Now().UnixNano(),
}
metaMgr := metaBuilder.TaskMetaMgr(nil)

ok, err := metaMgr.CheckTaskExist(context.Background())
require.NoError(t, err)
require.False(t, ok)

err = metaMgr.InitTask(context.Background(), 1<<30)
require.NoError(t, err)

ok, err = metaMgr.CheckTaskExist(context.Background())
require.NoError(t, err)
require.True(t, ok)

err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) {
require.Len(t, tasks, 1)
require.Equal(t, uint64(1<<30), tasks[0].sourceBytes)
return nil, nil
})
require.NoError(t, err)
}
18 changes: 16 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ func NewRestoreControllerWithPauser(
needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff,
}
case isSSTImport:
metaBuilder = singleMgrBuilder{}
metaBuilder = singleMgrBuilder{
taskID: cfg.TaskID,
}
default:
metaBuilder = noopMetaMgrBuilder{}
}
Expand Down Expand Up @@ -1893,7 +1895,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err = rc.taskMgr.InitTask(ctx, source); err != nil {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
}
if rc.cfg.App.CheckRequirements {
needCheck := true
if rc.cfg.Checkpoint.Enable {
taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx)
if err != nil {
return common.ErrReadCheckpoint.Wrap(err).GenWithStack("get task checkpoint failed")
}
// If task checkpoint is initialized, it means check has been performed before.
// We don't need and shouldn't check again, because lightning may have already imported some data.
needCheck = taskCheckpoints == nil
}
if needCheck {
err = rc.localResource(source)
if err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit 8893272

Please sign in to comment.