Skip to content

Commit

Permalink
disttask: mock to start multiple scheduler/taskexecutor (#50801)
Browse files Browse the repository at this point in the history
ref #49008
  • Loading branch information
D3Hunter authored Jan 30, 2024
1 parent 4e41699 commit be57d2f
Show file tree
Hide file tree
Showing 21 changed files with 493 additions and 236 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func generateGlobalSortIngestPlan(
// Skip global sort for empty table.
return nil, nil
}
instanceIDs, err := scheduler.GenerateTaskExecutorNodes(ctx)
instanceIDs, err := scheduler.GetLiveExecIDs(ctx)
if err != nil {
return nil, err
}
Expand Down
16 changes: 6 additions & 10 deletions pkg/disttask/framework/framework_err_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@ import (
)

func TestRetryErrOnNextSubtasksBatch(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanErrSchedulerExt(ctrl, testContext), testContext, nil)
submitTaskAndCheckSuccessForBasic(ctx, t, "key1", testContext)
distContext.Close()
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanErrSchedulerExt(c.MockCtrl, c.TestContext), c.TestContext, nil)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
}

func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 2)

testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanNotRetryableErrSchedulerExt(ctrl), testContext, nil)
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateFailed, task.State)
distContext.Close()
}
26 changes: 12 additions & 14 deletions pkg/disttask/framework/framework_pause_and_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,44 +41,42 @@ func CheckSubtasksState(ctx context.Context, t *testing.T, taskID int64, state p
}

func TestFrameworkPauseAndResume(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 3)

testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
// 1. schedule and pause one running task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()"))
task1 := testutil.SubmitAndWaitTask(ctx, t, "key1")
task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStatePaused, task1.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask"))
// 4 subtask scheduled.
require.NoError(t, handle.ResumeTask(ctx, "key1"))
require.NoError(t, handle.ResumeTask(c.Ctx, "key1"))
<-scheduler.TestSyncChan
testutil.WaitTaskDoneOrPaused(ctx, t, task1.Key)
CheckSubtasksState(ctx, t, 1, proto.SubtaskStateSucceed, 4)
testutil.WaitTaskDoneOrPaused(c.Ctx, t, task1.Key)
CheckSubtasksState(c.Ctx, t, 1, proto.SubtaskStateSucceed, 4)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume"))

mgr, err := storage.GetTaskManager()
require.NoError(t, err)
errs, err := mgr.GetSubtaskErrors(ctx, 1)
errs, err := mgr.GetSubtaskErrors(c.Ctx, 1)
require.NoError(t, err)
require.Empty(t, errs)

// 2. pause pending task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()"))
task2 := testutil.SubmitAndWaitTask(ctx, t, "key2")
task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2")
require.Equal(t, proto.TaskStatePaused, task2.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask"))
// 4 subtask scheduled.
require.NoError(t, handle.ResumeTask(ctx, "key2"))
require.NoError(t, handle.ResumeTask(c.Ctx, "key2"))
<-scheduler.TestSyncChan
testutil.WaitTaskDoneOrPaused(ctx, t, task2.Key)
CheckSubtasksState(ctx, t, 1, proto.SubtaskStateSucceed, 4)
testutil.WaitTaskDoneOrPaused(c.Ctx, t, task2.Key)
CheckSubtasksState(c.Ctx, t, 1, proto.SubtaskStateSucceed, 4)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume"))

errs, err = mgr.GetSubtaskErrors(ctx, 1)
errs, err = mgr.GetSubtaskErrors(c.Ctx, 1)
require.NoError(t, err)
require.Empty(t, errs)
distContext.Close()
}
21 changes: 9 additions & 12 deletions pkg/disttask/framework/framework_role_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,15 @@ func checkSubtaskOnNodes(ctx context.Context, t *testing.T, taskID int64, expect
}

func TestRoleBasic(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 3)

testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
tk := testkit.NewTestKit(t, distContext.Store)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
tk := testkit.NewTestKit(t, c.Store)

// 1. all "" role.
submitTaskAndCheckSuccessForBasic(ctx, t, "😁", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😁", c.TestContext)

checkSubtaskOnNodes(ctx, t, 1, []string{":4000", ":4001", ":4002"})
checkSubtaskOnNodes(c.Ctx, t, 1, []string{":4000", ":4001", ":4002"})
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))
Expand All @@ -60,29 +59,27 @@ func TestRoleBasic(t *testing.T) {

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))

tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))

checkSubtaskOnNodes(ctx, t, 2, []string{":4000"})
checkSubtaskOnNodes(c.Ctx, t, 2, []string{":4000"})

// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
time.Sleep(5 * time.Second)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(ctx, t, "😆", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😆", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))

checkSubtaskOnNodes(ctx, t, 3, []string{":4000", ":4001"})
checkSubtaskOnNodes(c.Ctx, t, 3, []string{":4000", ":4001"})
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))

distContext.Close()
}

func TestSetRole(t *testing.T) {
Expand Down
8 changes: 3 additions & 5 deletions pkg/disttask/framework/framework_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
)

func TestFrameworkRollback(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
testutil.RegisterRollbackTaskMeta(t, ctrl, testutil.GetMockRollbackSchedulerExt(ctrl), testContext)
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterRollbackTaskMeta(t, c.MockCtrl, testutil.GetMockRollbackSchedulerExt(c.MockCtrl), c.TestContext)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask"))
}()

task := testutil.SubmitAndWaitTask(ctx, t, "key1")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
distContext.Close()
}
Loading

0 comments on commit be57d2f

Please sign in to comment.