Skip to content

Commit c5f2935

Browse files
authored
disttask: add integration test for resource control (pingcap#50962)
ref pingcap#49008
1 parent 5c8b559 commit c5f2935

21 files changed

+519
-222
lines changed

pkg/disttask/framework/integrationtests/BUILD.bazel

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_test(
1111
"framework_rollback_test.go",
1212
"framework_test.go",
1313
"main_test.go",
14+
"resource_control_test.go",
1415
],
1516
flaky = True,
1617
race = "off",
@@ -19,6 +20,7 @@ go_test(
1920
"//pkg/disttask/framework/handle",
2021
"//pkg/disttask/framework/proto",
2122
"//pkg/disttask/framework/scheduler",
23+
"//pkg/disttask/framework/scheduler/mock",
2224
"//pkg/disttask/framework/storage",
2325
"//pkg/disttask/framework/taskexecutor",
2426
"//pkg/disttask/framework/testutil",
@@ -28,5 +30,6 @@ go_test(
2830
"@com_github_pingcap_failpoint//:failpoint",
2931
"@com_github_stretchr_testify//require",
3032
"@org_uber_go_goleak//:goleak",
33+
"@org_uber_go_mock//gomock",
3134
],
3235
)

pkg/disttask/framework/integrationtests/framework_err_handling_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) {
3232
c := testutil.NewTestDXFContext(t, 2)
3333

3434
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
35-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
35+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
3636
require.Equal(t, proto.TaskStateFailed, task.State)
3737
}

pkg/disttask/framework/integrationtests/framework_ha_test.go

+20-19
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ package integrationtests
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"math"
2021
"testing"
2122

22-
"github.com/pingcap/failpoint"
2323
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
2424
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
2525
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
2626
"github.com/pingcap/tidb/pkg/testkit"
27+
"github.com/pingcap/tidb/pkg/util"
2728
"github.com/stretchr/testify/require"
2829
)
2930

@@ -82,24 +83,24 @@ func TestHARandomShutdownInDifferentStep(t *testing.T) {
8283
submitTaskAndCheckSuccessForHA(c.Ctx, t, "😊", c.TestContext)
8384
}
8485

85-
func TestHAReplacedButRunning(t *testing.T) {
86-
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 4)
87-
defer ctrl.Finish()
86+
func TestHAMultipleOwner(t *testing.T) {
87+
c := testutil.NewDXFContextWithRandomNodes(t, 4, 8)
88+
prevCount := c.NodeCount()
89+
additionalOwnerCnt := c.Rand.Intn(2) + 1
90+
for i := 0; i < additionalOwnerCnt; i++ {
91+
c.ScaleOutBy(fmt.Sprintf("tidb-%d", i), true)
92+
}
93+
require.Equal(t, prevCount+additionalOwnerCnt, c.NodeCount())
8894

89-
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockHATestSchedulerExt(ctrl), testContext, nil)
90-
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBPartitionThenResume", "10*return(true)"))
91-
submitTaskAndCheckSuccessForHA(ctx, t, "😊", testContext)
92-
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBPartitionThenResume"))
93-
distContext.Close()
95+
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockHATestSchedulerExt(c.MockCtrl), c.TestContext, nil)
96+
var wg util.WaitGroupWrapper
97+
for i := 0; i < 10; i++ {
98+
taskKey := fmt.Sprintf("key%d", i)
99+
wg.Run(func() {
100+
submitTaskAndCheckSuccessForHA(c.Ctx, t, taskKey, c.TestContext)
101+
})
102+
}
103+
wg.Wait()
94104
}
95105

96-
func TestHAReplacedButRunningManyNodes(t *testing.T) {
97-
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 30)
98-
defer ctrl.Finish()
99-
100-
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockHATestSchedulerExt(ctrl), testContext, nil)
101-
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBPartitionThenResume", "30*return(true)"))
102-
submitTaskAndCheckSuccessForHA(ctx, t, "😊", testContext)
103-
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBPartitionThenResume"))
104-
distContext.Close()
105-
}
106+
// TODO add a case of real network partition, each owner should see different set of live nodes

pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestFrameworkPauseAndResume(t *testing.T) {
4747
// 1. schedule and pause one running task.
4848
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)"))
4949
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()"))
50-
task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
50+
task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
5151
require.Equal(t, proto.TaskStatePaused, task1.State)
5252
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask"))
5353
// 4 subtask scheduled.
@@ -66,7 +66,7 @@ func TestFrameworkPauseAndResume(t *testing.T) {
6666
// 2. pause pending task.
6767
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)"))
6868
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()"))
69-
task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2")
69+
task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2", 1)
7070
require.Equal(t, proto.TaskStatePaused, task2.State)
7171
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask"))
7272
// 4 subtask scheduled.

pkg/disttask/framework/integrationtests/framework_rollback_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,6 @@ func TestFrameworkRollback(t *testing.T) {
3131
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask"))
3232
}()
3333

34-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
34+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
3535
require.Equal(t, proto.TaskStateReverted, task.State)
3636
}

pkg/disttask/framework/integrationtests/framework_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func submitTaskAndCheckSuccessForBasic(ctx context.Context, t *testing.T, taskKe
4040

4141
func submitTaskAndCheckSuccess(ctx context.Context, t *testing.T, taskKey string,
4242
testContext *testutil.TestContext, subtaskCnts map[proto.Step]int) {
43-
task := testutil.SubmitAndWaitTask(ctx, t, taskKey)
43+
task := testutil.SubmitAndWaitTask(ctx, t, taskKey, 1)
4444
require.Equal(t, proto.TaskStateSucceed, task.State)
4545
for step, cnt := range subtaskCnts {
4646
require.Equal(t, cnt, testContext.CollectedSubtaskCnt(task.ID, step))
@@ -130,7 +130,7 @@ func TestFrameworkCancelTask(t *testing.T) {
130130
defer func() {
131131
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel"))
132132
}()
133-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
133+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
134134
require.Equal(t, proto.TaskStateReverted, task.State)
135135
}
136136

@@ -142,7 +142,7 @@ func TestFrameworkSubTaskFailed(t *testing.T) {
142142
defer func() {
143143
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr"))
144144
}()
145-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
145+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
146146
require.Equal(t, proto.TaskStateReverted, task.State)
147147
}
148148

@@ -153,7 +153,7 @@ func TestFrameworkSubTaskInitEnvFailed(t *testing.T) {
153153
defer func() {
154154
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr"))
155155
}()
156-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
156+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
157157
require.Equal(t, proto.TaskStateReverted, task.State)
158158
}
159159

@@ -212,7 +212,7 @@ func TestFrameworkSubtaskFinishedCancel(t *testing.T) {
212212
defer func() {
213213
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel"))
214214
}()
215-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
215+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
216216
require.Equal(t, proto.TaskStateReverted, task.State)
217217
}
218218

@@ -221,7 +221,7 @@ func TestFrameworkRunSubtaskCancel(t *testing.T) {
221221

222222
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
223223
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel", "1*return(true)"))
224-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
224+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
225225
require.Equal(t, proto.TaskStateReverted, task.State)
226226
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel"))
227227
}
@@ -270,7 +270,7 @@ func TestTaskCancelledBeforeUpdateTask(t *testing.T) {
270270

271271
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
272272
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)"))
273-
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
273+
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
274274
require.Equal(t, proto.TaskStateReverted, task.State)
275275
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask"))
276276
}

0 commit comments

Comments
 (0)