Skip to content

Commit

Permalink
ddl: fix cancel drop index error (pingcap#8504) (pingcap#9495)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and zimulala committed Feb 28, 2019
1 parent 128052d commit b6a0381
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 64 deletions.
5 changes: 3 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/charset"
"github.com/pingcap/tidb/util/schemautil"
)

func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt) (err error) {
Expand Down Expand Up @@ -1720,7 +1721,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, unique bool, ind
indexName = getAnonymousIndex(t, idxColNames[0].Column.Name)
}

if indexInfo := findIndexByName(indexName.L, t.Meta().Indices); indexInfo != nil {
if indexInfo := schemautil.FindIndexByName(indexName.L, t.Meta().Indices); indexInfo != nil {
return ErrDupKeyName.Gen("index already exist %s", indexName)
}

Expand Down Expand Up @@ -1844,7 +1845,7 @@ func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
return errors.Trace(infoschema.ErrTableNotExists.GenByArgs(ti.Schema, ti.Name))
}

if indexInfo := findIndexByName(indexName.L, t.Meta().Indices); indexInfo == nil {
if indexInfo := schemautil.FindIndexByName(indexName.L, t.Meta().Indices); indexInfo == nil {
return ErrCantDropFieldOrKey.Gen("index %s doesn't exist", indexName)
}

Expand Down
96 changes: 96 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/schemautil"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -506,6 +507,101 @@ func (s *testDBSuite) TestCancelAddIndex1(c *C) {
s.mustExec(c, "alter table t drop index idx_c2")
}

// TestCancelDropIndex tests cancel ddl job which type is drop index.
func (s *testDBSuite) TestCancelDropIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")
for i := 0; i < 5; i++ {
s.mustExec(c, "insert into t values (?, ?)", i, i)
}

testCases := []struct {
needAddIndex bool
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
// model.JobStateNone means the jobs is canceled before the first run.
{true, model.JobStateNone, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, true},
{false, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
}

var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropIndex && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobID = job.ID
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}

if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}

checkErr = txn.Commit(context.Background())
}
}
s.dom.DDL().SetHook(hook)
for i := range testCases {
testCase = &testCases[i]
if testCase.needAddIndex {
s.mustExec(c, "alter table t add index idx_c2(c2)")
}
rs, err := s.tk.Exec("alter table t drop index idx_c2")
if rs != nil {
rs.Close()
}

t := s.testGetTable(c, "t")
indexInfo := schemautil.FindIndexByName("idx_c2", t.Meta().Indices)
if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")

c.Assert(indexInfo, NotNil)
c.Assert(indexInfo.State, Equals, model.StatePublic)
} else {
err1 := admin.ErrCannotCancelDDLJob.GenByArgs(jobID)
c.Assert(err, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, err1.Error())

c.Assert(indexInfo, IsNil)
}
}
s.dom.DDL().SetHook(&ddl.TestDDLCallback{})
s.mustExec(c, "alter table t add index idx_c2(c2)")
s.mustExec(c, "alter table t drop index idx_c2")
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/schemautil"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -571,7 +572,7 @@ func (d *ddl) cleanAddIndexQueueJobs(txn kv.Transaction) error {
if err != nil {
return errors.Trace(err)
}
indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
_, err = convertAddIdxJob2RollbackJob(m, job, tblInfo, indexInfo, nil)
if err == nil {
_, err = m.DeQueueDDLJob()
Expand Down
58 changes: 19 additions & 39 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,30 +469,26 @@ type testCancelJob struct {

func buildCancelJobTests(firstID int64) []testCancelJob {
err := errCancelledDDLJob
errs := []error{err}
noErrs := []error{nil}
tests := []testCancelJob{
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err},

// TODO: after fix drop index and create table rollback bug, the below test cases maybe need to change.
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 5}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 6}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 7}, cancelRetErrs: errs, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 8)}, cancelState: model.StateNone, ddlRetErr: err},
// Test cancel drop index job , see TestCancelDropIndex.

