From 835834b5e6fb46ce28cecde90c1f32396b884772 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 29 May 2023 08:20:40 +0800 Subject: [PATCH] This is an automated cherry-pick of #44206 Signed-off-by: ti-chi-bot --- ddl/index.go | 5 ----- ddl/ingest/BUILD.bazel | 9 +++++++++ ddl/ingest/integration_test.go | 32 ++++++++++++++++++++++++++++++++ ddl/ingest/mock.go | 4 ++-- ddl/rollingback.go | 9 +++++++++ 5 files changed, 52 insertions(+), 7 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 7c5a6c4890940..ba28adbea0f35 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -917,7 +917,6 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, if err != nil { if !errorIsRetryable(err, job) { ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) - ingest.LitBackCtxMgr.Unregister(job.ID) } return false, ver, errors.Trace(err) } @@ -936,7 +935,6 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) if !errorIsRetryable(err, job) { ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) - ingest.LitBackCtxMgr.Unregister(job.ID) } } return false, ver, errors.Trace(err) @@ -1106,9 +1104,6 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { // Finish this job. if job.IsRollingback() { job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo) - if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - ingest.LitBackCtxMgr.Unregister(job.ID) - } job.Args[0] = indexInfo.ID } else { // the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility, diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel index 9a5b2d120b951..14eed3a620afb 100644 --- a/ddl/ingest/BUILD.bazel +++ b/ddl/ingest/BUILD.bazel @@ -61,9 +61,18 @@ go_test( ], embed = [":ingest"], flaky = True, +<<<<<<< HEAD shard_count = 11, deps = [ "//config", +======= + race = "on", + shard_count = 13, + deps = [ + "//config", + "//ddl", + "//ddl/internal/callback", +>>>>>>> f13419846cf (ddl/ingest: unregister backend after job cancelled (#44206)) "//ddl/internal/session", "//errno", "//kv", diff --git a/ddl/ingest/integration_test.go b/ddl/ingest/integration_test.go index bbc7363e4d272..22aff9e4de2cc 100644 --- a/ddl/ingest/integration_test.go +++ b/ddl/ingest/integration_test.go @@ -21,6 +21,12 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/ingest" +<<<<<<< HEAD +======= + "github.com/pingcap/tidb/ddl/internal/callback" + "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/domain" +>>>>>>> f13419846cf (ddl/ingest: unregister backend after job cancelled (#44206)) "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" @@ -160,3 +166,29 @@ func TestAddIndexIngestClientError(t *testing.T) { tk.MustExec(`insert into t1(f1) values (cast("null" as json));`) tk.MustGetErrCode("create index i1 on t1((cast(f1 as unsigned array)));", errno.ErrInvalidJSONValueForFuncIndex) } + +func TestAddIndexCancelOnNoneState(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tkCancel := testkit.NewTestKit(t, store) + defer injectMockBackendMgr(t, store)() + + tk.MustExec("use test") + tk.MustExec(`create table t (c1 int, c2 int, c3 int)`) + tk.MustExec("insert into t values(1, 1, 1);") + + hook := &callback.TestDDLCallback{Do: dom} + first := true + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.SchemaState == model.StateNone && first { + _, err := tkCancel.Exec(fmt.Sprintf("admin cancel ddl jobs %d", job.ID)) + assert.NoError(t, err) + first = false + } + } + dom.DDL().SetHook(hook.Clone()) + tk.MustGetErrCode("alter table t add index idx1(c1)", errno.ErrCancelledDDLJob) + available, err := ingest.LitBackCtxMgr.CheckAvailable() + require.NoError(t, err) + require.True(t, available) +} diff --git a/ddl/ingest/mock.go b/ddl/ingest/mock.go index d6aa6ee5a4b89..b95a16eb52a32 100644 --- a/ddl/ingest/mock.go +++ b/ddl/ingest/mock.go @@ -41,8 +41,8 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken } // CheckAvailable implements BackendCtxMgr.Available interface. -func (*MockBackendCtxMgr) CheckAvailable() (bool, error) { - return true, nil +func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) { + return len(m.runningJobs) == 0, nil } // Register implements BackendCtxMgr.Register interface. diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 100ea0a1d88f8..2517c85c56f2b 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -76,12 +77,20 @@ func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblIn } job.State = model.JobStateRollingback err = completeErr(err, indexInfo) + if ingest.LitBackCtxMgr != nil { + ingest.LitBackCtxMgr.Unregister(job.ID) + } return ver, errors.Trace(err) } // convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, // to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started. func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) { + defer func() { + if ingest.LitBackCtxMgr != nil { + ingest.LitBackCtxMgr.Unregister(job.ID) + } + }() schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil {