diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 75634841bfbb6..8cd23af585df8 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -63,6 +64,8 @@ var _ = Suite(&testDBSuite{}) const defaultBatchSize = 2048 type testDBSuite struct { + cluster *mocktikv.Cluster + mvccStore mocktikv.MVCCStore store kv.Storage dom *domain.Domain schemaName string @@ -84,7 +87,16 @@ func (s *testDBSuite) SetUpSuite(c *C) { s.autoIDStep = autoid.GetStep() autoid.SetStep(5000) - s.store, err = mockstore.NewMockTikvStore() + s.cluster = mocktikv.NewCluster() + mocktikv.BootstrapWithSingleStore(s.cluster) + s.mvccStore = mocktikv.MustNewMVCCStore() + store, err := mockstore.NewMockTikvStore( + mockstore.WithCluster(s.cluster), + mockstore.WithMVCCStore(s.mvccStore), + ) + c.Assert(err, IsNil) + + s.store = store c.Assert(err, IsNil) s.dom, err = session.BootstrapSession(s.store) @@ -1891,3 +1903,31 @@ func (s *testDBSuite) TestUpdateHandleFailed(c *C) { result.Check(testkit.Rows("1")) tk.MustExec("admin check index t idx_b") } + +func (s *testDBSuite) TestAddIndexFailed(c *C) { + gofail.Enable("github.com/pingcap/tidb/ddl/mockAddIndexErr", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/mockAddIndexErr") + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_add_index_failed") + defer tk.MustExec("drop database test_add_index_failed") + tk.MustExec("use test_add_index_failed") + + tk.MustExec("create table t(a bigint PRIMARY KEY, b int)") + for i := 0; i < 1000; i++ { + tk.MustExec(fmt.Sprintf("insert into t values(%v, %v)", i, i)) + } + + // Get table ID for split. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test_add_index_failed"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblID := tbl.Meta().ID + + // Split the table. + s.cluster.SplitTable(s.mvccStore, tblID, 100) + + tk.MustExec("alter table t add index idx_b(b)") + tk.MustExec("admin check index t idx_b") + tk.MustExec("admin check table t") +} diff --git a/ddl/index.go b/ddl/index.go index 1766d94f2885b..af75ab76a3afe 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -621,6 +621,8 @@ func (w *addIndexWorker) handleBackfillTask(task *reorgIndexTask) *addIndexResul return result } +var gofailMockAddindexErrOnceGuard bool + func (w *addIndexWorker) run() { log.Infof("[ddl-reorg] worker[%v] start", w.id) defer func() { @@ -637,8 +639,15 @@ func (w *addIndexWorker) run() { if !more { break } - log.Debug("[ddl-reorg] got backfill index task:#v", task) + + // gofail: var mockAddIndexErr bool + //if w.id == 0 && mockAddIndexErr && !gofailMockAddindexErrOnceGuard { + // gofailMockAddindexErrOnceGuard = true + // result := &addIndexResult{addedCount: 0, nextHandle: 0, err: errors.Errorf("mock add index error")} + // w.resultCh <- result + // continue + //} result := w.handleBackfillTask(task) w.resultCh <- result } @@ -727,14 +736,16 @@ func (d *ddl) waitTaskResults(workers []*addIndexWorker, taskCnt int, totalAdded return nextHandle, addedCount, errors.Trace(firstErr) } -// backfillBatchTasks send tasks to workers, and waits all the running worker return back result, +// handleReorgTasks send tasks to workers, and waits all the running worker return back result, // there are taskCnt running workers. -func (d *ddl) backfillBatchTasks(startTime time.Time, startHandle int64, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*addIndexWorker, batchTasks []*reorgIndexTask) error { +func (d *ddl) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, workers []*addIndexWorker, batchTasks []*reorgIndexTask) error { for i, task := range batchTasks { workers[i].taskCh <- task } + startHandle := batchTasks[0].startHandle taskCnt := len(batchTasks) + startTime := time.Now() nextHandle, taskAddedCount, err := d.waitTaskResults(workers, taskCnt, totalAddedCount, startHandle) elapsedTime := time.Since(startTime).Seconds() if err == nil { @@ -760,23 +771,16 @@ func (d *ddl) backfillBatchTasks(startTime time.Time, startHandle int64, reorgIn } func (d *ddl) backfillKVRangesIndex(t table.Table, workers []*addIndexWorker, kvRanges []kv.KeyRange, job *model.Job, reorgInfo *reorgInfo) error { - var ( - startTime time.Time - startHandle int64 - endHandle int64 - err error - ) totalAddedCount := job.GetRowCount() batchTasks := make([]*reorgIndexTask, 0, len(workers)) log.Infof("[ddl-reorg] start to reorg index of %v region ranges.", len(kvRanges)) for i, keyRange := range kvRanges { - startTime = time.Now() - - startHandle, endHandle, err = decodeHandleRange(keyRange) + startHandle, endHandle, err := decodeHandleRange(keyRange) if err != nil { return errors.Trace(err) } + endKey := t.RecordKey(endHandle) endIncluded := false if endKey.Cmp(keyRange.EndKey) < 0 { @@ -787,7 +791,7 @@ func (d *ddl) backfillKVRangesIndex(t table.Table, workers []*addIndexWorker, kv batchTasks = append(batchTasks, task) if len(batchTasks) >= len(workers) || i == (len(kvRanges)-1) { // Wait tasks finish. - err = d.backfillBatchTasks(startTime, startHandle, reorgInfo, &totalAddedCount, workers, batchTasks) + err = d.handleReorgTasks(reorgInfo, &totalAddedCount, workers, batchTasks) if err != nil { return errors.Trace(err) }