// TODO: add create table back after we fix the cancel bug.
//{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionDropColumn, jobIDs: []int64{firstID + 13}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 13)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 14}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 14)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 15}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 15)}, cancelState: model.StateDeleteReorganization, ddlRetErr: err},

{act: model.ActionAddColumn, jobIDs: []int64{firstID + 5}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 6}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 7}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 8)}, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionDropColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 9)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 10)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 11)}, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
}

return tests
Expand Down Expand Up @@ -626,23 +622,8 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
c.Assert(txn.Commit(context.Background()), IsNil)
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}
test = &tests[4]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[5]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[6]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[7]
testDropIndex(c, ctx, d, dbInfo, tblInfo, "idx")
c.Check(errors.ErrorStack(checkErr), Equals, "")

// for add column
test = &tests[8]
test = &tests[4]
addingColName := "colA"
newColumnDef := &ast.ColumnDef{
Name: &ast.ColumnName{Name: model.NewCIStr(addingColName)},
Expand All @@ -655,39 +636,38 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[9]
test = &tests[5]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[10]
test = &tests[6]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[11]
test = &tests[7]
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)

// for drop column.
test = &tests[12]
test = &tests[8]
dropColName := "c1"
dropColumnArgs := []interface{}{model.NewCIStr(dropColName)}
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testCancelDropColumn(c, ctx, d, dbInfo, tblInfo, dropColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

test = &tests[13]

dropColName = "c2"
test = &tests[9]
dropColName = "c3"
dropColumnArgs = []interface{}{model.NewCIStr(dropColName)}
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testCancelDropColumn(c, ctx, d, dbInfo, tblInfo, dropColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

test = &tests[14]
dropColName = "c3"
test = &tests[10]
dropColName = "c4"
dropColumnArgs = []interface{}{model.NewCIStr(dropColName)}
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testCancelDropColumn(c, ctx, d, dbInfo, tblInfo, dropColumnArgs)
Expand Down
14 changes: 3 additions & 11 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/schemautil"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -206,7 +207,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo != nil && indexInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
return ver, ErrDupKeyName.Gen("index already exist %s", indexName)
Expand Down Expand Up @@ -319,7 +320,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.Gen("index %s doesn't exist", indexName)
Expand Down Expand Up @@ -961,15 +962,6 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
return d.backfillKVRangesIndex(t, workers, kvRanges, job, reorgInfo)
}

func findIndexByName(idxName string, indices []*model.IndexInfo) *model.IndexInfo {
for _, idx := range indices {
if idx.Name.L == idxName {
return idx
}
}
return nil
}

func allocateIndexID(tblInfo *model.TableInfo) int64 {
tblInfo.MaxIndexID++
return tblInfo.MaxIndexID
Expand Down
50 changes: 49 additions & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/util/schemautil"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -63,7 +64,7 @@ func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredE
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
Expand Down Expand Up @@ -106,6 +107,51 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
}
return ver, errCancelledDDLJob
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}

var indexName model.CIStr
err = job.DecodeArgs(&indexName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.Gen("index %s doesn't exist", indexName)
}

originalState := indexInfo.State
switch indexInfo.State {
case model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone:
// We can not rollback now, so just continue to drop index.
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
job.State = model.JobStateRunning
return ver, nil
case model.StatePublic, model.StateWriteOnly:
job.State = model.JobStateRollbackDone
indexInfo.State = model.StatePublic
default:
return ver, ErrInvalidIndexState.Gen("invalid index state %v", indexInfo.State)
}

job.SchemaState = indexInfo.State
job.Args = []interface{}{indexInfo.Name}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}

func rollingbackAddindex(d *ddl, t *meta.Meta, job *model.Job) (ver int64, err error) {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
Expand Down Expand Up @@ -160,6 +206,8 @@ func convertJob2RollbackJob(d *ddl, t *meta.Meta, job *model.Job) (ver int64, er
ver, err = rollingbackAddindex(d, t, job)
case model.ActionDropColumn:
ver, err = rollingbackDropColumn(t, job)
case model.ActionDropIndex:
ver, err = rollingbackDropIndex(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
Loading

0 comments on commit b6a0381

Please sign in to comment.