Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dxf: merge loops in Run and RunStep of task executor #57944

Merged
merged 4 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/disttask/framework/integrationtests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
"//pkg/disttask/framework/scheduler/mock",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/taskexecutor",
"//pkg/disttask/framework/taskexecutor/execute",
"//pkg/disttask/framework/testutil",
"//pkg/domain",
"//pkg/session",
Expand Down
10 changes: 7 additions & 3 deletions pkg/disttask/framework/integrationtests/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
Expand All @@ -45,8 +46,9 @@ func registerExampleTask(t testing.TB, ctrl *gomock.Controller, schedulerExt sch
if runSubtaskFn == nil {
runSubtaskFn = getCommonSubtaskRunFn(testContext)
}
stepExecutor := testutil.GetCommonStepExecutor(ctrl, runSubtaskFn)
executorExt := testutil.GetCommonTaskExecutorExt(ctrl, stepExecutor)
executorExt := testutil.GetCommonTaskExecutorExt(ctrl, func(task *proto.Task) (execute.StepExecutor, error) {
return testutil.GetCommonStepExecutor(ctrl, task.Step, runSubtaskFn), nil
})
testutil.RegisterExampleTask(t, schedulerExt, executorExt, testutil.GetCommonCleanUpRoutine(ctrl))
}

Expand Down Expand Up @@ -174,7 +176,9 @@ func TestFrameworkSubTaskInitEnvFailed(t *testing.T) {
schedulerExt := testutil.GetMockBasicSchedulerExt(c.MockCtrl)
stepExec := mockexecute.NewMockStepExecutor(c.MockCtrl)
stepExec.EXPECT().Init(gomock.Any()).Return(errors.New("mockExecSubtaskInitEnvErr")).AnyTimes()
executorExt := testutil.GetCommonTaskExecutorExt(c.MockCtrl, stepExec)
executorExt := testutil.GetCommonTaskExecutorExt(c.MockCtrl, func(task *proto.Task) (execute.StepExecutor, error) {
return stepExec, nil
})
testutil.RegisterExampleTask(t, schedulerExt, executorExt, testutil.GetCommonCleanUpRoutine(c.MockCtrl))
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
Expand Down
14 changes: 14 additions & 0 deletions pkg/disttask/framework/mock/execute/execute_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 0 additions & 20 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions pkg/disttask/framework/proto/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ func Step2Str(t TaskType, s Step) string {
return fmt.Sprintf("unknown type %s", t)
}

// Steps of example task type, they can either have 1 or 2 steps.
// Steps of example task type.
const (
StepOne Step = 1
StepTwo Step = 2
StepOne Step = 1
StepTwo Step = 2
StepThree Step = 3
)

func exampleStep2Str(s Step) string {
Expand All @@ -59,6 +60,8 @@ func exampleStep2Str(s Step) string {
return "one"
case StepTwo:
return "two"
case StepThree:
return "three"
default:
return fmt.Sprintf("unknown step %d", s)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/proto/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package proto

const (
// TaskTypeExample is TaskType of Example.
// TaskTypeExample is TaskType of Example, it's for test.
TaskTypeExample TaskType = "Example"
// ImportInto is TaskType of ImportInto.
ImportInto TaskType = "ImportInto"
Expand Down
11 changes: 0 additions & 11 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,6 @@ func TestSubTaskTable(t *testing.T) {
require.Len(t, cntByStates, 1)
require.Equal(t, int64(1), cntByStates[proto.SubtaskStatePending])

ok, err := sm.HasSubtasksInStates(ctx, "tidb1", 1, proto.StepOne, proto.SubtaskStatePending)
require.NoError(t, err)
require.True(t, ok)

ts := time.Now()
time.Sleep(time.Second)
require.NoError(t, sm.StartSubtask(ctx, 1, "tidb1"))
Expand Down Expand Up @@ -555,15 +551,8 @@ func TestSubTaskTable(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(0), cntByStates[proto.SubtaskStatePending])

ok, err = sm.HasSubtasksInStates(ctx, "tidb1", 1, proto.StepOne, proto.SubtaskStatePending)
require.NoError(t, err)
require.False(t, ok)
require.NoError(t, testutil.DeleteSubtasksByTaskID(ctx, sm, 1))

ok, err = sm.HasSubtasksInStates(ctx, "tidb1", 1, proto.StepOne, proto.SubtaskStatePending, proto.SubtaskStateRunning)
require.NoError(t, err)
require.False(t, ok)

testutil.CreateSubTask(t, sm, 2, proto.StepOne, "tidb1", []byte("test"), proto.TaskTypeExample, 11)

subtasks, err := sm.GetAllSubtasksByStepAndState(ctx, 2, proto.StepOne, proto.SubtaskStateSucceed)
Expand Down
16 changes: 0 additions & 16 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,22 +567,6 @@ func (mgr *TaskManager) GetSubtaskErrors(ctx context.Context, taskID int64) ([]e
return subTaskErrors, nil
}

// HasSubtasksInStates checks if there are subtasks in the states.
func (mgr *TaskManager) HasSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (bool, error) {
args := []any{tidbID, taskID, step}
for _, state := range states {
args = append(args, state)
}
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select 1 from mysql.tidb_background_subtask
where exec_id = %? and task_key = %? and step = %?
and state in (`+strings.Repeat("%?,", len(states)-1)+"%?) limit 1", args...)
if err != nil {
return false, err
}

return len(rs) > 0, nil
}

// UpdateSubtasksExecIDs update subtasks' execID.
func (mgr *TaskManager) UpdateSubtasksExecIDs(ctx context.Context, subtasks []*proto.SubtaskBase) error {
// skip the update process.
Expand Down
4 changes: 1 addition & 3 deletions pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ go_test(
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 16,
shard_count = 13,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand All @@ -69,8 +69,6 @@ go_test(
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_mock//gomock",
],
Expand Down
15 changes: 13 additions & 2 deletions pkg/disttask/framework/taskexecutor/execute/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type StepExecutor interface {
// The subtask meta can be updated in place. only when OnFinished returns no
// err, a subtask can be marked as 'success', if it returns error, the subtask
// might be completely rerun, so don't put code that's prone to error in it.
// TODO merge with RunSubtask, seems no need to have a separate API.
OnFinished(ctx context.Context, subtask *proto.Subtask) error
// Cleanup is used to clean up the environment for this step.
// the returned error will not affect task/subtask state, it's only logged,
Expand All @@ -70,28 +71,38 @@ type StepExecFrameworkInfo interface {
// interfaces, the implementation of other interface must embed
// StepExecFrameworkInfo.
restricted()
// GetStep returns the step.
GetStep() proto.Step
// GetResource returns the expected resource of this step executor.
GetResource() *proto.StepResource
}

var stepExecFrameworkInfoName = reflect.TypeFor[StepExecFrameworkInfo]().Name()

type frameworkInfo struct {
step proto.Step
resource *proto.StepResource
}

func (*frameworkInfo) restricted() {}

func (f *frameworkInfo) GetStep() proto.Step {
return f.step
}

func (f *frameworkInfo) GetResource() *proto.StepResource {
return f.resource
}

// SetFrameworkInfo sets the framework info for the StepExecutor.
func SetFrameworkInfo(exec StepExecutor, resource *proto.StepResource) {
func SetFrameworkInfo(exec StepExecutor, step proto.Step, resource *proto.StepResource) {
if exec == nil {
return
}
toInject := &frameworkInfo{resource: resource}
toInject := &frameworkInfo{
step: step,
resource: resource,
}
// use reflection to set the framework info
e := reflect.ValueOf(exec)
if e.Kind() == reflect.Ptr || e.Kind() == reflect.Interface {
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type TaskTable interface {
// PauseSubtasks update subtasks state to paused.
PauseSubtasks(ctx context.Context, execID string, taskID int64) error

HasSubtasksInStates(ctx context.Context, execID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (bool, error)
// RunningSubtasksBack2Pending update the state of subtask which belongs to this
// node from running to pending.
// see subtask state machine for more detail.
Expand Down
7 changes: 1 addition & 6 deletions pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
Expand Down Expand Up @@ -464,11 +463,7 @@ func TestManagerInitMeta(t *testing.T) {
require.NoError(t, m.InitMeta())
require.True(t, ctrl.Satisfied())

bak := scheduler.RetrySQLTimes
t.Cleanup(func() {
scheduler.RetrySQLTimes = bak
})
scheduler.RetrySQLTimes = 1
reduceRetrySQLTimes(t, 1)
mockTaskTable.EXPECT().InitMeta(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err"))
require.ErrorContains(t, m.InitMeta(), "mock err")
require.True(t, ctrl.Satisfied())
Expand Down
Loading