diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index e9923fdd414c1..045c09a0221df 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -326,6 +326,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) { } var checkErr error + var c3IdxInfo *model.IndexInfo hook := &ddl.TestDDLCallback{} first := true oldReorgWaitTimeout := ddl.ReorgWaitTimeout @@ -338,6 +339,16 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) { // If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes. // When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes. if !addIndexNotFirstReorg { + // Get the index's meta. + if c3IdxInfo != nil { + return + } + t := s.testGetTable(c, "t1") + for _, index := range t.WritableIndices() { + if index.Meta().Name.L == "c3_index" { + c3IdxInfo = index.Meta() + } + } return } // The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes. @@ -410,6 +421,10 @@ LOOP: c.Assert(strings.EqualFold(tidx.Meta().Name.L, "c3_index"), IsFalse) } + ctx := s.s.(sessionctx.Context) + idx := tables.NewIndex(t.Meta(), c3IdxInfo) + checkDelRangeDone(c, ctx, idx) + s.mustExec(c, "drop table t1") ddl.ReorgWaitTimeout = oldReorgWaitTimeout } @@ -716,6 +731,11 @@ LOOP: c.Assert(nidx, IsNil) idx := tables.NewIndex(t.Meta(), c3idx.Meta()) + checkDelRangeDone(c, ctx, idx) + s.tk.MustExec("drop table test_drop_index") +} + +func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) { f := func() map[int64]struct{} { handles := make(map[int64]struct{}) @@ -748,8 +768,6 @@ LOOP: } } c.Assert(handles, HasLen, 0) - - s.tk.MustExec("drop table test_drop_index") } func (s *testDBSuite) TestAddIndexWithDupCols(c *C) { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 3045fee06d2b9..a7882f7183ec4 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -147,6 +147,16 @@ func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error { return errors.Trace(t.UpdateDDLJob(0, job, updateRawArgs)) } +func (d *ddl) deleteRange(job *model.Job) error { + var err error + if job.Version <= currentVersion { + err = d.delRangeManager.addDelRangeJob(job) + } else { + err = errInvalidJobVersion.GenByArgs(job.Version, currentVersion) + } + return errors.Trace(err) +} + // finishDDLJob deletes the finished DDL job in the ddl queue and puts it to history queue. // If the DDL job need to handle in background, it will prepare a background job. func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { @@ -154,16 +164,19 @@ func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { defer func() { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) }() + switch job.Type { - case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex: - if job.Version <= currentVersion { - err = d.delRangeManager.addDelRangeJob(job) - } else { - err = errInvalidJobVersion.GenByArgs(job.Version, currentVersion) - } - if err != nil { - return errors.Trace(err) + case model.ActionAddIndex: + if job.State != model.JobStateRollbackDone { + break } + // After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data. + err = d.deleteRange(job) + case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex: + err = d.deleteRange(job) + } + if err != nil { + return errors.Trace(err) } _, err = t.DeQueueDDLJob() @@ -254,9 +267,9 @@ func (d *ddl) handleDDLJobQueue() error { d.hookMu.Unlock() // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. - // If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update + // If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update // the newest schema. - if job.State == model.JobStateRunning || job.State == model.JobStateDone { + if job.IsRunning() || job.IsRollingback() || job.IsDone() { d.waitSchemaChanged(nil, waitTime, schemaVer) } if job.IsSynced() { @@ -419,7 +432,7 @@ func (d *ddl) waitSchemaChanged(ctx context.Context, waitTime time.Duration, lat // So here we get the latest schema version to make sure all servers' schema version update to the latest schema version // in a cluster, or to wait for 2 * lease time. func (d *ddl) waitSchemaSynced(job *model.Job, waitTime time.Duration) { - if !job.IsRunning() && !job.IsDone() { + if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() { return } // TODO: Make ctx exits when the d is close. diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 34b2ae98da71f..18694eb696d8a 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -234,6 +234,13 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error startKey := tablecodec.EncodeTablePrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) return doInsert(s, job.ID, tableID, startKey, endKey, now) + // ActionAddIndex needs do it, because it needs to be rolled back when it's canceled. + case model.ActionAddIndex: + tableID := job.TableID + indexID := job.Args[1].(int64) + startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) + endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) + return doInsert(s, job.ID, indexID, startKey, endKey, now) case model.ActionDropIndex: tableID := job.TableID var indexName interface{}