Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: cancel subtask context if scheduled away #58615

Merged
merged 8 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/disttask/framework/scheduler/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
llog "github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/util/intest"
Expand Down Expand Up @@ -116,6 +117,9 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible
// managed nodes, subtasks of task might not be balanced.
adjustedNodes := filterNodesWithEnoughSlots(b.currUsedSlots, b.slotMgr.getCapacity(),
eligibleNodes, subtasks[0].Concurrency)
failpoint.Inject("mockNoEnoughSlots", func(_ failpoint.Value) {
adjustedNodes = []string{}
})
if len(adjustedNodes) == 0 {
// no node has enough slots to run the subtasks, skip balance and skip
// update used slots.
Expand Down
23 changes: 17 additions & 6 deletions pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *Ba
// `pending` state, to make sure subtasks can be balanced later when node scale out.
// - If current running subtask are scheduled away from this node, i.e. this node
// is taken as down, cancel running.
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCancel context.CancelFunc) {
ticker := time.NewTicker(checkBalanceSubtaskInterval)
defer ticker.Stop()
for {
Expand All @@ -143,7 +143,10 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
e.logger.Info("subtask is scheduled away, cancel running",
zap.Int64("subtaskID", e.currSubtaskID.Load()))
// cancels runStep, but leave the subtask state unchanged.
e.cancelRunStepWith(nil)
if subtaskCtxCancel != nil {
subtaskCtxCancel()
}
failpoint.InjectCall("afterCancelSubtaskExec")
return
}

Expand Down Expand Up @@ -317,6 +320,12 @@ func (e *BaseTaskExecutor) Run() {
continue
}
}
if err := e.stepCtx.Err(); err != nil {
e.logger.Error("step executor context is done, the task should have been reverted",
zap.String("step", proto.Step2Str(task.Type, task.Step)),
zap.Error(err))
continue
}
err = e.runSubtask(subtask)
if err != nil {
// task executor keeps running its subtasks even though some subtask
Expand Down Expand Up @@ -418,23 +427,25 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
logTask := llog.BeginTask(logger, "run subtask")
subtaskErr := func() error {
e.currSubtaskID.Store(subtask.ID)
subtaskCtx, subtaskCtxCancel := context.WithCancel(e.stepCtx)

var wg util.WaitGroupWrapper
checkCtx, checkCancel := context.WithCancel(e.stepCtx)
checkCtx, checkCancel := context.WithCancel(subtaskCtx)
wg.RunWithLog(func() {
e.checkBalanceSubtask(checkCtx)
e.checkBalanceSubtask(checkCtx, subtaskCtxCancel)
})

if e.hasRealtimeSummary(e.stepExec) {
wg.RunWithLog(func() {
e.updateSubtaskSummaryLoop(checkCtx, e.stepCtx, e.stepExec)
e.updateSubtaskSummaryLoop(checkCtx, subtaskCtx, e.stepExec)
})
}
defer func() {
checkCancel()
wg.Wait()
subtaskCtxCancel()
}()
return e.stepExec.RunSubtask(e.stepCtx, subtask)
return e.stepExec.RunSubtask(subtaskCtx, subtask)
}()
failpoint.InjectCall("afterRunSubtask", e, &subtaskErr)
logTask.End2(zap.InfoLevel, subtaskErr)
Expand Down
16 changes: 8 additions & 8 deletions pkg/disttask/framework/taskexecutor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestTaskExecutorRun(t *testing.T) {
// mock for checkBalanceSubtask, returns empty subtask list
e.taskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id",
e.task1.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
// this subtask is scheduled awsy during running
// this subtask is scheduled away during running
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil)
e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil)
Expand All @@ -326,6 +326,7 @@ func TestTaskExecutorRun(t *testing.T) {
<-ctx.Done()
return ctx.Err()
})
e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true)
// keep running next subtask
nextSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{
ID: 2, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}
Expand Down Expand Up @@ -889,18 +890,17 @@ func TestCheckBalanceSubtask(t *testing.T) {
// context canceled
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
taskExecutor.checkBalanceSubtask(canceledCtx)
taskExecutor.checkBalanceSubtask(canceledCtx, nil)
})

t.Run("subtask scheduled away", func(t *testing.T) {
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, errors.New("error"))
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
runCtx, cancelCause := context.WithCancelCause(ctx)
taskExecutor.mu.runtimeCancel = cancelCause
runCtx, cancel := context.WithCancel(ctx)
require.NoError(t, runCtx.Err())
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, cancel)
require.ErrorIs(t, runCtx.Err(), context.Canceled)
require.True(t, ctrl.Satisfied())
})
Expand All @@ -913,7 +913,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false)
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1",
subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())

// if we failed to change state of non-idempotent subtask, will retry
Expand All @@ -930,7 +930,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false)
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1",
subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())
})

Expand All @@ -945,7 +945,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
// used to break the loop
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())
})
}
Expand Down
45 changes: 45 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,48 @@ func TestAddIndexDistLockAcquireFailed(t *testing.T) {
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/owner/mockAcquireDistLockFailed", "1*return(true)")
tk.MustExec("alter table t add index idx(b);")
}

func TestAddIndexScheduleAway(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_dist_task = on;")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_dist_task = off;")
})
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")

var jobID atomic.Int64
// Acquire the job ID.
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
if job.Type == model.ActionAddIndex {
jobID.Store(job.ID)
}
})
// Do not balance subtasks automatically.
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockNoEnoughSlots", "return")
afterCancel := make(chan struct{})
// Capture the cancel operation from checkBalanceLoop.
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterCancelSubtaskExec", func() {
close(afterCancel)
})
var once sync.Once
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", func() {
once.Do(func() {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
updateExecID := fmt.Sprintf(`
update mysql.tidb_background_subtask set exec_id = 'other' where task_key in
(select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load())
tk1.MustExec(updateExecID)
<-afterCancel
updateExecID = fmt.Sprintf(`
update mysql.tidb_background_subtask set exec_id = ':4000' where task_key in
(select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load())
tk1.MustExec(updateExecID)
})
})
tk.MustExec("alter table t add index idx(b);")
require.NotEqual(t, int64(0), jobID.Load())
}