Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix cancel drop index error #8504

Merged
merged 15 commits into from
Dec 7, 2018
91 changes: 91 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/schemautil"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -518,6 +519,96 @@ func (s *testDBSuite) TestCancelAddIndex1(c *C) {
s.mustExec(c, "alter table t drop index idx_c2")
}

// TestCancelDropIndex tests cancel ddl job which type is drop index.
func (s *testDBSuite) TestCancelDropIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")
for i := 0; i < 5; i++ {
s.mustExec(c, "insert into t values (?, ?)", i, i)
}

testCases := []struct {
needAddIndex bool
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
// model.JobStateNone means the jobs is canceled before the first run.
{true, model.JobStateNone, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, true},
{false, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
}

var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropIndex && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobID = job.ID
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}

if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}

checkErr = hookCtx.Txn(true).Commit(context.Background())
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
for i := range testCases {
testCase = &testCases[i]
if testCase.needAddIndex {
s.mustExec(c, "alter table t add index idx_c2(c2)")
}
rs, err := s.tk.Exec("alter table t drop index idx_c2")
if rs != nil {
rs.Close()
}

t := s.testGetTable(c, "t")
indexInfo := schemautil.FindIndexByName("idx_c2", t.Meta().Indices)
if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")

c.Assert(indexInfo, NotNil)
c.Assert(indexInfo.State, Equals, model.StatePublic)
} else {
err1 := admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID)
c.Assert(err, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, err1.Error())

c.Assert(indexInfo, IsNil)
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{})
s.mustExec(c, "alter table t add index idx_c2(c2)")
s.mustExec(c, "alter table t drop index idx_c2")
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
5 changes: 3 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/schemautil"
)

func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt) (err error) {
Expand Down Expand Up @@ -1994,7 +1995,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, unique bool, ind
indexName = getAnonymousIndex(t, idxColNames[0].Column.Name)
}

if indexInfo := findIndexByName(indexName.L, t.Meta().Indices); indexInfo != nil {
if indexInfo := schemautil.FindIndexByName(indexName.L, t.Meta().Indices); indexInfo != nil {
return ErrDupKeyName.GenWithStack("index already exist %s", indexName)
}

Expand Down Expand Up @@ -2119,7 +2120,7 @@ func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
}

if indexInfo := findIndexByName(indexName.L, t.Meta().Indices); indexInfo == nil {
if indexInfo := schemautil.FindIndexByName(indexName.L, t.Meta().Indices); indexInfo == nil {
return ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

Expand Down
39 changes: 10 additions & 29 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,26 +331,22 @@ type testCancelJob struct {

func buildCancelJobTests(firstID int64) []testCancelJob {
err := errCancelledDDLJob
errs := []error{err}
noErrs := []error{nil}
tests := []testCancelJob{
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err},

// TODO: after fix drop index and create table rollback bug, the below test cases maybe need to change.
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 5}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 6}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 7}, cancelRetErrs: errs, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 8)}, cancelState: model.StateNone, ddlRetErr: err},
// Test cancel drop index job , see TestCancelDropIndex.

// TODO: add create table back after we fix the cancel bug.
//{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionAddColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 5}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 6}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 7}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 8)}, cancelState: model.StatePublic, ddlRetErr: err},
}

return tests
Expand Down Expand Up @@ -464,23 +460,8 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil)
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}
test = &tests[4]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[5]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[6]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropIndex, idxName, &test.cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
test = &tests[7]
testDropIndex(c, ctx, d, dbInfo, tblInfo, "idx")
c.Check(errors.ErrorStack(checkErr), Equals, "")

// for add column
test = &tests[8]
test = &tests[4]
addingColName := "colA"

newColumnDef := &ast.ColumnDef{
Expand All @@ -494,17 +475,17 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[9]
test = &tests[5]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[10]
test = &tests[6]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[11]
test = &tests[7]
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)
Expand Down
20 changes: 6 additions & 14 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/schemautil"
"github.com/pingcap/tidb/util/timeutil"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -182,7 +183,7 @@ func dropIndexColumnFlag(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) {
}

