Skip to content

Commit

Permalink
ddl: add recover for run ddl job (#10981) (#11022)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and winkyao committed Jul 2, 2019
1 parent f865967 commit e04723a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 2 deletions.
14 changes: 13 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl/util"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -397,7 +399,14 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {

// If running job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
schemaVer, runJobErr = w.runDDLJob(d, t, job)
tidbutil.WithRecovery(func() {
schemaVer, runJobErr = w.runDDLJob(d, t, job)
}, func(r interface{}) {
if r != nil {
// If run ddl job panic, just cancel the ddl jobs.
job.State = model.JobStateCancelling
}
})
if job.IsCancelled() {
txn.Reset()
err = w.finishDDLJob(t, job)
Expand Down Expand Up @@ -469,6 +478,9 @@ func chooseLeaseTime(t, max time.Duration) time.Duration {

// runDDLJob runs a DDL job. It returns the current schema version in this transaction and the error.
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Mock for run ddl job panic.
failpoint.Inject("mockPanicInRunDDLJob", func(_ failpoint.Value) {})

logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String()))
timeStart := time.Now()
defer func() {
Expand Down
14 changes: 14 additions & 0 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,17 @@ LOOP:
tk.MustExec("admin check table test_add_index")
tk.MustExec("drop table test_add_index")
}

// TestRunDDLJobPanic tests recover panic when run ddl job panic.
func (s *testFailDBSuite) TestRunDDLJobPanic(c *C) {
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockPanicInRunDDLJob"), IsNil)
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockPanicInRunDDLJob", `1*panic("panic test")`), IsNil)
_, err := tk.Exec("create table t(c1 int, c2 int)")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}
2 changes: 1 addition & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _

func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error {
// d.infoHandle maybe nil in some test.
if d.infoHandle == nil {
if d.infoHandle == nil || !d.infoHandle.IsValid() {
return checkTableNotExistsFromStore(t, schemaID, tableName)
}
// Try to use memory schema info to check first.
Expand Down
5 changes: 5 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ func (h *Handle) Get() InfoSchema {
return schema
}

// IsValid uses to check whether handle value is valid.
func (h *Handle) IsValid() bool {
return h.value.Load() != nil
}

// EmptyClone creates a new Handle with the same store and memSchema, but the value is not set.
func (h *Handle) EmptyClone() *Handle {
newHandle := &Handle{
Expand Down

0 comments on commit e04723a

Please sign in to comment.