Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: add on subtask finished interface for scheduler #43155

Merged
merged 5 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ func (b *backfillSchedulerHandle) SplitSubtask(_ context.Context, subtask []byte
return nil, consumer.getResult()
}

// OnSubtaskFinished implements the Scheduler interface.
func (*backfillSchedulerHandle) OnSubtaskFinished(context.Context, []byte) error {
return nil
}

// CleanupSubtaskExecEnv implements the Scheduler interface.
func (b *backfillSchedulerHandle) CleanupSubtaskExecEnv(context.Context) error {
logutil.BgLogger().Info("[ddl] lightning cleanup subtask exec env")
Expand Down
4 changes: 4 additions & 0 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (t *testScheduler) SplitSubtask(_ context.Context, subtask []byte) ([]proto
}, nil
}

func (t *testScheduler) OnSubtaskFinished(_ context.Context, _ []byte) error {
return nil
}

type testSubtaskExecutor struct {
v *atomic.Int64
}
Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Scheduler interface {
SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error)
// CleanupSubtaskExecEnv is used to clean up the environment for the subtask executor.
CleanupSubtaskExecEnv(context.Context) error
// OnSubtaskFinished is used to handle the subtask when it is finished.
OnSubtaskFinished(ctx context.Context, subtask []byte) error
// Rollback is used to rollback all subtasks.
Rollback(context.Context) error
}
Expand Down
10 changes: 8 additions & 2 deletions disttask/framework/scheduler/interface_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,20 @@ func (m *MockScheduler) InitSubtaskExecEnv(ctx context.Context) error {
}

// SplitSubtask implements Scheduler.SplitSubtask.
func (m *MockScheduler) SplitSubtask(_ context.Context, subtask []byte) ([]proto.MinimalTask, error) {
args := m.Called(subtask)
func (m *MockScheduler) SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error) {
args := m.Called(ctx, subtask)
if args.Error(1) != nil {
return nil, args.Error(1)
}
return args.Get(0).([]proto.MinimalTask), nil
}

// OnSubtaskFinished implements Scheduler.OnSubtaskFinished.
func (m *MockScheduler) OnSubtaskFinished(ctx context.Context, subtask []byte) error {
args := m.Called(ctx, subtask)
return args.Error(0)
}

// CleanupSubtaskExecEnv implements Scheduler.CleanupSubtaskExecEnv.
func (m *MockScheduler) CleanupSubtaskExecEnv(ctx context.Context) error {
args := m.Called(ctx)
Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) {
func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) {
tasks = m.filterAlreadyHandlingTasks(tasks)
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onRunnableTasks", zap.Any("task", task))
if _, ok := m.subtaskExecutorPools[task.Type]; !ok {
logutil.Logger(m.logCtx).Error("unknown task type", zap.String("type", task.Type))
continue
}
exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, proto.TaskStatePending, proto.TaskStateRevertPending)
if err != nil {
logutil.Logger(m.logCtx).Error("check subtask exist failed", zap.Error(err))
m.onError(err)
continue
}
Expand Down
11 changes: 10 additions & 1 deletion disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *InternalSchedulerImpl) Run(ctx context.Context, task *proto.Task) error
break
}

minimalTasks, err := scheduler.SplitSubtask(context.Background(), subtask.Meta)
minimalTasks, err := scheduler.SplitSubtask(runCtx, subtask.Meta)
if err != nil {
s.onError(err)
break
Expand All @@ -162,6 +162,15 @@ func (s *InternalSchedulerImpl) Run(ctx context.Context, task *proto.Task) error
}
break
}
if err := scheduler.OnSubtaskFinished(runCtx, subtask.Meta); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put this logic before line157, then we can remove line167-173

s.onError(err)
if errors.Cause(err) == context.Canceled {
s.updateSubtaskState(subtask.ID, proto.TaskStateCanceled)
} else {
s.updateSubtaskState(subtask.ID, proto.TaskStateFailed)
}
break
}
s.updateSubtaskState(subtask.ID, proto.TaskStateSucceed)
}

Expand Down
11 changes: 6 additions & 5 deletions disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
err = scheduler.Run(runCtx, &proto.Task{Type: tp, ID: taskID, Concurrency: concurrency})
Expand All @@ -105,7 +105,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(runSubtaskErr).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand All @@ -117,8 +117,9 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateSucceed).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(nil, nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand All @@ -130,7 +131,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}, MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}, MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(context.Canceled).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateCanceled).Return(nil).Once()
Expand Down Expand Up @@ -244,7 +245,7 @@ func TestScheduler(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(runSubtaskErr).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand Down
6 changes: 6 additions & 0 deletions disttask/loaddata/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func (s *ImportScheduler) SplitSubtask(ctx context.Context, bs []byte) ([]proto.
return miniTask, nil
}

// OnSubtaskFinished implements the Scheduler.OnSubtaskFinished interface.
func (s *ImportScheduler) OnSubtaskFinished(context.Context, []byte) error {
logutil.BgLogger().Info("OnSubtaskFinished", zap.Any("taskMeta", s.taskMeta))
return nil
}

// CleanupSubtaskExecEnv implements the Scheduler.CleanupSubtaskExecEnv interface.
func (s *ImportScheduler) CleanupSubtaskExecEnv(ctx context.Context) (err error) {
defer func() {
Expand Down