Skip to content

Commit

Permalink
disttask: decouple handleExecutableTask from manager & separate execu…
Browse files Browse the repository at this point in the history
…tor context (#50724)

ref #49008
  • Loading branch information
D3Hunter authored Jan 30, 2024
1 parent 7087f70 commit b5542c5
Show file tree
Hide file tree
Showing 16 changed files with 621 additions and 617 deletions.
10 changes: 6 additions & 4 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type BackfillSubTaskMeta struct {
}

// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
func NewBackfillSubtaskExecutor(taskMeta []byte, d *ddl,
bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.StepExecutor, error) {
bgm := &BackfillTaskMeta{}
err := json.Unmarshal(taskMeta, bgm)
Expand Down Expand Up @@ -142,7 +142,9 @@ func (s *backfillDistExecutor) Init(ctx context.Context) error {
return err
}
pdLeaderAddr := d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
bc, err := ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
// TODO: local backend should be inited when step executor is created.
// TODO here we have to use executor ctx to avoid it keeps running when task is canceled.
bc, err := ingest.LitBackCtxMgr.Register(s.BaseTaskExecutor.Ctx(), unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -165,10 +167,10 @@ func decodeIndexUniqueness(job *model.Job) (bool, error) {
return unique[0], nil
}

func (s *backfillDistExecutor) GetStepExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary, _ *proto.StepResource) (execute.StepExecutor, error) {
func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, summary *execute.Summary, _ *proto.StepResource) (execute.StepExecutor, error) {
switch task.Step {
case proto.BackfillStepReadIndex, proto.BackfillStepMergeSort, proto.BackfillStepWriteAndIngest:
return NewBackfillSubtaskExecutor(ctx, task.Meta, s.d, s.backendCtx, task.Step, summary)
return NewBackfillSubtaskExecutor(task.Meta, s.d, s.backendCtx, task.Step, summary)
default:
return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_test(
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/taskexecutor",
"//pkg/disttask/framework/testutil",
"//pkg/testkit",
"//pkg/util",
Expand Down
6 changes: 6 additions & 0 deletions pkg/disttask/framework/framework_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package framework_test

import (
"context"
"sync"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -32,6 +34,7 @@ func submitTaskAndCheckSuccessForHA(ctx context.Context, t *testing.T, taskKey s
}

func TestHABasic(t *testing.T) {
taskexecutor.TestContexts = sync.Map{}
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 4)
defer ctrl.Finish()

Expand All @@ -47,6 +50,7 @@ func TestHABasic(t *testing.T) {
}

func TestHAManyNodes(t *testing.T) {
taskexecutor.TestContexts = sync.Map{}
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 30)
defer ctrl.Finish()

Expand All @@ -62,6 +66,7 @@ func TestHAManyNodes(t *testing.T) {
}

func TestHAFailInDifferentStage(t *testing.T) {
taskexecutor.TestContexts = sync.Map{}
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 6)
defer ctrl.Finish()

Expand All @@ -82,6 +87,7 @@ func TestHAFailInDifferentStage(t *testing.T) {
}

func TestHAFailInDifferentStageManyNodes(t *testing.T) {
taskexecutor.TestContexts = sync.Map{}
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 30)
defer ctrl.Finish()

Expand Down
9 changes: 6 additions & 3 deletions pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -105,9 +107,8 @@ func TestFrameworkScaleInAndOut(t *testing.T) {
time.Sleep(time.Duration(random.Intn(2000)) * time.Millisecond)
idx := int(random.Int31n(int32(distContext.GetDomainCnt())))
distContext.SetOwner(idx)
require.Eventually(t, func() bool {
return distContext.GetDomain(idx).DDL().OwnerManager().IsOwner()
}, 10*time.Second, 100*time.Millisecond)
// TODO we don't wait owner ready, it's not stable, will try refactor
// how we start multiple schedulers/task-executors later.
}
})
wg.Wait()
Expand Down Expand Up @@ -197,6 +198,7 @@ func TestOwnerChangeWhenSchedule(t *testing.T) {
}

func TestTaskExecutorDownBasic(t *testing.T) {
taskexecutor.TestContexts = sync.Map{}
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 4)
defer ctrl.Finish()

Expand All @@ -215,6 +217,7 @@ func TestTaskExecutorDownBasic(t *testing.T) {
}

func TestTaskExecutorDownManyNodes(t *testing.T) {
taskexecutor.TestContexts = sync.Map{}
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 30)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
Expand Down
74 changes: 48 additions & 26 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 29 additions & 4 deletions pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,37 @@ type Pool interface {
ReleaseAndWait()
}

// TaskExecutor is the subtask executor for a task.
// TaskExecutor is the executor for a task.
// Each task type should implement this interface.
// context tree of task execution:
//
// Manager.ctx
// └── TaskExecutor.ctx: Cancel cancels this one
// └── RunStep.ctx: CancelRunningSubtask cancels this one
type TaskExecutor interface {
// Init initializes the TaskExecutor, the returned error is fatal, it will fail
// the task directly, so be careful what to put into it.
// The context passing in is Manager.ctx, don't use it to init long-running routines,
// as it will NOT be cancelled when the task is finished.
Init(context.Context) error
RunStep(context.Context, *proto.Task, *proto.StepResource) error
Rollback(context.Context, *proto.Task) error
// Run runs the task with given resource, it will try to run each step one by
// one, if it cannot find any subtask to run for a while(10s now), it will exit,
// so manager can free and reuse the resource.
// we assume that all steps will have same resource usage now, will change it
// when we support different resource usage for different steps.
Run(resource *proto.StepResource)
// GetTask returns the task, returned value is for read only, don't change it.
GetTask() *proto.Task
// CancelRunningSubtask cancels the running subtask and change its state to `cancelled`,
// the task executor will keep running, so we can have a context to update the
// subtask state or keep handling revert logic.
CancelRunningSubtask()
// Cancel cancels the task executor, the state of running subtask is not changed.
// it's separated with Close as Close mostly mean will wait all resource released
// before return, but we only want its context cancelled and check whether it's
// closed later.
Cancel()
// Close closes the TaskExecutor.
Close()
IsRetryableError(err error) bool
}
Expand All @@ -88,7 +113,7 @@ type Extension interface {
// Note:
// 1. summary is the summary manager of all subtask of the same type now.
// 2. should not retry the error from it.
GetStepExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary, resource *proto.StepResource) (execute.StepExecutor, error)
GetStepExecutor(task *proto.Task, summary *execute.Summary, resource *proto.StepResource) (execute.StepExecutor, error)
// IsRetryableError returns whether the error is transient.
// When error is transient, the framework won't mark subtasks as failed,
// then the TaskExecutor can load the subtask again and redo it.
Expand Down
11 changes: 11 additions & 0 deletions pkg/disttask/framework/taskexecutor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,22 @@ package taskexecutor

import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/testkit/testsetup"
"go.uber.org/goleak"
)

func ReduceCheckInterval(t *testing.T) {
checkIntervalBak := defaultCheckInterval
maxIntervalBak := maxCheckInterval
t.Cleanup(func() {
defaultCheckInterval = checkIntervalBak
maxCheckInterval = maxIntervalBak
})
maxCheckInterval, defaultCheckInterval = time.Millisecond, time.Millisecond
}

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()
opts := []goleak.Option{
Expand Down
Loading

0 comments on commit b5542c5

Please sign in to comment.