Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix canceling add index and add column, port from #8171 #8513

Merged
merged 6 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,15 @@ func (d *ddl) createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnIn
return colInfo, position, nil
}

func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = d.onDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
Expand Down Expand Up @@ -256,7 +264,11 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
}
default:
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
}
Expand Down
55 changes: 55 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,61 @@ LOOP:
ddl.ReorgWaitTimeout = oldReorgWaitTimeout
}

// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started.
func (s *testDBSuite) TestCancelAddIndex1(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")
for i := 0; i < 50; i++ {
s.mustExec(c, "insert into t values (?, ?)", i, i)
}
var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = hookCtx.Txn(true).Commit(context.Background())
}
}
s.dom.DDL().SetHook(hook)
rs, err := s.tk.Exec("alter table t add index idx_c2(c2)")
if rs != nil {
rs.Close()
}
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
s.dom.DDL().SetHook(&ddl.TestDDLCallback{})
t := s.testGetTable(c, "t")
for _, idx := range t.Indices() {
c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c2"), IsFalse)
}

s.mustExec(c, "alter table t add index idx_c2(c2)")
s.mustExec(c, "alter table t drop index idx_c2")
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
15 changes: 15 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,18 @@ func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, t
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testAddColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, args []interface{}) *model.Job {
job := &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddColumn,
Args: args,
BinlogInfo: &model.HistoryInfo{},
}
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}
13 changes: 2 additions & 11 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,16 +348,7 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64, err error) {
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
log.Infof("[ddl] run the cancelling DDL job %s", job)
d.reorgCtx.notifyReorgCancel()
} else {
job.State = model.JobStateCancelled
job.Error = errCancelledDDLJob
job.ErrorCount++
return
}
return convertJob2RollbackJob(d, t, job)
}

if !job.IsRollingback() && !job.IsCancelling() {
Expand Down Expand Up @@ -581,7 +572,7 @@ func (d *ddl) cleanAddIndexQueueJobs(txn kv.Transaction) error {
return errors.Trace(err)
}
indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
_, err = d.convert2RollbackJob(m, job, tblInfo, indexInfo, nil)
_, err = convertAddIdxJob2RollbackJob(m, job, tblInfo, indexInfo, nil)
if err == nil {
_, err = m.DeQueueDDLJob()
}
Expand Down
115 changes: 88 additions & 27 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -439,10 +440,7 @@ func doDDLJobErr(c *C, schemaID, tableID int64, tp model.ActionType, args []inte

func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) error {
var checkErr error
addIndexFirstReorg := test.act == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0
// If the action is adding index and the state is writing reorganization, it wants to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexFirstReorg, the worker hasn't started to backfill indexes.
if test.cancelState == job.SchemaState && !addIndexFirstReorg {
if test.cancelState == job.SchemaState {
if job.SchemaState == model.StateNone && job.State != model.JobStateDone {
// If the schema state is none, we only test the job is finished.
} else {
Expand All @@ -452,7 +450,7 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e
return checkErr
}
// It only tests cancel one DDL job.
if errs[0] != test.cancelRetErrs[0] {
if !terror.ErrorEqual(errs[0], test.cancelRetErrs[0]) {
checkErr = errors.Trace(errs[0])
return checkErr
}
Expand All @@ -474,22 +472,51 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
errs := []error{err}
noErrs := []error{nil}
tests := []testCancelJob{
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: errs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err},

// TODO: after fix drop index and create table rollback bug, the below test cases maybe need to change.
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 5}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 6}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 7}, cancelRetErrs: errs, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},

{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 8)}, cancelState: model.StateNone, ddlRetErr: err},

// TODO: add create table back after we fix the cancel bug.
//{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},
}

return tests
}

