diff --git a/pkg/disttask/framework/integrationtests/BUILD.bazel b/pkg/disttask/framework/integrationtests/BUILD.bazel index 3c1aaaaa657bb..0ad38d06efa16 100644 --- a/pkg/disttask/framework/integrationtests/BUILD.bazel +++ b/pkg/disttask/framework/integrationtests/BUILD.bazel @@ -17,7 +17,7 @@ go_test( ], flaky = True, race = "off", - shard_count = 23, + shard_count = 22, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/disttask/framework/integrationtests/framework_ha_test.go b/pkg/disttask/framework/integrationtests/framework_ha_test.go index 78db6ba5a0fe8..8d33701a579ee 100644 --- a/pkg/disttask/framework/integrationtests/framework_ha_test.go +++ b/pkg/disttask/framework/integrationtests/framework_ha_test.go @@ -36,7 +36,6 @@ func submitTaskAndCheckSuccessForHA(ctx context.Context, t *testing.T, taskKey s } func TestHANodeRandomShutdown(t *testing.T) { - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBShutdown", "return()") c := testutil.NewDXFContextWithRandomNodes(t, 4, 15) registerExampleTask(t, c.MockCtrl, testutil.GetMockHATestSchedulerExt(c.MockCtrl), c.TestContext, nil) @@ -44,18 +43,18 @@ func TestHANodeRandomShutdown(t *testing.T) { keepCount := int(math.Min(float64(c.NodeCount()-1), float64(c.Rand.Intn(10)+1))) nodeNeedDown := c.GetRandNodeIDs(c.NodeCount() - keepCount) t.Logf("started %d nodes, and we keep %d nodes, nodes that need shutdown: %v", c.NodeCount(), keepCount, nodeNeedDown) - taskexecutor.MockTiDBDown = func(execID string, _ *proto.TaskBase) bool { - if _, ok := nodeNeedDown[execID]; ok { - c.AsyncShutdown(execID) - return true - } - return false - } + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBShutdown", + func(e taskexecutor.TaskExecutor, execID string, _ *proto.TaskBase) { + if _, ok := nodeNeedDown[execID]; ok { + c.AsyncShutdown(execID) + e.Cancel() + } + }, + ) submitTaskAndCheckSuccessForHA(c.Ctx, t, "😊", c.TestContext) } func TestHARandomShutdownInDifferentStep(t *testing.T) { - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBShutdown", "return()") c := testutil.NewDXFContextWithRandomNodes(t, 6, 15) registerExampleTask(t, c.MockCtrl, testutil.GetMockHATestSchedulerExt(c.MockCtrl), c.TestContext, nil) @@ -64,22 +63,23 @@ func TestHARandomShutdownInDifferentStep(t *testing.T) { nodeNeedDownAtStepTwo := c.GetRandNodeIDs(c.NodeCount()/2 - 1) t.Logf("started %d nodes, shutdown nodes at step 1: %v, shutdown nodes at step 2: %v", c.NodeCount(), nodeNeedDownAtStepOne, nodeNeedDownAtStepTwo) - taskexecutor.MockTiDBDown = func(execID string, task *proto.TaskBase) bool { - var targetNodes map[string]struct{} - switch task.Step { - case proto.StepOne: - targetNodes = nodeNeedDownAtStepOne - case proto.StepTwo: - targetNodes = nodeNeedDownAtStepTwo - default: - return false - } - if _, ok := targetNodes[execID]; ok { - c.AsyncShutdown(execID) - return true - } - return false - } + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBShutdown", + func(e taskexecutor.TaskExecutor, execID string, task *proto.TaskBase) { + var targetNodes map[string]struct{} + switch task.Step { + case proto.StepOne: + targetNodes = nodeNeedDownAtStepOne + case proto.StepTwo: + targetNodes = nodeNeedDownAtStepTwo + default: + return + } + if _, ok := targetNodes[execID]; ok { + c.AsyncShutdown(execID) + e.Cancel() + } + }, + ) submitTaskAndCheckSuccessForHA(c.Ctx, t, "😊", c.TestContext) } diff --git a/pkg/disttask/framework/integrationtests/framework_test.go b/pkg/disttask/framework/integrationtests/framework_test.go index a9a05eb65060f..9eb26feb2279c 100644 --- a/pkg/disttask/framework/integrationtests/framework_test.go +++ b/pkg/disttask/framework/integrationtests/framework_test.go @@ -234,22 +234,6 @@ func TestGC(t *testing.T) { }, 10*time.Second, 500*time.Millisecond) } -func TestFrameworkSubtaskFinishedCancel(t *testing.T) { - c := testutil.NewTestDXFContext(t, 3, 16, true) - - registerExampleTask(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) - var counter atomic.Int32 - testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterOnFinishedCalled", - func(e *taskexecutor.BaseTaskExecutor) { - if counter.Add(1) == 1 { - e.CancelRunningSubtask() - } - }, - ) - task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1) - require.Equal(t, proto.TaskStateReverted, task.State) -} - func TestFrameworkRunSubtaskCancelOrFailed(t *testing.T) { c := testutil.NewTestDXFContext(t, 3, 16, true) @@ -257,8 +241,9 @@ func TestFrameworkRunSubtaskCancelOrFailed(t *testing.T) { t.Run("meet cancel on run subtask", func(t *testing.T) { var counter atomic.Int32 testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/changeRunSubtaskError", - func(errP *error) { + func(e taskexecutor.TaskExecutor, errP *error) { if counter.Add(1) == 1 { + e.CancelRunningSubtask() *errP = taskexecutor.ErrCancelSubtask } }, @@ -270,7 +255,7 @@ func TestFrameworkRunSubtaskCancelOrFailed(t *testing.T) { t.Run("meet some error on run subtask", func(t *testing.T) { var counter atomic.Int32 testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/changeRunSubtaskError", - func(errP *error) { + func(_ taskexecutor.TaskExecutor, errP *error) { if counter.Add(1) == 1 { *errP = errors.New("MockExecutorRunErr") } diff --git a/pkg/disttask/framework/taskexecutor/BUILD.bazel b/pkg/disttask/framework/taskexecutor/BUILD.bazel index 79a2483481d91..031a5b368f0b6 100644 --- a/pkg/disttask/framework/taskexecutor/BUILD.bazel +++ b/pkg/disttask/framework/taskexecutor/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "//pkg/disttask/framework/scheduler", "//pkg/disttask/framework/storage", "//pkg/disttask/framework/taskexecutor/execute", - "//pkg/lightning/common", "//pkg/lightning/log", "//pkg/metrics", "//pkg/sessionctx/variable", @@ -51,7 +50,7 @@ go_test( ], embed = [":taskexecutor"], flaky = True, - shard_count = 17, + shard_count = 16, deps = [ "//pkg/disttask/framework/mock", "//pkg/disttask/framework/mock/execute", @@ -74,7 +73,5 @@ go_test( "@org_golang_google_grpc//status", "@org_uber_go_goleak//:goleak", "@org_uber_go_mock//gomock", - "@org_uber_go_zap//:zap", - "@org_uber_go_zap//zaptest/observer", ], ) diff --git a/pkg/disttask/framework/taskexecutor/execute/interface.go b/pkg/disttask/framework/taskexecutor/execute/interface.go index 75890d2c3ddb0..f3db0c9d4f8b6 100644 --- a/pkg/disttask/framework/taskexecutor/execute/interface.go +++ b/pkg/disttask/framework/taskexecutor/execute/interface.go @@ -33,7 +33,9 @@ type StepExecutor interface { StepExecFrameworkInfo // Init is used to initialize the environment. - // if failed, task executor will retry later. + // task executor will retry if the returned error is retryable, see + // IsRetryableError in TaskExecutor.Extension, else framework will mark random + // subtask as failed, to trigger task failure. Init(context.Context) error // RunSubtask is used to run the subtask. RunSubtask(ctx context.Context, subtask *proto.Subtask) error @@ -42,9 +44,13 @@ type StepExecutor interface { RealtimeSummary() *SubtaskSummary // OnFinished is used to handle the subtask when it is finished. - // The subtask meta can be updated in place. + // The subtask meta can be updated in place. only when OnFinished returns no + // err, a subtask can be marked as 'success', if it returns error, the subtask + // might be completely rerun, so don't put code that's prone to error in it. OnFinished(ctx context.Context, subtask *proto.Subtask) error - // Cleanup is used to clean up the environment. + // Cleanup is used to clean up the environment for this step. + // the returned error will not affect task/subtask state, it's only logged, + // so don't put code that's prone to error in it. Cleanup(context.Context) error } diff --git a/pkg/disttask/framework/taskexecutor/interface.go b/pkg/disttask/framework/taskexecutor/interface.go index 2ae95087960b5..020896a2a18ee 100644 --- a/pkg/disttask/framework/taskexecutor/interface.go +++ b/pkg/disttask/framework/taskexecutor/interface.go @@ -115,9 +115,7 @@ type Extension interface { // the Executor will mark the subtask as failed. IsIdempotent(subtask *proto.Subtask) bool // GetStepExecutor returns the subtask executor for the subtask. - // Note: - // 1. summary is the summary manager of all subtask of the same type now. - // 2. should not retry the error from it. + // Note, the error returned is fatal, framework will fail the task directly. GetStepExecutor(task *proto.Task) (execute.StepExecutor, error) // IsRetryableError returns whether the error is transient. // When error is transient, the framework won't mark subtasks as failed, diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 4f7fc0d2e3024..b3d80e7cc2e75 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -16,7 +16,6 @@ package taskexecutor import ( "context" - "fmt" "sync" "sync/atomic" "time" @@ -29,7 +28,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" - "github.com/pingcap/tidb/pkg/lightning/common" llog "github.com/pingcap/tidb/pkg/lightning/log" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/backoff" @@ -52,14 +50,9 @@ var ( var ( // ErrCancelSubtask is the cancel cause when cancelling subtasks. ErrCancelSubtask = errors.New("cancel subtasks") - // ErrFinishSubtask is the cancel cause when TaskExecutor successfully processed subtasks. - ErrFinishSubtask = errors.New("finish subtasks") // ErrNonIdempotentSubtask means the subtask is left in running state and is not idempotent, // so cannot be run again. ErrNonIdempotentSubtask = errors.New("subtask in running state and is not idempotent") - - // MockTiDBDown is used to mock TiDB node down, return true if it's chosen. - MockTiDBDown func(execID string, task *proto.TaskBase) bool ) // BaseTaskExecutor is the base implementation of TaskExecutor. @@ -79,9 +72,6 @@ type BaseTaskExecutor struct { mu struct { sync.RWMutex - err error - // handled indicates whether the error has been updated to one of the subtask. - handled bool // runtimeCancel is used to cancel the Run/Rollback when error occurs. runtimeCancel context.CancelCauseFunc } @@ -143,7 +133,13 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { continue } if !e.IsIdempotent(st) { - e.updateSubtaskStateAndErrorImpl(ctx, st.ExecID, st.ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask) + if err := e.updateSubtaskStateAndErrorImpl(ctx, st.ExecID, st.ID, + proto.SubtaskStateFailed, ErrNonIdempotentSubtask); err != nil { + e.logger.Error("failed to update subtask to 'failed' state", zap.Error(err)) + continue + } + // if a subtask fail, scheduler will notice and start revert the + // task, so we can directly return. return } extraRunningSubtasks = append(extraRunningSubtasks, &st.SubtaskBase) @@ -236,58 +232,33 @@ func (e *BaseTaskExecutor) Run(resource *proto.StepResource) { checkInterval, noSubtaskCheckCnt = SubtaskCheckInterval, 0 err = e.RunStep(resource) if err != nil { - e.logger.Error("failed to handle task", zap.Error(err)) + e.logger.Error("run task step failed", zap.Error(err)) } } } // RunStep start to fetch and run all subtasks for the step of task on the node. // return if there's no subtask to run. -func (e *BaseTaskExecutor) RunStep(resource *proto.StepResource) (err error) { +func (e *BaseTaskExecutor) RunStep(resource *proto.StepResource) (resErr error) { defer func() { if r := recover(); r != nil { - e.logger.Error("BaseTaskExecutor panicked", zap.Any("recover", r), zap.Stack("stack")) + e.logger.Error("run step panicked", zap.Any("recover", r), zap.Stack("stack")) err4Panic := errors.Errorf("%v", r) - err1 := e.updateSubtask(err4Panic) - if err == nil { - err = err1 - } + taskBase := e.taskBase.Load() + e.failOneSubtask(e.ctx, taskBase.ID, err4Panic) + resErr = err4Panic } }() - err = e.runStep(resource) - if e.mu.handled { - return err - } - if err == nil { - // may have error in - // 1. defer function in run(ctx, task) - // 2. cancel ctx - // TODO: refine onError/getError - if e.getError() != nil { - err = e.getError() - } else if e.ctx.Err() != nil { - err = e.ctx.Err() - } else { - return nil - } - } - - return e.updateSubtask(err) -} - -func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) { runStepCtx, runStepCancel := context.WithCancelCause(e.ctx) e.registerRunStepCancelFunc(runStepCancel) defer func() { - runStepCancel(ErrFinishSubtask) + runStepCancel(nil) e.unregisterRunStepCancelFunc() }() - e.resetError() taskBase := e.taskBase.Load() task, err := e.taskTable.GetTaskByID(e.ctx, taskBase.ID) if err != nil { - e.onError(errors.Trace(err)) - return e.getError() + return errors.Trace(err) } stepLogger := llog.BeginTask(e.logger.With( zap.String("step", proto.Step2Str(task.Type, task.Step)), @@ -302,31 +273,37 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) stepExecutor, err := e.GetStepExecutor(task) if err != nil { - e.onError(errors.Trace(err)) - return e.getError() + e.logger.Info("failed to get step executor", zap.Error(err)) + e.failOneSubtask(runStepCtx, task.ID, err) + return errors.Trace(err) } execute.SetFrameworkInfo(stepExecutor, resource) if err := stepExecutor.Init(runStepCtx); err != nil { - e.onError(errors.Trace(err)) - return e.getError() + if e.IsRetryableError(err) { + e.logger.Info("meet retryable err when init step executor", zap.Error(err)) + } else { + e.logger.Info("failed to init step executor", zap.Error(err)) + e.failOneSubtask(runStepCtx, task.ID, err) + } + return errors.Trace(err) } defer func() { err := stepExecutor.Cleanup(runStepCtx) if err != nil { e.logger.Error("cleanup subtask exec env failed", zap.Error(err)) - e.onError(errors.Trace(err)) + // Cleanup is not a critical path of running subtask, so no need to + // affect state of subtasks. there might be no subtask to change even + // we want to if all subtasks are finished. } }() for { - // check if any error occurs. - if err := e.getError(); err != nil { - break - } - if runStepCtx.Err() != nil { - break + select { + case <-runStepCtx.Done(): + return runStepCtx.Err() + default: } subtask, err := e.taskTable.GetFirstSubtaskInStates(runStepCtx, e.id, task.ID, task.Step, @@ -336,17 +313,18 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) continue } if subtask == nil { - break + return nil } if subtask.State == proto.SubtaskStateRunning { if !e.IsIdempotent(subtask) { e.logger.Info("subtask in running state and is not idempotent, fail it", zap.Int64("subtask-id", subtask.ID)) - e.onError(ErrNonIdempotentSubtask) - e.updateSubtaskStateAndErrorImpl(runStepCtx, subtask.ExecID, subtask.ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask) - e.markErrorHandled() - break + if err := e.updateSubtaskStateAndErrorImpl(runStepCtx, subtask.ExecID, subtask.ID, + proto.SubtaskStateFailed, ErrNonIdempotentSubtask); err != nil { + return err + } + return ErrNonIdempotentSubtask } e.logger.Info("subtask in running state and is idempotent", zap.Int64("subtask-id", subtask.ID)) @@ -354,13 +332,11 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) // subtask.State == proto.SubtaskStatePending err := e.startSubtask(runStepCtx, subtask.ID) if err != nil { - e.logger.Warn("startSubtask meets error", zap.Error(err)) // should ignore ErrSubtaskNotFound // since it only means that the subtask not owned by current task executor. - if err == storage.ErrSubtaskNotFound { - continue + if err != storage.ErrSubtaskNotFound { + e.logger.Warn("start subtask meets error", zap.Error(err)) } - e.onError(errors.Trace(err)) continue } } @@ -369,9 +345,10 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) runStepCancel(nil) }) - e.runSubtask(runStepCtx, stepExecutor, subtask) + if err := e.runSubtask(runStepCtx, stepExecutor, subtask); err != nil { + return err + } } - return e.getError() } func (e *BaseTaskExecutor) hasRealtimeSummary(stepExecutor execute.StepExecutor) bool { @@ -379,8 +356,11 @@ func (e *BaseTaskExecutor) hasRealtimeSummary(stepExecutor execute.StepExecutor) return ok && stepExecutor.RealtimeSummary() != nil } -func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute.StepExecutor, subtask *proto.Subtask) { - err := func() error { +func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute.StepExecutor, + subtask *proto.Subtask) error { + logger := e.logger.With(zap.Int64("subtaskID", subtask.ID)) + logTask := llog.BeginTask(logger, "run subtask") + subtaskErr := func() error { e.currSubtaskID.Store(subtask.ID) var wg util.WaitGroupWrapper @@ -400,48 +380,26 @@ func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute. }() return stepExecutor.RunSubtask(ctx, subtask) }() - failpoint.InjectCall("changeRunSubtaskError", &err) - - if err != nil { - e.onError(errors.Trace(err)) - } + failpoint.InjectCall("changeRunSubtaskError", e, &subtaskErr) + logTask.End2(zap.InfoLevel, subtaskErr) - handled := e.markSubTaskCanceledOrFailed(ctx, subtask) - if handled { - return - } - - failpoint.Inject("mockTiDBShutdown", func() { - if MockTiDBDown(e.id, e.GetTaskBase()) { - failpoint.Return() - } - }) - - failpoint.InjectCall("beforeCallOnSubtaskFinished", subtask) - e.onSubtaskFinished(ctx, stepExecutor, subtask) -} + failpoint.InjectCall("mockTiDBShutdown", e, e.id, e.GetTaskBase()) -func (e *BaseTaskExecutor) onSubtaskFinished(ctx context.Context, executor execute.StepExecutor, subtask *proto.Subtask) { - if err := e.getError(); err == nil { - if err = executor.OnFinished(ctx, subtask); err != nil { - e.onError(errors.Trace(err)) + if subtaskErr != nil { + if err := e.markSubTaskCanceledOrFailed(ctx, subtask, subtaskErr); err != nil { + logger.Error("failed to handle subtask error", zap.Error(err)) } - failpoint.InjectCall("afterOnFinishedCalled", e) - } - - handled := e.markSubTaskCanceledOrFailed(ctx, subtask) - if handled { - return + return subtaskErr } - e.finishSubtask(ctx, subtask) - - handled = e.markSubTaskCanceledOrFailed(ctx, subtask) - if handled { - return + failpoint.InjectCall("beforeCallOnSubtaskFinished", subtask) + if err := stepExecutor.OnFinished(ctx, subtask); err != nil { + logger.Info("OnFinished failed", zap.Error(err)) + return errors.Trace(err) } - + err := e.finishSubtask(ctx, subtask) failpoint.InjectCall("syncAfterSubtaskFinish") + return err } // GetTaskBase implements TaskExecutor.GetTaskBase. @@ -495,50 +453,8 @@ func (e *BaseTaskExecutor) cancelRunStepWith(cause error) { } } -func (e *BaseTaskExecutor) onError(err error) { - if err == nil { - return - } - - if errors.HasStack(err) { - e.logger.Error("onError", zap.Error(err), zap.Stack("stack"), - zap.String("error stack", fmt.Sprintf("%+v", err))) - } else { - e.logger.Error("onError", zap.Error(err), zap.Stack("stack")) - } - e.mu.Lock() - defer e.mu.Unlock() - - if e.mu.err == nil { - e.mu.err = err - e.logger.Error("taskExecutor met first error", zap.Error(err)) - } - - if e.mu.runtimeCancel != nil { - e.mu.runtimeCancel(err) - } -} - -func (e *BaseTaskExecutor) markErrorHandled() { - e.mu.Lock() - defer e.mu.Unlock() - e.mu.handled = true -} - -func (e *BaseTaskExecutor) getError() error { - e.mu.RLock() - defer e.mu.RUnlock() - return e.mu.err -} - -func (e *BaseTaskExecutor) resetError() { - e.mu.Lock() - defer e.mu.Unlock() - e.mu.err = nil - e.mu.handled = false -} - -func (e *BaseTaskExecutor) updateSubtaskStateAndErrorImpl(ctx context.Context, execID string, subtaskID int64, state proto.SubtaskState, subTaskErr error) { +func (e *BaseTaskExecutor) updateSubtaskStateAndErrorImpl(ctx context.Context, execID string, subtaskID int64, state proto.SubtaskState, subTaskErr error) error { + start := time.Now() // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes backoffer := backoff.NewExponential(scheduler.RetrySQLInterval, 2, scheduler.RetrySQLMaxInterval) err := handle.RunWithRetry(ctx, scheduler.RetrySQLTimes, backoffer, e.logger, @@ -547,17 +463,21 @@ func (e *BaseTaskExecutor) updateSubtaskStateAndErrorImpl(ctx context.Context, e }, ) if err != nil { - e.onError(errors.Trace(err)) + e.logger.Error("failed to update subtask state", zap.Int64("subtaskID", subtaskID), + zap.Stringer("targetState", state), zap.NamedError("subtaskErr", subTaskErr), + zap.Duration("takes", time.Since(start)), zap.Error(err)) } + return err } // startSubtask try to change the state of the subtask to running. // If the subtask is not owned by the task executor, // the update will fail and task executor should not run the subtask. func (e *BaseTaskExecutor) startSubtask(ctx context.Context, subtaskID int64) error { + start := time.Now() // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes backoffer := backoff.NewExponential(scheduler.RetrySQLInterval, 2, scheduler.RetrySQLMaxInterval) - return handle.RunWithRetry(ctx, scheduler.RetrySQLTimes, backoffer, e.logger, + err := handle.RunWithRetry(ctx, scheduler.RetrySQLTimes, backoffer, e.logger, func(ctx context.Context) (bool, error) { err := e.taskTable.StartSubtask(ctx, subtaskID, e.id) if err == storage.ErrSubtaskNotFound { @@ -567,9 +487,16 @@ func (e *BaseTaskExecutor) startSubtask(ctx context.Context, subtaskID int64) er return true, err }, ) + if err != nil && err != storage.ErrSubtaskNotFound { + e.logger.Error("failed to start subtask", zap.Int64("subtaskID", subtaskID), + zap.Duration("takes", time.Since(start)), zap.Error(err)) + } + return err } -func (e *BaseTaskExecutor) finishSubtask(ctx context.Context, subtask *proto.Subtask) { +func (e *BaseTaskExecutor) finishSubtask(ctx context.Context, subtask *proto.Subtask) error { + start := time.Now() + // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes backoffer := backoff.NewExponential(scheduler.RetrySQLInterval, 2, scheduler.RetrySQLMaxInterval) err := handle.RunWithRetry(ctx, scheduler.RetrySQLTimes, backoffer, e.logger, func(ctx context.Context) (bool, error) { @@ -577,79 +504,46 @@ func (e *BaseTaskExecutor) finishSubtask(ctx context.Context, subtask *proto.Sub }, ) if err != nil { - e.onError(errors.Trace(err)) + e.logger.Error("failed to finish subtask", zap.Int64("subtaskID", subtask.ID), + zap.Duration("takes", time.Since(start)), zap.Error(err)) } + return err } // markSubTaskCanceledOrFailed check the error type and decide the subtasks' state. // 1. Only cancel subtasks when meet ErrCancelSubtask. // 2. Only fail subtasks when meet non retryable error. // 3. When meet other errors, don't change subtasks' state. -func (e *BaseTaskExecutor) markSubTaskCanceledOrFailed(ctx context.Context, subtask *proto.Subtask) bool { - if err := e.getError(); err != nil { - err := errors.Cause(err) - if ctx.Err() != nil && context.Cause(ctx) == ErrCancelSubtask { - e.logger.Warn("subtask canceled", zap.Error(err)) - e.updateSubtaskStateAndErrorImpl(e.ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateCanceled, nil) - } else if e.IsRetryableError(err) { - e.logger.Warn("meet retryable error", zap.Error(err)) - } else if common.IsContextCanceledError(err) { - e.logger.Info("meet context canceled for gracefully shutdown", zap.Error(err)) - } else { - e.logger.Warn("subtask failed", zap.Error(err)) - e.updateSubtaskStateAndErrorImpl(e.ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateFailed, err) +func (e *BaseTaskExecutor) markSubTaskCanceledOrFailed(ctx context.Context, subtask *proto.Subtask, stErr error) error { + if ctx.Err() != nil { + if context.Cause(ctx) == ErrCancelSubtask { + e.logger.Warn("subtask canceled") + return e.updateSubtaskStateAndErrorImpl(e.ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateCanceled, nil) } - e.markErrorHandled() - return true - } - return false -} -func (e *BaseTaskExecutor) failSubtaskWithRetry(ctx context.Context, taskID int64, err error) error { - backoffer := backoff.NewExponential(scheduler.RetrySQLInterval, 2, scheduler.RetrySQLMaxInterval) - err1 := handle.RunWithRetry(e.ctx, scheduler.RetrySQLTimes, backoffer, e.logger, - func(_ context.Context) (bool, error) { - return true, e.taskTable.FailSubtask(ctx, e.id, taskID, err) - }, - ) - if err1 == nil { - e.logger.Info("failed one subtask succeed", zap.NamedError("subtask-err", err)) + e.logger.Info("meet context canceled for gracefully shutdown") + } else if e.IsRetryableError(stErr) { + e.logger.Warn("meet retryable error", zap.Error(stErr)) + } else { + e.logger.Warn("subtask failed", zap.Error(stErr)) + return e.updateSubtaskStateAndErrorImpl(e.ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateFailed, stErr) } - return err1 + return nil } -func (e *BaseTaskExecutor) cancelSubtaskWithRetry(ctx context.Context, taskID int64, err error) error { - e.logger.Warn("subtask canceled", zap.NamedError("subtask-cancel", err)) +// on fatal error, we randomly fail a subtask to notify scheduler to revert the +// task. we don't return the internal error, what can we do if we failed to handle +// a fatal error? +func (e *BaseTaskExecutor) failOneSubtask(ctx context.Context, taskID int64, subtaskErr error) { + start := time.Now() backoffer := backoff.NewExponential(scheduler.RetrySQLInterval, 2, scheduler.RetrySQLMaxInterval) - err1 := handle.RunWithRetry(e.ctx, scheduler.RetrySQLTimes, backoffer, e.logger, + err1 := handle.RunWithRetry(ctx, scheduler.RetrySQLTimes, backoffer, e.logger, func(_ context.Context) (bool, error) { - return true, e.taskTable.CancelSubtask(ctx, e.id, taskID) + return true, e.taskTable.FailSubtask(ctx, e.id, taskID, subtaskErr) }, ) - if err1 == nil { - e.logger.Info("canceled one subtask succeed", zap.NamedError("subtask-cancel", err)) + if err1 != nil { + e.logger.Error("fail one subtask failed", zap.NamedError("subtaskErr", subtaskErr), + zap.Duration("takes", time.Since(start)), zap.Error(err1)) } - return err1 -} - -// updateSubtask check the error type and decide the subtasks' state. -// 1. Only cancel subtasks when meet ErrCancelSubtask. -// 2. Only fail subtasks when meet non retryable error. -// 3. When meet other errors, don't change subtasks' state. -// Handled errors should not happen during subtasks execution. -// Only handle errors before subtasks execution and after subtasks execution. -func (e *BaseTaskExecutor) updateSubtask(err error) error { - task := e.taskBase.Load() - err = errors.Cause(err) - // TODO this branch is unreachable now, remove it when we refactor error handling. - if e.ctx.Err() != nil && context.Cause(e.ctx) == ErrCancelSubtask { - return e.cancelSubtaskWithRetry(e.ctx, task.ID, ErrCancelSubtask) - } else if e.IsRetryableError(err) { - e.logger.Warn("meet retryable error", zap.Error(err)) - } else if common.IsContextCanceledError(err) { - e.logger.Info("meet context canceled for gracefully shutdown", zap.Error(err)) - } else { - return e.failSubtaskWithRetry(e.ctx, task.ID, err) - } - return nil } diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index 132f798e64b60..d06d1e3bb795a 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -23,12 +23,11 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/mock" mockexecute "github.com/pingcap/tidb/pkg/disttask/framework/mock/execute" "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -60,14 +59,12 @@ func TestTaskExecutorRun(t *testing.T) { // 1. no taskExecutor constructor taskExecutorRegisterErr := errors.Errorf("constructor of taskExecutor for key not found") - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, taskExecutorRegisterErr).Times(2) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, taskExecutorRegisterErr) taskExecutor := NewBaseTaskExecutor(ctx, "id", task1, mockSubtaskTable) taskExecutor.Extension = mockExtension - err := taskExecutor.runStep(nil) + mockSubtaskTable.EXPECT().FailSubtask(gomock.Any(), taskExecutor.id, task1.ID, taskExecutorRegisterErr).Return(nil) + err := taskExecutor.RunStep(nil) require.EqualError(t, err, taskExecutorRegisterErr.Error()) - mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - err = taskExecutor.RunStep(nil) - require.NoError(t, err) require.True(t, ctrl.Satisfied()) // 2. init subtask exec env failed @@ -75,7 +72,8 @@ func TestTaskExecutorRun(t *testing.T) { initErr := errors.New("init error") mockStepExecutor.EXPECT().Init(gomock.Any()).Return(initErr) - err = taskExecutor.runStep(nil) + mockSubtaskTable.EXPECT().FailSubtask(gomock.Any(), taskExecutor.id, task1.ID, initErr).Return(nil) + err = taskExecutor.RunStep(nil) require.EqualError(t, err, initErr.Error()) require.True(t, ctrl.Satisfied()) @@ -175,56 +173,51 @@ func TestTaskExecutorRun(t *testing.T) { unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(ErrCancelSubtask) + mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(context.Context, *proto.Subtask) error { + taskExecutor.CancelRunningSubtask() + return ErrCancelSubtask + }) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", task1.ID, proto.SubtaskStateCanceled, gomock.Any()).Return(nil) mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err = taskExecutor.RunStep(nil) require.EqualError(t, err, ErrCancelSubtask.Error()) require.True(t, ctrl.Satisfied()) - // 7. RunSubtask return context.Canceled + // 7. RunSubtask return context.Canceled, for graceful shutdown mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(context.Canceled) + mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(context.Context, *proto.Subtask) error { + // we cancel the runStep context here to make sure the task executor + // can still be used later. in real world case we should cancel the task + // executor for graceful shutdown + taskExecutor.cancelRunStepWith(nil) + return context.Canceled + }) mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err = taskExecutor.RunStep(nil) require.EqualError(t, err, context.Canceled.Error()) require.True(t, ctrl.Satisfied()) - // 8. grpc cancel + // 8. grpc cancel, for graceful shutdown mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) grpcErr := status.Error(codes.Canceled, "test cancel") - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(grpcErr) + mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(context.Context, *proto.Subtask) error { + // same as previous case + taskExecutor.cancelRunStepWith(nil) + return grpcErr + }) mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err = taskExecutor.RunStep(nil) require.EqualError(t, err, grpcErr.Error()) require.True(t, ctrl.Satisfied()) - // 9. annotate grpc cancel - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) - grpcErr = status.Error(codes.Canceled, "test cancel") - annotatedError := errors.Annotatef( - grpcErr, - " %s", - "test annotate", - ) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(annotatedError) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.EqualError(t, err, annotatedError.Error()) - require.True(t, ctrl.Satisfied()) - // 10. subtask owned by other executor mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, @@ -299,7 +292,6 @@ func TestTaskExecutor(t *testing.T) { mockSubtaskTable := mock.NewMockTaskTable(ctrl) mockStepExecutor := mockexecute.NewMockStepExecutor(ctrl) mockExtension := mock.NewMockExtension(ctrl) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", taskID, proto.SubtaskStateFailed, gomock.Any()).Return(nil) mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() // mock for checkBalanceSubtask @@ -321,12 +313,13 @@ func TestTaskExecutor(t *testing.T) { unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), taskID, "id").Return(nil) mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", subtasks[0].ID, proto.SubtaskStateFailed, gomock.Any()).Return(nil) mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err := taskExecutor.runStep(nil) + err := taskExecutor.RunStep(nil) require.EqualError(t, err, runSubtaskErr.Error()) require.True(t, ctrl.Satisfied()) - // 2. run one subtask, then task moved to history(ErrTaskNotFound). + // 2. run one subtask, then no subtask anymore, show exit RunStep loop. mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", taskID, proto.StepOne, @@ -338,7 +331,7 @@ func TestTaskExecutor(t *testing.T) { mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", taskID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(nil, nil) mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.runStep(nil) + err = taskExecutor.RunStep(nil) require.NoError(t, err) require.True(t, ctrl.Satisfied()) } @@ -363,7 +356,6 @@ func TestRunStepCurrentSubtaskScheduledAway(t *testing.T) { task.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) // mock for runStep mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "tidb1", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) @@ -374,7 +366,7 @@ func TestRunStepCurrentSubtaskScheduledAway(t *testing.T) { return ctx.Err() }) mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - require.ErrorIs(t, taskExecutor.runStep(nil), context.Canceled) + require.ErrorIs(t, taskExecutor.RunStep(nil), context.Canceled) require.True(t, ctrl.Satisfied()) } @@ -389,52 +381,82 @@ func TestCheckBalanceSubtask(t *testing.T) { taskExecutor := NewBaseTaskExecutor(ctx, "tidb1", task, mockSubtaskTable) taskExecutor.Extension = mockExtension - // context canceled - canceledCtx, cancel := context.WithCancel(ctx) - cancel() - taskExecutor.checkBalanceSubtask(canceledCtx) - bak := checkBalanceSubtaskInterval + retryIntBak := scheduler.RetrySQLInterval t.Cleanup(func() { checkBalanceSubtaskInterval = bak + scheduler.RetrySQLInterval = retryIntBak }) checkBalanceSubtaskInterval = 100 * time.Millisecond + scheduler.RetrySQLInterval = time.Millisecond - // subtask scheduled away - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", - task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, errors.New("error")) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", - task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) - runCtx, cancelCause := context.WithCancelCause(ctx) - taskExecutor.registerRunStepCancelFunc(cancelCause) - require.NoError(t, runCtx.Err()) - taskExecutor.checkBalanceSubtask(ctx) - require.ErrorIs(t, runCtx.Err(), context.Canceled) - require.True(t, ctrl.Satisfied()) + t.Run("context cancelled", func(t *testing.T) { + // context canceled + canceledCtx, cancel := context.WithCancel(ctx) + cancel() + taskExecutor.checkBalanceSubtask(canceledCtx) + }) - subtasks := []*proto.Subtask{{SubtaskBase: proto.SubtaskBase{ID: 1, ExecID: "tidb1"}}} - // in-idempotent running subtask - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", - task.ID, task.Step, proto.SubtaskStateRunning).Return(subtasks, nil) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1", - subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil) - mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false) - taskExecutor.checkBalanceSubtask(ctx) - require.True(t, ctrl.Satisfied()) + t.Run("subtask scheduled away", func(t *testing.T) { + mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", + task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, errors.New("error")) + mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", + task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) + runCtx, cancelCause := context.WithCancelCause(ctx) + taskExecutor.registerRunStepCancelFunc(cancelCause) + require.NoError(t, runCtx.Err()) + taskExecutor.checkBalanceSubtask(ctx) + require.ErrorIs(t, runCtx.Err(), context.Canceled) + require.True(t, ctrl.Satisfied()) + }) - // current running subtask is skipped - require.Zero(t, taskExecutor.currSubtaskID.Load()) - taskExecutor.currSubtaskID.Store(1) - subtasks = []*proto.Subtask{{SubtaskBase: proto.SubtaskBase{ID: 1, ExecID: "tidb1"}}, {SubtaskBase: proto.SubtaskBase{ID: 2, ExecID: "tidb1"}}} - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", - task.ID, task.Step, proto.SubtaskStateRunning).Return(subtasks, nil) - mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true) - mockSubtaskTable.EXPECT().RunningSubtasksBack2Pending(gomock.Any(), []*proto.SubtaskBase{{ID: 2, ExecID: "tidb1"}}).Return(nil) - // used to break the loop - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", - task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, nil) - taskExecutor.checkBalanceSubtask(ctx) - require.True(t, ctrl.Satisfied()) + t.Run("non-idempotent running subtask", func(t *testing.T) { + subtasks := []*proto.Subtask{{SubtaskBase: proto.SubtaskBase{ID: 1, ExecID: "tidb1"}}} + // in-idempotent running subtask + mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", + task.ID, task.Step, proto.SubtaskStateRunning).Return(subtasks, nil) + mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false) + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1", + subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil) + taskExecutor.checkBalanceSubtask(ctx) + require.True(t, ctrl.Satisfied()) + + // if we failed to change state of non-idempotent subtask, will retry + retryCntBak := scheduler.RetrySQLTimes + t.Cleanup(func() { + scheduler.RetrySQLTimes = retryCntBak + }) + scheduler.RetrySQLTimes = 1 + mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", + task.ID, task.Step, proto.SubtaskStateRunning).Return(subtasks, nil) + mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false) + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1", + subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask). + Return(errors.New("some error")) + // retry part + mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", + task.ID, task.Step, proto.SubtaskStateRunning).Return(subtasks, nil) + mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false) + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1", + subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil) + taskExecutor.checkBalanceSubtask(ctx) + require.True(t, ctrl.Satisfied()) + }) + + t.Run("current running subtask is skipped", func(t *testing.T) { + require.Zero(t, taskExecutor.currSubtaskID.Load()) + taskExecutor.currSubtaskID.Store(1) + subtasks := []*proto.Subtask{{SubtaskBase: proto.SubtaskBase{ID: 1, ExecID: "tidb1"}}, {SubtaskBase: proto.SubtaskBase{ID: 2, ExecID: "tidb1"}}} + mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", + task.ID, task.Step, proto.SubtaskStateRunning).Return(subtasks, nil) + mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true) + mockSubtaskTable.EXPECT().RunningSubtasksBack2Pending(gomock.Any(), []*proto.SubtaskBase{{ID: 2, ExecID: "tidb1"}}).Return(nil) + // used to break the loop + mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", + task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, nil) + taskExecutor.checkBalanceSubtask(ctx) + require.True(t, ctrl.Satisfied()) + }) } func TestExecutorErrHandling(t *testing.T) { @@ -451,29 +473,13 @@ func TestExecutorErrHandling(t *testing.T) { taskExecutor := NewBaseTaskExecutor(ctx, "id", task, mockSubtaskTable) taskExecutor.Extension = mockExtension - // GetStepExecutor meet retryable error. - getSubtaskExecutorErr := errors.New("get executor err") - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, getSubtaskExecutorErr) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) - - // GetStepExecutor meet non retryable error. - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, getSubtaskExecutorErr) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) - mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), getSubtaskExecutorErr) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) - // Init meet retryable error. initErr := errors.New("executor init err") mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) - require.NoError(t, taskExecutor.RunStep(nil)) + require.ErrorIs(t, taskExecutor.RunStep(nil), initErr) require.True(t, ctrl.Satisfied()) // Init meet non retryable error. @@ -481,11 +487,11 @@ func TestExecutorErrHandling(t *testing.T) { mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) - mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), initErr) - require.NoError(t, taskExecutor.RunStep(nil)) + mockSubtaskTable.EXPECT().FailSubtask(gomock.Any(), taskExecutor.id, gomock.Any(), initErr) + require.ErrorIs(t, taskExecutor.RunStep(nil), initErr) require.True(t, ctrl.Satisfied()) - // Cleanup meet retryable error. + // Cleanup meet error. cleanupErr := errors.New("cleanup err") mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) @@ -500,26 +506,6 @@ func TestExecutorErrHandling(t *testing.T) { mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(nil, nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(cleanupErr) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) - - // Cleanup meet non retryable error. - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) - mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task.ID, "id").Return(nil) - mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().FinishSubtask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(cleanupErr) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) - mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), cleanupErr) require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) @@ -548,50 +534,3 @@ func TestInject(t *testing.T) { got := e.GetResource() require.Equal(t, r, got) } - -func throwError() error { - return errors.New("mock error") -} - -func callOnError(taskExecutor *BaseTaskExecutor) { - taskExecutor.onError(throwError()) -} - -func throwErrorNoTrace() error { - return errors.NewNoStackError("mock error") -} - -func callOnErrorNoTrace(taskExecutor *BaseTaskExecutor) { - taskExecutor.onError(throwErrorNoTrace()) -} - -func TestExecutorOnErrorLog(t *testing.T) { - taskExecutor := &BaseTaskExecutor{} - - observedZapCore, observedLogs := observer.New(zap.ErrorLevel) - observedLogger := zap.New(observedZapCore) - taskExecutor.logger = observedLogger - - { - callOnError(taskExecutor) - require.GreaterOrEqual(t, observedLogs.Len(), 1) - errLog := observedLogs.TakeAll()[0] - contextMap := errLog.ContextMap() - require.Contains(t, contextMap, "error stack") - errStack := contextMap["error stack"] - require.IsType(t, "", errStack) - errStackStr := errStack.(string) - require.Regexpf(t, `mock error[\n\t ]*`+ - `github\.com/pingcap/tidb/pkg/disttask/framework/taskexecutor\.throwError`, - errStackStr, - "got err stack: %s", errStackStr) - } - - { - callOnErrorNoTrace(taskExecutor) - require.GreaterOrEqual(t, observedLogs.Len(), 1) - errLog := observedLogs.TakeAll()[0] - contextMap := errLog.ContextMap() - require.NotContains(t, contextMap, "error stack") - } -} diff --git a/pkg/lightning/log/log.go b/pkg/lightning/log/log.go index ab72aacc0be94..fc41e480ee356 100644 --- a/pkg/lightning/log/log.go +++ b/pkg/lightning/log/log.go @@ -280,6 +280,25 @@ func (task *Task) End(level zapcore.Level, err error, extraFields ...zap.Field) return elapsed } +// End2 is similar to End except we don't check cancel, and we print full error. +func (task *Task) End2(level zapcore.Level, err error, extraFields ...zap.Field) time.Duration { + elapsed := time.Since(task.since) + var verb string + errField := zap.Skip() + adjustedLevel := task.level + verb = " completed" + if err != nil { + adjustedLevel = level + verb = " failed" + extraFields = nil + errField = zap.Error(err) + } + if ce := task.WithOptions(zap.AddCallerSkip(1)).Check(adjustedLevel, task.name+verb); ce != nil { + ce.Write(append(extraFields, zap.Duration("takeTime", elapsed), errField)...) + } + return elapsed +} + type ctxKeyType struct{} var ctxKey ctxKeyType diff --git a/tests/realtikvtest/addindextest1/BUILD.bazel b/tests/realtikvtest/addindextest1/BUILD.bazel index de6f27ec5ac04..8aa3af30880ab 100644 --- a/tests/realtikvtest/addindextest1/BUILD.bazel +++ b/tests/realtikvtest/addindextest1/BUILD.bazel @@ -12,6 +12,7 @@ go_test( "//pkg/config", "//pkg/ddl/ingest", "//pkg/disttask/framework/storage", + "//pkg/disttask/framework/taskexecutor", "//pkg/errno", "//pkg/meta/model", "//pkg/sessionctx/variable", diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index cf3e988b91b06..703254a223e06 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -96,11 +97,13 @@ func TestAddIndexDistBasic(t *testing.T) { tk.MustExec("admin check index t1 idx;") var counter atomic.Int32 - testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/changeRunSubtaskError", func(errP *error) { - if counter.Add(1) == 1 { - *errP = context.Canceled - } - }) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/changeRunSubtaskError", + func(e taskexecutor.TaskExecutor, errP *error) { + if counter.Add(1) == 1 { + *errP = context.Canceled + } + }, + ) tk.MustExec("alter table t1 add index idx1(a);") tk.MustExec("admin check index t1 idx1;") testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/changeRunSubtaskError")