Skip to content

Commit

Permalink
ddl, executor: executor change for pause/resume on ddl jobs (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dhysum committed May 11, 2023
1 parent 954b725 commit 8565dbe
Show file tree
Hide file tree
Showing 11 changed files with 670 additions and 67 deletions.
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ go_test(
"mv_index_test.go",
"options_test.go",
"partition_test.go",
"pause_test.go",
"placement_policy_ddl_test.go",
"placement_policy_test.go",
"placement_sql_test.go",
Expand Down
72 changes: 70 additions & 2 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,8 +1106,10 @@ func TestCancelJobWriteConflict(t *testing.T) {
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL", `return(true)`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()
rs, cancelErr = tk2.Session().Execute(context.Background(), stmt)
}
}
Expand All @@ -1131,6 +1133,72 @@ func TestCancelJobWriteConflict(t *testing.T) {
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

func TestPauseJobWriteConflict(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)

tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)

tk1.MustExec("use test")

tk1.MustExec("create table t(id int)")

var jobID int64
var pauseErr error
var pauseRS []sqlexec.RecordSet
hook := &callback.TestDDLCallback{Do: dom}
d := dom.DDL()
originalHook := d.GetHook()
d.SetHook(hook)
defer d.SetHook(originalHook)

// Test when pause cannot be retried and adding index succeeds.
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()

jobID = job.ID
stmt := fmt.Sprintf("admin pause ddl jobs %d", jobID)
pauseRS, pauseErr = tk2.Session().Execute(context.Background(), stmt)
}
}
tk1.MustExec("alter table t add index (id)")
require.EqualError(t, pauseErr, "mock commit error")

var cancelRS []sqlexec.RecordSet
var cancelErr error
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(false)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()

jobID = job.ID
stmt := fmt.Sprintf("admin pause ddl jobs %d", jobID)
pauseRS, pauseErr = tk2.Session().Execute(context.Background(), stmt)

time.Sleep(5 * time.Second)
stmt = fmt.Sprintf("admin cancel ddl jobs %d", jobID)
cancelRS, cancelErr = tk2.Session().Execute(context.Background(), stmt)
}
}
tk1.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob)
require.NoError(t, pauseErr)
require.NoError(t, cancelErr)
result := tk2.ResultSetToResultWithCtx(context.Background(), pauseRS[0], "pause ddl job successfully")
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
result = tk2.ResultSetToResultWithCtx(context.Background(), cancelRS[0], "cancel ddl job successfully")
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

func TestTxnSavepointWithDDL(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
tk := testkit.NewTestKit(t, store)
Expand Down
259 changes: 206 additions & 53 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
continue
}
sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat.
errs, err := CancelJobs(se, []int64{jobID})
errs, err := CancelJobsBySystem(se, []int64{jobID})
d.sessPool.Put(se)
if len(errs) > 0 {
logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
Expand Down Expand Up @@ -1425,85 +1425,238 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) {
return generalJob, reorgJob, nil
}

// CancelJobs cancels the DDL jobs.
func CancelJobs(se sessionctx.Context, ids []int64) (errs []error, err error) {
return cancelConcurrencyJobs(se, ids)
// cancelRunningJob cancel a DDL job that is in the concurrent state.
func cancelRunningJob(sess *sess.Session, job *model.Job,
byWho model.AdminCommandOperator) (err error) {
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
return dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID)
}

// If the state is rolling back, it means the work is cleaning the data after cancelling the job.
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
return nil
}

if !job.IsRollbackable() {
return dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
}
job.State = model.JobStateCancelling
job.AdminOperator = byWho

// Make sure RawArgs isn't overwritten.
return json.Unmarshal(job.RawArgs, &job.Args)
}

// cancelConcurrencyJobs cancels the DDL jobs that are in the concurrent state.
func cancelConcurrencyJobs(se sessionctx.Context, ids []int64) ([]error, error) {
failpoint.Inject("mockCancelConcurencyDDL", func(val failpoint.Value) {
// pauseRunningJob check and pause the running Job
func pauseRunningJob(sess *sess.Session, job *model.Job,
byWho model.AdminCommandOperator) (err error) {
// It would be much better doing this filter during `getJobsBySQL`, but not now.
if !job.IsPausable() {
err = dbterror.ErrCannotPauseDDLJob.GenWithStackByArgs(job.ID)
if err != nil {
return err
}
}

job.State = model.JobStatePausing
job.AdminOperator = byWho

return json.Unmarshal(job.RawArgs, &job.Args)
}

// resumePausedJob check and resume the Paused Job
func resumePausedJob(se *sess.Session, job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if !job.IsResumable() ||
// The Paused job should only be resumed by who paused it
job.AdminOperator != byWho {
return dbterror.ErrCannotResumeDDLJob.GenWithStackByArgs(job.ID)
}

job.State = model.JobStateQueueing

return json.Unmarshal(job.RawArgs, &job.Args)
}

// processJobs command on the Job according to the process
func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
sessCtx sessionctx.Context,
ids []int64,
byWho model.AdminCommandOperator) ([]error, error) {
failpoint.Inject("mockFailedCommandOnConcurencyDDL", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mock commit error"))
}
})

if len(ids) == 0 {
return nil, nil
}
var jobMap = make(map[int64]int) // jobID -> error index

sessCtx := sess.NewSession(se)
err := sessCtx.Begin()
if err != nil {
return nil, err
}
ns := sess.NewSession(sessCtx)
var errs []error

idsStr := make([]string, 0, len(ids))
for idx, id := range ids {
jobMap[id] = idx
idsStr = append(idsStr, strconv.FormatInt(id, 10))
}
// We should process (and try) all the jobs in one Transaction.
for tryN := uint(0); tryN < 10; tryN += 1 {
errs = make([]error, len(ids))
// Need to figure out which one could not be paused
jobMap := make(map[int64]int, len(ids))
idsStr := make([]string, 0, len(ids))
for idx, id := range ids {
jobMap[id] = idx
idsStr = append(idsStr, strconv.FormatInt(id, 10))
}

jobs, err := getJobsBySQL(sessCtx, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", ")))
if err != nil {
sessCtx.Rollback()
return nil, err
}
err := ns.Begin()
if err != nil {
return nil, err
}
jobs, err := getJobsBySQL(ns, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", ")))
if err != nil {
ns.Rollback()
return nil, err
}

