Skip to content

Commit

Permalink
disttask: handle left subtask in running state (#47300)
Browse files Browse the repository at this point in the history
close #46735
  • Loading branch information
D3Hunter authored Sep 26, 2023
1 parent 70b0c1f commit 56a1af7
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 65 deletions.
4 changes: 4 additions & 0 deletions ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *pr
}
}

func (*backfillDistScheduler) IsIdempotent(*proto.Subtask) bool {
return true
}

func (s *backfillDistScheduler) Close() {
if s.backendCtx != nil {
ingest.LitBackCtxMgr.Unregister(s.jobID)
Expand Down
14 changes: 14 additions & 0 deletions disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions disttask/framework/scheduler/execute/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ type SubtaskExecutor interface {
// The subtask meta can be updated in place.
OnFinished(ctx context.Context, subtask *proto.Subtask) error
// Rollback is used to roll back all subtasks.
// TODO: right now all impl of Rollback is empty, maybe we can remove it.
Rollback(context.Context) error
}
5 changes: 5 additions & 0 deletions disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ type Scheduler interface {
// Extension extends the scheduler.
// each task type should implement this interface.
type Extension interface {
// IsIdempotent returns whether the subtask is idempotent.
// when tidb restart, the subtask might be left in the running state.
// if it's idempotent, the scheduler can rerun the subtask, else
// the scheduler will mark the subtask as failed.
IsIdempotent(subtask *proto.Subtask) bool
// GetSubtaskExecutor returns the subtask executor for the subtask.
// Note: summary is the summary manager of all subtask of the same type now.
GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error)
Expand Down
10 changes: 8 additions & 2 deletions disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) {
func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) {
tasks = m.filterAlreadyHandlingTasks(tasks)
for _, task := range tasks {
exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRevertPending)
exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRevertPending,
// for the case that the tidb is restarted when the subtask is running.
proto.TaskStateRunning, proto.TaskStateReverting)
if err != nil {
logutil.Logger(m.logCtx).Error("check subtask exist failed", zap.Error(err))
m.onError(err)
Expand Down Expand Up @@ -323,7 +326,10 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
zap.Int64("task-id", task.ID), zap.Int64("step", task.Step), zap.String("state", task.State))
return
}
if exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRevertPending); err != nil {
if exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRevertPending,
// for the case that the tidb is restarted when the subtask is running.
proto.TaskStateRunning, proto.TaskStateReverting); err != nil {
m.onError(err)
return
} else if !exist {
Expand Down
35 changes: 20 additions & 15 deletions disttask/framework/scheduler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
"go.uber.org/mock/gomock"
)

var unfinishedSubtaskStates = []interface{}{
proto.TaskStatePending, proto.TaskStateRevertPending,
proto.TaskStateRunning, proto.TaskStateReverting,
}

func getPoolRunFn() (*sync.WaitGroup, func(f func()) error) {
wg := &sync.WaitGroup{}
return wg, func(f func()) error {
Expand Down Expand Up @@ -120,43 +125,43 @@ func TestOnRunnableTasks(t *testing.T) {
// get subtask failed
mockInternalScheduler.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).
unfinishedSubtaskStates...).
Return(false, errors.New("get subtask failed"))
mockInternalScheduler.EXPECT().Close()
m.onRunnableTasks(context.Background(), []*proto.Task{task})

// no subtask
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).Return(false, nil)
unfinishedSubtaskStates...).Return(false, nil)
m.onRunnableTasks(context.Background(), []*proto.Task{task})

// pool error
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).Return(true, nil)
unfinishedSubtaskStates...).Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).Return(errors.New("pool error"))
m.onRunnableTasks(context.Background(), []*proto.Task{task})

// step 0 succeed
// StepOne succeed
wg, runFn := getPoolRunFn()
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).Return(true, nil)
unfinishedSubtaskStates...).Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
mockTaskTable.EXPECT().GetGlobalTaskByID(taskID).Return(task, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).Return(true, nil)
unfinishedSubtaskStates...).Return(true, nil)
mockInternalScheduler.EXPECT().Run(gomock.Any(), task).Return(nil)

// step 1 canceled
// StepTwo failed
task1 := &proto.Task{ID: taskID, State: proto.TaskStateRunning, Step: proto.StepTwo}
mockTaskTable.EXPECT().GetGlobalTaskByID(taskID).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepTwo,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).Return(true, nil)
unfinishedSubtaskStates...).Return(true, nil)
mockInternalScheduler.EXPECT().Run(gomock.Any(), task1).Return(errors.New("run err"))

task2 := &proto.Task{ID: taskID, State: proto.TaskStateReverting, Step: proto.StepTwo}
mockTaskTable.EXPECT().GetGlobalTaskByID(taskID).Return(task2, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepTwo,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).Return(true, nil)
unfinishedSubtaskStates...).Return(true, nil)
mockInternalScheduler.EXPECT().Rollback(gomock.Any(), task2).Return(nil)

