Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix cancel drop index error (#8504) #9486

Merged
merged 2 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/schemautil"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -626,6 +627,101 @@ func (s *testDBSuite) TestCancelAddIndex1(c *C) {
s.mustExec(c, "alter table t drop index idx_c2")
}

// TestCancelDropIndex tests cancel ddl job which type is drop index.
func (s *testDBSuite) TestCancelDropIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")
for i := 0; i < 5; i++ {
s.mustExec(c, "insert into t values (?, ?)", i, i)
}

testCases := []struct {
needAddIndex bool
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
// model.JobStateNone means the jobs is canceled before the first run.
{true, model.JobStateNone, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, true},
{false, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
}

var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropIndex && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobID = job.ID
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}

if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}

checkErr = txn.Commit(context.Background())
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
for i := range testCases {
testCase = &testCases[i]
if testCase.needAddIndex {
s.mustExec(c, "alter table t add index idx_c2(c2)")
}
rs, err := s.tk.Exec("alter table t drop index idx_c2")
if rs != nil {
rs.Close()
}

t := s.testGetTable(c, "t")
indexInfo := schemautil.FindIndexByName("idx_c2", t.Meta().Indices)
if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")

c.Assert(indexInfo, NotNil)
c.Assert(indexInfo.State, Equals, model.StatePublic)
} else {
err1 := admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID)
c.Assert(err, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, err1.Error())

c.Assert(indexInfo, IsNil)
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{})
s.mustExec(c, "alter table t add index idx_c2(c2)")
s.mustExec(c, "alter table t drop index idx_c2")
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
5 changes: 3 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/schemautil"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -2236,7 +2237,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, unique bool, ind
indexName = getAnonymousIndex(t, idxColNames[0].Column.Name)
}

if indexInfo := findIndexByName(indexName.L, t.Meta().Indices); indexInfo != nil {
if indexInfo := schemautil.FindIndexByName(indexName.L, t.Meta().Indices); indexInfo != nil {
return ErrDupKeyName.GenWithStack("index already exist %s", indexName)
}

Expand Down Expand Up @@ -2361,7 +2362,7 @@ func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
}

if indexInfo := findIndexByName(indexName.L, t.Meta().Indices); indexInfo == nil {
if indexInfo := schemautil.FindIndexByName(indexName.L, t.Meta().Indices); indexInfo == nil {
return ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

Expand Down
51 changes: 16 additions & 35 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,30 +331,26 @@ type testCancelJob struct {

func buildCancelJobTests(firstID int64) []testCancelJob {
err := errCancelledDDLJob
errs := []error{err}
noErrs := []error{nil}
tests := []testCancelJob{
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err},

// TODO: after fix drop index and create table rollback bug, the below test cases maybe need to change.
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 5}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 6}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 7}, cancelRetErrs: errs, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 8)}, cancelState: model.StateNone, ddlRetErr: err},
// Test cancel drop index job , see TestCancelDropIndex.

// TODO: add create table back after we fix the cancel bug.
//{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionAddColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 5}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 6}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 7}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 8)}, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionDropColumn, jobIDs: []int64{firstID + 13}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 13)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 14}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 14)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 15}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 15)}, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 9)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 10)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 11)}, cancelState: model.StateWriteReorganization, ddlRetErr: err},
}

return tests
Expand Down Expand Up @@ -489,23 +485,8 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
c.Assert(txn.Commit(context.Background()), IsNil)
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}
test = &tests[4]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[5]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[6]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[7]
testDropIndex(c, ctx, d, dbInfo, tblInfo, "idx")
c.Check(errors.ErrorStack(checkErr), Equals, "")

// for add column
test = &tests[8]
test = &tests[4]
addingColName := "colA"

newColumnDef := &ast.ColumnDef{
Expand All @@ -520,37 +501,37 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[9]
test = &tests[5]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[10]
test = &tests[6]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[11]
test = &tests[7]
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)

