diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 9c46fd727f1ce..fb82096dd3755 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -14,10 +14,13 @@ package ddl_test import ( + "context" "errors" "time" . "github.com/pingcap/check" + errors2 "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" parser_mysql "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" @@ -25,6 +28,7 @@ import ( "github.com/pingcap/tidb/domain" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/helper" @@ -1718,3 +1722,54 @@ func (s *testColumnTypeChangeSuite) TestChangingAttributeOfColumnWithFK(c *C) { tk.MustExec("drop table if exists orders, users") } + +// Close issue #23202 +func (s *testColumnTypeChangeSuite) TestDDLExitWhenCancelMeetPanic(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2)") + tk.MustExec("alter table t add index(b)") + tk.MustExec("set @@global.tidb_ddl_error_count_limit=3") + + failpoint.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`) + defer func() { + failpoint.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit") + }() + + originalHook := s.dom.DDL().GetHook() + defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) + + hook := &ddl.TestDDLCallback{Do: s.dom} + var jobID int64 + hook.OnJobRunBeforeExported = func(job *model.Job) { + if jobID != 0 { + return + } + if job.Type == model.ActionDropIndex { + jobID = job.ID + } + } + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + + // when it panics in write-reorg state, the job will be pulled up as a cancelling job. Since drop-index with + // write-reorg can't be cancelled, so it will be converted to running state and try again (dead loop). + _, err := tk.Exec("alter table t drop index b") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled") + c.Assert(jobID > 0, Equals, true) + + // Verification of the history job state. + var job *model.Job + err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { + t := meta.NewMeta(txn) + var err1 error + job, err1 = t.GetHistoryDDLJob(jobID) + return errors2.Trace(err1) + }) + c.Assert(err, IsNil) + c.Assert(job.ErrorCount, Equals, int64(4)) + c.Assert(job.Error.Error(), Equals, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled") +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index c1fd476ba6801..1c263821b932b 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -621,12 +621,55 @@ func chooseLeaseTime(t, max time.Duration) time.Duration { return t } +// countForPanic records the error count for DDL job. +func (w *worker) countForPanic(job *model.Job) { + // If run DDL job panic, just cancel the DDL jobs. + job.State = model.JobStateCancelling + job.ErrorCount++ + + // Load global DDL variables. + if err1 := loadDDLVars(w); err1 != nil { + logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1)) + } + errorCount := variable.GetDDLErrorCountLimit() + + if job.ErrorCount > errorCount { + msg := fmt.Sprintf("panic in handling DDL logic and error count beyond the limitation %d, cancelled", errorCount) + logutil.Logger(w.logCtx).Warn(msg) + job.Error = toTError(errors.New(msg)) + job.State = model.JobStateCancelled + } +} + +// countForError records the error count for DDL job. +func (w *worker) countForError(err error, job *model.Job) error { + job.Error = toTError(err) + job.ErrorCount++ + + // If job is cancelled, we shouldn't return an error and shouldn't load DDL variables. + if job.State == model.JobStateCancelled { + logutil.Logger(w.logCtx).Info("[ddl] DDL job is cancelled normally", zap.Error(err)) + return nil + } + logutil.Logger(w.logCtx).Error("[ddl] run DDL job error", zap.Error(err)) + + // Load global DDL variables. + if err1 := loadDDLVars(w); err1 != nil { + logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1)) + } + // Check error limit to avoid falling into an infinite loop. + if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) { + logutil.Logger(w.logCtx).Warn("[ddl] DDL job error count exceed the limit, cancelling it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit())) + job.State = model.JobStateCancelling + } + return err +} + // runDDLJob runs a DDL job. It returns the current schema version in this transaction and the error. func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { defer tidbutil.Recover(metrics.LabelDDLWorker, fmt.Sprintf("%s runDDLJob", w), func() { - // If run DDL job panic, just cancel the DDL jobs. - job.State = model.JobStateCancelling + w.countForPanic(job) }, false) // Mock for run ddl job panic. @@ -743,27 +786,9 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err = errInvalidDDLJob.GenWithStack("invalid ddl job type: %v", job.Type) } - // Save errors in job, so that others can know errors happened. + // Save errors in job if any, so that others can know errors happened. if err != nil { - job.Error = toTError(err) - job.ErrorCount++ - - // If job is cancelled, we shouldn't return an error and shouldn't load DDL variables. - if job.State == model.JobStateCancelled { - logutil.Logger(w.logCtx).Info("[ddl] DDL job is cancelled normally", zap.Error(err)) - return ver, nil - } - logutil.Logger(w.logCtx).Error("[ddl] run DDL job error", zap.Error(err)) - - // Load global ddl variables. - if err1 := loadDDLVars(w); err1 != nil { - logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1)) - } - // Check error limit to avoid falling into an infinite loop. - if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) { - logutil.Logger(w.logCtx).Warn("[ddl] DDL job error count exceed the limit, cancelling it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit())) - job.State = model.JobStateCancelling - } + err = w.countForError(err, job) } return } diff --git a/ddl/index.go b/ddl/index.go index ebcdaddf48060..1e2423f82ef19 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -666,6 +666,12 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { // Set column index flag. dropIndexColumnFlag(tblInfo, indexInfo) + failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) { + if val.(bool) { + panic("panic test in cancelling add index") + } + }) + tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)] ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != model.StateNone)