Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: check and restore pd scheduler even if our task failed #1336

Merged
merged 10 commits into from
Aug 6, 2021
81 changes: 60 additions & 21 deletions pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,10 @@ func (m *dbTableMetaMgr) FinishTable(ctx context.Context) error {
type taskMetaMgr interface {
InitTask(ctx context.Context) error
CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error)
CheckAndFinishRestore(ctx context.Context) (bool, error)
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
CheckAndFinishRestore(ctx context.Context, finished bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error)
Cleanup(ctx context.Context) error
CleanupAllMetas(ctx context.Context) error
}
Expand All @@ -485,6 +488,11 @@ const (
taskMetaStatusSwitchBack
)

const (
taskStateNormal int = iota
taskStateExited
)

func (m taskMetaStatus) String() string {
switch m {
case taskMetaStatusInitial:
Expand Down Expand Up @@ -525,9 +533,13 @@ func (m *dbTaskMetaMgr) InitTask(ctx context.Context) error {
DB: m.session,
Logger: log.L(),
}
// avoid override existing metadata if the meta is already inserted.
stmt := fmt.Sprintf(`INSERT IGNORE INTO %s (task_id, status) values (?, ?)`, m.tableName)
err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String())

err := exec.Transact(ctx, "check and init task status", func(ctx context.Context, tx *sql.Tx) error {
// avoid override existing metadata if the meta is already inserted.
stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status) values (?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment feels outdated.

_, err := tx.ExecContext(ctx, stmt, m.taskID, taskMetaStatusInitial.String(), taskStateNormal)
return errors.Trace(err)
})
return errors.Trace(err)
}

Expand All @@ -551,7 +563,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
paused := false
var pausedCfg storedCfgs
err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status from %s FOR UPDATE", m.tableName)
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName)
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.Annotate(err, "fetch task meta failed")
Expand All @@ -566,10 +578,11 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
taskID int64
cfg string
statusValue string
state int
)
var cfgStr string
for rows.Next() {
if err = rows.Scan(&taskID, &cfg, &statusValue); err != nil {
if err = rows.Scan(&taskID, &cfg, &statusValue, &state); err != nil {
return errors.Trace(err)
}
status, err := parseTaskMetaStatus(statusValue)
Expand Down Expand Up @@ -643,10 +656,13 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
}, nil
}

func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) {
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool) (bool, bool, error) {
conn, err := m.session.Conn(ctx)
if err != nil {
return false, errors.Trace(err)
return false, false, errors.Trace(err)
}
defer conn.Close()
exec := &common.SQLWithRetry{
Expand All @@ -655,12 +671,13 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error)
}
err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';")
if err != nil {
return false, errors.Annotate(err, "enable pessimistic transaction failed")
return false, false, errors.Annotate(err, "enable pessimistic transaction failed")
}

switchBack := true
allFinished := finished
err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id, status from %s FOR UPDATE", m.tableName)
query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName)
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.Annotate(err, "fetch task meta failed")
Expand All @@ -674,10 +691,12 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error)
var (
taskID int64
statusValue string
state int
)
newStatus := taskMetaStatusSwitchBack

taskStatus := taskMetaStatusInitial
for rows.Next() {
if err = rows.Scan(&taskID, &statusValue); err != nil {
if err = rows.Scan(&taskID, &statusValue, &state); err != nil {
return errors.Trace(err)
}
status, err := parseTaskMetaStatus(statusValue)
Expand All @@ -686,27 +705,47 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error)
}

if taskID == m.taskID {
taskStatus = status
continue
}

if status < taskMetaStatusSwitchSkipped {
newStatus = taskMetaStatusSwitchSkipped
switchBack = false
break
allFinished = false
// check if other task still running
if state == taskStateNormal {
log.L().Info("unfinished task found", zap.Int64("task_id", taskID),
zap.Stringer("status", status))
switchBack = false
}
}
}
if err = rows.Close(); err != nil {
return errors.Trace(err)
}
closed = true

