Skip to content

Commit

Permalink
ddl, parser, executor, ddl worker: admin pause and resume ddl jobs (#…
Browse files Browse the repository at this point in the history
…43674)

close #18015, ref #40041
  • Loading branch information
dhysum authored May 11, 2023
1 parent 845fe40 commit 18783b9
Show file tree
Hide file tree
Showing 33 changed files with 6,536 additions and 5,578 deletions.
20 changes: 10 additions & 10 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -786,8 +786,8 @@ def go_deps():
name = "com_github_cznic_strutil",
build_file_proto_mode = "disable_global",
importpath = "github.com/cznic/strutil",
sum = "h1:0rkFMAbn5KBKNpJyHQ6Prb95vIKanmAe62KxsrN+sqA=",
version = "v0.0.0-20171016134553-529a34b1c186",
sum = "h1:MZRmHqDBd0vxNwenEbKSQqRVT24d3C05ft8kduSwlqM=",
version = "v0.0.0-20181122101858-275e90344537",
)
go_repository(
name = "com_github_daixiang0_gci",
Expand Down Expand Up @@ -6076,8 +6076,8 @@ def go_deps():
name = "org_modernc_golex",
build_file_proto_mode = "disable_global",
importpath = "modernc.org/golex",
sum = "h1:EYKY1a3wStt0RzHaH8mdSRNg78Ub0OHxYfCRWw35YtM=",
version = "v1.0.1",
sum = "h1:IaPGsXg0FX9vkth++hQ/YIHjj0v4eZGJwoMrejWR1Ts=",
version = "v1.0.4",
)
go_repository(
name = "org_modernc_lex",
Expand All @@ -6097,8 +6097,8 @@ def go_deps():
name = "org_modernc_mathutil",
build_file_proto_mode = "disable_global",
importpath = "modernc.org/mathutil",
sum = "h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8=",
version = "v1.4.1",
sum = "h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=",
version = "v1.5.0",
)
go_repository(
name = "org_modernc_parser",
Expand All @@ -6118,15 +6118,15 @@ def go_deps():
name = "org_modernc_sortutil",
build_file_proto_mode = "disable_global",
importpath = "modernc.org/sortutil",
sum = "h1:SUTM1sCR0Ldpv7dbB/KCPC2zHHsZ1KrSkhmGmmV22CQ=",
version = "v1.0.0",
sum = "h1:VQGxbQGcHaQeB/BX9TQjrHFmOA0bounO1X/jvOfRo6Q=",
version = "v1.1.1",
)
go_repository(
name = "org_modernc_strutil",
build_file_proto_mode = "disable_global",
importpath = "modernc.org/strutil",
sum = "h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=",
version = "v1.1.0",
sum = "h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY=",
version = "v1.1.3",
)
go_repository(
name = "org_modernc_y",
Expand Down
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ go_test(
"mv_index_test.go",
"options_test.go",
"partition_test.go",
"pause_test.go",
"placement_policy_ddl_test.go",
"placement_policy_test.go",
"placement_sql_test.go",
Expand Down
5 changes: 2 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
rc := d.getReorgCtx(jobID)

for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillData we will never cancel the job.
// Give job chance to be canceled or paused, if we not check it here,
// we will never cancel the job once there is panic in bf.BackfillData.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := d.isReorgRunnable(jobID, false)
Expand Down Expand Up @@ -721,7 +721,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
if len(kvRanges) == 0 {
break
}

logutil.BgLogger().Info("[ddl] start backfill workers to reorg record",
zap.Stringer("type", bfWorkerType),
zap.Int("workerCnt", scheduler.currentWorkerSize()),
Expand Down
6 changes: 5 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,10 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
return w.updateCurrentElement(tbl, reorgInfo)
})
if err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
return false, ver, nil
}

if dbterror.ErrWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
Expand Down Expand Up @@ -1100,7 +1104,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
TestReorgGoroutineRunning <- a
for {
time.Sleep(30 * time.Millisecond)
if w.getReorgCtx(reorgInfo.Job.ID).isReorgCanceled() {
if w.isReorgCancelled(reorgInfo.Job.ID) {
// Job is cancelled. So it can't be done.
failpoint.Return(dbterror.ErrCancelledDDLJob)
}
Expand Down
72 changes: 70 additions & 2 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,8 +1106,10 @@ func TestCancelJobWriteConflict(t *testing.T) {
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL", `return(true)`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()
rs, cancelErr = tk2.Session().Execute(context.Background(), stmt)
}
}
Expand All @@ -1131,6 +1133,72 @@ func TestCancelJobWriteConflict(t *testing.T) {
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

func TestPauseJobWriteConflict(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)

tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)

tk1.MustExec("use test")

tk1.MustExec("create table t(id int)")

var jobID int64
var pauseErr error
var pauseRS []sqlexec.RecordSet
hook := &callback.TestDDLCallback{Do: dom}
d := dom.DDL()
originalHook := d.GetHook()
d.SetHook(hook)
defer d.SetHook(originalHook)

// Test when pause cannot be retried and adding index succeeds.
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()

jobID = job.ID
stmt := fmt.Sprintf("admin pause ddl jobs %d", jobID)
pauseRS, pauseErr = tk2.Session().Execute(context.Background(), stmt)
}
}
tk1.MustExec("alter table t add index (id)")
require.EqualError(t, pauseErr, "mock commit error")

var cancelRS []sqlexec.RecordSet
var cancelErr error
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(false)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()

jobID = job.ID
stmt := fmt.Sprintf("admin pause ddl jobs %d", jobID)
pauseRS, pauseErr = tk2.Session().Execute(context.Background(), stmt)

time.Sleep(5 * time.Second)
stmt = fmt.Sprintf("admin cancel ddl jobs %d", jobID)
cancelRS, cancelErr = tk2.Session().Execute(context.Background(), stmt)
}
}
tk1.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob)
require.NoError(t, pauseErr)
require.NoError(t, cancelErr)
result := tk2.ResultSetToResultWithCtx(context.Background(), pauseRS[0], "pause ddl job successfully")
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
result = tk2.ResultSetToResultWithCtx(context.Background(), cancelRS[0], "cancel ddl job successfully")
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

func TestTxnSavepointWithDDL(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
tk := testkit.NewTestKit(t, store)
Expand Down
Loading

0 comments on commit 18783b9

Please sign in to comment.