Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add recover for run ddl job (#10981) #11022

Merged
merged 3 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl/util"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -397,7 +399,14 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {

// If running job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
schemaVer, runJobErr = w.runDDLJob(d, t, job)
tidbutil.WithRecovery(func() {
schemaVer, runJobErr = w.runDDLJob(d, t, job)
}, func(r interface{}) {
if r != nil {
// If run ddl job panic, just cancel the ddl jobs.
job.State = model.JobStateCancelling
}
})
if job.IsCancelled() {
txn.Reset()
err = w.finishDDLJob(t, job)
Expand Down Expand Up @@ -469,6 +478,9 @@ func chooseLeaseTime(t, max time.Duration) time.Duration {

// runDDLJob runs a DDL job. It returns the current schema version in this transaction and the error.
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Mock for run ddl job panic.
failpoint.Inject("mockPanicInRunDDLJob", func(_ failpoint.Value) {})

logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String()))
timeStart := time.Now()
defer func() {
Expand Down
14 changes: 14 additions & 0 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,17 @@ LOOP:
tk.MustExec("admin check table test_add_index")
tk.MustExec("drop table test_add_index")
}

// TestRunDDLJobPanic tests recover panic when run ddl job panic.
func (s *testFailDBSuite) TestRunDDLJobPanic(c *C) {
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockPanicInRunDDLJob"), IsNil)
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockPanicInRunDDLJob", `1*panic("panic test")`), IsNil)
_, err := tk.Exec("create table t(c1 int, c2 int)")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}
2 changes: 1 addition & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _

func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error {
// d.infoHandle maybe nil in some test.
if d.infoHandle == nil {
if d.infoHandle == nil || !d.infoHandle.IsValid() {
return checkTableNotExistsFromStore(t, schemaID, tableName)
}
// Try to use memory schema info to check first.
Expand Down
5 changes: 5 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ func (h *Handle) Get() InfoSchema {
return schema
}

// IsValid uses to check whether handle value is valid.
func (h *Handle) IsValid() bool {
return h.value.Load() != nil
}

// EmptyClone creates a new Handle with the same store and memSchema, but the value is not set.
func (h *Handle) EmptyClone() *Handle {
newHandle := &Handle{
Expand Down