func validateRenameIndex(from, to model.CIStr, tbl *model.TableInfo) (ignore bool, err error) {
if fromIdx := findIndexByName(from.L, tbl.Indices); fromIdx == nil {
if fromIdx := schemautil.FindIndexByName(from.L, tbl.Indices); fromIdx == nil {
return false, errors.Trace(infoschema.ErrKeyNotExists.GenWithStackByArgs(from.O, tbl.Name))
}
// Take case-sensitivity into account, if `FromKey` and `ToKey` are the same, nothing need to be changed
Expand All @@ -192,7 +193,7 @@ func validateRenameIndex(from, to model.CIStr, tbl *model.TableInfo) (ignore boo
// If spec.FromKey.L == spec.ToKey.L, we operate on the same index(case-insensitive) and change its name (case-sensitive)
// e.g: from `inDex` to `IndEX`. Otherwise, we try to rename an index to another different index which already exists,
// that's illegal by rule.
if toIdx := findIndexByName(to.L, tbl.Indices); toIdx != nil && from.L != to.L {
if toIdx := schemautil.FindIndexByName(to.L, tbl.Indices); toIdx != nil && from.L != to.L {
return false, errors.Trace(infoschema.ErrKeyNameDuplicate.GenWithStackByArgs(toIdx.Name.O))
}
return false, nil
Expand All @@ -219,7 +220,7 @@ func onRenameIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
idx := findIndexByName(from.L, tblInfo.Indices)
idx := schemautil.FindIndexByName(from.L, tblInfo.Indices)
idx.Name = to
if ver, err = updateVersionAndTableInfo(t, job, tblInfo, true); err != nil {
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -258,7 +259,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo != nil && indexInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
return ver, ErrDupKeyName.GenWithStack("index already exist %s", indexName)
Expand Down Expand Up @@ -371,7 +372,7 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
Expand Down Expand Up @@ -1170,15 +1171,6 @@ func findNextPartitionID(currentPartition int64, defs []model.PartitionDefinitio
return 0, errors.Errorf("partition id not found %d", currentPartition)
}

func findIndexByName(idxName string, indices []*model.IndexInfo) *model.IndexInfo {
for _, idx := range indices {
if idx.Name.L == idxName {
return idx
}
}
return nil
}

func allocateIndexID(tblInfo *model.TableInfo) int64 {
tblInfo.MaxIndexID++
return tblInfo.MaxIndexID
Expand Down
50 changes: 49 additions & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/util/schemautil"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredE
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
Expand Down Expand Up @@ -117,6 +118,51 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
return ver, errCancelledDDLJob
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
var indexName model.CIStr
err = job.DecodeArgs(&indexName)
if err != nil {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

originalState := indexInfo.State
switch indexInfo.State {
case model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone:
// We can not rollback now, so just continue to drop index.
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
job.State = model.JobStateRunning
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
return ver, nil
case model.StatePublic, model.StateWriteOnly:
job.State = model.JobStateRollbackDone
indexInfo.State = model.StatePublic
default:
return ver, ErrInvalidIndexState.GenWithStack("invalid index state %v", indexInfo.State)
}

job.SchemaState = indexInfo.State
job.Args = []interface{}{indexInfo.Name}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}

func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
Expand All @@ -137,6 +183,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
ver, err = rollingbackAddColumn(t, job)
case model.ActionAddIndex:
ver, err = rollingbackAddindex(w, d, t, job)
case model.ActionDropIndex:
ver, err = rollingbackDropIndex(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
12 changes: 2 additions & 10 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/schemautil"
)

type visitInfo struct {
Expand Down Expand Up @@ -383,15 +384,6 @@ func removeIgnoredPaths(paths, ignoredPaths []*accessPath, tblInfo *model.TableI
return remainedPaths
}

func findIndexByName(indices []*model.IndexInfo, name model.CIStr) *model.IndexInfo {
for _, idx := range indices {
if idx.Name.L == name.L {
return idx
}
}
return nil
}

func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock ast.SelectLockType) *LogicalLock {
selectLock := LogicalLock{Lock: lock}.Init(b.ctx)
selectLock.SetChildren(src)
Expand Down Expand Up @@ -723,7 +715,7 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) (Plan, error)
return nil, err
}
for _, idxName := range as.IndexNames {
idx := findIndexByName(tblInfo.Indices, idxName)
idx := schemautil.FindIndexByName(idxName.L, tblInfo.Indices)
if idx == nil || idx.State != model.StatePublic {
return nil, ErrAnalyzeMissIndex.GenWithStackByArgs(idxName.O, tblInfo.Name.O)
}
Expand Down
Loading