errs := make([]error, len(ids))
for _, job := range jobs {
i, ok := jobMap[job.ID]
if !ok {
logutil.BgLogger().Debug("Job ID from meta is not consistent with requested job id,",
zap.Int64("fetched job ID", job.ID))
errs[i] = dbterror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
continue
}
delete(jobMap, job.ID)

for _, job := range jobs {
i, ok := jobMap[job.ID]
if !ok {
logutil.BgLogger().Debug("the job that needs to be canceled isn't equal to current job",
zap.Int64("need to canceled job ID", job.ID),
zap.Int64("current job ID", job.ID))
continue
err = process(ns, job, byWho)
if err != nil {
errs[i] = err
break
}

err = updateDDLJob2Table(ns, job, true)
if err != nil {
break
}
}
delete(jobMap, job.ID)
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
errs[i] = dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID)
// We may meet some error on job update, try it again
if err != nil {
ns.Rollback()
continue
}
// If the state is rolling back, it means the work is cleaning the data after cancelling the job.
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {

// There may be some conflict during the update, try it again
if ns.Commit() != nil {
continue
}
if !job.IsRollbackable() {
errs[i] = dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
continue

for id, idx := range jobMap {
errs[idx] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id)
}
job.State = model.JobStateCancelling
// Make sure RawArgs isn't overwritten.
err := json.Unmarshal(job.RawArgs, &job.Args)

break
}
return errs, nil
}

// CancelJobs cancels the DDL jobs according to user command.
func CancelJobs(se sessionctx.Context, ids []int64) (errs []error, err error) {
return processJobs(cancelRunningJob, se, ids, model.AdminCommandByEndUser)
}

// PauseJobs pause all the DDL jobs according to user command.
func PauseJobs(se sessionctx.Context, ids []int64) ([]error, error) {
return processJobs(pauseRunningJob, se, ids, model.AdminCommandByEndUser)
}

// ResumeJobs resume all the DDL jobs according to user command.
func ResumeJobs(se sessionctx.Context, ids []int64) ([]error, error) {
return processJobs(resumePausedJob, se, ids, model.AdminCommandByEndUser)
}

// CancelJobsBySystem cancels Jobs because of internal reasons.
func CancelJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) {
return processJobs(cancelRunningJob, se, ids, model.AdminCommandBySystem)
}

// PauseJobsBySystem pauses Jobs because of internal reasons.
func PauseJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) {
return processJobs(pauseRunningJob, se, ids, model.AdminCommandBySystem)
}

// ResumeJobsBySystem resumes Jobs that are paused by TiDB itself.
func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) {
return processJobs(resumePausedJob, se, ids, model.AdminCommandBySystem)
}

// pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage.
func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) {
var err error
var jobErrs = make(map[int64]error)

ns := sess.NewSession(se)
err = ns.Begin()
if err != nil {
return nil, err
}

var jobID int64 = 0
var jobIDMax int64 = 0
var limit int = 100
for {
var jobs []*model.Job
jobs, err = getJobsBySQL(ns, JobTable,
fmt.Sprintf("job_id >= %s order by job_id asc limit %s",
strconv.FormatInt(jobID, 10),
strconv.FormatInt(int64(limit), 10)))
if err != nil {
errs[i] = errors.Trace(err)
continue
ns.Rollback()
return nil, err
}
err = updateDDLJob2Table(sessCtx, job, true)
if err != nil {
errs[i] = errors.Trace(err)

for _, job := range jobs {
err = process(ns, job, byWho)
if err != nil {
jobErrs[job.ID] = err
ns.Rollback()
return jobErrs, err
}
err = updateDDLJob2Table(ns, job, true)
if err != nil {
ns.Rollback()
return jobErrs, err
}
}

// Just in case the job ID is not sequential
if jobs[len(jobs)-1].ID > jobIDMax {
jobIDMax = jobs[len(jobs)-1].ID
}

// If rows returned is smaller than $limit, then there is no more records
if len(jobs) < limit {
break
}

jobID = jobIDMax + 1
}
err = sessCtx.Commit()

err = ns.Commit()
if err != nil {
return nil, err
}
for id, idx := range jobMap {
errs[idx] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id)
}
return errs, nil
return jobErrs, nil
}

// PauseAllJobsBySystem pauses all running Jobs because of internal reasons.
func PauseAllJobsBySystem(se sessionctx.Context) (map[int64]error, error) {
return processAllJobs(pauseRunningJob, se, model.AdminCommandBySystem)
}

// ResumeAllJobsBySystem resumes all paused Jobs because of internal reasons.
func ResumeAllJobsBySystem(se sessionctx.Context) (map[int64]error, error) {
return processAllJobs(resumePausedJob, se, model.AdminCommandBySystem)
}

// GetAllDDLJobs get all DDL jobs and sorts jobs by job.ID.
Expand Down
Loading

0 comments on commit 8565dbe

Please sign in to comment.