Skip to content

Commit

Permalink
ddl/ingest: unregister backend after job cancelled (#44206) (#44224)
Browse files Browse the repository at this point in the history
close #44205
  • Loading branch information
ti-chi-bot authored Jul 13, 2023
1 parent 503a6c5 commit d3b7615
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 8 deletions.
5 changes: 0 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,6 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
logutil.BgLogger().Warn("[ddl] run reorg job failed, convert job to rollback",
zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
ingest.LitBackCtxMgr.Unregister(job.ID)
}
return false, ver, errors.Trace(err)
}
Expand All @@ -943,7 +942,6 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err))
if !errorIsRetryable(err, job) {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
ingest.LitBackCtxMgr.Unregister(job.ID)
}
}
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1118,9 +1116,6 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
job.Args[0] = indexInfo.ID
} else {
// the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility,
Expand Down
5 changes: 4 additions & 1 deletion ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,20 @@ go_test(
],
embed = [":ingest"],
flaky = True,
shard_count = 11,
shard_count = 12,
deps = [
"//config",
"//ddl/internal/session",
"//ddl/util/callback",
"//errno",
"//kv",
"//parser/model",
"//sessionctx",
"//testkit",
"//tests/realtikvtest",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
Expand Down
29 changes: 29 additions & 0 deletions ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/ddl/util/callback"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -160,3 +163,29 @@ func TestAddIndexIngestClientError(t *testing.T) {
tk.MustExec(`insert into t1(f1) values (cast("null" as json));`)
tk.MustGetErrCode("create index i1 on t1((cast(f1 as unsigned array)));", errno.ErrInvalidJSONValueForFuncIndex)
}

func TestAddIndexCancelOnNoneState(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)
defer injectMockBackendMgr(t, store)()

tk.MustExec("use test")
tk.MustExec(`create table t (c1 int, c2 int, c3 int)`)
tk.MustExec("insert into t values(1, 1, 1);")

hook := &callback.TestDDLCallback{Do: dom}
first := true
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState == model.StateNone && first {
_, err := tkCancel.Exec(fmt.Sprintf("admin cancel ddl jobs %d", job.ID))
assert.NoError(t, err)
first = false
}
}
dom.DDL().SetHook(hook.Clone())
tk.MustGetErrCode("alter table t add index idx1(c1)", errno.ErrCancelledDDLJob)
available, err := ingest.LitBackCtxMgr.CheckAvailable()
require.NoError(t, err)
require.True(t, available)
}
4 changes: 2 additions & 2 deletions ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
}

// CheckAvailable implements BackendCtxMgr.Available interface.
func (*MockBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
return len(m.runningJobs) == 0, nil
}

// Register implements BackendCtxMgr.Register interface.
Expand Down
9 changes: 9 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -76,12 +77,20 @@ func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblIn
}
job.State = model.JobStateRollingback
err = completeErr(err, indexInfo)
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
return ver, errors.Trace(err)
}

// convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob,
// to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started.
func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) {
defer func() {
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
}()
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
Expand Down

0 comments on commit d3b7615

Please sign in to comment.