Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: delete ranges when rolling back 'add index' #6188

Merged
merged 4 commits into from
Apr 2, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
}

var checkErr error
var idxInfo *model.IndexInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c3IdxInfo is more readable.

hook := &ddl.TestDDLCallback{}
first := true
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
Expand All @@ -335,6 +336,16 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
// If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes.
if !addIndexNotFirstReorg {
// Get the index's meta.
if idxInfo != nil {
return
}
t := s.testGetTable(c, "t1")
for _, index := range t.WritableIndices() {
if index.Meta().Name.L == "c3_index" {
idxInfo = index.Meta()
}
}
return
}
// The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes.
Expand Down Expand Up @@ -407,6 +418,10 @@ LOOP:
c.Assert(strings.EqualFold(tidx.Meta().Name.L, "c3_index"), IsFalse)
}

ctx := s.s.(sessionctx.Context)
idx := tables.NewIndex(t.Meta(), idxInfo)
checkDelRangeDone(c, ctx, idx)

s.mustExec(c, "drop table t1")
ddl.ReorgWaitTimeout = oldReorgWaitTimeout
}
Expand Down Expand Up @@ -713,6 +728,11 @@ LOOP:
c.Assert(nidx, IsNil)

idx := tables.NewIndex(t.Meta(), c3idx.Meta())
checkDelRangeDone(c, ctx, idx)
s.tk.MustExec("drop table test_drop_index")
}

func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) {
f := func() map[int64]struct{} {
handles := make(map[int64]struct{})

Expand Down Expand Up @@ -745,8 +765,6 @@ LOOP:
}
}
c.Assert(handles, HasLen, 0)

s.tk.MustExec("drop table test_drop_index")
}

func (s *testDBSuite) TestAddIndexWithDupCols(c *C) {
Expand Down
12 changes: 9 additions & 3 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()
switch job.Type {
case model.ActionAddIndex:
if job.State != model.JobStateRollbackDone {
break
}
// ActionAddIndex needs to delete ranges when it needs to be rolled back.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.

fallthrough
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fallthrough is rarely used, it's hard to read.
How about extracting a function shouldDeleteRange?

case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex:
if job.Version <= currentVersion {
err = d.delRangeManager.addDelRangeJob(job)
Expand Down Expand Up @@ -254,9 +260,9 @@ func (d *ddl) handleDDLJobQueue() error {
d.hookMu.Unlock()

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobStateRunning || job.State == model.JobStateDone {
if job.IsRunning() || job.IsRollingback() || job.IsDone() {
d.waitSchemaChanged(nil, waitTime, schemaVer)
}
if job.IsSynced() {
Expand Down Expand Up @@ -417,7 +423,7 @@ func (d *ddl) waitSchemaChanged(ctx context.Context, waitTime time.Duration, lat
// So here we get the latest schema version to make sure all servers' schema version update to the latest schema version
// in a cluster, or to wait for 2 * lease time.
func (d *ddl) waitSchemaSynced(job *model.Job, waitTime time.Duration) {
if !job.IsRunning() && !job.IsDone() {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() {
return
}
// TODO: Make ctx exits when the d is close.
Expand Down
7 changes: 7 additions & 0 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error
startKey := tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
return doInsert(s, job.ID, tableID, startKey, endKey, now)
// ActionAddIndex needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex:
tableID := job.TableID
indexID := job.Args[1].(int64)
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(s, job.ID, indexID, startKey, endKey, now)
case model.ActionDropIndex:
tableID := job.TableID
var indexName interface{}
Expand Down
1 change: 1 addition & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
}
job.Args = append(job.Args, indexInfo.ID)
log.Warnf("args %v, ts %v", job.Args, t.StartTS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the warn log.

default:
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
}
Expand Down