func (s *testDDLSuite) checkAddIdx(c *C, d *ddl, schemaID int64, tableID int64, idxName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
for _, idxInfo := range changedTable.Meta().Indices {
if idxInfo.Name.O == idxName {
found = true
break
}
}
c.Assert(found, Equals, success)
}
func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
for _, colInfo := range changedTable.Meta().Columns {
if colInfo.Name.O == colName {
found = true
break
}
}
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) TestCancelJob(c *C) {
store := testCreateStore(c, "test_cancel_job")
defer store.Close()
Expand Down Expand Up @@ -519,47 +546,60 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
var checkErr error
var test *testCancelJob
tc.onJobUpdated = func(job *model.Job) {
if job.State == model.JobStateSynced || job.State == model.JobStateCancelled || job.State == model.JobStateCancelling {
return
}

if checkErr != nil {
return
}
hookCtx := mock.NewContext()
hookCtx.Store = store
var err error
err = hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
var err1 error
err1 = hookCtx.NewTxn()
if err1 != nil {
checkErr = errors.Trace(err1)
return
}
checkErr = checkCancelState(hookCtx.Txn(true), job, test)
if checkErr != nil {
return
}
checkCancelState(hookCtx.Txn(true), job, test)
err = hookCtx.Txn(true).Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
err1 = hookCtx.Txn(true).Commit(context.Background())
if err1 != nil {
checkErr = errors.Trace(err1)
return
}
}
d.SetHook(tc)

// for adding index
test = &tests[0]
validArgs := []interface{}{false, model.NewCIStr("idx"),
idxOrigName := "idx"
validArgs := []interface{}{false, model.NewCIStr(idxOrigName),
[]*ast.IndexColName{{
Column: &ast.ColumnName{Name: model.NewCIStr("c1")},
Length: -1,
}}, nil}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState)
// When the job satisfies this test case, the option will be rollback, so the job's schema state is none.
cancelState := model.StateNone
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[1]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState)
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[2]
// When the job satisfies this test case, the option will be rollback, so the job's schema state is none.
cancelState := model.StateNone
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[3]
testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2")
c.Check(errors.ErrorStack(checkErr), Equals, "")
c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil)
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}
Expand All @@ -576,10 +616,31 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testDropIndex(c, ctx, d, dbInfo, tblInfo, "idx")
c.Check(errors.ErrorStack(checkErr), Equals, "")

// for creating table
// for add column
test = &tests[8]
tblInfo = testTableInfo(c, d, "t1", 3)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
addingColName := "colA"
newColumnDef := &ast.ColumnDef{
Name: &ast.ColumnName{Name: model.NewCIStr(addingColName)},
Tp: &types.FieldType{Tp: mysql.TypeLonglong},
Options: []*ast.ColumnOption{},
}
col, _, err := buildColumnAndConstraint(ctx, 2, newColumnDef)
addColumnArgs := []interface{}{col, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 0}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[9]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[10]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[11]
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
28 changes: 3 additions & 25 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
}
if kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) {
log.Warnf("[ddl] run DDL job %v err %v, convert job to rollback job", job, err)
ver, err = d.convert2RollbackJob(t, job, tblInfo, indexInfo, err)
ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
d.reorgCtx.cleanNotifyReorgCancel()
Expand All @@ -300,29 +300,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
default:
err = ErrInvalidIndexState.Gen("invalid index state %v", tblInfo.State)
}

return ver, errors.Trace(err)
}

func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) {
job.State = model.JobStateRollingback
job.Args = []interface{}{indexInfo.Name}
// If add index job rollbacks in write reorganization state, its need to delete all keys which has been added.
// Its work is the same as drop index job do.
// The write reorganization state in add index job that likes write only state in drop index job.
// So the next state is delete only state.
indexInfo.State = model.StateDeleteOnly
originalState := indexInfo.State
job.SchemaState = model.StateDeleteOnly
ver, err1 := updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err1 != nil {
return ver, errors.Trace(err1)
}

if kv.ErrKeyExists.Equal(err) {
return ver, kv.ErrKeyExists.Gen("Duplicate for key %s", indexInfo.Name.O)
err = ErrInvalidIndexState.Gen("invalid index state %v", indexInfo.State)
}

return ver, errors.Trace(err)
Expand Down Expand Up @@ -390,7 +368,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.Args = append(job.Args, indexInfo.ID)
}
default:
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
err = ErrInvalidTableState.Gen("invalid index state %v", indexInfo.State)
}
return ver, errors.Trace(err)
}
Expand Down
Loading