From 3a3ba31a919ae079f8ff609fbfb979b6fff6608e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Mar 2022 19:00:55 +0800 Subject: [PATCH 1/2] task_checker(dm): refact and expose an auto resume function Signed-off-by: lance6716 --- dm/dm/worker/task_checker.go | 198 ++++++++++++++++-------------- dm/dm/worker/task_checker_test.go | 86 +++++++------ 2 files changed, 145 insertions(+), 139 deletions(-) diff --git a/dm/dm/worker/task_checker.go b/dm/dm/worker/task_checker.go index 2cacdc8e9a0..32640b67f37 100644 --- a/dm/dm/worker/task_checker.go +++ b/dm/dm/worker/task_checker.go @@ -104,33 +104,13 @@ type TaskStatusChecker interface { // NewTaskStatusChecker is a TaskStatusChecker initializer. var NewTaskStatusChecker = NewRealTaskStatusChecker -type backoffController struct { - // task name -> backoff counter - backoffs map[string]*backoff.Backoff - - // task name -> task latest paused time that checker observes - latestPausedTime map[string]time.Time - - // task name -> task latest block time, block means task paused with un-resumable error - latestBlockTime map[string]time.Time - - // task name -> the latest auto resume time - latestResumeTime map[string]time.Time - - latestRelayPausedTime time.Time - latestRelayBlockTime time.Time - latestRelayResumeTime time.Time - relayBackoff *backoff.Backoff -} - -// newBackoffController returns a new backoffController instance. -func newBackoffController() *backoffController { - return &backoffController{ - backoffs: make(map[string]*backoff.Backoff), - latestPausedTime: make(map[string]time.Time), - latestBlockTime: make(map[string]time.Time), - latestResumeTime: make(map[string]time.Time), - } +// AutoResumeTimes contains some Time and Backoff that are related to auto resume. +// This structure is exposed for DM as library. +type AutoResumeTimes struct { + Backoff *backoff.Backoff + LatestPausedTime time.Time + LatestBlockTime time.Time + LatestResumeTime time.Time } // realTaskStatusChecker is not thread-safe. @@ -145,16 +125,18 @@ type realTaskStatusChecker struct { cfg config.CheckerConfig l log.Logger w *SourceWorker - bc *backoffController + + subtaskTimes map[string]*AutoResumeTimes + relayTimes *AutoResumeTimes } // NewRealTaskStatusChecker creates a new realTaskStatusChecker instance. func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *SourceWorker) TaskStatusChecker { tsc := &realTaskStatusChecker{ - cfg: cfg, - l: log.With(zap.String("component", "task checker")), - w: w, - bc: newBackoffController(), + cfg: cfg, + l: log.With(zap.String("component", "task checker")), + w: w, + subtaskTimes: map[string]*AutoResumeTimes{}, } tsc.closed.Store(true) return tsc @@ -254,7 +236,19 @@ func isResumableError(err *pb.ProcessError) bool { return true } -func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, duration time.Duration) ResumeStrategy { +// UpdateResumeStrategy updates times and returns ResumeStrategy for a subtask. +// When ResumeDispatch and the subtask is successfully resumed, LatestResumeTime +// and backoff should be updated. +// This function is exposed for DM as library. +func UpdateResumeStrategy( + stStatus *pb.SubTaskStatus, + times *AutoResumeTimes, + backoffRollback time.Duration, +) (strategy ResumeStrategy) { + defer func() { + updateTimes(strategy, times, backoffRollback) + }() + // task that is not paused or paused manually, just ignore it if stStatus == nil || stStatus.Stage != pb.Stage_Paused || stStatus.Result == nil || stStatus.Result.IsCanceled { return ResumeIgnore @@ -265,21 +259,29 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, pErr := processErr if !isResumableError(processErr) { failpoint.Inject("TaskCheckInterval", func(_ failpoint.Value) { - tsc.l.Info("error is not resumable", zap.Stringer("error", pErr)) + log.L().Info("error is not resumable", zap.Stringer("error", pErr)) }) return ResumeNoSense } } // auto resume interval does not exceed backoff duration, skip this paused task - if time.Since(tsc.bc.latestResumeTime[stStatus.Name]) < duration { + if time.Since(times.LatestResumeTime) < times.Backoff.Current() { return ResumeSkip } return ResumeDispatch } -func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelayStatus, duration time.Duration) ResumeStrategy { +func updateRelayResumeStrategy( + relayStatus *pb.RelayStatus, + times *AutoResumeTimes, + backoffRollback time.Duration, +) (strategy ResumeStrategy) { + defer func() { + updateTimes(strategy, times, backoffRollback) + }() + // relay that is not paused or paused manually, just ignore it if relayStatus == nil || relayStatus.Stage != pb.Stage_Paused || relayStatus.Result == nil || relayStatus.Result.IsCanceled { return ResumeIgnore @@ -291,52 +293,68 @@ func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelaySt } } - if time.Since(tsc.bc.latestRelayResumeTime) < duration { + if time.Since(times.LatestResumeTime) < times.Backoff.Current() { return ResumeSkip } return ResumeDispatch } -func (tsc *realTaskStatusChecker) checkRelayStatus() { - relayStatus := tsc.w.relayHolder.Status(nil) - if tsc.bc.relayBackoff == nil { - tsc.bc.relayBackoff, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration) - tsc.bc.latestRelayPausedTime = time.Now() - tsc.bc.latestRelayResumeTime = time.Now() - } - rbf := tsc.bc.relayBackoff - duration := rbf.Current() - strategy := tsc.getRelayResumeStrategy(relayStatus, duration) +func updateTimes( + strategy ResumeStrategy, + times *AutoResumeTimes, + backoffRollback time.Duration, +) { switch strategy { case ResumeIgnore: - if time.Since(tsc.bc.latestRelayPausedTime) > tsc.cfg.BackoffRollback.Duration { - rbf.Rollback() + if time.Since(times.LatestPausedTime) > backoffRollback { + times.Backoff.Rollback() // after each rollback, reset this timer - tsc.bc.latestRelayPausedTime = time.Now() + times.LatestPausedTime = time.Now() } case ResumeNoSense: // this strategy doesn't forward or rollback backoff - tsc.bc.latestRelayPausedTime = time.Now() - blockTime := tsc.bc.latestRelayBlockTime - if !blockTime.IsZero() { - tsc.l.Warn("relay can't auto resume", zap.Duration("paused duration", time.Since(blockTime))) - } else { - tsc.bc.latestRelayBlockTime = time.Now() - tsc.l.Warn("relay can't auto resume") + times.LatestPausedTime = time.Now() + if times.LatestBlockTime.IsZero() { + times.LatestBlockTime = time.Now() + } + case ResumeSkip, ResumeDispatch: + times.LatestPausedTime = time.Now() + } +} + +func (tsc *realTaskStatusChecker) checkRelayStatus() { + relayStatus := tsc.w.relayHolder.Status(nil) + if tsc.relayTimes == nil { + bf, _ := backoff.NewBackoff( + tsc.cfg.BackoffFactor, + tsc.cfg.BackoffJitter, + tsc.cfg.BackoffMin.Duration, + tsc.cfg.BackoffMax.Duration) + tsc.relayTimes = &AutoResumeTimes{ + Backoff: bf, + LatestResumeTime: time.Now(), + LatestPausedTime: time.Now(), } + } + + strategy := updateRelayResumeStrategy(relayStatus, tsc.relayTimes, tsc.cfg.BackoffRollback.Duration) + switch strategy { + case ResumeNoSense: + tsc.l.Warn("relay can't auto resume", + zap.Duration("paused duration", time.Since(tsc.relayTimes.LatestBlockTime))) case ResumeSkip: - tsc.l.Warn("backoff skip auto resume relay", zap.Time("latestResumeTime", tsc.bc.latestRelayResumeTime), zap.Duration("duration", duration)) - tsc.bc.latestRelayPausedTime = time.Now() + tsc.l.Warn("backoff skip auto resume relay", + zap.Time("latestResumeTime", tsc.relayTimes.LatestResumeTime), + zap.Duration("duration", tsc.relayTimes.Backoff.Current())) case ResumeDispatch: - tsc.bc.latestRelayPausedTime = time.Now() err := tsc.w.operateRelay(tsc.ctx, pb.RelayOp_ResumeRelay) if err != nil { tsc.l.Error("dispatch auto resume relay failed", zap.Error(err)) } else { tsc.l.Info("dispatch auto resume relay") - tsc.bc.latestRelayResumeTime = time.Now() - rbf.BoundaryForward() + tsc.relayTimes.LatestResumeTime = time.Now() + tsc.relayTimes.Backoff.BoundaryForward() } } } @@ -346,57 +364,49 @@ func (tsc *realTaskStatusChecker) checkTaskStatus() { defer func() { // cleanup outdated tasks - for taskName := range tsc.bc.backoffs { + for taskName := range tsc.subtaskTimes { _, ok := allSubTaskStatus[taskName] if !ok { tsc.l.Debug("remove task from checker", zap.String("task", taskName)) - delete(tsc.bc.backoffs, taskName) - delete(tsc.bc.latestPausedTime, taskName) - delete(tsc.bc.latestBlockTime, taskName) - delete(tsc.bc.latestResumeTime, taskName) + delete(tsc.subtaskTimes, taskName) } } }() for taskName, stStatus := range allSubTaskStatus { - bf, ok := tsc.bc.backoffs[taskName] + times, ok := tsc.subtaskTimes[taskName] if !ok { - bf, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration) - tsc.bc.backoffs[taskName] = bf - tsc.bc.latestPausedTime[taskName] = time.Now() - tsc.bc.latestResumeTime[taskName] = time.Now() + bf, _ := backoff.NewBackoff( + tsc.cfg.BackoffFactor, + tsc.cfg.BackoffJitter, + tsc.cfg.BackoffMin.Duration, + tsc.cfg.BackoffMax.Duration) + times = &AutoResumeTimes{ + Backoff: bf, + LatestPausedTime: time.Now(), + LatestResumeTime: time.Now(), + } + tsc.subtaskTimes[taskName] = times } - duration := bf.Current() - strategy := tsc.getResumeStrategy(stStatus, duration) + strategy := UpdateResumeStrategy(stStatus, times, tsc.cfg.BackoffRollback.Duration) switch strategy { - case ResumeIgnore: - if time.Since(tsc.bc.latestPausedTime[taskName]) > tsc.cfg.BackoffRollback.Duration { - bf.Rollback() - // after each rollback, reset this timer - tsc.bc.latestPausedTime[taskName] = time.Now() - } case ResumeNoSense: - // this strategy doesn't forward or rollback backoff - tsc.bc.latestPausedTime[taskName] = time.Now() - blockTime, ok := tsc.bc.latestBlockTime[taskName] - if ok { - tsc.l.Warn("task can't auto resume", zap.String("task", taskName), zap.Duration("paused duration", time.Since(blockTime))) - } else { - tsc.bc.latestBlockTime[taskName] = time.Now() - tsc.l.Warn("task can't auto resume", zap.String("task", taskName)) - } + tsc.l.Warn("task can't auto resume", + zap.String("task", taskName), + zap.Duration("paused duration", time.Since(times.LatestBlockTime))) case ResumeSkip: - tsc.l.Warn("backoff skip auto resume task", zap.String("task", taskName), zap.Time("latestResumeTime", tsc.bc.latestResumeTime[taskName]), zap.Duration("duration", duration)) - tsc.bc.latestPausedTime[taskName] = time.Now() + tsc.l.Warn("backoff skip auto resume task", + zap.String("task", taskName), + zap.Time("latestResumeTime", times.LatestResumeTime), + zap.Duration("duration", times.Backoff.Current())) case ResumeDispatch: - tsc.bc.latestPausedTime[taskName] = time.Now() err := tsc.w.OperateSubTask(taskName, pb.TaskOp_AutoResume) if err != nil { tsc.l.Error("dispatch auto resume task failed", zap.String("task", taskName), zap.Error(err)) } else { tsc.l.Info("dispatch auto resume task", zap.String("task", taskName)) - tsc.bc.latestResumeTime[taskName] = time.Now() - bf.BoundaryForward() + times.LatestResumeTime = time.Now() + times.Backoff.BoundaryForward() } } } diff --git a/dm/dm/worker/task_checker_test.go b/dm/dm/worker/task_checker_test.go index eac07e08b5a..d1284d5f3b8 100644 --- a/dm/dm/worker/task_checker_test.go +++ b/dm/dm/worker/task_checker_test.go @@ -18,13 +18,13 @@ import ( "github.com/pingcap/check" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" tmysql "github.com/pingcap/tidb/parser/mysql" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/dm/pb" "github.com/pingcap/tiflow/dm/dm/unit" + "github.com/pingcap/tiflow/dm/pkg/backoff" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" ) @@ -34,20 +34,10 @@ var _ = check.Suite(&testTaskCheckerSuite{}) type testTaskCheckerSuite struct{} var ( - unsupporteModifyColumnError = unit.NewProcessError(terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1105, Message: "unsupported modify column length 20 is less than origin 40", State: tmysql.DefaultMySQLState})) - unknownProcessError = unit.NewProcessError(errors.New("error mesage")) + unsupportedModifyColumnError = unit.NewProcessError(terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1105, Message: "unsupported modify column length 20 is less than origin 40", State: tmysql.DefaultMySQLState})) + unknownProcessError = unit.NewProcessError(errors.New("error message")) ) -func (s *testTaskCheckerSuite) SetUpSuite(c *check.C) { - c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/worker/MockGetSourceCfgFromETCD", `return(true)`), check.IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/worker/SkipRefreshFromETCDInUT", `return()`), check.IsNil) -} - -func (s *testTaskCheckerSuite) TearDownSuite(c *check.C) { - c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/worker/MockGetSourceCfgFromETCD"), check.IsNil) - c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/worker/SkipRefreshFromETCDInUT"), check.IsNil) -} - func (s *testTaskCheckerSuite) TestResumeStrategy(c *check.C) { c.Assert(ResumeSkip.String(), check.Equals, resumeStrategy2Str[ResumeSkip]) c.Assert(ResumeStrategy(10000).String(), check.Equals, "unsupported resume strategy: 10000") @@ -65,7 +55,7 @@ func (s *testTaskCheckerSuite) TestResumeStrategy(c *check.C) { {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Running}, now, time.Duration(0), 1 * time.Millisecond, ResumeIgnore}, {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused}, now, time.Duration(0), 1 * time.Millisecond, ResumeIgnore}, {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: true}}, now, time.Duration(0), 1 * time.Millisecond, ResumeIgnore}, - {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: false, Errors: []*pb.ProcessError{unsupporteModifyColumnError}}}, now, time.Duration(0), 1 * time.Millisecond, ResumeNoSense}, + {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: false, Errors: []*pb.ProcessError{unsupportedModifyColumnError}}}, now, time.Duration(0), 1 * time.Millisecond, ResumeNoSense}, {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: false}}, now, time.Duration(0), 1 * time.Second, ResumeSkip}, {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: false}}, now, -2 * time.Millisecond, 1 * time.Millisecond, ResumeDispatch}, } @@ -81,8 +71,16 @@ func (s *testTaskCheckerSuite) TestResumeStrategy(c *check.C) { for _, tc := range testCases { rtsc, ok := tsc.(*realTaskStatusChecker) c.Assert(ok, check.IsTrue) - rtsc.bc.latestResumeTime[taskName] = tc.latestResumeFn(tc.addition) - strategy := rtsc.getResumeStrategy(tc.status, tc.duration) + bf, _ := backoff.NewBackoff( + 1, + false, + tc.duration, + tc.duration) + rtsc.subtaskTimes[taskName] = &AutoResumeTimes{ + Backoff: bf, + LatestResumeTime: tc.latestResumeFn(tc.addition), + } + strategy := UpdateResumeStrategy(tc.status, rtsc.subtaskTimes[taskName], config.DefaultBackoffRollback) c.Assert(strategy, check.Equals, tc.expected) } } @@ -125,8 +123,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { c.Assert(st.cfg.Adjust(false), check.IsNil) rtsc.w.subTaskHolder.recordSubTask(st) rtsc.check() - bf, ok := rtsc.bc.backoffs[taskName] - c.Assert(ok, check.IsTrue) + bf := rtsc.subtaskTimes[taskName].Backoff // test resume with paused task st.stage = pb.Stage_Paused @@ -152,14 +149,14 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { // test no sense strategy st.result = &pb.ProcessResult{ IsCanceled: false, - Errors: []*pb.ProcessError{unsupporteModifyColumnError}, + Errors: []*pb.ProcessError{unsupportedModifyColumnError}, } rtsc.check() - c.Assert(latestPausedTime.Before(rtsc.bc.latestPausedTime[taskName]), check.IsTrue) - latestBlockTime = rtsc.bc.latestBlockTime[taskName] + c.Assert(latestPausedTime.Before(rtsc.subtaskTimes[taskName].LatestPausedTime), check.IsTrue) + latestBlockTime = rtsc.subtaskTimes[taskName].LatestBlockTime time.Sleep(200 * time.Millisecond) rtsc.check() - c.Assert(rtsc.bc.latestBlockTime[taskName], check.Equals, latestBlockTime) + c.Assert(rtsc.subtaskTimes[taskName].LatestBlockTime, check.Equals, latestBlockTime) c.Assert(bf.Current(), check.Equals, current) // test resume skip strategy @@ -182,8 +179,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { } rtsc.w.subTaskHolder.recordSubTask(st) rtsc.check() - bf, ok = rtsc.bc.backoffs[taskName] - c.Assert(ok, check.IsTrue) + bf = rtsc.subtaskTimes[taskName].Backoff st.stage = pb.Stage_Paused st.result = &pb.ProcessResult{ @@ -191,14 +187,14 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { Errors: []*pb.ProcessError{unknownProcessError}, } rtsc.check() - latestResumeTime = rtsc.bc.latestResumeTime[taskName] - latestPausedTime = rtsc.bc.latestPausedTime[taskName] + latestResumeTime = rtsc.subtaskTimes[taskName].LatestResumeTime + latestPausedTime = rtsc.subtaskTimes[taskName].LatestPausedTime c.Assert(bf.Current(), check.Equals, 10*time.Second) for i := 0; i < 10; i++ { rtsc.check() - c.Assert(latestResumeTime, check.Equals, rtsc.bc.latestResumeTime[taskName]) - c.Assert(latestPausedTime.Before(rtsc.bc.latestPausedTime[taskName]), check.IsTrue) - latestPausedTime = rtsc.bc.latestPausedTime[taskName] + c.Assert(latestResumeTime, check.Equals, rtsc.subtaskTimes[taskName].LatestResumeTime) + c.Assert(latestPausedTime.Before(rtsc.subtaskTimes[taskName].LatestPausedTime), check.IsTrue) + latestPausedTime = rtsc.subtaskTimes[taskName].LatestPausedTime } } @@ -246,10 +242,10 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { } rtsc.w.subTaskHolder.recordSubTask(st2) rtsc.check() - c.Assert(len(rtsc.bc.backoffs), check.Equals, 2) - c.Assert(len(rtsc.bc.latestPausedTime), check.Equals, 2) - c.Assert(len(rtsc.bc.latestResumeTime), check.Equals, 2) - c.Assert(len(rtsc.bc.latestBlockTime), check.Equals, 0) + c.Assert(len(rtsc.subtaskTimes), check.Equals, 2) + for _, times := range rtsc.subtaskTimes { + c.Assert(times.LatestBlockTime.IsZero(), check.IsTrue) + } // test backoff strategies of different tasks do not affect each other st1 = &SubTask{ @@ -257,7 +253,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { stage: pb.Stage_Paused, result: &pb.ProcessResult{ IsCanceled: false, - Errors: []*pb.ProcessError{unsupporteModifyColumnError}, + Errors: []*pb.ProcessError{unsupportedModifyColumnError}, }, l: log.With(zap.String("subtask", task1)), } @@ -275,26 +271,26 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { c.Assert(st2.cfg.Adjust(false), check.IsNil) rtsc.w.subTaskHolder.recordSubTask(st2) - task1LatestResumeTime = rtsc.bc.latestResumeTime[task1] - task2LatestResumeTime = rtsc.bc.latestResumeTime[task2] + task1LatestResumeTime = rtsc.subtaskTimes[task1].LatestResumeTime + task2LatestResumeTime = rtsc.subtaskTimes[task2].LatestResumeTime for i := 0; i < 10; i++ { time.Sleep(backoffMin) rtsc.check() - c.Assert(task1LatestResumeTime, check.Equals, rtsc.bc.latestResumeTime[task1]) - c.Assert(task2LatestResumeTime.Before(rtsc.bc.latestResumeTime[task2]), check.IsTrue) - c.Assert(len(rtsc.bc.latestBlockTime), check.Equals, 1) - task2LatestResumeTime = rtsc.bc.latestResumeTime[task2] + c.Assert(task1LatestResumeTime, check.Equals, rtsc.subtaskTimes[task1].LatestResumeTime) + c.Assert(task2LatestResumeTime.Before(rtsc.subtaskTimes[task2].LatestResumeTime), check.IsTrue) + c.Assert(rtsc.subtaskTimes[task1].LatestBlockTime.IsZero(), check.IsFalse) + c.Assert(rtsc.subtaskTimes[task2].LatestBlockTime.IsZero(), check.IsTrue) + + task2LatestResumeTime = rtsc.subtaskTimes[task2].LatestResumeTime } // test task information cleanup in task status checker rtsc.w.subTaskHolder.removeSubTask(task1) time.Sleep(backoffMin) rtsc.check() - c.Assert(task2LatestResumeTime.Before(rtsc.bc.latestResumeTime[task2]), check.IsTrue) - c.Assert(len(rtsc.bc.backoffs), check.Equals, 1) - c.Assert(len(rtsc.bc.latestPausedTime), check.Equals, 1) - c.Assert(len(rtsc.bc.latestResumeTime), check.Equals, 1) - c.Assert(len(rtsc.bc.latestBlockTime), check.Equals, 0) + c.Assert(task2LatestResumeTime.Before(rtsc.subtaskTimes[task2].LatestResumeTime), check.IsTrue) + c.Assert(len(rtsc.subtaskTimes), check.Equals, 1) + c.Assert(rtsc.subtaskTimes[task2].LatestBlockTime.IsZero(), check.IsTrue) } func (s *testTaskCheckerSuite) TestIsResumableError(c *check.C) { From 7dd99f86ca891c4faf943f449592b9191d39868e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 24 Mar 2022 18:59:10 +0800 Subject: [PATCH 2/2] address comment Signed-off-by: lance6716 --- dm/dm/worker/task_checker.go | 94 +++++++++++++++---------------- dm/dm/worker/task_checker_test.go | 48 ++++++++-------- 2 files changed, 68 insertions(+), 74 deletions(-) diff --git a/dm/dm/worker/task_checker.go b/dm/dm/worker/task_checker.go index 32640b67f37..9b4bfcfa6e2 100644 --- a/dm/dm/worker/task_checker.go +++ b/dm/dm/worker/task_checker.go @@ -104,9 +104,9 @@ type TaskStatusChecker interface { // NewTaskStatusChecker is a TaskStatusChecker initializer. var NewTaskStatusChecker = NewRealTaskStatusChecker -// AutoResumeTimes contains some Time and Backoff that are related to auto resume. +// AutoResumeInfo contains some Time and Backoff that are related to auto resume. // This structure is exposed for DM as library. -type AutoResumeTimes struct { +type AutoResumeInfo struct { Backoff *backoff.Backoff LatestPausedTime time.Time LatestBlockTime time.Time @@ -126,17 +126,17 @@ type realTaskStatusChecker struct { l log.Logger w *SourceWorker - subtaskTimes map[string]*AutoResumeTimes - relayTimes *AutoResumeTimes + subtaskAutoResume map[string]*AutoResumeInfo + relayAutoResume *AutoResumeInfo } // NewRealTaskStatusChecker creates a new realTaskStatusChecker instance. func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *SourceWorker) TaskStatusChecker { tsc := &realTaskStatusChecker{ - cfg: cfg, - l: log.With(zap.String("component", "task checker")), - w: w, - subtaskTimes: map[string]*AutoResumeTimes{}, + cfg: cfg, + l: log.With(zap.String("component", "task checker")), + w: w, + subtaskAutoResume: map[string]*AutoResumeInfo{}, } tsc.closed.Store(true) return tsc @@ -236,17 +236,16 @@ func isResumableError(err *pb.ProcessError) bool { return true } -// UpdateResumeStrategy updates times and returns ResumeStrategy for a subtask. -// When ResumeDispatch and the subtask is successfully resumed, LatestResumeTime -// and backoff should be updated. +// CheckResumeSubtask updates info and returns ResumeStrategy for a subtask. +// When ResumeDispatch and the subtask is successfully resumed at caller, +// LatestResumeTime and backoff should be updated. // This function is exposed for DM as library. -func UpdateResumeStrategy( +func (i *AutoResumeInfo) CheckResumeSubtask( stStatus *pb.SubTaskStatus, - times *AutoResumeTimes, backoffRollback time.Duration, ) (strategy ResumeStrategy) { defer func() { - updateTimes(strategy, times, backoffRollback) + i.update(strategy, backoffRollback) }() // task that is not paused or paused manually, just ignore it @@ -266,20 +265,19 @@ func UpdateResumeStrategy( } // auto resume interval does not exceed backoff duration, skip this paused task - if time.Since(times.LatestResumeTime) < times.Backoff.Current() { + if time.Since(i.LatestResumeTime) < i.Backoff.Current() { return ResumeSkip } return ResumeDispatch } -func updateRelayResumeStrategy( +func (i *AutoResumeInfo) checkResumeRelay( relayStatus *pb.RelayStatus, - times *AutoResumeTimes, backoffRollback time.Duration, ) (strategy ResumeStrategy) { defer func() { - updateTimes(strategy, times, backoffRollback) + i.update(strategy, backoffRollback) }() // relay that is not paused or paused manually, just ignore it @@ -293,68 +291,64 @@ func updateRelayResumeStrategy( } } - if time.Since(times.LatestResumeTime) < times.Backoff.Current() { + if time.Since(i.LatestResumeTime) < i.Backoff.Current() { return ResumeSkip } return ResumeDispatch } -func updateTimes( - strategy ResumeStrategy, - times *AutoResumeTimes, - backoffRollback time.Duration, -) { +func (i *AutoResumeInfo) update(strategy ResumeStrategy, backoffRollback time.Duration) { switch strategy { case ResumeIgnore: - if time.Since(times.LatestPausedTime) > backoffRollback { - times.Backoff.Rollback() + if time.Since(i.LatestPausedTime) > backoffRollback { + i.Backoff.Rollback() // after each rollback, reset this timer - times.LatestPausedTime = time.Now() + i.LatestPausedTime = time.Now() } case ResumeNoSense: // this strategy doesn't forward or rollback backoff - times.LatestPausedTime = time.Now() - if times.LatestBlockTime.IsZero() { - times.LatestBlockTime = time.Now() + i.LatestPausedTime = time.Now() + if i.LatestBlockTime.IsZero() { + i.LatestBlockTime = time.Now() } case ResumeSkip, ResumeDispatch: - times.LatestPausedTime = time.Now() + i.LatestPausedTime = time.Now() } } func (tsc *realTaskStatusChecker) checkRelayStatus() { relayStatus := tsc.w.relayHolder.Status(nil) - if tsc.relayTimes == nil { + if tsc.relayAutoResume == nil { bf, _ := backoff.NewBackoff( tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration) - tsc.relayTimes = &AutoResumeTimes{ + tsc.relayAutoResume = &AutoResumeInfo{ Backoff: bf, LatestResumeTime: time.Now(), LatestPausedTime: time.Now(), } } - strategy := updateRelayResumeStrategy(relayStatus, tsc.relayTimes, tsc.cfg.BackoffRollback.Duration) + strategy := tsc.relayAutoResume.checkResumeRelay(relayStatus, tsc.cfg.BackoffRollback.Duration) switch strategy { case ResumeNoSense: tsc.l.Warn("relay can't auto resume", - zap.Duration("paused duration", time.Since(tsc.relayTimes.LatestBlockTime))) + zap.Duration("paused duration", time.Since(tsc.relayAutoResume.LatestBlockTime))) case ResumeSkip: tsc.l.Warn("backoff skip auto resume relay", - zap.Time("latestResumeTime", tsc.relayTimes.LatestResumeTime), - zap.Duration("duration", tsc.relayTimes.Backoff.Current())) + zap.Time("latestResumeTime", tsc.relayAutoResume.LatestResumeTime), + zap.Duration("duration", tsc.relayAutoResume.Backoff.Current())) case ResumeDispatch: err := tsc.w.operateRelay(tsc.ctx, pb.RelayOp_ResumeRelay) if err != nil { tsc.l.Error("dispatch auto resume relay failed", zap.Error(err)) } else { tsc.l.Info("dispatch auto resume relay") - tsc.relayTimes.LatestResumeTime = time.Now() - tsc.relayTimes.Backoff.BoundaryForward() + tsc.relayAutoResume.LatestResumeTime = time.Now() + tsc.relayAutoResume.Backoff.BoundaryForward() } } } @@ -364,49 +358,49 @@ func (tsc *realTaskStatusChecker) checkTaskStatus() { defer func() { // cleanup outdated tasks - for taskName := range tsc.subtaskTimes { + for taskName := range tsc.subtaskAutoResume { _, ok := allSubTaskStatus[taskName] if !ok { tsc.l.Debug("remove task from checker", zap.String("task", taskName)) - delete(tsc.subtaskTimes, taskName) + delete(tsc.subtaskAutoResume, taskName) } } }() for taskName, stStatus := range allSubTaskStatus { - times, ok := tsc.subtaskTimes[taskName] + info, ok := tsc.subtaskAutoResume[taskName] if !ok { bf, _ := backoff.NewBackoff( tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration) - times = &AutoResumeTimes{ + info = &AutoResumeInfo{ Backoff: bf, LatestPausedTime: time.Now(), LatestResumeTime: time.Now(), } - tsc.subtaskTimes[taskName] = times + tsc.subtaskAutoResume[taskName] = info } - strategy := UpdateResumeStrategy(stStatus, times, tsc.cfg.BackoffRollback.Duration) + strategy := info.CheckResumeSubtask(stStatus, tsc.cfg.BackoffRollback.Duration) switch strategy { case ResumeNoSense: tsc.l.Warn("task can't auto resume", zap.String("task", taskName), - zap.Duration("paused duration", time.Since(times.LatestBlockTime))) + zap.Duration("paused duration", time.Since(info.LatestBlockTime))) case ResumeSkip: tsc.l.Warn("backoff skip auto resume task", zap.String("task", taskName), - zap.Time("latestResumeTime", times.LatestResumeTime), - zap.Duration("duration", times.Backoff.Current())) + zap.Time("latestResumeTime", info.LatestResumeTime), + zap.Duration("duration", info.Backoff.Current())) case ResumeDispatch: err := tsc.w.OperateSubTask(taskName, pb.TaskOp_AutoResume) if err != nil { tsc.l.Error("dispatch auto resume task failed", zap.String("task", taskName), zap.Error(err)) } else { tsc.l.Info("dispatch auto resume task", zap.String("task", taskName)) - times.LatestResumeTime = time.Now() - times.Backoff.BoundaryForward() + info.LatestResumeTime = time.Now() + info.Backoff.BoundaryForward() } } } diff --git a/dm/dm/worker/task_checker_test.go b/dm/dm/worker/task_checker_test.go index d1284d5f3b8..963f2f6e881 100644 --- a/dm/dm/worker/task_checker_test.go +++ b/dm/dm/worker/task_checker_test.go @@ -76,11 +76,11 @@ func (s *testTaskCheckerSuite) TestResumeStrategy(c *check.C) { false, tc.duration, tc.duration) - rtsc.subtaskTimes[taskName] = &AutoResumeTimes{ + rtsc.subtaskAutoResume[taskName] = &AutoResumeInfo{ Backoff: bf, LatestResumeTime: tc.latestResumeFn(tc.addition), } - strategy := UpdateResumeStrategy(tc.status, rtsc.subtaskTimes[taskName], config.DefaultBackoffRollback) + strategy := rtsc.subtaskAutoResume[taskName].CheckResumeSubtask(tc.status, config.DefaultBackoffRollback) c.Assert(strategy, check.Equals, tc.expected) } } @@ -123,7 +123,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { c.Assert(st.cfg.Adjust(false), check.IsNil) rtsc.w.subTaskHolder.recordSubTask(st) rtsc.check() - bf := rtsc.subtaskTimes[taskName].Backoff + bf := rtsc.subtaskAutoResume[taskName].Backoff // test resume with paused task st.stage = pb.Stage_Paused @@ -152,11 +152,11 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { Errors: []*pb.ProcessError{unsupportedModifyColumnError}, } rtsc.check() - c.Assert(latestPausedTime.Before(rtsc.subtaskTimes[taskName].LatestPausedTime), check.IsTrue) - latestBlockTime = rtsc.subtaskTimes[taskName].LatestBlockTime + c.Assert(latestPausedTime.Before(rtsc.subtaskAutoResume[taskName].LatestPausedTime), check.IsTrue) + latestBlockTime = rtsc.subtaskAutoResume[taskName].LatestBlockTime time.Sleep(200 * time.Millisecond) rtsc.check() - c.Assert(rtsc.subtaskTimes[taskName].LatestBlockTime, check.Equals, latestBlockTime) + c.Assert(rtsc.subtaskAutoResume[taskName].LatestBlockTime, check.Equals, latestBlockTime) c.Assert(bf.Current(), check.Equals, current) // test resume skip strategy @@ -179,7 +179,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { } rtsc.w.subTaskHolder.recordSubTask(st) rtsc.check() - bf = rtsc.subtaskTimes[taskName].Backoff + bf = rtsc.subtaskAutoResume[taskName].Backoff st.stage = pb.Stage_Paused st.result = &pb.ProcessResult{ @@ -187,14 +187,14 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { Errors: []*pb.ProcessError{unknownProcessError}, } rtsc.check() - latestResumeTime = rtsc.subtaskTimes[taskName].LatestResumeTime - latestPausedTime = rtsc.subtaskTimes[taskName].LatestPausedTime + latestResumeTime = rtsc.subtaskAutoResume[taskName].LatestResumeTime + latestPausedTime = rtsc.subtaskAutoResume[taskName].LatestPausedTime c.Assert(bf.Current(), check.Equals, 10*time.Second) for i := 0; i < 10; i++ { rtsc.check() - c.Assert(latestResumeTime, check.Equals, rtsc.subtaskTimes[taskName].LatestResumeTime) - c.Assert(latestPausedTime.Before(rtsc.subtaskTimes[taskName].LatestPausedTime), check.IsTrue) - latestPausedTime = rtsc.subtaskTimes[taskName].LatestPausedTime + c.Assert(latestResumeTime, check.Equals, rtsc.subtaskAutoResume[taskName].LatestResumeTime) + c.Assert(latestPausedTime.Before(rtsc.subtaskAutoResume[taskName].LatestPausedTime), check.IsTrue) + latestPausedTime = rtsc.subtaskAutoResume[taskName].LatestPausedTime } } @@ -242,8 +242,8 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { } rtsc.w.subTaskHolder.recordSubTask(st2) rtsc.check() - c.Assert(len(rtsc.subtaskTimes), check.Equals, 2) - for _, times := range rtsc.subtaskTimes { + c.Assert(len(rtsc.subtaskAutoResume), check.Equals, 2) + for _, times := range rtsc.subtaskAutoResume { c.Assert(times.LatestBlockTime.IsZero(), check.IsTrue) } @@ -271,26 +271,26 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { c.Assert(st2.cfg.Adjust(false), check.IsNil) rtsc.w.subTaskHolder.recordSubTask(st2) - task1LatestResumeTime = rtsc.subtaskTimes[task1].LatestResumeTime - task2LatestResumeTime = rtsc.subtaskTimes[task2].LatestResumeTime + task1LatestResumeTime = rtsc.subtaskAutoResume[task1].LatestResumeTime + task2LatestResumeTime = rtsc.subtaskAutoResume[task2].LatestResumeTime for i := 0; i < 10; i++ { time.Sleep(backoffMin) rtsc.check() - c.Assert(task1LatestResumeTime, check.Equals, rtsc.subtaskTimes[task1].LatestResumeTime) - c.Assert(task2LatestResumeTime.Before(rtsc.subtaskTimes[task2].LatestResumeTime), check.IsTrue) - c.Assert(rtsc.subtaskTimes[task1].LatestBlockTime.IsZero(), check.IsFalse) - c.Assert(rtsc.subtaskTimes[task2].LatestBlockTime.IsZero(), check.IsTrue) + c.Assert(task1LatestResumeTime, check.Equals, rtsc.subtaskAutoResume[task1].LatestResumeTime) + c.Assert(task2LatestResumeTime.Before(rtsc.subtaskAutoResume[task2].LatestResumeTime), check.IsTrue) + c.Assert(rtsc.subtaskAutoResume[task1].LatestBlockTime.IsZero(), check.IsFalse) + c.Assert(rtsc.subtaskAutoResume[task2].LatestBlockTime.IsZero(), check.IsTrue) - task2LatestResumeTime = rtsc.subtaskTimes[task2].LatestResumeTime + task2LatestResumeTime = rtsc.subtaskAutoResume[task2].LatestResumeTime } // test task information cleanup in task status checker rtsc.w.subTaskHolder.removeSubTask(task1) time.Sleep(backoffMin) rtsc.check() - c.Assert(task2LatestResumeTime.Before(rtsc.subtaskTimes[task2].LatestResumeTime), check.IsTrue) - c.Assert(len(rtsc.subtaskTimes), check.Equals, 1) - c.Assert(rtsc.subtaskTimes[task2].LatestBlockTime.IsZero(), check.IsTrue) + c.Assert(task2LatestResumeTime.Before(rtsc.subtaskAutoResume[task2].LatestResumeTime), check.IsTrue) + c.Assert(len(rtsc.subtaskAutoResume), check.Equals, 1) + c.Assert(rtsc.subtaskAutoResume[task2].LatestBlockTime.IsZero(), check.IsTrue) } func (s *testTaskCheckerSuite) TestIsResumableError(c *check.C) {