Skip to content

Commit

Permalink
ddl: fix ddl hang over when it meets panic in cancelling path (#23204)
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Mar 13, 2021
1 parent b534ec7 commit 46f5f21
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 22 deletions.
55 changes: 55 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
package ddl_test

import (
"context"
"errors"
"time"

. "github.com/pingcap/check"
errors2 "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
parser_mysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
mysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -1718,3 +1722,54 @@ func (s *testColumnTypeChangeSuite) TestChangingAttributeOfColumnWithFK(c *C) {

tk.MustExec("drop table if exists orders, users")
}

// Close issue #23202
func (s *testColumnTypeChangeSuite) TestDDLExitWhenCancelMeetPanic(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1,1),(2,2)")
tk.MustExec("alter table t add index(b)")
tk.MustExec("set @@global.tidb_ddl_error_count_limit=3")

failpoint.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`)
defer func() {
failpoint.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit")
}()

originalHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)

hook := &ddl.TestDDLCallback{Do: s.dom}
var jobID int64
hook.OnJobRunBeforeExported = func(job *model.Job) {
if jobID != 0 {
return
}
if job.Type == model.ActionDropIndex {
jobID = job.ID
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)

// when it panics in write-reorg state, the job will be pulled up as a cancelling job. Since drop-index with
// write-reorg can't be cancelled, so it will be converted to running state and try again (dead loop).
_, err := tk.Exec("alter table t drop index b")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled")
c.Assert(jobID > 0, Equals, true)

// Verification of the history job state.
var job *model.Job
err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(jobID)
return errors2.Trace(err1)
})
c.Assert(err, IsNil)
c.Assert(job.ErrorCount, Equals, int64(4))
c.Assert(job.Error.Error(), Equals, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled")
}
69 changes: 47 additions & 22 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,55 @@ func chooseLeaseTime(t, max time.Duration) time.Duration {
return t
}

// countForPanic records the error count for DDL job.
func (w *worker) countForPanic(job *model.Job) {
// If run DDL job panic, just cancel the DDL jobs.
job.State = model.JobStateCancelling
job.ErrorCount++

// Load global DDL variables.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
errorCount := variable.GetDDLErrorCountLimit()

if job.ErrorCount > errorCount {
msg := fmt.Sprintf("panic in handling DDL logic and error count beyond the limitation %d, cancelled", errorCount)
logutil.Logger(w.logCtx).Warn(msg)
job.Error = toTError(errors.New(msg))
job.State = model.JobStateCancelled
}
}

// countForError records the error count for DDL job.
func (w *worker) countForError(err error, job *model.Job) error {
job.Error = toTError(err)
job.ErrorCount++

// If job is cancelled, we shouldn't return an error and shouldn't load DDL variables.
if job.State == model.JobStateCancelled {
logutil.Logger(w.logCtx).Info("[ddl] DDL job is cancelled normally", zap.Error(err))
return nil
}
logutil.Logger(w.logCtx).Error("[ddl] run DDL job error", zap.Error(err))

// Load global DDL variables.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) {
logutil.Logger(w.logCtx).Warn("[ddl] DDL job error count exceed the limit, cancelling it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit()))
job.State = model.JobStateCancelling
}
return err
}

// runDDLJob runs a DDL job. It returns the current schema version in this transaction and the error.
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
defer tidbutil.Recover(metrics.LabelDDLWorker, fmt.Sprintf("%s runDDLJob", w),
func() {
// If run DDL job panic, just cancel the DDL jobs.
job.State = model.JobStateCancelling
w.countForPanic(job)
}, false)

// Mock for run ddl job panic.
Expand Down Expand Up @@ -743,27 +786,9 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
err = errInvalidDDLJob.GenWithStack("invalid ddl job type: %v", job.Type)
}

// Save errors in job, so that others can know errors happened.
// Save errors in job if any, so that others can know errors happened.
if err != nil {
job.Error = toTError(err)
job.ErrorCount++

// If job is cancelled, we shouldn't return an error and shouldn't load DDL variables.
if job.State == model.JobStateCancelled {
logutil.Logger(w.logCtx).Info("[ddl] DDL job is cancelled normally", zap.Error(err))
return ver, nil
}
logutil.Logger(w.logCtx).Error("[ddl] run DDL job error", zap.Error(err))

// Load global ddl variables.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) {
logutil.Logger(w.logCtx).Warn("[ddl] DDL job error count exceed the limit, cancelling it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit()))
job.State = model.JobStateCancelling
}
err = w.countForError(err, job)
}
return
}
Expand Down
6 changes: 6 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,12 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)

failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
panic("panic test in cancelling add index")
}
})

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]

ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != model.StateNone)
Expand Down

0 comments on commit 46f5f21

Please sign in to comment.