Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task_checker(dm): refactor and expose an auto resume function #5004

Merged
merged 5 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 98 additions & 94 deletions dm/dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,33 +104,13 @@ type TaskStatusChecker interface {
// NewTaskStatusChecker is a TaskStatusChecker initializer.
var NewTaskStatusChecker = NewRealTaskStatusChecker

type backoffController struct {
// task name -> backoff counter
backoffs map[string]*backoff.Backoff

// task name -> task latest paused time that checker observes
latestPausedTime map[string]time.Time

// task name -> task latest block time, block means task paused with un-resumable error
latestBlockTime map[string]time.Time

// task name -> the latest auto resume time
latestResumeTime map[string]time.Time

latestRelayPausedTime time.Time
latestRelayBlockTime time.Time
latestRelayResumeTime time.Time
relayBackoff *backoff.Backoff
}

// newBackoffController returns a new backoffController instance.
func newBackoffController() *backoffController {
return &backoffController{
backoffs: make(map[string]*backoff.Backoff),
latestPausedTime: make(map[string]time.Time),
latestBlockTime: make(map[string]time.Time),
latestResumeTime: make(map[string]time.Time),
}
// AutoResumeInfo contains some Time and Backoff that are related to auto resume.
// This structure is exposed for DM as library.
type AutoResumeInfo struct {
Backoff *backoff.Backoff
LatestPausedTime time.Time
LatestBlockTime time.Time
LatestResumeTime time.Time
}

// realTaskStatusChecker is not thread-safe.
Expand All @@ -145,16 +125,18 @@ type realTaskStatusChecker struct {
cfg config.CheckerConfig
l log.Logger
w *SourceWorker
bc *backoffController

subtaskAutoResume map[string]*AutoResumeInfo
relayAutoResume *AutoResumeInfo
}

// NewRealTaskStatusChecker creates a new realTaskStatusChecker instance.
func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *SourceWorker) TaskStatusChecker {
tsc := &realTaskStatusChecker{
cfg: cfg,
l: log.With(zap.String("component", "task checker")),
w: w,
bc: newBackoffController(),
cfg: cfg,
l: log.With(zap.String("component", "task checker")),
w: w,
subtaskAutoResume: map[string]*AutoResumeInfo{},
}
tsc.closed.Store(true)
return tsc
Expand Down Expand Up @@ -254,7 +236,18 @@ func isResumableError(err *pb.ProcessError) bool {
return true
}

func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, duration time.Duration) ResumeStrategy {
// CheckResumeSubtask updates info and returns ResumeStrategy for a subtask.
// When ResumeDispatch and the subtask is successfully resumed at caller,
// LatestResumeTime and backoff should be updated.
// This function is exposed for DM as library.
func (i *AutoResumeInfo) CheckResumeSubtask(
stStatus *pb.SubTaskStatus,
backoffRollback time.Duration,
) (strategy ResumeStrategy) {
defer func() {
i.update(strategy, backoffRollback)
}()

// task that is not paused or paused manually, just ignore it
if stStatus == nil || stStatus.Stage != pb.Stage_Paused || stStatus.Result == nil || stStatus.Result.IsCanceled {
return ResumeIgnore
Expand All @@ -265,21 +258,28 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus,
pErr := processErr
if !isResumableError(processErr) {
failpoint.Inject("TaskCheckInterval", func(_ failpoint.Value) {
tsc.l.Info("error is not resumable", zap.Stringer("error", pErr))
log.L().Info("error is not resumable", zap.Stringer("error", pErr))
})
return ResumeNoSense
}
}

// auto resume interval does not exceed backoff duration, skip this paused task
if time.Since(tsc.bc.latestResumeTime[stStatus.Name]) < duration {
if time.Since(i.LatestResumeTime) < i.Backoff.Current() {
return ResumeSkip
}

return ResumeDispatch
}

func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelayStatus, duration time.Duration) ResumeStrategy {
func (i *AutoResumeInfo) checkResumeRelay(
relayStatus *pb.RelayStatus,
backoffRollback time.Duration,
) (strategy ResumeStrategy) {
defer func() {
i.update(strategy, backoffRollback)
}()

// relay that is not paused or paused manually, just ignore it
if relayStatus == nil || relayStatus.Stage != pb.Stage_Paused || relayStatus.Result == nil || relayStatus.Result.IsCanceled {
return ResumeIgnore
Expand All @@ -291,52 +291,64 @@ func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelaySt
}
}

if time.Since(tsc.bc.latestRelayResumeTime) < duration {
if time.Since(i.LatestResumeTime) < i.Backoff.Current() {
return ResumeSkip
}

return ResumeDispatch
}

func (tsc *realTaskStatusChecker) checkRelayStatus() {
relayStatus := tsc.w.relayHolder.Status(nil)
if tsc.bc.relayBackoff == nil {
tsc.bc.relayBackoff, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration)
tsc.bc.latestRelayPausedTime = time.Now()
tsc.bc.latestRelayResumeTime = time.Now()
}
rbf := tsc.bc.relayBackoff
duration := rbf.Current()
strategy := tsc.getRelayResumeStrategy(relayStatus, duration)
func (i *AutoResumeInfo) update(strategy ResumeStrategy, backoffRollback time.Duration) {
switch strategy {
case ResumeIgnore:
if time.Since(tsc.bc.latestRelayPausedTime) > tsc.cfg.BackoffRollback.Duration {
rbf.Rollback()
if time.Since(i.LatestPausedTime) > backoffRollback {
i.Backoff.Rollback()
// after each rollback, reset this timer
tsc.bc.latestRelayPausedTime = time.Now()
i.LatestPausedTime = time.Now()
}
case ResumeNoSense:
// this strategy doesn't forward or rollback backoff
tsc.bc.latestRelayPausedTime = time.Now()
blockTime := tsc.bc.latestRelayBlockTime
if !blockTime.IsZero() {
tsc.l.Warn("relay can't auto resume", zap.Duration("paused duration", time.Since(blockTime)))
} else {
tsc.bc.latestRelayBlockTime = time.Now()
tsc.l.Warn("relay can't auto resume")
i.LatestPausedTime = time.Now()
if i.LatestBlockTime.IsZero() {
i.LatestBlockTime = time.Now()
}
case ResumeSkip, ResumeDispatch:
i.LatestPausedTime = time.Now()
}
}

func (tsc *realTaskStatusChecker) checkRelayStatus() {
relayStatus := tsc.w.relayHolder.Status(nil)
if tsc.relayAutoResume == nil {
bf, _ := backoff.NewBackoff(
tsc.cfg.BackoffFactor,
tsc.cfg.BackoffJitter,
tsc.cfg.BackoffMin.Duration,
tsc.cfg.BackoffMax.Duration)
tsc.relayAutoResume = &AutoResumeInfo{
Backoff: bf,
LatestResumeTime: time.Now(),
LatestPausedTime: time.Now(),
}
}

strategy := tsc.relayAutoResume.checkResumeRelay(relayStatus, tsc.cfg.BackoffRollback.Duration)
switch strategy {
case ResumeNoSense:
tsc.l.Warn("relay can't auto resume",
zap.Duration("paused duration", time.Since(tsc.relayAutoResume.LatestBlockTime)))
case ResumeSkip:
tsc.l.Warn("backoff skip auto resume relay", zap.Time("latestResumeTime", tsc.bc.latestRelayResumeTime), zap.Duration("duration", duration))
tsc.bc.latestRelayPausedTime = time.Now()
tsc.l.Warn("backoff skip auto resume relay",
zap.Time("latestResumeTime", tsc.relayAutoResume.LatestResumeTime),
zap.Duration("duration", tsc.relayAutoResume.Backoff.Current()))
case ResumeDispatch:
tsc.bc.latestRelayPausedTime = time.Now()
err := tsc.w.operateRelay(tsc.ctx, pb.RelayOp_ResumeRelay)
if err != nil {
tsc.l.Error("dispatch auto resume relay failed", zap.Error(err))
} else {
tsc.l.Info("dispatch auto resume relay")
tsc.bc.latestRelayResumeTime = time.Now()
rbf.BoundaryForward()
tsc.relayAutoResume.LatestResumeTime = time.Now()
tsc.relayAutoResume.Backoff.BoundaryForward()
}
}
}
Expand All @@ -346,57 +358,49 @@ func (tsc *realTaskStatusChecker) checkTaskStatus() {

defer func() {
// cleanup outdated tasks
for taskName := range tsc.bc.backoffs {
for taskName := range tsc.subtaskAutoResume {
_, ok := allSubTaskStatus[taskName]
if !ok {
tsc.l.Debug("remove task from checker", zap.String("task", taskName))
delete(tsc.bc.backoffs, taskName)
delete(tsc.bc.latestPausedTime, taskName)
delete(tsc.bc.latestBlockTime, taskName)
delete(tsc.bc.latestResumeTime, taskName)
delete(tsc.subtaskAutoResume, taskName)
}
}
}()

for taskName, stStatus := range allSubTaskStatus {
bf, ok := tsc.bc.backoffs[taskName]
info, ok := tsc.subtaskAutoResume[taskName]
if !ok {
bf, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration)
tsc.bc.backoffs[taskName] = bf
tsc.bc.latestPausedTime[taskName] = time.Now()
tsc.bc.latestResumeTime[taskName] = time.Now()
bf, _ := backoff.NewBackoff(
tsc.cfg.BackoffFactor,
tsc.cfg.BackoffJitter,
tsc.cfg.BackoffMin.Duration,
tsc.cfg.BackoffMax.Duration)
info = &AutoResumeInfo{
Backoff: bf,
LatestPausedTime: time.Now(),
LatestResumeTime: time.Now(),
}
tsc.subtaskAutoResume[taskName] = info
}
duration := bf.Current()
strategy := tsc.getResumeStrategy(stStatus, duration)
strategy := info.CheckResumeSubtask(stStatus, tsc.cfg.BackoffRollback.Duration)
switch strategy {
case ResumeIgnore:
if time.Since(tsc.bc.latestPausedTime[taskName]) > tsc.cfg.BackoffRollback.Duration {
bf.Rollback()
// after each rollback, reset this timer
tsc.bc.latestPausedTime[taskName] = time.Now()
}
case ResumeNoSense:
// this strategy doesn't forward or rollback backoff
tsc.bc.latestPausedTime[taskName] = time.Now()
blockTime, ok := tsc.bc.latestBlockTime[taskName]
if ok {
tsc.l.Warn("task can't auto resume", zap.String("task", taskName), zap.Duration("paused duration", time.Since(blockTime)))
} else {
tsc.bc.latestBlockTime[taskName] = time.Now()
tsc.l.Warn("task can't auto resume", zap.String("task", taskName))
}
tsc.l.Warn("task can't auto resume",
zap.String("task", taskName),
zap.Duration("paused duration", time.Since(info.LatestBlockTime)))
case ResumeSkip:
tsc.l.Warn("backoff skip auto resume task", zap.String("task", taskName), zap.Time("latestResumeTime", tsc.bc.latestResumeTime[taskName]), zap.Duration("duration", duration))
tsc.bc.latestPausedTime[taskName] = time.Now()
tsc.l.Warn("backoff skip auto resume task",
zap.String("task", taskName),
zap.Time("latestResumeTime", info.LatestResumeTime),
zap.Duration("duration", info.Backoff.Current()))
case ResumeDispatch:
tsc.bc.latestPausedTime[taskName] = time.Now()
err := tsc.w.OperateSubTask(taskName, pb.TaskOp_AutoResume)
if err != nil {
tsc.l.Error("dispatch auto resume task failed", zap.String("task", taskName), zap.Error(err))
} else {
tsc.l.Info("dispatch auto resume task", zap.String("task", taskName))
tsc.bc.latestResumeTime[taskName] = time.Now()
bf.BoundaryForward()
info.LatestResumeTime = time.Now()
info.Backoff.BoundaryForward()
}
}
}
Expand Down
Loading