task3 := &proto.Task{ID: taskID, State: proto.TaskStateReverted, Step: proto.StepTwo}
Expand Down Expand Up @@ -199,33 +204,33 @@ func TestManager(t *testing.T) {
mockInternalScheduler.EXPECT().Init(gomock.Any()).Return(nil)
// task1
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID1, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).
unfinishedSubtaskStates...).
Return(true, nil)
wg, runFn := getPoolRunFn()
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
mockTaskTable.EXPECT().GetGlobalTaskByID(taskID1).Return(task1, nil).AnyTimes()
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID1, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).
unfinishedSubtaskStates...).
Return(true, nil)
mockInternalScheduler.EXPECT().Run(gomock.Any(), task1).Return(nil)

mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID1, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).
unfinishedSubtaskStates...).
Return(false, nil).AnyTimes()
mockInternalScheduler.EXPECT().Close()
// task2
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
mockTaskTable.EXPECT().GetGlobalTaskByID(taskID2).Return(task2, nil).AnyTimes()
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).
unfinishedSubtaskStates...).
Return(true, nil)
mockInternalScheduler.EXPECT().Init(gomock.Any()).Return(nil)
mockInternalScheduler.EXPECT().Rollback(gomock.Any(), task2).Return(nil)
mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne,
[]interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}).
unfinishedSubtaskStates...).
Return(false, nil).AnyTimes()
mockInternalScheduler.EXPECT().Close()
// task3
Expand Down
49 changes: 35 additions & 14 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error {
wg.Wait()
}()

subtasks, err := s.taskTable.GetSubtasksInStates(s.id, task.ID, task.Step, proto.TaskStatePending)
subtasks, err := s.taskTable.GetSubtasksInStates(s.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRunning)
if err != nil {
s.onError(err)
return s.getError()
Expand All @@ -180,7 +181,8 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error {
break
}

subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending)
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRunning)
if err != nil {
logutil.Logger(s.logCtx).Warn("GetFirstSubtaskInStates meets error", zap.Error(err))
continue
Expand All @@ -198,10 +200,23 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error {
continue
}

s.startSubtaskAndUpdateState(subtask)
if err := s.getError(); err != nil {
logutil.Logger(s.logCtx).Warn("startSubtaskAndUpdateState meets error", zap.Error(err))
continue
if subtask.State == proto.TaskStateRunning {
if !s.IsIdempotent(subtask) {
logutil.Logger(s.logCtx).Info("subtask in running state and is not idempotent, fail it",
zap.Int64("subtask-id", subtask.ID))
subtaskErr := errors.New("subtask in running state and is not idempotent")
s.onError(subtaskErr)
s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, subtaskErr)
s.markErrorHandled()
break
}
} else {
// subtask.State == proto.TaskStatePending
s.startSubtaskAndUpdateState(subtask)
if err := s.getError(); err != nil {
logutil.Logger(s.logCtx).Warn("startSubtaskAndUpdateState meets error", zap.Error(err))
continue
}
}

failpoint.Inject("mockCleanScheduler", func() {
Expand All @@ -218,8 +233,8 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error {
return s.getError()
}

func (s *BaseScheduler) runSubtask(ctx context.Context, scheduler execute.SubtaskExecutor, subtask *proto.Subtask) {
err := scheduler.RunSubtask(ctx, subtask)
func (s *BaseScheduler) runSubtask(ctx context.Context, executor execute.SubtaskExecutor, subtask *proto.Subtask) {
err := executor.RunSubtask(ctx, subtask)
failpoint.Inject("MockRunSubtaskCancel", func(val failpoint.Value) {
if val.(bool) {
err = context.Canceled
Expand Down Expand Up @@ -289,12 +304,12 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, scheduler execute.Subtas
}
}
})
s.onSubtaskFinished(ctx, scheduler, subtask)
s.onSubtaskFinished(ctx, executor, subtask)
}

func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, scheduler execute.SubtaskExecutor, subtask *proto.Subtask) {
func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, executor execute.SubtaskExecutor, subtask *proto.Subtask) {
if err := s.getError(); err == nil {
if err = scheduler.OnFinished(ctx, subtask); err != nil {
if err = executor.OnFinished(ctx, subtask); err != nil {
s.onError(err)
}
}
Expand Down Expand Up @@ -330,7 +345,8 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {

// We should cancel all subtasks before rolling back
for {
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRunning)
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRunning)
if err != nil {
s.onError(err)
return s.getError()
Expand All @@ -351,7 +367,8 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {
s.onError(err)
return s.getError()
}
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStateRevertPending)
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step,
proto.TaskStateRevertPending, proto.TaskStateReverting)
if err != nil {
s.onError(err)
return s.getError()
Expand All @@ -360,11 +377,15 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {
logutil.BgLogger().Warn("scheduler rollback a step, but no subtask in revert_pending state", zap.Any("step", task.Step))
return nil
}
s.updateSubtaskStateAndError(subtask, proto.TaskStateReverting, nil)
if subtask.State == proto.TaskStateRevertPending {
s.updateSubtaskStateAndError(subtask, proto.TaskStateReverting, nil)
}
if err := s.getError(); err != nil {
return err
}

// right now all impl of Rollback is empty, so we don't check idempotent here.
// will try to remove this rollback completely in the future.
err = executor.Rollback(rollbackCtx)
if err != nil {
s.updateSubtaskStateAndError(subtask, proto.TaskStateRevertFailed, nil)
Expand Down
Loading

0 comments on commit 56a1af7

Please sign in to comment.