// for drop column.
test = &tests[12]
test = &tests[8]
dropColName := "c3"
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

test = &tests[13]
test = &tests[9]
dropColName = "c4"
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

test = &tests[14]
test = &tests[10]
dropColName = "c5"
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
Expand Down
20 changes: 6 additions & 14 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/schemautil"
"github.com/pingcap/tidb/util/timeutil"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -183,7 +184,7 @@ func dropIndexColumnFlag(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) {
}

func validateRenameIndex(from, to model.CIStr, tbl *model.TableInfo) (ignore bool, err error) {
if fromIdx := findIndexByName(from.L, tbl.Indices); fromIdx == nil {
if fromIdx := schemautil.FindIndexByName(from.L, tbl.Indices); fromIdx == nil {
return false, errors.Trace(infoschema.ErrKeyNotExists.GenWithStackByArgs(from.O, tbl.Name))
}
// Take case-sensitivity into account, if `FromKey` and `ToKey` are the same, nothing need to be changed
Expand All @@ -193,7 +194,7 @@ func validateRenameIndex(from, to model.CIStr, tbl *model.TableInfo) (ignore boo
// If spec.FromKey.L == spec.ToKey.L, we operate on the same index(case-insensitive) and change its name (case-sensitive)
// e.g: from `inDex` to `IndEX`. Otherwise, we try to rename an index to another different index which already exists,
// that's illegal by rule.
if toIdx := findIndexByName(to.L, tbl.Indices); toIdx != nil && from.L != to.L {
if toIdx := schemautil.FindIndexByName(to.L, tbl.Indices); toIdx != nil && from.L != to.L {
return false, errors.Trace(infoschema.ErrKeyNameDuplicate.GenWithStackByArgs(toIdx.Name.O))
}
return false, nil
Expand All @@ -220,7 +221,7 @@ func onRenameIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
idx := findIndexByName(from.L, tblInfo.Indices)
idx := schemautil.FindIndexByName(from.L, tblInfo.Indices)
idx.Name = to
if ver, err = updateVersionAndTableInfo(t, job, tblInfo, true); err != nil {
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -259,7 +260,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo != nil && indexInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
return ver, ErrDupKeyName.GenWithStack("index already exist %s", indexName)
Expand Down Expand Up @@ -372,7 +373,7 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
Expand Down Expand Up @@ -1232,15 +1233,6 @@ func findNextPartitionID(currentPartition int64, defs []model.PartitionDefinitio
return 0, errors.Errorf("partition id not found %d", currentPartition)
}

func findIndexByName(idxName string, indices []*model.IndexInfo) *model.IndexInfo {
for _, idx := range indices {
if idx.Name.L == idxName {
return idx
}
}
return nil
}

func allocateIndexID(tblInfo *model.TableInfo) int64 {
tblInfo.MaxIndexID++
return tblInfo.MaxIndexID
Expand Down
49 changes: 48 additions & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/util/schemautil"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredE
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
Expand Down Expand Up @@ -129,6 +130,50 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
return ver, errCancelledDDLJob
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}

var indexName model.CIStr
err = job.DecodeArgs(&indexName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

originalState := indexInfo.State
switch indexInfo.State {
case model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone:
// We can not rollback now, so just continue to drop index.
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
job.State = model.JobStateRunning
return ver, nil
case model.StatePublic, model.StateWriteOnly:
job.State = model.JobStateRollbackDone
indexInfo.State = model.StatePublic
default:
return ver, ErrInvalidIndexState.GenWithStack("invalid index state %v", indexInfo.State)
}

job.SchemaState = indexInfo.State
job.Args = []interface{}{indexInfo.Name}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}

func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
Expand Down Expand Up @@ -219,6 +264,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
ver, err = rollingbackRenameIndex(job)
case model.ActionTruncateTable:
ver, err = rollingbackTruncateTable(t, job)
case model.ActionDropIndex:
ver, err = rollingbackDropIndex(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
Loading