query = fmt.Sprintf("update %s set status = ? where task_id = ?", m.tableName)
_, err = tx.ExecContext(ctx, query, newStatus.String(), m.taskID)
if taskStatus < taskMetaStatusSwitchSkipped {
newStatus := taskMetaStatusSwitchBack
newState := taskStateNormal
if !finished {
newStatus = taskStatus
newState = taskStateExited
} else if !allFinished {
newStatus = taskMetaStatusSwitchSkipped
}

return errors.Trace(err)
query = fmt.Sprintf("update %s set status = ?, state = ? where task_id = ?", m.tableName)
if _, err = tx.ExecContext(ctx, query, newStatus.String(), newState, m.taskID); err != nil {
return errors.Trace(err)
}
}

return nil
})
log.L().Info("check all task finish status", zap.Bool("task_finished", finished),
zap.Bool("all_finished", allFinished), zap.Bool("switch_back", switchBack))

return switchBack, err
return switchBack, allFinished, err
}

func (m *dbTaskMetaMgr) Cleanup(ctx context.Context) error {
Expand Down Expand Up @@ -773,8 +812,8 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.
}, nil
}

func (m noopTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) {
return false, nil
func (m noopTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (bool, bool, error) {
return false, true, nil
}

func (m noopTaskMetaMgr) Cleanup(ctx context.Context) error {
Expand Down
26 changes: 16 additions & 10 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
task_id BIGINT(20) UNSIGNED NOT NULL,
pd_cfgs VARCHAR(2048) NOT NULL DEFAULT '',
status VARCHAR(32) NOT NULL,
state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the taskMetaTableName be changed?

otherwise if we used Lightning before, and this CREATE TABLE IF NOT EXISTS means the task_meta with the old table structure will be used

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. But since we don't GA this feature and if lightning exited before finished, the current logic may still not be recover except manually drop the meta schema. We also don't recommend change lightning binary during one import task.

PRIMARY KEY (task_id)
);`

Expand Down Expand Up @@ -1195,6 +1196,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
// we do not do switch back automatically
cleanupFunc := func() {}
switchBack := false
taskFinished := false
if rc.cfg.TikvImporter.Backend == config.BackendLocal {
// disable some pd schedulers
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
Expand All @@ -1215,7 +1217,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
if restoreFn != nil {
// use context.Background to make sure this restore function can still be executed even if ctx is canceled
restoreCtx := context.Background()
needSwitchBack, err := mgr.CheckAndFinishRestore(restoreCtx)
needSwitchBack, needCleanup, err := mgr.CheckAndFinishRestore(restoreCtx, taskFinished)
if err != nil {
logTask.Warn("check restore pd schedulers failed", zap.Error(err))
return
Expand All @@ -1225,19 +1227,22 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
if restoreE := restoreFn(restoreCtx); restoreE != nil {
logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}

logTask.Info("add back PD leader&region schedulers")
// clean up task metas
if cleanupErr := mgr.Cleanup(restoreCtx); cleanupErr != nil {
logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr))
}
// cleanup table meta and schema db if needed.
cleanupFunc = func() {
if e := mgr.CleanupAllMetas(restoreCtx); err != nil {
logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e))
if needCleanup {
logTask.Info("cleanup task metas")
if cleanupErr := mgr.Cleanup(restoreCtx); cleanupErr != nil {
logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr))
}
// cleanup table meta and schema db if needed.
cleanupFunc = func() {
if e := mgr.CleanupAllMetas(restoreCtx); err != nil {
logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e))
}
}
}
}

logTask.Info("add back PD leader&region schedulers")
}

pdController.Close()
Expand Down Expand Up @@ -1435,6 +1440,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
// finishSchedulers()
// cancelFunc(switchBack)
// finishFuncCalled = true
Comment on lines 1437 to 1439
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need clean up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We originally want to restore pd schedulers and switch back tikv to normal mode after data import finished. Then the cluster can do possible rebalance during checksum and analyze. But In our test, these rebalance will bring non-trivial impact to checksum and analyze. So we need to investigate further to determine whether we can still do this. So I think we can keep these before coming up with a clear conclusion.

taskFinished = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it used for?

Copy link
Collaborator Author

@glorv glorv Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If current task is exit before lightning finished (maybe met error or by user terminating), we should not clean up the task/table meta tables if all other lightning are finished

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. finishSchedulers will be called before this method ending...... Emmmmmm, it seems to be difficult to understand


close(postProcessTaskChan)
// otherwise, we should run all tasks in the post-process task chan
Expand Down