Skip to content

Commit

Permalink
lightning: init checkpoint db after precheck (#31033)
Browse files Browse the repository at this point in the history
close #30772
  • Loading branch information
glorv authored Dec 30, 2021
1 parent eeedd81 commit a06e9f2
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 19 deletions.
35 changes: 17 additions & 18 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ func (rc *Controller) Run(ctx context.Context) error {
rc.setGlobalVariables,
rc.restoreSchema,
rc.preCheckRequirements,
rc.initCheckpoint,
rc.restoreTables,
rc.fullCompact,
rc.cleanCheckpoints,
Expand Down Expand Up @@ -774,14 +775,20 @@ func (rc *Controller) restoreSchema(ctx context.Context) error {
}
rc.dbInfos = dbInfos

if rc.tidbGlue.OwnsSQLExecutor() {
if err = rc.DataCheck(ctx); err != nil {
return errors.Trace(err)
}
sysVars := ObtainImportantVariables(ctx, rc.tidbGlue.GetSQLExecutor(), !rc.isTiDBBackend())
// override by manually set vars
for k, v := range rc.cfg.TiDB.Vars {
sysVars[k] = v
}
rc.sysVars = sysVars

return nil
}

// initCheckpoint initializes all tables' checkpoint data
func (rc *Controller) initCheckpoint(ctx context.Context) error {
// Load new checkpoints
err = rc.checkpointsDB.Initialize(ctx, rc.cfg, dbInfos)
err := rc.checkpointsDB.Initialize(ctx, rc.cfg, rc.dbInfos)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -793,20 +800,8 @@ func (rc *Controller) restoreSchema(ctx context.Context) error {
rc.checkpointsWg.Add(1) // checkpointsWg will be done in `rc.listenCheckpointUpdates`
go rc.listenCheckpointUpdates()

sysVars := ObtainImportantVariables(ctx, rc.tidbGlue.GetSQLExecutor(), !rc.isTiDBBackend())
// override by manually set vars
for k, v := range rc.cfg.TiDB.Vars {
sysVars[k] = v
}
rc.sysVars = sysVars

// Estimate the number of chunks for progress reporting
err = rc.estimateChunkCountIntoMetrics(ctx)
if err != nil {
return errors.Trace(err)
}

return nil
return rc.estimateChunkCountIntoMetrics(ctx)
}

// verifyCheckpoint check whether previous task checkpoint is compatible with task config
Expand Down Expand Up @@ -1871,6 +1866,10 @@ func (rc *Controller) isTiDBBackend() bool {
// 4. Lightning configuration
// before restore tables start.
func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err := rc.DataCheck(ctx); err != nil {
return errors.Trace(err)
}

if rc.cfg.App.CheckRequirements {
if err := rc.ClusterIsAvailable(ctx); err != nil {
return errors.Trace(err)
Expand Down
57 changes: 57 additions & 0 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,63 @@ func (s *restoreSuite) TestDiskQuotaLock(c *C) {
}
}

// failMetaMgrBuilder mocks meta manager init failure
type failMetaMgrBuilder struct {
metaMgrBuilder
}

func (b failMetaMgrBuilder) Init(context.Context) error {
return errors.New("mock init meta failure")
}

type panicCheckpointDB struct {
checkpoints.DB
}

func (cp panicCheckpointDB) Initialize(context.Context, *config.Config, map[string]*checkpoints.TidbDBInfo) error {
panic("should not reach here")
}

func (s *restoreSuite) TestPreCheckFailed(c *C) {
cfg := config.NewConfig()
cfg.TikvImporter.Backend = config.BackendTiDB
cfg.App.CheckRequirements = false

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
g := glue.NewExternalTiDBGlue(db, mysql.ModeNone)

ctl := &Controller{
cfg: cfg,
saveCpCh: make(chan saveCp),
checkpointsDB: panicCheckpointDB{},
metaMgrBuilder: failMetaMgrBuilder{},
checkTemplate: NewSimpleTemplate(),
tidbGlue: g,
}

mock.ExpectBegin()
mock.ExpectQuery("SHOW VARIABLES WHERE Variable_name IN .*").
WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("tidb_row_format_version", "2"))
mock.ExpectCommit()
// precheck failed, will not do init checkpoint.
err = ctl.Run(context.Background())
c.Assert(err, ErrorMatches, ".*mock init meta failure")
c.Assert(mock.ExpectationsWereMet(), IsNil)

mock.ExpectBegin()
mock.ExpectQuery("SHOW VARIABLES WHERE Variable_name IN .*").
WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("tidb_row_format_version", "2"))
mock.ExpectCommit()
ctl.saveCpCh = make(chan saveCp)
// precheck failed, will not do init checkpoint.
err1 := ctl.Run(context.Background())
c.Assert(err1.Error(), Equals, err.Error())
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

var _ = Suite(&tableRestoreSuite{})

type tableRestoreSuiteBase struct {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) {
}

for k, v := range vars {
q := fmt.Sprintf("SET SESSION %s = %s;", k, v)
q := fmt.Sprintf("SET SESSION %s = '%s';", k, v)
if _, err1 := db.ExecContext(ctx, q); err1 != nil {
log.L().Warn("set session variable failed, will skip this query", zap.String("query", q),
zap.Error(err1))
Expand Down

0 comments on commit a06e9f2

Please sign in to comment.