Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: remove revert subtask #50550

Merged
merged 8 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,16 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
subtaskMetas, err := sch.OnNextSubtasksBatch(ctx, sch, task, execIDs, sch.GetNextStep(task))
require.NoError(t, err)
require.Len(t, subtaskMetas, 1)
task.Step = ext.GetNextStep(task)
require.Equal(t, ddl.StepReadIndex, task.Step)
nextStep := ext.GetNextStep(task)
require.Equal(t, ddl.StepReadIndex, nextStep)
// update task/subtask, and finish subtask, so we can go to next stage
subtasks := make([]*proto.Subtask, 0, len(subtaskMetas))
for _, m := range subtaskMetas {
subtasks = append(subtasks, proto.NewSubtask(task.Step, task.ID, task.Type, "", 1, m, 0))
for i, m := range subtaskMetas {
subtasks = append(subtasks, proto.NewSubtask(nextStep, task.ID, task.Type, "", 1, m, i+1))
}
_, err = mgr.UpdateTaskAndAddSubTasks(ctx, task, subtasks, proto.TaskStatePending)
err = mgr.SwitchTaskStep(ctx, task, proto.TaskStatePending, nextStep, subtasks)
require.NoError(t, err)
task.Step = nextStep
gotSubtasks, err := mgr.GetSubtasksWithHistory(ctx, taskID, ddl.StepReadIndex)
require.NoError(t, err)

Expand Down Expand Up @@ -218,16 +219,17 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, sch, task, execIDs, ext.GetNextStep(task))
require.NoError(t, err)
require.Len(t, subtaskMetas, 1)
task.Step = ext.GetNextStep(task)
require.Equal(t, ddl.StepMergeSort, task.Step)
nextStep = ext.GetNextStep(task)
require.Equal(t, ddl.StepMergeSort, nextStep)

