Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#44206
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
tangenta authored and ti-chi-bot committed May 29, 2023
1 parent f8f6cda commit 835834b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 7 deletions.
5 changes: 0 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,6 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
if err != nil {
if !errorIsRetryable(err, job) {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
ingest.LitBackCtxMgr.Unregister(job.ID)
}
return false, ver, errors.Trace(err)
}
Expand All @@ -936,7 +935,6 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err))
if !errorIsRetryable(err, job) {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
ingest.LitBackCtxMgr.Unregister(job.ID)
}
}
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1106,9 +1104,6 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
job.Args[0] = indexInfo.ID
} else {
// the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility,
Expand Down
9 changes: 9 additions & 0 deletions ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,18 @@ go_test(
],
embed = [":ingest"],
flaky = True,
<<<<<<< HEAD
shard_count = 11,
deps = [
"//config",
=======
race = "on",
shard_count = 13,
deps = [
"//config",
"//ddl",
"//ddl/internal/callback",
>>>>>>> f13419846cf (ddl/ingest: unregister backend after job cancelled (#44206))
"//ddl/internal/session",
"//errno",
"//kv",
Expand Down
32 changes: 32 additions & 0 deletions ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
<<<<<<< HEAD
=======
"github.com/pingcap/tidb/ddl/internal/callback"
"github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
>>>>>>> f13419846cf (ddl/ingest: unregister backend after job cancelled (#44206))
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -160,3 +166,29 @@ func TestAddIndexIngestClientError(t *testing.T) {
tk.MustExec(`insert into t1(f1) values (cast("null" as json));`)
tk.MustGetErrCode("create index i1 on t1((cast(f1 as unsigned array)));", errno.ErrInvalidJSONValueForFuncIndex)
}

func TestAddIndexCancelOnNoneState(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)
defer injectMockBackendMgr(t, store)()

tk.MustExec("use test")
tk.MustExec(`create table t (c1 int, c2 int, c3 int)`)
tk.MustExec("insert into t values(1, 1, 1);")

hook := &callback.TestDDLCallback{Do: dom}
first := true
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState == model.StateNone && first {
_, err := tkCancel.Exec(fmt.Sprintf("admin cancel ddl jobs %d", job.ID))
assert.NoError(t, err)
first = false
}
}
dom.DDL().SetHook(hook.Clone())
tk.MustGetErrCode("alter table t add index idx1(c1)", errno.ErrCancelledDDLJob)
available, err := ingest.LitBackCtxMgr.CheckAvailable()
require.NoError(t, err)
require.True(t, available)
}
4 changes: 2 additions & 2 deletions ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
}

// CheckAvailable implements BackendCtxMgr.Available interface.
func (*MockBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
return len(m.runningJobs) == 0, nil
}

// Register implements BackendCtxMgr.Register interface.
Expand Down
9 changes: 9 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -76,12 +77,20 @@ func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblIn
}
job.State = model.JobStateRollingback
err = completeErr(err, indexInfo)
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
return ver, errors.Trace(err)
}

// convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob,
// to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started.
func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) {
defer func() {
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
}()
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
Expand Down

0 comments on commit 835834b

Please sign in to comment.