From dff057bd40eb699499ba65ec07b645d430968937 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 2 Dec 2024 12:06:51 +0800 Subject: [PATCH 1/4] change refresh task remove unused change change --- .../framework/mock/execute/execute_mock.go | 14 + .../framework/mock/task_executor_mock.go | 20 - pkg/disttask/framework/proto/step.go | 9 +- pkg/disttask/framework/proto/type.go | 2 +- pkg/disttask/framework/storage/task_table.go | 16 - .../taskexecutor/execute/interface.go | 15 +- .../framework/taskexecutor/interface.go | 1 - .../framework/taskexecutor/manager_test.go | 7 +- .../framework/taskexecutor/task_executor.go | 266 +++-- .../taskexecutor/task_executor_test.go | 929 ++++++++++-------- 10 files changed, 696 insertions(+), 583 deletions(-) diff --git a/pkg/disttask/framework/mock/execute/execute_mock.go b/pkg/disttask/framework/mock/execute/execute_mock.go index edd5af519dad1..196d26463393f 100644 --- a/pkg/disttask/framework/mock/execute/execute_mock.go +++ b/pkg/disttask/framework/mock/execute/execute_mock.go @@ -75,6 +75,20 @@ func (mr *MockStepExecutorMockRecorder) GetResource() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResource", reflect.TypeOf((*MockStepExecutor)(nil).GetResource)) } +// GetStep mocks base method. +func (m *MockStepExecutor) GetStep() proto.Step { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetStep") + ret0, _ := ret[0].(proto.Step) + return ret0 +} + +// GetStep indicates an expected call of GetStep. +func (mr *MockStepExecutorMockRecorder) GetStep() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStep", reflect.TypeOf((*MockStepExecutor)(nil).GetStep)) +} + // Init mocks base method. func (m *MockStepExecutor) Init(arg0 context.Context) error { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/mock/task_executor_mock.go b/pkg/disttask/framework/mock/task_executor_mock.go index 2ecd10ab0472a..f5e3a1bfc1ccb 100644 --- a/pkg/disttask/framework/mock/task_executor_mock.go +++ b/pkg/disttask/framework/mock/task_executor_mock.go @@ -194,26 +194,6 @@ func (mr *MockTaskTableMockRecorder) GetTasksInStates(arg0 any, arg1 ...any) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTasksInStates", reflect.TypeOf((*MockTaskTable)(nil).GetTasksInStates), varargs...) } -// HasSubtasksInStates mocks base method. -func (m *MockTaskTable) HasSubtasksInStates(arg0 context.Context, arg1 string, arg2 int64, arg3 proto.Step, arg4 ...proto.SubtaskState) (bool, error) { - m.ctrl.T.Helper() - varargs := []any{arg0, arg1, arg2, arg3} - for _, a := range arg4 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "HasSubtasksInStates", varargs...) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// HasSubtasksInStates indicates an expected call of HasSubtasksInStates. -func (mr *MockTaskTableMockRecorder) HasSubtasksInStates(arg0, arg1, arg2, arg3 any, arg4 ...any) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]any{arg0, arg1, arg2, arg3}, arg4...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasSubtasksInStates", reflect.TypeOf((*MockTaskTable)(nil).HasSubtasksInStates), varargs...) -} - // InitMeta mocks base method. func (m *MockTaskTable) InitMeta(arg0 context.Context, arg1, arg2 string) error { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/proto/step.go b/pkg/disttask/framework/proto/step.go index d045f71423f99..8f41c686fc2dc 100644 --- a/pkg/disttask/framework/proto/step.go +++ b/pkg/disttask/framework/proto/step.go @@ -47,10 +47,11 @@ func Step2Str(t TaskType, s Step) string { return fmt.Sprintf("unknown type %s", t) } -// Steps of example task type, they can either have 1 or 2 steps. +// Steps of example task type. const ( - StepOne Step = 1 - StepTwo Step = 2 + StepOne Step = 1 + StepTwo Step = 2 + StepThree Step = 3 ) func exampleStep2Str(s Step) string { @@ -59,6 +60,8 @@ func exampleStep2Str(s Step) string { return "one" case StepTwo: return "two" + case StepThree: + return "three" default: return fmt.Sprintf("unknown step %d", s) } diff --git a/pkg/disttask/framework/proto/type.go b/pkg/disttask/framework/proto/type.go index b3283dc74947b..c8137da4db34d 100644 --- a/pkg/disttask/framework/proto/type.go +++ b/pkg/disttask/framework/proto/type.go @@ -15,7 +15,7 @@ package proto const ( - // TaskTypeExample is TaskType of Example. + // TaskTypeExample is TaskType of Example, it's for test. TaskTypeExample TaskType = "Example" // ImportInto is TaskType of ImportInto. ImportInto TaskType = "ImportInto" diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 28d6703024fbe..38a601a31a6d1 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -567,22 +567,6 @@ func (mgr *TaskManager) GetSubtaskErrors(ctx context.Context, taskID int64) ([]e return subTaskErrors, nil } -// HasSubtasksInStates checks if there are subtasks in the states. -func (mgr *TaskManager) HasSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (bool, error) { - args := []any{tidbID, taskID, step} - for _, state := range states { - args = append(args, state) - } - rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select 1 from mysql.tidb_background_subtask - where exec_id = %? and task_key = %? and step = %? - and state in (`+strings.Repeat("%?,", len(states)-1)+"%?) limit 1", args...) - if err != nil { - return false, err - } - - return len(rs) > 0, nil -} - // UpdateSubtasksExecIDs update subtasks' execID. func (mgr *TaskManager) UpdateSubtasksExecIDs(ctx context.Context, subtasks []*proto.SubtaskBase) error { // skip the update process. diff --git a/pkg/disttask/framework/taskexecutor/execute/interface.go b/pkg/disttask/framework/taskexecutor/execute/interface.go index f3db0c9d4f8b6..7a88a76131640 100644 --- a/pkg/disttask/framework/taskexecutor/execute/interface.go +++ b/pkg/disttask/framework/taskexecutor/execute/interface.go @@ -47,6 +47,7 @@ type StepExecutor interface { // The subtask meta can be updated in place. only when OnFinished returns no // err, a subtask can be marked as 'success', if it returns error, the subtask // might be completely rerun, so don't put code that's prone to error in it. + // TODO merge with RunSubtask, seems no need to have a separate API. OnFinished(ctx context.Context, subtask *proto.Subtask) error // Cleanup is used to clean up the environment for this step. // the returned error will not affect task/subtask state, it's only logged, @@ -70,6 +71,8 @@ type StepExecFrameworkInfo interface { // interfaces, the implementation of other interface must embed // StepExecFrameworkInfo. restricted() + // GetStep returns the step. + GetStep() proto.Step // GetResource returns the expected resource of this step executor. GetResource() *proto.StepResource } @@ -77,21 +80,29 @@ type StepExecFrameworkInfo interface { var stepExecFrameworkInfoName = reflect.TypeFor[StepExecFrameworkInfo]().Name() type frameworkInfo struct { + step proto.Step resource *proto.StepResource } func (*frameworkInfo) restricted() {} +func (f *frameworkInfo) GetStep() proto.Step { + return f.step +} + func (f *frameworkInfo) GetResource() *proto.StepResource { return f.resource } // SetFrameworkInfo sets the framework info for the StepExecutor. -func SetFrameworkInfo(exec StepExecutor, resource *proto.StepResource) { +func SetFrameworkInfo(exec StepExecutor, step proto.Step, resource *proto.StepResource) { if exec == nil { return } - toInject := &frameworkInfo{resource: resource} + toInject := &frameworkInfo{ + step: step, + resource: resource, + } // use reflection to set the framework info e := reflect.ValueOf(exec) if e.Kind() == reflect.Ptr || e.Kind() == reflect.Interface { diff --git a/pkg/disttask/framework/taskexecutor/interface.go b/pkg/disttask/framework/taskexecutor/interface.go index 020896a2a18ee..ace8c63405b35 100644 --- a/pkg/disttask/framework/taskexecutor/interface.go +++ b/pkg/disttask/framework/taskexecutor/interface.go @@ -53,7 +53,6 @@ type TaskTable interface { // PauseSubtasks update subtasks state to paused. PauseSubtasks(ctx context.Context, execID string, taskID int64) error - HasSubtasksInStates(ctx context.Context, execID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (bool, error) // RunningSubtasksBack2Pending update the state of subtask which belongs to this // node from running to pending. // see subtask state machine for more detail. diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go index 8f74e7a730a4c..1bbaa2915d5a9 100644 --- a/pkg/disttask/framework/taskexecutor/manager_test.go +++ b/pkg/disttask/framework/taskexecutor/manager_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/mock" "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" @@ -464,11 +463,7 @@ func TestManagerInitMeta(t *testing.T) { require.NoError(t, m.InitMeta()) require.True(t, ctrl.Satisfied()) - bak := scheduler.RetrySQLTimes - t.Cleanup(func() { - scheduler.RetrySQLTimes = bak - }) - scheduler.RetrySQLTimes = 1 + reduceRetrySQLTimes(t, 1) mockTaskTable.EXPECT().InitMeta(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")) require.ErrorContains(t, m.InitMeta(), "mock err") require.True(t, ctrl.Satisfied()) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index b3d80e7cc2e75..39a5a64d669e4 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -58,10 +58,8 @@ var ( // BaseTaskExecutor is the base implementation of TaskExecutor. type BaseTaskExecutor struct { // id, it's the same as server id now, i.e. host:port. - id string - // we only store task base here to reduce overhead of refreshing it. - // task meta is loaded when we do execute subtasks, see GetStepExecutor. - taskBase atomic.Pointer[proto.TaskBase] + id string + task atomic.Pointer[proto.Task] taskTable TaskTable logger *zap.Logger ctx context.Context @@ -75,6 +73,10 @@ type BaseTaskExecutor struct { // runtimeCancel is used to cancel the Run/Rollback when error occurs. runtimeCancel context.CancelCauseFunc } + + stepExec execute.StepExecutor + stepCtx context.Context + stepLogger *llog.Task } // NewBaseTaskExecutor creates a new BaseTaskExecutor. @@ -94,7 +96,7 @@ func NewBaseTaskExecutor(ctx context.Context, id string, task *proto.Task, taskT cancel: cancelFunc, logger: logger, } - taskExecutorImpl.taskBase.Store(&task.TaskBase) + taskExecutorImpl.task.Store(task) return taskExecutorImpl } @@ -113,7 +115,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { case <-ticker.C: } - task := e.taskBase.Load() + task := e.task.Load() subtasks, err := e.taskTable.GetSubtasksByExecIDAndStepAndStates(ctx, e.id, task.ID, task.Step, proto.SubtaskStateRunning) if err != nil { @@ -192,34 +194,47 @@ func (e *BaseTaskExecutor) Ctx() context.Context { // Run implements the TaskExecutor interface. func (e *BaseTaskExecutor) Run(resource *proto.StepResource) { - var err error + defer func() { + if e.stepExec != nil { + e.cleanStepExecutor() + } + }() // task executor occupies resources, if there's no subtask to run for 10s, // we release the resources so that other tasks can use them. // 300ms + 600ms + 1.2s + 2s * 4 = 10.1s backoffer := backoff.NewExponential(SubtaskCheckInterval, 2, MaxSubtaskCheckInterval) checkInterval, noSubtaskCheckCnt := SubtaskCheckInterval, 0 + skipBackoff := false for { - select { - case <-e.ctx.Done(): + if e.ctx.Err() != nil { return - case <-time.After(checkInterval): } - if err = e.refreshTask(); err != nil { + if !skipBackoff { + select { + case <-e.ctx.Done(): + return + case <-time.After(checkInterval): + } + } + skipBackoff = false + if err := e.refreshTask(); err != nil { if errors.Cause(err) == storage.ErrTaskNotFound { return } e.logger.Error("refresh task failed", zap.Error(err)) continue } - task := e.taskBase.Load() + task := e.task.Load() if task.State != proto.TaskStateRunning { return } - if exist, err := e.taskTable.HasSubtasksInStates(e.ctx, e.id, task.ID, task.Step, - unfinishedSubtaskStates...); err != nil { - e.logger.Error("check whether there are subtasks to run failed", zap.Error(err)) + + subtask, err := e.taskTable.GetFirstSubtaskInStates(e.ctx, e.id, task.ID, task.Step, + proto.SubtaskStatePending, proto.SubtaskStateRunning) + if err != nil { + e.logger.Warn("get first subtask meets error", zap.Error(err)) continue - } else if !exist { + } else if subtask == nil { if noSubtaskCheckCnt >= maxChecksWhenNoSubtask { e.logger.Info("no subtask to run for a while, exit") break @@ -230,155 +245,134 @@ func (e *BaseTaskExecutor) Run(resource *proto.StepResource) { } // reset it when we get a subtask checkInterval, noSubtaskCheckCnt = SubtaskCheckInterval, 0 - err = e.RunStep(resource) + + if e.stepExec != nil && e.stepExec.GetStep() != subtask.Step { + e.cleanStepExecutor() + } + if e.stepExec == nil { + if err2 := e.createStepExecutor(resource); err2 != nil { + e.logger.Error("create step executor failed", + zap.String("step", proto.Step2Str(task.Type, task.Step)), zap.Error(err2)) + continue + } + } + err = e.runSubtask(subtask) if err != nil { - e.logger.Error("run task step failed", zap.Error(err)) + // task executor keeps running its subtasks even though some subtask + // might have failed, we rely on scheduler to detect the error, and + // notify task executor or manager to cancel. + e.logger.Error("run subtask failed", zap.Error(err)) + } else { + // if we run a subtask successfully, we will try to run next subtask + // immediately for once. + skipBackoff = true } } } -// RunStep start to fetch and run all subtasks for the step of task on the node. -// return if there's no subtask to run. -func (e *BaseTaskExecutor) RunStep(resource *proto.StepResource) (resErr error) { - defer func() { - if r := recover(); r != nil { - e.logger.Error("run step panicked", zap.Any("recover", r), zap.Stack("stack")) - err4Panic := errors.Errorf("%v", r) - taskBase := e.taskBase.Load() - e.failOneSubtask(e.ctx, taskBase.ID, err4Panic) - resErr = err4Panic - } - }() - runStepCtx, runStepCancel := context.WithCancelCause(e.ctx) - e.registerRunStepCancelFunc(runStepCancel) - defer func() { - runStepCancel(nil) - e.unregisterRunStepCancelFunc() - }() - taskBase := e.taskBase.Load() - task, err := e.taskTable.GetTaskByID(e.ctx, taskBase.ID) - if err != nil { - return errors.Trace(err) - } - stepLogger := llog.BeginTask(e.logger.With( - zap.String("step", proto.Step2Str(task.Type, task.Step)), - zap.Float64("mem-limit-percent", gctuner.GlobalMemoryLimitTuner.GetPercentage()), - zap.String("server-mem-limit", memory.ServerMemoryLimitOriginText.Load()), - zap.Stringer("resource", resource), - ), "execute task step") - // log as info level, subtask might be cancelled, let caller check it. - defer func() { - stepLogger.End(zap.InfoLevel, resErr) - }() +func (e *BaseTaskExecutor) createStepExecutor(resource *proto.StepResource) error { + task := e.task.Load() stepExecutor, err := e.GetStepExecutor(task) if err != nil { e.logger.Info("failed to get step executor", zap.Error(err)) - e.failOneSubtask(runStepCtx, task.ID, err) + e.failOneSubtask(e.ctx, task.ID, err) return errors.Trace(err) } - execute.SetFrameworkInfo(stepExecutor, resource) + execute.SetFrameworkInfo(stepExecutor, task.Step, resource) - if err := stepExecutor.Init(runStepCtx); err != nil { + if err := stepExecutor.Init(e.ctx); err != nil { if e.IsRetryableError(err) { e.logger.Info("meet retryable err when init step executor", zap.Error(err)) } else { e.logger.Info("failed to init step executor", zap.Error(err)) - e.failOneSubtask(runStepCtx, task.ID, err) + e.failOneSubtask(e.ctx, task.ID, err) } return errors.Trace(err) } - defer func() { - err := stepExecutor.Cleanup(runStepCtx) - if err != nil { - e.logger.Error("cleanup subtask exec env failed", zap.Error(err)) - // Cleanup is not a critical path of running subtask, so no need to - // affect state of subtasks. there might be no subtask to change even - // we want to if all subtasks are finished. - } - }() + stepLogger := llog.BeginTask(e.logger.With( + zap.String("step", proto.Step2Str(task.Type, task.Step)), + zap.Float64("mem-limit-percent", gctuner.GlobalMemoryLimitTuner.GetPercentage()), + zap.String("server-mem-limit", memory.ServerMemoryLimitOriginText.Load()), + zap.Stringer("resource", resource), + ), "execute task step") - for { - select { - case <-runStepCtx.Done(): - return runStepCtx.Err() - default: - } + runStepCtx, runStepCancel := context.WithCancelCause(e.ctx) + e.stepExec = stepExecutor + e.stepCtx = runStepCtx + e.stepLogger = stepLogger - subtask, err := e.taskTable.GetFirstSubtaskInStates(runStepCtx, e.id, task.ID, task.Step, - proto.SubtaskStatePending, proto.SubtaskStateRunning) - if err != nil { - e.logger.Warn("GetFirstSubtaskInStates meets error", zap.Error(err)) - continue - } - if subtask == nil { - return nil - } + e.mu.Lock() + defer e.mu.Unlock() + e.mu.runtimeCancel = runStepCancel - if subtask.State == proto.SubtaskStateRunning { - if !e.IsIdempotent(subtask) { - e.logger.Info("subtask in running state and is not idempotent, fail it", - zap.Int64("subtask-id", subtask.ID)) - if err := e.updateSubtaskStateAndErrorImpl(runStepCtx, subtask.ExecID, subtask.ID, - proto.SubtaskStateFailed, ErrNonIdempotentSubtask); err != nil { - return err - } - return ErrNonIdempotentSubtask - } - e.logger.Info("subtask in running state and is idempotent", + return nil +} + +func (e *BaseTaskExecutor) cleanStepExecutor() { + if err2 := e.stepExec.Cleanup(e.ctx); err2 != nil { + e.logger.Error("cleanup subtask exec env failed", zap.Error(err2)) + // Cleanup is not a critical path of running subtask, so no need to + // affect state of subtasks. there might be no subtask to change even + // we want to if all subtasks are finished. + } + e.stepExec = nil + e.stepLogger.End(zap.InfoLevel, nil) + + e.mu.Lock() + defer e.mu.Unlock() + e.mu.runtimeCancel(nil) + e.mu.runtimeCancel = nil +} + +func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) { + if subtask.State == proto.SubtaskStateRunning { + if !e.IsIdempotent(subtask) { + e.logger.Info("subtask in running state and is not idempotent, fail it", zap.Int64("subtask-id", subtask.ID)) - } else { - // subtask.State == proto.SubtaskStatePending - err := e.startSubtask(runStepCtx, subtask.ID) - if err != nil { - // should ignore ErrSubtaskNotFound - // since it only means that the subtask not owned by current task executor. - if err != storage.ErrSubtaskNotFound { - e.logger.Warn("start subtask meets error", zap.Error(err)) - } - continue + if err := e.updateSubtaskStateAndErrorImpl(e.stepCtx, subtask.ExecID, subtask.ID, + proto.SubtaskStateFailed, ErrNonIdempotentSubtask); err != nil { + return err } + return ErrNonIdempotentSubtask } - - failpoint.Inject("cancelBeforeRunSubtask", func() { - runStepCancel(nil) - }) - - if err := e.runSubtask(runStepCtx, stepExecutor, subtask); err != nil { - return err + e.logger.Info("subtask in running state and is idempotent", + zap.Int64("subtask-id", subtask.ID)) + } else { + // subtask.State == proto.SubtaskStatePending + err := e.startSubtask(e.stepCtx, subtask.ID) + if err != nil { + // should ignore ErrSubtaskNotFound + // since it only means that the subtask not owned by current task executor. + if err != storage.ErrSubtaskNotFound { + e.logger.Warn("start subtask meets error", zap.Error(err)) + } + return errors.Trace(err) } } -} - -func (e *BaseTaskExecutor) hasRealtimeSummary(stepExecutor execute.StepExecutor) bool { - _, ok := e.taskTable.(*storage.TaskManager) - return ok && stepExecutor.RealtimeSummary() != nil -} -func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute.StepExecutor, - subtask *proto.Subtask) error { - logger := e.logger.With(zap.Int64("subtaskID", subtask.ID)) + logger := e.logger.With(zap.Int64("subtaskID", subtask.ID), zap.String("step", proto.Step2Str(subtask.Type, subtask.Step))) logTask := llog.BeginTask(logger, "run subtask") subtaskErr := func() error { e.currSubtaskID.Store(subtask.ID) var wg util.WaitGroupWrapper - checkCtx, checkCancel := context.WithCancel(ctx) + checkCtx, checkCancel := context.WithCancel(e.stepCtx) wg.RunWithLog(func() { e.checkBalanceSubtask(checkCtx) }) - if e.hasRealtimeSummary(stepExecutor) { + if e.hasRealtimeSummary(e.stepExec) { wg.RunWithLog(func() { - e.updateSubtaskSummaryLoop(checkCtx, ctx, stepExecutor) + e.updateSubtaskSummaryLoop(checkCtx, e.stepCtx, e.stepExec) }) } defer func() { checkCancel() wg.Wait() }() - return stepExecutor.RunSubtask(ctx, subtask) + return e.stepExec.RunSubtask(e.stepCtx, subtask) }() failpoint.InjectCall("changeRunSubtaskError", e, &subtaskErr) logTask.End2(zap.InfoLevel, subtaskErr) @@ -386,25 +380,31 @@ func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute. failpoint.InjectCall("mockTiDBShutdown", e, e.id, e.GetTaskBase()) if subtaskErr != nil { - if err := e.markSubTaskCanceledOrFailed(ctx, subtask, subtaskErr); err != nil { + if err := e.markSubTaskCanceledOrFailed(e.stepCtx, subtask, subtaskErr); err != nil { logger.Error("failed to handle subtask error", zap.Error(err)) } return subtaskErr } failpoint.InjectCall("beforeCallOnSubtaskFinished", subtask) - if err := stepExecutor.OnFinished(ctx, subtask); err != nil { + if err := e.stepExec.OnFinished(e.stepCtx, subtask); err != nil { logger.Info("OnFinished failed", zap.Error(err)) return errors.Trace(err) } - err := e.finishSubtask(ctx, subtask) + err := e.finishSubtask(e.stepCtx, subtask) failpoint.InjectCall("syncAfterSubtaskFinish") return err } +func (e *BaseTaskExecutor) hasRealtimeSummary(stepExecutor execute.StepExecutor) bool { + _, ok := e.taskTable.(*storage.TaskManager) + return ok && stepExecutor.RealtimeSummary() != nil +} + // GetTaskBase implements TaskExecutor.GetTaskBase. func (e *BaseTaskExecutor) GetTaskBase() *proto.TaskBase { - return e.taskBase.Load() + task := e.task.Load() + return &task.TaskBase } // CancelRunningSubtask implements TaskExecutor.CancelRunningSubtask. @@ -424,27 +424,15 @@ func (e *BaseTaskExecutor) Close() { // refreshTask fetch task state from tidb_global_task table. func (e *BaseTaskExecutor) refreshTask() error { - task := e.GetTaskBase() - newTaskBase, err := e.taskTable.GetTaskBaseByID(e.ctx, task.ID) + oldTask := e.task.Load() + newTask, err := e.taskTable.GetTaskByID(e.ctx, oldTask.ID) if err != nil { return err } - e.taskBase.Store(newTaskBase) + e.task.Store(newTask) return nil } -func (e *BaseTaskExecutor) registerRunStepCancelFunc(cancel context.CancelCauseFunc) { - e.mu.Lock() - defer e.mu.Unlock() - e.mu.runtimeCancel = cancel -} - -func (e *BaseTaskExecutor) unregisterRunStepCancelFunc() { - e.mu.Lock() - defer e.mu.Unlock() - e.mu.runtimeCancel = nil -} - func (e *BaseTaskExecutor) cancelRunStepWith(cause error) { e.mu.Lock() defer e.mu.Unlock() diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index d06d1e3bb795a..ac3d9d43c8e50 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -28,8 +28,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) var ( @@ -38,336 +36,549 @@ var ( } ) -func TestTaskExecutorRun(t *testing.T) { - var tp proto.TaskType = "test_task_executor_run" - var concurrency = 10 - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockSubtaskTable := mock.NewMockTaskTable(ctrl) - mockStepExecutor := mockexecute.NewMockStepExecutor(ctrl) - mockExtension := mock.NewMockExtension(ctrl) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() - - task1 := &proto.Task{TaskBase: proto.TaskBase{State: proto.TaskStateRunning, Step: proto.StepOne, Type: tp, ID: 1, Concurrency: concurrency}} - // mock for checkBalanceSubtask - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", - task1.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{{SubtaskBase: proto.SubtaskBase{ID: 1}}}, nil).AnyTimes() - // mock GetTaskByID at beginning of runStep - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task1.ID).Return(task1, nil).AnyTimes() - - // 1. no taskExecutor constructor - taskExecutorRegisterErr := errors.Errorf("constructor of taskExecutor for key not found") - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, taskExecutorRegisterErr) - taskExecutor := NewBaseTaskExecutor(ctx, "id", task1, mockSubtaskTable) - taskExecutor.Extension = mockExtension - mockSubtaskTable.EXPECT().FailSubtask(gomock.Any(), taskExecutor.id, task1.ID, taskExecutorRegisterErr).Return(nil) - err := taskExecutor.RunStep(nil) - require.EqualError(t, err, taskExecutorRegisterErr.Error()) - require.True(t, ctrl.Satisfied()) - - // 2. init subtask exec env failed - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() - - initErr := errors.New("init error") - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(initErr) - mockSubtaskTable.EXPECT().FailSubtask(gomock.Any(), taskExecutor.id, task1.ID, initErr).Return(nil) - err = taskExecutor.RunStep(nil) - require.EqualError(t, err, initErr.Error()) - require.True(t, ctrl.Satisfied()) - - // 3. run subtask failed - runSubtaskErr := errors.New("run subtask error") - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", task1.ID, proto.SubtaskStateFailed, gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - - err = taskExecutor.RunStep(nil) - require.EqualError(t, err, runSubtaskErr.Error()) - require.True(t, ctrl.Satisfied()) - - // 4. run subtask success - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().FinishSubtask(gomock.Any(), "id", int64(1), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.NoError(t, err) - require.True(t, ctrl.Satisfied()) - - // 5. run subtask one by one - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - // first round of the run loop - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), int64(1), "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().FinishSubtask(gomock.Any(), "id", int64(1), gomock.Any()).Return(nil) - // second round of the run loop - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 2, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), int64(2), "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().FinishSubtask(gomock.Any(), "id", int64(2), gomock.Any()).Return(nil) - // third round of the run loop - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.NoError(t, err) - require.True(t, ctrl.Satisfied()) - - // run previous left subtask in running state again, but the subtask is not - // idempotent, so fail it. - subtaskID := int64(2) - theSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{ID: subtaskID, Type: tp, Step: proto.StepOne, State: proto.SubtaskStateRunning, ExecID: "id"}} - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(theSubtask, nil) - mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(false) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", subtaskID, proto.SubtaskStateFailed, gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.ErrorContains(t, err, "subtask in running state and is not idempotent") - require.True(t, ctrl.Satisfied()) - - // run previous left subtask in running state again, but the subtask idempotent, - // run it again. - theSubtask = &proto.Subtask{SubtaskBase: proto.SubtaskBase{ID: subtaskID, Type: tp, Step: proto.StepOne, State: proto.SubtaskStateRunning, ExecID: "id"}} - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - // first round of the run loop - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(theSubtask, nil) - mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().FinishSubtask(gomock.Any(), "id", subtaskID, gomock.Any()).Return(nil) - // second round of the run loop - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.NoError(t, err) - require.True(t, ctrl.Satisfied()) - - // 6. cancel - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(context.Context, *proto.Subtask) error { - taskExecutor.CancelRunningSubtask() - return ErrCancelSubtask - }) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", task1.ID, proto.SubtaskStateCanceled, gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.EqualError(t, err, ErrCancelSubtask.Error()) - require.True(t, ctrl.Satisfied()) - - // 7. RunSubtask return context.Canceled, for graceful shutdown - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(context.Context, *proto.Subtask) error { - // we cancel the runStep context here to make sure the task executor - // can still be used later. in real world case we should cancel the task - // executor for graceful shutdown - taskExecutor.cancelRunStepWith(nil) - return context.Canceled - }) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.EqualError(t, err, context.Canceled.Error()) - require.True(t, ctrl.Satisfied()) - - // 8. grpc cancel, for graceful shutdown - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(nil) - grpcErr := status.Error(codes.Canceled, "test cancel") - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(context.Context, *proto.Subtask) error { - // same as previous case - taskExecutor.cancelRunStepWith(nil) - return grpcErr +func reduceRetrySQLTimes(t *testing.T, target int) { + retryCntBak := scheduler.RetrySQLTimes + t.Cleanup(func() { + scheduler.RetrySQLTimes = retryCntBak }) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.EqualError(t, err, grpcErr.Error()) - require.True(t, ctrl.Satisfied()) - - // 10. subtask owned by other executor - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task1.ID, "id").Return(storage.ErrSubtaskNotFound) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.NoError(t, err) - require.True(t, ctrl.Satisfied()) - - // task not found when Run - mockSubtaskTable.EXPECT().GetTaskBaseByID(gomock.Any(), task1.ID).Return(nil, storage.ErrTaskNotFound) - taskExecutor.Run(nil) - require.True(t, ctrl.Satisfied()) - // task Succeed inside Run - task1.State = proto.TaskStateSucceed - mockSubtaskTable.EXPECT().GetTaskBaseByID(gomock.Any(), task1.ID).Return(&task1.TaskBase, nil) - taskExecutor.Run(nil) - require.True(t, ctrl.Satisfied()) - - task1.State = proto.TaskStateRunning - ReduceCheckInterval(t) + scheduler.RetrySQLTimes = target +} - // GetTaskBaseByID error, should continue - mockSubtaskTable.EXPECT().GetTaskBaseByID(gomock.Any(), task1.ID).Return(nil, errors.New("mock err")) - // HasSubtasksInStates error, should continue - mockSubtaskTable.EXPECT().GetTaskBaseByID(gomock.Any(), task1.ID).Return(&task1.TaskBase, nil) - mockSubtaskTable.EXPECT().HasSubtasksInStates(gomock.Any(), "id", task1.ID, task1.Step, - unfinishedNormalSubtaskStates...).Return(false, errors.New("failed to check")) - // no subtask to run, should exit the loop after some time. - mockSubtaskTable.EXPECT().GetTaskBaseByID(gomock.Any(), task1.ID).Return(&task1.TaskBase, nil).Times(8) - mockSubtaskTable.EXPECT().HasSubtasksInStates(gomock.Any(), "id", task1.ID, task1.Step, - unfinishedNormalSubtaskStates...).Return(false, nil).Times(8) - taskExecutor.Run(nil) - require.True(t, ctrl.Satisfied()) - - // no-subtask check counter should be reset after a subtask is run. - // loop 4 times without subtask, then 1 time with subtask. - mockSubtaskTable.EXPECT().GetTaskBaseByID(gomock.Any(), task1.ID).Return(&task1.TaskBase, nil).Times(4) - mockSubtaskTable.EXPECT().HasSubtasksInStates(gomock.Any(), "id", task1.ID, task1.Step, - unfinishedNormalSubtaskStates...).Return(false, nil).Times(4) - mockSubtaskTable.EXPECT().GetTaskBaseByID(gomock.Any(), task1.ID).Return(&task1.TaskBase, nil) - mockSubtaskTable.EXPECT().HasSubtasksInStates(gomock.Any(), "id", task1.ID, task1.Step, - unfinishedNormalSubtaskStates...).Return(true, nil) - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - // should loop for another 8 times - mockSubtaskTable.EXPECT().GetTaskBaseByID(gomock.Any(), task1.ID).Return(&task1.TaskBase, nil).Times(8) - mockSubtaskTable.EXPECT().HasSubtasksInStates(gomock.Any(), "id", task1.ID, task1.Step, - unfinishedNormalSubtaskStates...).Return(false, nil).Times(8) - taskExecutor.Run(nil) - require.True(t, ctrl.Satisfied()) - - taskExecutor.Cancel() - taskExecutor.Run(nil) - require.True(t, ctrl.Satisfied()) +type taskExecutorRunEnv struct { + ctrl *gomock.Controller + taskTable *mock.MockTaskTable + stepExecutor *mockexecute.MockStepExecutor + taskExecExt *mock.MockExtension + taskExecutor *BaseTaskExecutor + task1 *proto.Task + succeedTask1 *proto.Task + revertingTask1 *proto.Task + pendingSubtask1 *proto.Subtask + runningSubtask2 *proto.Subtask } -func TestTaskExecutor(t *testing.T) { - var tp proto.TaskType = "test_task_executor" - var taskID int64 = 1 - var concurrency = 10 - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func newTaskExecutorRunEnv(t *testing.T) *taskExecutorRunEnv { ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockSubtaskTable := mock.NewMockTaskTable(ctrl) - mockStepExecutor := mockexecute.NewMockStepExecutor(ctrl) - mockExtension := mock.NewMockExtension(ctrl) - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() - // mock for checkBalanceSubtask - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", - taskID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{{SubtaskBase: proto.SubtaskBase{ID: 1}}}, nil).AnyTimes() - - task := &proto.Task{TaskBase: proto.TaskBase{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}} - taskExecutor := NewBaseTaskExecutor(ctx, "id", task, mockSubtaskTable) - taskExecutor.Extension = mockExtension + taskTable := mock.NewMockTaskTable(ctrl) + stepExecutor := mockexecute.NewMockStepExecutor(ctrl) + taskExecExt := mock.NewMockExtension(ctrl) + + task1 := proto.Task{TaskBase: proto.TaskBase{State: proto.TaskStateRunning, Step: proto.StepOne, + Type: proto.TaskTypeExample, ID: 1, Concurrency: 10}} + taskExecutor := NewBaseTaskExecutor(context.Background(), "id", &task1, taskTable) + taskExecutor.Extension = taskExecExt - // 1. run failed. - runSubtaskErr := errors.New("run subtask error") - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - subtasks := []*proto.Subtask{ - {SubtaskBase: proto.SubtaskBase{ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, + t.Cleanup(func() { + ctrl.Finish() + }) + ReduceCheckInterval(t) + + succeedTask1 := task1 + succeedTask1.State = proto.TaskStateSucceed + revertingTask1 := task1 + revertingTask1.State = proto.TaskStateReverting + return &taskExecutorRunEnv{ + ctrl: ctrl, + taskTable: taskTable, + stepExecutor: stepExecutor, + taskExecExt: taskExecExt, + taskExecutor: taskExecutor, + task1: &task1, + succeedTask1: &succeedTask1, + revertingTask1: &revertingTask1, + pendingSubtask1: &proto.Subtask{SubtaskBase: proto.SubtaskBase{ + ID: 1, Type: task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, + runningSubtask2: &proto.Subtask{SubtaskBase: proto.SubtaskBase{ + ID: 2, Type: task1.Type, Step: proto.StepOne, State: proto.SubtaskStateRunning, ExecID: "id"}}, } - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", taskID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), taskID, "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", subtasks[0].ID, proto.SubtaskStateFailed, gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err := taskExecutor.RunStep(nil) - require.EqualError(t, err, runSubtaskErr.Error()) - require.True(t, ctrl.Satisfied()) - - // 2. run one subtask, then no subtask anymore, show exit RunStep loop. - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", taskID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), taskID, "id").Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().FinishSubtask(gomock.Any(), "id", int64(1), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", taskID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = taskExecutor.RunStep(nil) - require.NoError(t, err) - require.True(t, ctrl.Satisfied()) } -func TestRunStepCurrentSubtaskScheduledAway(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockSubtaskTable := mock.NewMockTaskTable(ctrl) - mockStepExecutor := mockexecute.NewMockStepExecutor(ctrl) - mockExtension := mock.NewMockExtension(ctrl) +func (e *taskExecutorRunEnv) mockForCheckBalanceSubtask() { + e.taskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates( + gomock.Any(), "id", e.task1.ID, proto.StepOne, proto.SubtaskStateRunning). + DoAndReturn(func(context.Context, string, int64, proto.Step, ...proto.SubtaskState) ([]*proto.Subtask, error) { + return []*proto.Subtask{{SubtaskBase: proto.SubtaskBase{ID: e.taskExecutor.currSubtaskID.Load()}}}, nil + }).AnyTimes() +} - task := &proto.Task{TaskBase: proto.TaskBase{Step: proto.StepOne, Type: "example", ID: 1, Concurrency: 1}} - subtasks := []*proto.Subtask{ - {SubtaskBase: proto.SubtaskBase{ID: 1, Type: "example", Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "tidb1"}}, - } - ctx := context.Background() - taskExecutor := NewBaseTaskExecutor(ctx, "tidb1", task, mockSubtaskTable) - taskExecutor.Extension = mockExtension +func TestTaskExecutorRun(t *testing.T) { + t.Run("context done when run", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskExecutor.cancel() + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("task not found when run", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(nil, storage.ErrTaskNotFound) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("task state is not running when run", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.revertingTask1, nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("retry on error of GetTaskByID", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(nil, errors.New("some err")) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(nil, errors.New("some err")) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("retry on error of GetFirstSubtaskInStates", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + for i := 0; i < 3; i++ { + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(nil, errors.New("some err")) + } + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("get step executor failed", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + taskExecutorRegisterErr := errors.Errorf("constructor of taskExecutor for key not found") + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, taskExecutorRegisterErr) + e.taskTable.EXPECT().FailSubtask(gomock.Any(), e.taskExecutor.id, e.task1.ID, taskExecutorRegisterErr).Return(nil) + // used to break the loop, below too + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("non retryable step executor Init error", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + initErr := errors.New("init error") + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(initErr) + e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(false) + e.taskTable.EXPECT().FailSubtask(gomock.Any(), e.taskExecutor.id, e.task1.ID, initErr).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) - // mock for checkBalanceSubtask - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", - task.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) - // mock for runStep - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil) - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "tidb1", task.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task.ID, "tidb1").Return(nil) - mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockStepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subtask *proto.Subtask) error { - <-ctx.Done() - return ctx.Err() + t.Run("retryable step executor Init error", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + initErr := errors.New("init error") + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(initErr) + e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("run one subtask failed with non-retryable error", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + runSubtaskErr := errors.New("run subtask error") + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) + e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(false) + e.taskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", e.task1.ID, proto.SubtaskStateFailed, gomock.Any()).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("run one subtask failed with retryable error, success after retry 3 times", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + runSubtaskErr := errors.New("run subtask error") + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) + e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true) + // already started by prev StartSubtask + runningSubtask := *e.pendingSubtask1 + runningSubtask.State = proto.SubtaskStateRunning + for i := 0; i < 3; i++ { + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(&runningSubtask, nil) + e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne) + e.taskExecExt.EXPECT().IsIdempotent(gomock.Any()).Return(true) + if i < 2 { + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) + e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true) + } else { + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", e.pendingSubtask1.ID, gomock.Any()).Return(nil) + } + } + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("subtask scheduled away during running, keep running next subtask", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + // mock for checkBalanceSubtask, returns empty subtask list + e.taskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", + e.task1.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) + // this subtask is scheduled awsy during running + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subtask *proto.Subtask) error { + <-ctx.Done() + return ctx.Err() + }) + // keep running next subtask + nextSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{ + ID: 2, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}} + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(nextSubtask, nil) + e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), nextSubtask.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), nextSubtask).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), nextSubtask).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", nextSubtask.ID, gomock.Any()).Return(nil) + // exit + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("run one subtask success", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", int64(1), gomock.Any()).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("run subtasks one by one, and exit due to no subtask to run for a while", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + for i := 0; i < 5; i++ { + subtaskID := int64(i + 1) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + theSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{ + ID: subtaskID, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}} + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(theSubtask, nil) + if i == 0 { + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + } else { + e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne) + } + e.taskTable.EXPECT().StartSubtask(gomock.Any(), subtaskID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), theSubtask).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), theSubtask).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", subtaskID, gomock.Any()).Return(nil) + } + // exit due to no subtask to run for a while + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil).Times(8) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(nil, nil).Times(8) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("run subtasks step by step", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + idAlloc := int64(1) + var currStepExecStep proto.Step + for i, s := range []struct { + step proto.Step + count int + }{ + {proto.StepOne, 5}, + {proto.StepTwo, 1}, + {proto.StepThree, 3}, + } { + taskOfStep := *e.task1 + taskOfStep.Step = s.step + for j := 0; j < s.count; j++ { + subtaskID := idAlloc + idAlloc++ + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), taskOfStep.ID).Return(&taskOfStep, nil) + theSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{ + ID: subtaskID, Type: taskOfStep.Type, Step: s.step, State: proto.SubtaskStatePending, ExecID: "id"}} + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", taskOfStep.ID, s.step, + unfinishedNormalSubtaskStates...).Return(theSubtask, nil) + if j == 0 { + if i != 0 { + e.stepExecutor.EXPECT().GetStep().Return(currStepExecStep) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + } + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + currStepExecStep = s.step + } else { + e.stepExecutor.EXPECT().GetStep().Return(currStepExecStep) + } + e.taskTable.EXPECT().StartSubtask(gomock.Any(), subtaskID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), theSubtask).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), theSubtask).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", subtaskID, gomock.Any()).Return(nil) + } + } + // end the loop + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("step executor cleanup failed, keeps running", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), e.pendingSubtask1).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", int64(1), gomock.Any()).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + step2Subtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{ + ID: 2, Type: e.task1.Type, Step: proto.StepTwo, State: proto.SubtaskStatePending, ExecID: "id"}} + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(step2Subtask, nil) + e.stepExecutor.EXPECT().GetStep().Return(e.pendingSubtask1.Step) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(errors.New("some error")) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), step2Subtask.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), step2Subtask).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), step2Subtask).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", step2Subtask.ID, gomock.Any()).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(errors.New("some error 2")) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("run previous left non-idempotent subtask in running state, fail it.", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + subtaskID := int64(2) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.runningSubtask2, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskExecExt.EXPECT().IsIdempotent(gomock.Any()).Return(false) + e.taskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", subtaskID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("run previous left idempotent subtask in running state, run it again.", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + subtaskID := int64(2) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.runningSubtask2, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + // first round of the run loop + e.taskExecExt.EXPECT().IsIdempotent(gomock.Any()).Return(true) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", subtaskID, gomock.Any()).Return(nil) + // second round of the run loop + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("subtask cancelled during running", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(context.Context, *proto.Subtask) error { + e.taskExecutor.CancelRunningSubtask() + return ErrCancelSubtask + }) + e.taskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", e.task1.ID, proto.SubtaskStateCanceled, nil).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(nil, storage.ErrTaskNotFound) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("task executor cancelled for graceful shutdown during subtask running", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(func(context.Context, *proto.Subtask) error { + e.taskExecutor.Cancel() + return context.Canceled + }) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("subtask scheduled away right before we start it", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(storage.ErrSubtaskNotFound) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("start subtask failed after retry, will try again", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + reduceRetrySQLTimes(t, 1) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(errors.New("some error")) + // second round of the run loop + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), e.pendingSubtask1).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", e.pendingSubtask1.ID, gomock.Any()).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("OnFinished failed for task, will run again", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), e.pendingSubtask1).Return(errors.New("some error")) + // second round of the run loop + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), e.pendingSubtask1).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", e.pendingSubtask1.ID, gomock.Any()).Return(nil) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("no subtask to run, should exit the loop after some time", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil).Times(8) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(nil, nil).Times(8) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + + t.Run("no-subtask check counter should be reset after a subtask is run.", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + // loop 4 times without subtask, then 1 time with subtask. + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil).Times(4) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(nil, nil).Times(4) + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) + e.stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", e.pendingSubtask1.ID, gomock.Any()).Return(nil) + // loop for 8 times without subtask, and exit + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil).Times(8) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(nil, nil).Times(8) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) }) - mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - require.ErrorIs(t, taskExecutor.RunStep(nil), context.Canceled) - require.True(t, ctrl.Satisfied()) } func TestCheckBalanceSubtask(t *testing.T) { @@ -403,7 +614,7 @@ func TestCheckBalanceSubtask(t *testing.T) { mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) runCtx, cancelCause := context.WithCancelCause(ctx) - taskExecutor.registerRunStepCancelFunc(cancelCause) + taskExecutor.mu.runtimeCancel = cancelCause require.NoError(t, runCtx.Err()) taskExecutor.checkBalanceSubtask(ctx) require.ErrorIs(t, runCtx.Err(), context.Canceled) @@ -422,11 +633,7 @@ func TestCheckBalanceSubtask(t *testing.T) { require.True(t, ctrl.Satisfied()) // if we failed to change state of non-idempotent subtask, will retry - retryCntBak := scheduler.RetrySQLTimes - t.Cleanup(func() { - scheduler.RetrySQLTimes = retryCntBak - }) - scheduler.RetrySQLTimes = 1 + reduceRetrySQLTimes(t, 1) mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, task.Step, proto.SubtaskStateRunning).Return(subtasks, nil) mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false) @@ -459,78 +666,10 @@ func TestCheckBalanceSubtask(t *testing.T) { }) } -func TestExecutorErrHandling(t *testing.T) { - var tp proto.TaskType = "test_task_executor" - var concurrency = 10 - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockSubtaskTable := mock.NewMockTaskTable(ctrl) - mockSubtaskExecutor := mockexecute.NewMockStepExecutor(ctrl) - mockExtension := mock.NewMockExtension(ctrl) - task := &proto.Task{TaskBase: proto.TaskBase{Step: proto.StepOne, Type: tp, ID: 1, Concurrency: concurrency}} - taskExecutor := NewBaseTaskExecutor(ctx, "id", task, mockSubtaskTable) - taskExecutor.Extension = mockExtension - - // Init meet retryable error. - initErr := errors.New("executor init err") - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) - mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) - require.ErrorIs(t, taskExecutor.RunStep(nil), initErr) - require.True(t, ctrl.Satisfied()) - - // Init meet non retryable error. - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) - mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) - mockSubtaskTable.EXPECT().FailSubtask(gomock.Any(), taskExecutor.id, gomock.Any(), initErr) - require.ErrorIs(t, taskExecutor.RunStep(nil), initErr) - require.True(t, ctrl.Satisfied()) - - // Cleanup meet error. - cleanupErr := errors.New("cleanup err") - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) - mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task.ID, "id").Return(nil) - mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().FinishSubtask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(cleanupErr) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) - - // subtask succeed. - mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) - mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) - mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task.ID, "id").Return(nil) - mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().FinishSubtask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) -} - func TestInject(t *testing.T) { e := &EmptyStepExecutor{} r := &proto.StepResource{CPU: proto.NewAllocatable(1)} - execute.SetFrameworkInfo(e, r) + execute.SetFrameworkInfo(e, proto.StepOne, r) got := e.GetResource() require.Equal(t, r, got) } From d1f310b8d1005b6438d2b69f3699e1fcaf2dc96e Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 3 Dec 2024 14:13:45 +0800 Subject: [PATCH 2/4] change --- .../framework/taskexecutor/task_executor.go | 6 +++++ .../taskexecutor/task_executor_test.go | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 39a5a64d669e4..1cf97b07cfd58 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -195,6 +195,12 @@ func (e *BaseTaskExecutor) Ctx() context.Context { // Run implements the TaskExecutor interface. func (e *BaseTaskExecutor) Run(resource *proto.StepResource) { defer func() { + if r := recover(); r != nil { + e.logger.Error("run task panicked, fail the task", zap.Any("recover", r), zap.Stack("stack")) + err4Panic := errors.Errorf("%v", r) + taskBase := e.task.Load() + e.failOneSubtask(e.ctx, taskBase.ID, err4Panic) + } if e.stepExec != nil { e.cleanStepExecutor() } diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index ac3d9d43c8e50..dceda8703c67b 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -211,6 +211,32 @@ func TestTaskExecutorRun(t *testing.T) { require.True(t, e.ctrl.Satisfied()) }) + t.Run("run subtask panic, fail the entire task", func(t *testing.T) { + e := newTaskExecutorRunEnv(t) + e.mockForCheckBalanceSubtask() + + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) + e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil) + e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).DoAndReturn( + func(context.Context, *proto.Subtask) error { + panic("run subtask panic") + }, + ) + e.taskTable.EXPECT().FailSubtask(gomock.Any(), "id", e.task1.ID, gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, _ int64, err error) error { + require.ErrorContains(t, err, "run subtask panic") + return nil + }, + ) + e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + e.taskExecutor.Run(nil) + require.True(t, e.ctrl.Satisfied()) + }) + t.Run("run one subtask failed with retryable error, success after retry 3 times", func(t *testing.T) { e := newTaskExecutorRunEnv(t) e.mockForCheckBalanceSubtask() From 7bec2e9c1906f02c13672906c2fd45b2daa3ca9a Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 3 Dec 2024 20:05:32 +0800 Subject: [PATCH 3/4] change --- pkg/disttask/framework/taskexecutor/BUILD.bazel | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/disttask/framework/taskexecutor/BUILD.bazel b/pkg/disttask/framework/taskexecutor/BUILD.bazel index 031a5b368f0b6..cf76a554bc2a8 100644 --- a/pkg/disttask/framework/taskexecutor/BUILD.bazel +++ b/pkg/disttask/framework/taskexecutor/BUILD.bazel @@ -50,7 +50,7 @@ go_test( ], embed = [":taskexecutor"], flaky = True, - shard_count = 16, + shard_count = 13, deps = [ "//pkg/disttask/framework/mock", "//pkg/disttask/framework/mock/execute", @@ -69,8 +69,6 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//util", - "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//status", "@org_uber_go_goleak//:goleak", "@org_uber_go_mock//gomock", ], From 81e53aee036b21d31ad11b1577de5d47811eb9bb Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 3 Dec 2024 23:16:39 +0800 Subject: [PATCH 4/4] fix test --- pkg/disttask/framework/integrationtests/BUILD.bazel | 1 + .../framework/integrationtests/framework_test.go | 10 +++++++--- pkg/disttask/framework/storage/table_test.go | 11 ----------- pkg/disttask/framework/testutil/BUILD.bazel | 1 + pkg/disttask/framework/testutil/disttest_util.go | 12 ++++++++---- pkg/disttask/framework/testutil/executor_util.go | 5 ++++- 6 files changed, 21 insertions(+), 19 deletions(-) diff --git a/pkg/disttask/framework/integrationtests/BUILD.bazel b/pkg/disttask/framework/integrationtests/BUILD.bazel index 0ad38d06efa16..5290394c61708 100644 --- a/pkg/disttask/framework/integrationtests/BUILD.bazel +++ b/pkg/disttask/framework/integrationtests/BUILD.bazel @@ -28,6 +28,7 @@ go_test( "//pkg/disttask/framework/scheduler/mock", "//pkg/disttask/framework/storage", "//pkg/disttask/framework/taskexecutor", + "//pkg/disttask/framework/taskexecutor/execute", "//pkg/disttask/framework/testutil", "//pkg/domain", "//pkg/session", diff --git a/pkg/disttask/framework/integrationtests/framework_test.go b/pkg/disttask/framework/integrationtests/framework_test.go index 9eb26feb2279c..40bef11005606 100644 --- a/pkg/disttask/framework/integrationtests/framework_test.go +++ b/pkg/disttask/framework/integrationtests/framework_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" "github.com/pingcap/tidb/pkg/disttask/framework/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" @@ -45,8 +46,9 @@ func registerExampleTask(t testing.TB, ctrl *gomock.Controller, schedulerExt sch if runSubtaskFn == nil { runSubtaskFn = getCommonSubtaskRunFn(testContext) } - stepExecutor := testutil.GetCommonStepExecutor(ctrl, runSubtaskFn) - executorExt := testutil.GetCommonTaskExecutorExt(ctrl, stepExecutor) + executorExt := testutil.GetCommonTaskExecutorExt(ctrl, func(task *proto.Task) (execute.StepExecutor, error) { + return testutil.GetCommonStepExecutor(ctrl, task.Step, runSubtaskFn), nil + }) testutil.RegisterExampleTask(t, schedulerExt, executorExt, testutil.GetCommonCleanUpRoutine(ctrl)) } @@ -174,7 +176,9 @@ func TestFrameworkSubTaskInitEnvFailed(t *testing.T) { schedulerExt := testutil.GetMockBasicSchedulerExt(c.MockCtrl) stepExec := mockexecute.NewMockStepExecutor(c.MockCtrl) stepExec.EXPECT().Init(gomock.Any()).Return(errors.New("mockExecSubtaskInitEnvErr")).AnyTimes() - executorExt := testutil.GetCommonTaskExecutorExt(c.MockCtrl, stepExec) + executorExt := testutil.GetCommonTaskExecutorExt(c.MockCtrl, func(task *proto.Task) (execute.StepExecutor, error) { + return stepExec, nil + }) testutil.RegisterExampleTask(t, schedulerExt, executorExt, testutil.GetCommonCleanUpRoutine(c.MockCtrl)) task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1) require.Equal(t, proto.TaskStateReverted, task.State) diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index 37a1a7d280b93..13cc884163838 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -518,10 +518,6 @@ func TestSubTaskTable(t *testing.T) { require.Len(t, cntByStates, 1) require.Equal(t, int64(1), cntByStates[proto.SubtaskStatePending]) - ok, err := sm.HasSubtasksInStates(ctx, "tidb1", 1, proto.StepOne, proto.SubtaskStatePending) - require.NoError(t, err) - require.True(t, ok) - ts := time.Now() time.Sleep(time.Second) require.NoError(t, sm.StartSubtask(ctx, 1, "tidb1")) @@ -555,15 +551,8 @@ func TestSubTaskTable(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(0), cntByStates[proto.SubtaskStatePending]) - ok, err = sm.HasSubtasksInStates(ctx, "tidb1", 1, proto.StepOne, proto.SubtaskStatePending) - require.NoError(t, err) - require.False(t, ok) require.NoError(t, testutil.DeleteSubtasksByTaskID(ctx, sm, 1)) - ok, err = sm.HasSubtasksInStates(ctx, "tidb1", 1, proto.StepOne, proto.SubtaskStatePending, proto.SubtaskStateRunning) - require.NoError(t, err) - require.False(t, ok) - testutil.CreateSubTask(t, sm, 2, proto.StepOne, "tidb1", []byte("test"), proto.TaskTypeExample, 11) subtasks, err := sm.GetAllSubtasksByStepAndState(ctx, 2, proto.StepOne, proto.SubtaskStateSucceed) diff --git a/pkg/disttask/framework/testutil/BUILD.bazel b/pkg/disttask/framework/testutil/BUILD.bazel index a606528a86bcd..d442f345a409d 100644 --- a/pkg/disttask/framework/testutil/BUILD.bazel +++ b/pkg/disttask/framework/testutil/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/disttask/framework/scheduler/mock", "//pkg/disttask/framework/storage", "//pkg/disttask/framework/taskexecutor", + "//pkg/disttask/framework/taskexecutor/execute", "//pkg/kv", "//pkg/sessionctx", "//pkg/store/mockstore", diff --git a/pkg/disttask/framework/testutil/disttest_util.go b/pkg/disttask/framework/testutil/disttest_util.go index 57280dd574930..06e5fe52df141 100644 --- a/pkg/disttask/framework/testutil/disttest_util.go +++ b/pkg/disttask/framework/testutil/disttest_util.go @@ -25,24 +25,26 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) // GetCommonTaskExecutorExt returns a common task executor extension. -func GetCommonTaskExecutorExt(ctrl *gomock.Controller, stepExec *mockexecute.MockStepExecutor) *mock.MockExtension { +func GetCommonTaskExecutorExt(ctrl *gomock.Controller, getStepExecFn func(*proto.Task) (execute.StepExecutor, error)) *mock.MockExtension { executorExt := mock.NewMockExtension(ctrl) executorExt.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes() - executorExt.EXPECT().GetStepExecutor(gomock.Any()).Return(stepExec, nil).AnyTimes() + executorExt.EXPECT().GetStepExecutor(gomock.Any()).DoAndReturn(getStepExecFn).AnyTimes() executorExt.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() return executorExt } // GetCommonStepExecutor returns one mock subtaskExecutor. -func GetCommonStepExecutor(ctrl *gomock.Controller, runSubtaskFn func(ctx context.Context, subtask *proto.Subtask) error) *mockexecute.MockStepExecutor { +func GetCommonStepExecutor(ctrl *gomock.Controller, step proto.Step, runSubtaskFn func(ctx context.Context, subtask *proto.Subtask) error) *mockexecute.MockStepExecutor { executor := mockexecute.NewMockStepExecutor(ctrl) executor.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() executor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(runSubtaskFn).AnyTimes() + executor.EXPECT().GetStep().Return(step).AnyTimes() executor.EXPECT().Cleanup(gomock.Any()).Return(nil).AnyTimes() executor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() executor.EXPECT().RealtimeSummary().Return(nil).AnyTimes() @@ -99,7 +101,9 @@ func RegisterTaskTypeForRollback(t testing.TB, ctrl *gomock.Controller, schedule testContext.CollectSubtask(subtask) return nil } - executorExt := GetCommonTaskExecutorExt(ctrl, GetCommonStepExecutor(ctrl, subtaskRunFn)) + executorExt := GetCommonTaskExecutorExt(ctrl, func(task *proto.Task) (execute.StepExecutor, error) { + return GetCommonStepExecutor(ctrl, task.Step, subtaskRunFn), nil + }) RegisterExampleTask(t, schedulerExt, executorExt, GetCommonCleanUpRoutine(ctrl)) } diff --git a/pkg/disttask/framework/testutil/executor_util.go b/pkg/disttask/framework/testutil/executor_util.go index 8338e1e1646b3..90a82ccfb6818 100644 --- a/pkg/disttask/framework/testutil/executor_util.go +++ b/pkg/disttask/framework/testutil/executor_util.go @@ -19,12 +19,15 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" "go.uber.org/mock/gomock" ) // InitTaskExecutor inits all mock components for TaskExecutor. func InitTaskExecutor(ctrl *gomock.Controller, runSubtaskFn func(ctx context.Context, subtask *proto.Subtask) error) { - executorExt := GetCommonTaskExecutorExt(ctrl, GetCommonStepExecutor(ctrl, runSubtaskFn)) + executorExt := GetCommonTaskExecutorExt(ctrl, func(task *proto.Task) (execute.StepExecutor, error) { + return GetCommonStepExecutor(ctrl, task.Step, runSubtaskFn), nil + }) taskexecutor.RegisterTaskType(proto.TaskTypeExample, func(ctx context.Context, id string, task *proto.Task, taskTable taskexecutor.TaskTable) taskexecutor.TaskExecutor { s := taskexecutor.NewBaseTaskExecutor(ctx, id, task, taskTable)