// update meta, same as import into.
subtasks = make([]*proto.Subtask, 0, len(subtaskMetas))
for _, m := range subtaskMetas {
subtasks = append(subtasks, proto.NewSubtask(task.Step, task.ID, task.Type, "", 1, m, 0))
for i, m := range subtaskMetas {
subtasks = append(subtasks, proto.NewSubtask(nextStep, task.ID, task.Type, "", 1, m, i+1))
}
_, err = mgr.UpdateTaskAndAddSubTasks(ctx, task, subtasks, proto.TaskStatePending)
err = mgr.SwitchTaskStepInBatch(ctx, task, proto.TaskStatePending, nextStep, subtasks)
require.NoError(t, err)
task.Step = nextStep
gotSubtasks, err = mgr.GetSubtasksWithHistory(ctx, taskID, task.Step)
require.NoError(t, err)
mergeSortStepMeta := &ddl.BackfillSubTaskMeta{
Expand Down
5 changes: 0 additions & 5 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,3 @@ func (*cloudImportExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) er
logutil.Logger(ctx).Info("cloud import executor finish subtask")
return nil
}

func (*cloudImportExecutor) Rollback(ctx context.Context) error {
logutil.Logger(ctx).Info("cloud import executor rollback subtask")
return nil
}
5 changes: 0 additions & 5 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,3 @@ func (m *mergeSortExecutor) OnFinished(ctx context.Context, subtask *proto.Subta
subtask.Meta = newMeta
return nil
}

func (*mergeSortExecutor) Rollback(ctx context.Context) error {
logutil.Logger(ctx).Info("merge sort executor rollback backfill add index task")
return nil
}
6 changes: 0 additions & 6 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,6 @@ func (r *readIndexExecutor) OnFinished(ctx context.Context, subtask *proto.Subta
return nil
}

func (r *readIndexExecutor) Rollback(ctx context.Context) error {
logutil.Logger(ctx).Info("read index executor rollback backfill add index task",
zap.String("category", "ddl"), zap.Int64("jobID", r.job.ID))
return nil
}

func (r *readIndexExecutor) getTableStartEndKey(sm *BackfillSubTaskMeta) (
start, end kv.Key, tbl table.PhysicalTable, err error) {
currentVer, err1 := getValidCurrentVersion(r.d.store)
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/framework_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,5 @@ func TestFrameworkRollback(t *testing.T) {

task := testutil.SubmitAndWaitTask(ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
require.Equal(t, int32(2), testContext.RollbackCnt.Load())
testContext.RollbackCnt.Store(0)
distContext.Close()
}
14 changes: 0 additions & 14 deletions pkg/disttask/framework/mock/execute/execute_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 28 additions & 30 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 8 additions & 25 deletions pkg/disttask/framework/proto/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,13 @@ import (
// │ ┌────────┐
// └───────►│canceled│
// └────────┘
//
// for reverting subtask:
//
// ┌──────────────┐ ┌─────────┐ ┌─────────┐
// │revert_pending├───►│reverting├──►│ reverted│
// └──────────────┘ └────┬────┘ └─────────┘
// │ ┌─────────────┐
// └────────►│revert_failed│
// └─────────────┘
// 1. succeed/failed: pending -> running -> succeed/failed
// 2. canceled: pending -> running -> canceled
// 3. rollback: revert_pending -> reverting -> reverted/revert_failed
// 4. pause/resume: pending -> running -> paused -> running
const (
SubtaskStatePending SubtaskState = "pending"
SubtaskStateRunning SubtaskState = "running"
SubtaskStateSucceed SubtaskState = "succeed"
SubtaskStateFailed SubtaskState = "failed"
SubtaskStateCanceled SubtaskState = "canceled"
SubtaskStatePaused SubtaskState = "paused"
SubtaskStateRevertPending SubtaskState = "revert_pending"
SubtaskStateReverting SubtaskState = "reverting"
SubtaskStateReverted SubtaskState = "reverted"
SubtaskStateRevertFailed SubtaskState = "revert_failed"
SubtaskStatePending SubtaskState = "pending"
SubtaskStateRunning SubtaskState = "running"
SubtaskStateSucceed SubtaskState = "succeed"
SubtaskStateFailed SubtaskState = "failed"
SubtaskStateCanceled SubtaskState = "canceled"
SubtaskStatePaused SubtaskState = "paused"
)

type (
Expand Down Expand Up @@ -117,8 +100,8 @@ func (t *Subtask) String() string {

// IsDone checks if the subtask is done.
func (t *Subtask) IsDone() bool {
return t.State == SubtaskStateSucceed || t.State == SubtaskStateReverted || t.State == SubtaskStateCanceled ||
t.State == SubtaskStateFailed || t.State == SubtaskStateRevertFailed
return t.State == SubtaskStateSucceed || t.State == SubtaskStateCanceled ||
t.State == SubtaskStateFailed
}

// NewSubtask create a new subtask.
Expand Down
4 changes: 0 additions & 4 deletions pkg/disttask/framework/proto/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ func TestSubtaskIsDone(t *testing.T) {
{SubtaskStatePending, false},
{SubtaskStateRunning, false},
{SubtaskStateSucceed, true},
{SubtaskStateReverting, false},
{SubtaskStateRevertPending, false},
{SubtaskStateFailed, true},
{SubtaskStateRevertFailed, true},
{SubtaskStatePaused, false},
{SubtaskStateReverted, true},
{SubtaskStateCanceled, true},
}
for _, c := range cases {
Expand Down
61 changes: 29 additions & 32 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,37 @@ import (

// task state machine
//
// ┌────────┐
// ┌───────────│resuming│◄────────┐
// │ └────────┘ │
// ┌──────┐ │ ┌───────┐ ┌──┴───┐
// │failed│ │ ┌────────►│pausing├──────►│paused│
// └──────┘ │ │ └───────┘ └──────┘
// ▲ ▼ │
// ┌──┴────┐ ┌───┴───┐ ┌────────┐
// │pending├────►│running├────►│succeed │
// └──┬────┘ └──┬┬───┘ └────────┘
// │ ││ ┌─────────┐ ┌────────┐
// │ │└────────►│reverting├────►│reverted│
// │ ▼ └────┬────┘ └────────┘
// │ ┌──────────┐ ▲ │ ┌─────────────┐
// └─────────►│cancelling├────┘ └─────────►│revert_failed│
// └──────────┘ └─────────────┘
// 1. succeed: pending -> running -> succeed
// 2. failed: pending -> running -> reverting -> reverted/revert_failed, pending -> failed
// 3. canceled: pending -> running -> cancelling -> reverting -> reverted/revert_failed
// 4. pause/resume: pending -> running -> pausing -> paused -> running
// Note: if task fail during running, it will end with `reverted` state.
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
// The `failed` state is used to mean the framework cannot run the task, such as
// invalid task type, scheduler init error(fatal), etc.
//
// TODO: we don't have revert_failed task for now.
// ┌────────┐
// ┌───────────│resuming│◄────────┐
// │ └────────┘ │
// ┌──────┐ │ ┌───────┐ ┌──┴───┐
// │failed│ │ ┌────────►│pausing├──────►│paused│
// └──────┘ │ │ └───────┘ └──────┘
// ▲ ▼ │
// ┌──┴────┐ ┌───┴───┐ ┌────────┐
// │pending├────►│running├────►│succeed │
// └──┬────┘ └──┬┬───┘ └────────┘
// │ ││ ┌─────────┐ ┌────────┐
// │ │└────────►│reverting├────►│reverted│
// │ ▼ └─────────┘ └────────┘
// │ ┌──────────┐ ▲
// └─────────►│cancelling├────┘
// └──────────┘
const (
TaskStatePending TaskState = "pending"
TaskStateRunning TaskState = "running"
TaskStateSucceed TaskState = "succeed"
TaskStateFailed TaskState = "failed"
TaskStateReverting TaskState = "reverting"
TaskStateReverted TaskState = "reverted"
TaskStateRevertFailed TaskState = "revert_failed"
TaskStateCancelling TaskState = "cancelling"
TaskStatePausing TaskState = "pausing"
TaskStatePaused TaskState = "paused"
TaskStateResuming TaskState = "resuming"
TaskStatePending TaskState = "pending"
TaskStateRunning TaskState = "running"
TaskStateSucceed TaskState = "succeed"
TaskStateFailed TaskState = "failed"
TaskStateReverting TaskState = "reverting"
TaskStateReverted TaskState = "reverted"
TaskStateCancelling TaskState = "cancelling"
TaskStatePausing TaskState = "pausing"
TaskStatePaused TaskState = "paused"
TaskStateResuming TaskState = "resuming"
)

type (
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/proto/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func TestTaskIsDone(t *testing.T) {
{TaskStateSucceed, true},
{TaskStateReverting, false},
{TaskStateFailed, true},
{TaskStateRevertFailed, false},
{TaskStateCancelling, false},
{TaskStatePausing, false},
{TaskStatePaused, false},
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ go_test(
embed = [":scheduler"],
flaky = True,
race = "off",
shard_count = 30,
shard_count = 29,
deps = [
"//pkg/config",
"//pkg/disttask/framework/mock",
Expand Down
9 changes: 5 additions & 4 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,25 @@ type TaskManager interface {
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.Task, error)
GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
UpdateTaskAndAddSubTasks(ctx context.Context, task *proto.Task, subtasks []*proto.Subtask, prevState proto.TaskState) (bool, error)
GCSubtasks(ctx context.Context) error
GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error)
DeleteDeadNodes(ctx context.Context, nodes []string) error
// TransferTask2History transfer tasks and it's related subtasks to history tables.
// TransferTasks2History transfer tasks, and it's related subtasks to history tables.
TransferTasks2History(ctx context.Context, tasks []*proto.Task) error
// CancelTask updated task state to canceling.
CancelTask(ctx context.Context, taskID int64) error
// FailTask updates task state to Failed and updates task error.
FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error
// RevertTask updates task state to reverting, and task error.
RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error
// RevertedTask updates task state to reverted.
RevertedTask(ctx context.Context, taskID int64) error
// PauseTask updated task state to pausing.
PauseTask(ctx context.Context, taskKey string) (bool, error)
// PausedTask updated task state to paused.
PausedTask(ctx context.Context, taskID int64) error
// ResumedTask updated task state from resuming to running.
ResumedTask(ctx context.Context, taskID int64) error
// SucceedTask updates a task to success state.
SucceedTask(ctx context.Context, taskID int64) error
// SwitchTaskStep switches the task to the next step and add subtasks in one
Expand Down Expand Up @@ -80,8 +83,6 @@ type TaskManager interface {
// else we use nodes without role.
// returned nodes are sorted by node id(host:port).
GetManagedNodes(ctx context.Context) ([]proto.ManagedNode, error)
// GetTaskExecutorIDsByTaskID gets the task executor IDs of the given task ID.
GetTaskExecutorIDsByTaskID(ctx context.Context, taskID int64) ([]string, error)

// GetAllSubtasksByStepAndState gets all subtasks by given states for one step.
GetAllSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error)
Expand Down
Loading