Skip to content

Commit

Permalink
disttask: refine planErr state transition (#51279)
Browse files Browse the repository at this point in the history
ref #49008
  • Loading branch information
ywqzzy authored Feb 27, 2024
1 parent 0df9170 commit 7b61b1a
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,8 @@ func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) {

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateFailed, task.State)
require.Equal(t, proto.TaskStateReverted, task.State)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetStepTwoPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
task = testutil.SubmitAndWaitTask(c.Ctx, t, "key2", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}
8 changes: 1 addition & 7 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,7 @@ func (s *BaseScheduler) handlePlanErr(err error) error {
}
task.Error = err
s.task.Store(&task)

if err = s.OnDone(s.ctx, s, &task); err != nil {
return errors.Trace(err)
}

// TODO: to reverting state?
return s.taskMgr.FailTask(s.ctx, task.ID, task.State, task.Error)
return s.taskMgr.RevertTask(s.ctx, task.ID, task.State, task.Error)
}

// MockServerInfo exported for scheduler_test.go
Expand Down
7 changes: 0 additions & 7 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package taskexecutor
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -279,12 +278,6 @@ func (m *Manager) cancelTaskExecutors(tasks []*proto.TaskBase) {
}
}

// TestContext only used in tests.
type TestContext struct {
TestSyncSubtaskRun chan struct{}
mockDown atomic.Bool
}

// startTaskExecutor handles a runnable task.
func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) {
// TODO: remove it when we can create task executor with task base.
Expand Down
11 changes: 11 additions & 0 deletions pkg/disttask/framework/testutil/scheduler_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ func GetPlanNotRetryableErrSchedulerExt(ctrl *gomock.Controller) scheduler.Exten
})
}

// GetStepTwoPlanNotRetryableErrSchedulerExt returns mock scheduler.Extension which will generate non retryable error when planning for step two.
func GetStepTwoPlanNotRetryableErrSchedulerExt(ctrl *gomock.Controller) scheduler.Extension {
return GetMockSchedulerExt(ctrl, SchedulerInfo{
AllErrorRetryable: false,
StepInfos: []StepInfo{
{Step: proto.StepOne, SubtaskCnt: 10},
{Step: proto.StepTwo, Err: errors.New("not retryable err"), ErrRepeatCount: math.MaxInt64},
},
})
}

// GetPlanErrSchedulerExt returns mock scheduler.Extension which will generate error when planning.
func GetPlanErrSchedulerExt(ctrl *gomock.Controller, testContext *TestContext) scheduler.Extension {
mockScheduler := mockDispatch.NewMockExtension(ctrl)
Expand Down
3 changes: 2 additions & 1 deletion tests/realtikvtest/importintotest4/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/importinto"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
s.Eventually(func() bool {
task, err2 = taskManager.GetTaskByKeyWithHistory(ctx, importinto.TaskKey(int64(jobID)))
s.NoError(err2)
return task.State == "failed"
return task.State == proto.TaskStateReverted
}, 30*time.Second, 300*time.Millisecond)
// check all sorted data cleaned up
<-scheduler.WaitCleanUpFinished
Expand Down

0 comments on commit 7b61b1a

Please sign in to comment.