Skip to content

Commit

Permalink
ddl: fix canceling model.ActionRebaseAutoID and model.ActionShardRowI…
Browse files Browse the repository at this point in the history
…D ddl jobs (#9226)
  • Loading branch information
winkyao authored Feb 18, 2019
1 parent c468f02 commit 33a961d
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 32 deletions.
8 changes: 4 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *

func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, *ast.ColumnPosition, int, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, 0, errors.Trace(err)
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {

func checkDropColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func (w *worker) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.Colu
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -476,7 +476,7 @@ func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, tab
}

func updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldColName *model.CIStr) (ver int64, _ error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
36 changes: 30 additions & 6 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func (s *testDDLSuite) TestTableError(c *C) {
job.SchemaID = -1
job.TableID = -1
t := meta.NewMeta(txn)
_, err1 := getTableInfo(t, job, job.SchemaID)
_, err1 := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
c.Assert(err1, NotNil)
job.SchemaID = dbInfo.ID
_, err1 = getTableInfo(t, job, job.SchemaID)
_, err1 = getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
c.Assert(err1, NotNil)
return nil
})
Expand Down Expand Up @@ -325,8 +325,9 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e
// If the action is adding index and the state is writing reorganization, it wants to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexFirstReorg, the worker hasn't started to backfill indexes.
if test.cancelState == job.SchemaState && !addIndexFirstReorg {
if job.SchemaState == model.StateNone && job.State != model.JobStateDone && job.Type != model.ActionCreateTable && job.Type != model.ActionCreateSchema {
// If the schema state is none, we only test the job is finished.
if job.SchemaState == model.StateNone && job.State != model.JobStateDone && job.Type != model.ActionCreateTable && job.Type != model.ActionCreateSchema && job.Type != model.ActionRebaseAutoID {
// If the schema state is none and is not equal to model.JobStateDone, we only test the job is finished.
// Unless the job is model.ActionCreateTable, model.ActionCreateSchema, model.ActionRebaseAutoID, we do the cancel anyway.
} else {
errs, err := admin.CancelJobs(txn, test.jobIDs)
if err != nil {
Expand Down Expand Up @@ -361,7 +362,6 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err},

// Test cancel drop index job , see TestCancelDropIndex.

{act: model.ActionAddColumn, jobIDs: []int64{firstID + 5}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 6}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 7}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
Expand All @@ -375,6 +375,8 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 13}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 13)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 14}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 14)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 15}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 15)}, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionRebaseAutoID, jobIDs: []int64{firstID + 16}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},
{act: model.ActionShardRowID, jobIDs: []int64{firstID + 17}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},
}

return tests
Expand Down Expand Up @@ -429,6 +431,10 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
ctx := testNewContext(d)
err := ctx.NewTxn(context.Background())
c.Assert(err, IsNil)
tableAutoID := int64(100)
shardRowIDBits := uint64(5)
tblInfo.AutoIncID = tableAutoID
tblInfo.ShardRowIDBits = shardRowIDBits
job := testCreateTable(c, ctx, d, dbInfo, tblInfo)
// insert t values (1, 2, 3, 4, 5);
originTable := testGetTable(c, d, dbInfo.ID, tblInfo.ID)
Expand All @@ -446,7 +452,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
tests := buildCancelJobTests(firstJobID)
var checkErr error
var test *testCancelJob
tc.onJobUpdated = func(job *model.Job) {
hookCancelFunc := func(job *model.Job) {
if job.State == model.JobStateSynced || job.State == model.JobStateCancelled || job.State == model.JobStateCancelling {
return
}
Expand Down Expand Up @@ -481,6 +487,8 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
return
}
}
tc.onJobUpdated = hookCancelFunc
tc.onJobRunBefore = hookCancelFunc
d.SetHook(tc)

// for adding index
Expand Down Expand Up @@ -581,6 +589,22 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

// cancel rebase auto id
test = &tests[13]
rebaseIDArgs := []interface{}{int64(200)}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionRebaseAutoID, rebaseIDArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
changedTable := testGetTable(c, d, dbInfo.ID, tblInfo.ID)
c.Assert(changedTable.Meta().AutoIncID, Equals, tableAutoID)

// cancel shard bits
test = &tests[14]
shardRowIDArgs := []interface{}{uint64(7)}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionRebaseAutoID, shardRowIDArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
changedTable = testGetTable(c, d, dbInfo.ID, tblInfo.ID)
c.Assert(changedTable.Meta().ShardRowIDBits, Equals, shardRowIDBits)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

func onCreateForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func onCreateForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error) {

func onDropForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int

// Handle normal job.
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -403,7 +403,7 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand All @@ -425,7 +425,7 @@ func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Inde
func checkRenameIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, model.CIStr, model.CIStr, error) {
var from, to model.CIStr
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, from, to, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -374,7 +374,7 @@ func onTruncateTablePartition(t *meta.Meta, job *model.Job) (int64, error) {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
27 changes: 25 additions & 2 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.T
// to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started.
func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}

func rollingbackDropTableOrView(t *meta.Meta, job *model.Job) error {
tblInfo, err := checkTableExist(t, job, job.SchemaID)
tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, job.SchemaID)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -205,6 +205,25 @@ func rollingbackRenameIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

func cancelOnlyNotHandledJob(job *model.Job) (ver int64, err error) {
// We can only cancel the not handled job.
if job.SchemaState == model.StateNone {
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
}

job.State = model.JobStateRunning

return ver, nil
}
func rollingbackRebaseAutoID(t *meta.Meta, job *model.Job) (ver int64, err error) {
return cancelOnlyNotHandledJob(job)
}

func rollingbackShardRowID(t *meta.Meta, job *model.Job) (ver int64, err error) {
return cancelOnlyNotHandledJob(job)
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
Expand All @@ -221,6 +240,10 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
err = rollingbackDropSchema(t, job)
case model.ActionRenameIndex:
ver, err = rollingbackRenameIndex(t, job)
case model.ActionRebaseAutoID:
ver, err = rollingbackRebaseAutoID(t, job)
case model.ActionShardRowID:
ver, err = rollingbackShardRowID(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
24 changes: 11 additions & 13 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
}

func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, err := checkTableExist(t, job, job.SchemaID)
tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -330,8 +330,8 @@ func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table
return tbl, errors.Trace(err)
}

func getTableInfo(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
tblInfo, err := checkTableExist(t, job, schemaID)
func getTableInfoAndCancelFaultJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, schemaID)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -344,7 +344,7 @@ func getTableInfo(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInf
return tblInfo, nil
}

func checkTableExist(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
tableID := job.TableID
// Check this table's database.
tblInfo, err := t.GetTable(schemaID, tableID)
Expand Down Expand Up @@ -381,7 +381,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -433,7 +433,7 @@ func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job) (ver int64,
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -449,12 +449,10 @@ func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job) (ver int64,
// Its behavior is consistent with MySQL.
err = tbl.RebaseAutoID(nil, tblInfo.AutoIncID-1, false)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
Expand All @@ -468,7 +466,7 @@ func onShardRowID(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -492,7 +490,7 @@ func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfo(t, job, oldSchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, oldSchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -557,7 +555,7 @@ func onModifyTableComment(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -578,7 +576,7 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -638,7 +636,7 @@ func onAddTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
4 changes: 4 additions & 0 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func isJobRollbackable(job *model.Job, id int64) error {
job.SchemaState == model.StateDeleteOnly {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
case model.ActionRebaseAutoID, model.ActionShardRowID:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
}
return nil
}
Expand Down

0 comments on commit 33a961d

Please sign in to comment.