From bb28f0236e8e4e225fc53a61b50ca9463e5e676b Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 5 Jun 2024 11:51:39 +0800 Subject: [PATCH 1/5] change --- pkg/ddl/cancel_test.go | 48 +++++++++++++++++++----------------- pkg/ddl/testutil/testutil.go | 4 +-- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/pkg/ddl/cancel_test.go b/pkg/ddl/cancel_test.go index 566655cbdd163..d7bb286adafe7 100644 --- a/pkg/ddl/cancel_test.go +++ b/pkg/ddl/cancel_test.go @@ -33,12 +33,12 @@ import ( ) type testCancelJob struct { - sql string - ok bool - cancelState any // model.SchemaState | []model.SchemaState - onJobBefore bool - onJobUpdate bool - prepareSQL []string + sql string + expectCancelled bool + cancelState any // model.SchemaState | []model.SchemaState + onJobBefore bool + onJobUpdate bool + prepareSQL []string } var allTestCase = []testCancelJob{ @@ -244,28 +244,30 @@ func TestCancel(t *testing.T) { hook := &callback.TestDDLCallback{Do: dom} i := atomicutil.NewInt64(0) - cancel := atomicutil.NewBool(false) + canceled := atomicutil.NewBool(false) cancelResult := atomicutil.NewBool(false) cancelWhenReorgNotStart := atomicutil.NewBool(false) + // there might be a time gap between tk.MustExec return and the calling of OnJobUpdated, + // and it causes we call the hook with an unexpected prepare job. hookFunc := func(job *model.Job) { - if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !cancel.Load() { + if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !canceled.Load() { if !cancelWhenReorgNotStart.Load() && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 { return } rs := tkCancel.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID)) cancelResult.Store(cancelSuccess(rs)) - cancel.Store(true) + canceled.Store(true) } } dom.DDL().SetHook(hook.Clone()) - restHook := func(h *callback.TestDDLCallback) { + resetHook := func(h *callback.TestDDLCallback) { h.OnJobRunBeforeExported = nil h.OnJobUpdatedExported.Store(nil) dom.DDL().SetHook(h.Clone()) } - registHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) { + registerHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) { if onJobRunBefore { h.OnJobRunBeforeExported = hookFunc } else { @@ -278,37 +280,37 @@ func TestCancel(t *testing.T) { i.Store(int64(j)) msg := fmt.Sprintf("sql: %s, state: %s", tc.sql, tc.cancelState) if tc.onJobBefore { - restHook(hook) + resetHook(hook) for _, prepareSQL := range tc.prepareSQL { tk.MustExec(prepareSQL) } - cancel.Store(false) + canceled.Store(false) cancelWhenReorgNotStart.Store(true) - registHook(hook, true) - if tc.ok { + registerHook(hook, true) + if tc.expectCancelled { tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob) } else { tk.MustExec(tc.sql) } - if cancel.Load() { - require.Equal(t, tc.ok, cancelResult.Load(), msg) + if canceled.Load() { + require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg) } } if tc.onJobUpdate { - restHook(hook) + resetHook(hook) for _, prepareSQL := range tc.prepareSQL { tk.MustExec(prepareSQL) } - cancel.Store(false) + canceled.Store(false) cancelWhenReorgNotStart.Store(false) - registHook(hook, false) - if tc.ok { + registerHook(hook, false) + if tc.expectCancelled { tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob) } else { tk.MustExec(tc.sql) } - if cancel.Load() { - require.Equal(t, tc.ok, cancelResult.Load(), msg) + if canceled.Load() { + require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg) } } } diff --git a/pkg/ddl/testutil/testutil.go b/pkg/ddl/testutil/testutil.go index fb6e44df1c2fb..4b977654b7bd8 100644 --- a/pkg/ddl/testutil/testutil.go +++ b/pkg/ddl/testutil/testutil.go @@ -107,14 +107,14 @@ func TestMatchCancelState(t *testing.T, job *model.Job, cancelState any, sql str switch v := cancelState.(type) { case model.SchemaState: if job.Type == model.ActionMultiSchemaChange { - msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s)", sql, v) + msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s, job: %s)", sql, v, job.String()) require.Failf(t, msg, "use []model.SchemaState as cancel states instead") return false } return job.SchemaState == v case SubStates: // For multi-schema change sub-jobs. if job.MultiSchemaInfo == nil { - msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v)", sql, v) + msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v, job: %s)", sql, v, job.String()) require.Failf(t, msg, "use model.SchemaState as the cancel state instead") return false } From 33cd5b256692c1abfba0333d26eff4df5339a23c Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 7 Jun 2024 19:56:48 +0800 Subject: [PATCH 2/5] change --- pkg/ddl/cancel_test.go | 3 +-- pkg/ddl/job_table.go | 6 +++--- pkg/ddl/testutil/testutil.go | 3 +++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/ddl/cancel_test.go b/pkg/ddl/cancel_test.go index d7bb286adafe7..4dc6f1a5fc0ee 100644 --- a/pkg/ddl/cancel_test.go +++ b/pkg/ddl/cancel_test.go @@ -248,8 +248,6 @@ func TestCancel(t *testing.T) { cancelResult := atomicutil.NewBool(false) cancelWhenReorgNotStart := atomicutil.NewBool(false) - // there might be a time gap between tk.MustExec return and the calling of OnJobUpdated, - // and it causes we call the hook with an unexpected prepare job. hookFunc := func(job *model.Job) { if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !canceled.Load() { if !cancelWhenReorgNotStart.Load() && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 { @@ -277,6 +275,7 @@ func TestCancel(t *testing.T) { } for j, tc := range allTestCase { + t.Logf("running test case %d: %s", j, tc.sql) i.Store(int64(j)) msg := fmt.Sprintf("sql: %s, state: %s", tc.sql, tc.cancelState) if tc.onJobBefore { diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 7619147884189..5c463b1e3d8e0 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -561,7 +561,7 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model. // job is already moved to history. failpoint.InjectCall("beforeRefreshJob", job) for { - job, err = s.sysTblMgr.GetJobByID(s.schCtx, job.ID) + job, err = s.sysTblMgr.GetJobByID(s.schCtx, jobID) failpoint.InjectCall("mockGetJobByIDFail", &err) if err == nil { break @@ -569,10 +569,10 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model. if err == systable.ErrNotFound { logutil.DDLLogger().Info("job not found, might already finished", - zap.Int64("job_id", job.ID), zap.Stringer("state", job.State)) + zap.Int64("job_id", jobID)) return } - logutil.DDLLogger().Error("get job failed", zap.Error(err)) + logutil.DDLLogger().Error("get job failed", zap.Int64("job_id", jobID), zap.Error(err)) select { case <-s.schCtx.Done(): return diff --git a/pkg/ddl/testutil/testutil.go b/pkg/ddl/testutil/testutil.go index 4b977654b7bd8..6420b10ac3acd 100644 --- a/pkg/ddl/testutil/testutil.go +++ b/pkg/ddl/testutil/testutil.go @@ -104,6 +104,9 @@ type SubStates = []model.SchemaState // TestMatchCancelState is used to test whether the cancel state matches. func TestMatchCancelState(t *testing.T, job *model.Job, cancelState any, sql string) bool { + if job.Query != sql { + return false + } switch v := cancelState.(type) { case model.SchemaState: if job.Type == model.ActionMultiSchemaChange { From 88c1e12b28a05de3727167136b0cfdc307115384 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Sat, 8 Jun 2024 16:14:34 +0800 Subject: [PATCH 3/5] change --- pkg/ddl/cancel_test.go | 15 +++++++++++++++ pkg/ddl/job_table.go | 1 + pkg/ddl/testutil/testutil.go | 3 --- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/ddl/cancel_test.go b/pkg/ddl/cancel_test.go index 4dc6f1a5fc0ee..e2d325b968e7b 100644 --- a/pkg/ddl/cancel_test.go +++ b/pkg/ddl/cancel_test.go @@ -17,6 +17,7 @@ package ddl_test import ( "fmt" "strings" + "sync/atomic" "testing" "time" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/stretchr/testify/require" atomicutil "go.uber.org/atomic" ) @@ -204,6 +206,14 @@ func cancelSuccess(rs *testkit.Result) bool { } func TestCancel(t *testing.T) { + var enterCnt, exitCnt atomic.Int32 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDelivery2Worker", func(job *model.Job) { enterCnt.Add(1) }) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { exitCnt.Add(1) }) + waitDDLWorkerExisted := func() { + require.Eventually(t, func() bool { + return enterCnt.Load() == exitCnt.Load() + }, 10*time.Second, 10*time.Millisecond) + } store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 100*time.Millisecond) tk := testkit.NewTestKit(t, store) tkCancel := testkit.NewTestKit(t, store) @@ -274,6 +284,7 @@ func TestCancel(t *testing.T) { dom.DDL().SetHook(h.Clone()) } + waitDDLWorkerExisted() for j, tc := range allTestCase { t.Logf("running test case %d: %s", j, tc.sql) i.Store(int64(j)) @@ -283,6 +294,7 @@ func TestCancel(t *testing.T) { for _, prepareSQL := range tc.prepareSQL { tk.MustExec(prepareSQL) } + waitDDLWorkerExisted() canceled.Store(false) cancelWhenReorgNotStart.Store(true) registerHook(hook, true) @@ -291,6 +303,7 @@ func TestCancel(t *testing.T) { } else { tk.MustExec(tc.sql) } + waitDDLWorkerExisted() if canceled.Load() { require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg) } @@ -300,6 +313,7 @@ func TestCancel(t *testing.T) { for _, prepareSQL := range tc.prepareSQL { tk.MustExec(prepareSQL) } + waitDDLWorkerExisted() canceled.Store(false) cancelWhenReorgNotStart.Store(false) registerHook(hook, false) @@ -308,6 +322,7 @@ func TestCancel(t *testing.T) { } else { tk.MustExec(tc.sql) } + waitDDLWorkerExisted() if canceled.Load() { require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg) } diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 5c463b1e3d8e0..fa8234e14916b 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -529,6 +529,7 @@ func (d *ddl) delivery2LocalWorker(pool *workerPool, task *limitJobTask) { // delivery2Worker owns the worker, need to put it back to the pool in this function. func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { + failpoint.InjectCall("beforeDelivery2Worker", job) injectFailPointForGetJob(job) jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo() s.runningJobs.add(jobID, involvedSchemaInfos) diff --git a/pkg/ddl/testutil/testutil.go b/pkg/ddl/testutil/testutil.go index 6420b10ac3acd..4b977654b7bd8 100644 --- a/pkg/ddl/testutil/testutil.go +++ b/pkg/ddl/testutil/testutil.go @@ -104,9 +104,6 @@ type SubStates = []model.SchemaState // TestMatchCancelState is used to test whether the cancel state matches. func TestMatchCancelState(t *testing.T, job *model.Job, cancelState any, sql string) bool { - if job.Query != sql { - return false - } switch v := cancelState.(type) { case model.SchemaState: if job.Type == model.ActionMultiSchemaChange { From 42d21509c31407b41697c01fdc40071b09dc5b50 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 11 Jun 2024 11:29:25 +0800 Subject: [PATCH 4/5] fix test --- pkg/ddl/tests/adminpause/BUILD.bazel | 1 + pkg/ddl/tests/adminpause/pause_cancel_test.go | 7 ++++--- pkg/testkit/testfailpoint/failpoint.go | 5 +++++ 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/ddl/tests/adminpause/BUILD.bazel b/pkg/ddl/tests/adminpause/BUILD.bazel index 30c658808ec7f..34034000e12f4 100644 --- a/pkg/ddl/tests/adminpause/BUILD.bazel +++ b/pkg/ddl/tests/adminpause/BUILD.bazel @@ -39,6 +39,7 @@ go_test( "//pkg/errno", "//pkg/parser/model", "//pkg/testkit", + "//pkg/testkit/testfailpoint", "//pkg/testkit/testsetup", "//pkg/util/sqlexec", "@com_github_pingcap_failpoint//:failpoint", diff --git a/pkg/ddl/tests/adminpause/pause_cancel_test.go b/pkg/ddl/tests/adminpause/pause_cancel_test.go index c526eab013236..7f7f642c68913 100644 --- a/pkg/ddl/tests/adminpause/pause_cancel_test.go +++ b/pkg/ddl/tests/adminpause/pause_cancel_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -90,7 +91,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit var isCancelled = &atomic.Bool{} var cancelResultChn = make(chan []sqlexec.RecordSet, 1) var cancelErrChn = make(chan error, 1) - var cancelFunc = func(jobType string) { + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob", func(*model.Job) { Logger.Debug("pauseAndCancelStmt: OnGetJobBeforeExported, ", zap.String("Expected Schema State", stmtCase.schemaState.String())) @@ -107,7 +108,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit isCancelled.CompareAndSwap(false, true) // In case that it runs into this scope again and again } - } + }) var verifyCancelResult = func(t *testing.T, adminCommandKit *testkit.TestKit) { require.True(t, isCancelled.Load()) @@ -130,7 +131,6 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit originalHook := dom.DDL().GetHook() hook.OnJobRunBeforeExported = pauseFunc - hook.OnGetJobBeforeExported = cancelFunc dom.DDL().SetHook(hook.Clone()) isPaused.Store(false) @@ -152,6 +152,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit // Release the hook, so that we could run the `rollbackStmts` successfully. dom.DDL().SetHook(originalHook) + testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob") for _, rollbackStmt := range stmtCase.rollbackStmts { // no care about the result here, since the `statement` could have been cancelled OR finished successfully. diff --git a/pkg/testkit/testfailpoint/failpoint.go b/pkg/testkit/testfailpoint/failpoint.go index eec483115a077..381b53c129c23 100644 --- a/pkg/testkit/testfailpoint/failpoint.go +++ b/pkg/testkit/testfailpoint/failpoint.go @@ -36,3 +36,8 @@ func EnableCall(t testing.TB, name string, fn any) { require.NoError(t, failpoint.Disable(name)) }) } + +// Disable disables fail-point. +func Disable(t testing.TB, name string) { + require.NoError(t, failpoint.Disable(name)) +} From 3a889287d496e931768deb6dadffacee2f5c22ad Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 11 Jun 2024 11:30:35 +0800 Subject: [PATCH 5/5] fix comments --- pkg/ddl/cancel_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/ddl/cancel_test.go b/pkg/ddl/cancel_test.go index e2d325b968e7b..22a3489b02dca 100644 --- a/pkg/ddl/cancel_test.go +++ b/pkg/ddl/cancel_test.go @@ -209,7 +209,7 @@ func TestCancel(t *testing.T) { var enterCnt, exitCnt atomic.Int32 testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDelivery2Worker", func(job *model.Job) { enterCnt.Add(1) }) testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { exitCnt.Add(1) }) - waitDDLWorkerExisted := func() { + waitDDLWorkerExited := func() { require.Eventually(t, func() bool { return enterCnt.Load() == exitCnt.Load() }, 10*time.Second, 10*time.Millisecond) @@ -284,7 +284,7 @@ func TestCancel(t *testing.T) { dom.DDL().SetHook(h.Clone()) } - waitDDLWorkerExisted() + waitDDLWorkerExited() for j, tc := range allTestCase { t.Logf("running test case %d: %s", j, tc.sql) i.Store(int64(j)) @@ -294,7 +294,7 @@ func TestCancel(t *testing.T) { for _, prepareSQL := range tc.prepareSQL { tk.MustExec(prepareSQL) } - waitDDLWorkerExisted() + waitDDLWorkerExited() canceled.Store(false) cancelWhenReorgNotStart.Store(true) registerHook(hook, true) @@ -303,7 +303,7 @@ func TestCancel(t *testing.T) { } else { tk.MustExec(tc.sql) } - waitDDLWorkerExisted() + waitDDLWorkerExited() if canceled.Load() { require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg) } @@ -313,7 +313,7 @@ func TestCancel(t *testing.T) { for _, prepareSQL := range tc.prepareSQL { tk.MustExec(prepareSQL) } - waitDDLWorkerExisted() + waitDDLWorkerExited() canceled.Store(false) cancelWhenReorgNotStart.Store(false) registerHook(hook, false) @@ -322,7 +322,7 @@ func TestCancel(t *testing.T) { } else { tk.MustExec(tc.sql) } - waitDDLWorkerExisted() + waitDDLWorkerExited() if canceled.Load() { require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg) }