From 2ef8b3dd1b3575b369eb3b8315dac922251114c2 Mon Sep 17 00:00:00 2001 From: winkyao Date: Mon, 25 Jun 2018 19:36:10 +0800 Subject: [PATCH 1/3] ddl: set correct startHandle when add indices meets some errors (#6897) --- ddl/ddl_db_test.go | 131 ++++++++++++++++++++++++++++++++++++++++++++- ddl/index.go | 30 ++++++----- 2 files changed, 147 insertions(+), 14 deletions(-) diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 75634841bfbb6..032005b653e0f 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -26,6 +26,7 @@ import ( gofail "github.com/coreos/gofail/runtime" "github.com/juju/errors" . "github.com/pingcap/check" + "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" @@ -38,6 +39,7 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -63,6 +65,8 @@ var _ = Suite(&testDBSuite{}) const defaultBatchSize = 2048 type testDBSuite struct { + cluster *mocktikv.Cluster + mvccStore mocktikv.MVCCStore store kv.Storage dom *domain.Domain schemaName string @@ -84,7 +88,16 @@ func (s *testDBSuite) SetUpSuite(c *C) { s.autoIDStep = autoid.GetStep() autoid.SetStep(5000) - s.store, err = mockstore.NewMockTikvStore() + s.cluster = mocktikv.NewCluster() + mocktikv.BootstrapWithSingleStore(s.cluster) + s.mvccStore = mocktikv.MustNewMVCCStore() + store, err := mockstore.NewMockTikvStore( + mockstore.WithCluster(s.cluster), + mockstore.WithMVCCStore(s.mvccStore), + ) + c.Assert(err, IsNil) + + s.store = store c.Assert(err, IsNil) s.dom, err = session.BootstrapSession(s.store) @@ -1891,3 +1904,119 @@ func (s *testDBSuite) TestUpdateHandleFailed(c *C) { result.Check(testkit.Rows("1")) tk.MustExec("admin check index t idx_b") } + +func (s *testDBSuite) TestAddIndexFailed(c *C) { + gofail.Enable("github.com/pingcap/tidb/ddl/mockAddIndexErr", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/mockAddIndexErr") + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_add_index_failed") + defer tk.MustExec("drop database test_add_index_failed") + tk.MustExec("use test_add_index_failed") + + tk.MustExec("create table t(a bigint PRIMARY KEY, b int)") + for i := 0; i < 1000; i++ { + tk.MustExec(fmt.Sprintf("insert into t values(%v, %v)", i, i)) + } + + // Get table ID for split. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test_add_index_failed"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblID := tbl.Meta().ID + + // Split the table. + s.cluster.SplitTable(s.mvccStore, tblID, 100) + + tk.MustExec("alter table t add index idx_b(b)") + tk.MustExec("admin check index t idx_b") + tk.MustExec("admin check table t") +} + +func (s *testDBSuite) getHistoryDDLJob(id int64) (*model.Job, error) { + var job *model.Job + + err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + var err1 error + job, err1 = t.GetHistoryDDLJob(id) + return errors.Trace(err1) + }) + + return job, errors.Trace(err) +} + +func (s *testDBSuite) TestBackwardCompatibility(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_backward_compatibility") + defer tk.MustExec("drop database test_backward_compatibility") + tk.MustExec("use test_backward_compatibility") + tk.MustExec("create table t(a int primary key, b int)") + for i := 0; i < 200; i++ { + tk.MustExec(fmt.Sprintf("insert into t values(%v, %v)", i, i)) + } + + // alter table t add index idx_b(b); + is := s.dom.InfoSchema() + schemaName := model.NewCIStr("test_backward_compatibility") + tableName := model.NewCIStr("t") + schema, ok := is.SchemaByName(schemaName) + c.Assert(ok, IsTrue) + tbl, err := is.TableByName(schemaName, tableName) + c.Assert(err, IsNil) + + // Split the table. + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 100) + + unique := false + indexName := model.NewCIStr("idx_b") + idxColName := &ast.IndexColName{ + Column: &ast.ColumnName{ + Schema: schemaName, + Table: tableName, + Name: model.NewCIStr("b"), + }, + Length: types.UnspecifiedLength, + } + idxColNames := []*ast.IndexColName{idxColName} + var indexOption *ast.IndexOption + job := &model.Job{ + SchemaID: schema.ID, + TableID: tbl.Meta().ID, + Type: model.ActionAddIndex, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{unique, indexName, idxColNames, indexOption}, + } + txn, err := s.store.Begin() + c.Assert(err, IsNil) + t := meta.NewMeta(txn) + job.ID, err = t.GenGlobalID() + c.Assert(err, IsNil) + job.Version = 1 + job.StartTS = txn.StartTS() + + // Simulate old TiDB init the add index job, old TiDB will not init the model.Job.ReorgMeta field, + // if we set job.SnapshotVer here, can simulate the behavior. + job.SnapshotVer = txn.StartTS() + err = t.EnQueueDDLJob(job) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + ticker := time.NewTicker(s.lease) + for range ticker.C { + historyJob, err := s.getHistoryDDLJob(job.ID) + c.Assert(err, IsNil) + if historyJob == nil { + + continue + } + c.Assert(historyJob.Error, IsNil) + + if historyJob.IsSynced() { + break + } + } + + // finished add index + tk.MustExec("admin check index t idx_b") +} diff --git a/ddl/index.go b/ddl/index.go index 1766d94f2885b..e5c8c2a7af5a4 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -621,6 +621,8 @@ func (w *addIndexWorker) handleBackfillTask(task *reorgIndexTask) *addIndexResul return result } +var gofailMockAddindexErrOnceGuard bool + func (w *addIndexWorker) run() { log.Infof("[ddl-reorg] worker[%v] start", w.id) defer func() { @@ -637,8 +639,15 @@ func (w *addIndexWorker) run() { if !more { break } - log.Debug("[ddl-reorg] got backfill index task:#v", task) + + // gofail: var mockAddIndexErr bool + //if mockAddIndexErr && !gofailMockAddindexErrOnceGuard && w.id == 0 { + // gofailMockAddindexErrOnceGuard = true + // result := &addIndexResult{addedCount: 0, nextHandle: 0, err: errors.Errorf("mock add index error")} + // w.resultCh <- result + // continue + //} result := w.handleBackfillTask(task) w.resultCh <- result } @@ -727,14 +736,16 @@ func (d *ddl) waitTaskResults(workers []*addIndexWorker, taskCnt int, totalAdded return nextHandle, addedCount, errors.Trace(firstErr) } -// backfillBatchTasks send tasks to workers, and waits all the running worker return back result, +// handleReorgTasks send tasks to workers, and waits all the running worker return back result, // there are taskCnt running workers. -func (d *ddl) backfillBatchTasks(startTime time.Time, startHandle int64, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*addIndexWorker, batchTasks []*reorgIndexTask) error { +func (d *ddl) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, workers []*addIndexWorker, batchTasks []*reorgIndexTask) error { for i, task := range batchTasks { workers[i].taskCh <- task } + startHandle := batchTasks[0].startHandle taskCnt := len(batchTasks) + startTime := time.Now() nextHandle, taskAddedCount, err := d.waitTaskResults(workers, taskCnt, totalAddedCount, startHandle) elapsedTime := time.Since(startTime).Seconds() if err == nil { @@ -760,23 +771,16 @@ func (d *ddl) backfillBatchTasks(startTime time.Time, startHandle int64, reorgIn } func (d *ddl) backfillKVRangesIndex(t table.Table, workers []*addIndexWorker, kvRanges []kv.KeyRange, job *model.Job, reorgInfo *reorgInfo) error { - var ( - startTime time.Time - startHandle int64 - endHandle int64 - err error - ) totalAddedCount := job.GetRowCount() batchTasks := make([]*reorgIndexTask, 0, len(workers)) log.Infof("[ddl-reorg] start to reorg index of %v region ranges.", len(kvRanges)) for i, keyRange := range kvRanges { - startTime = time.Now() - - startHandle, endHandle, err = decodeHandleRange(keyRange) + startHandle, endHandle, err := decodeHandleRange(keyRange) if err != nil { return errors.Trace(err) } + endKey := t.RecordKey(endHandle) endIncluded := false if endKey.Cmp(keyRange.EndKey) < 0 { @@ -787,7 +791,7 @@ func (d *ddl) backfillKVRangesIndex(t table.Table, workers []*addIndexWorker, kv batchTasks = append(batchTasks, task) if len(batchTasks) >= len(workers) || i == (len(kvRanges)-1) { // Wait tasks finish. - err = d.backfillBatchTasks(startTime, startHandle, reorgInfo, &totalAddedCount, workers, batchTasks) + err = d.handleReorgTasks(reorgInfo, &totalAddedCount, workers, batchTasks) if err != nil { return errors.Trace(err) } From 320e4108483e24fdd8eb67db00c135f91e0d8370 Mon Sep 17 00:00:00 2001 From: winkyao Date: Mon, 25 Jun 2018 19:58:07 +0800 Subject: [PATCH 2/3] remove tests --- ddl/ddl_db_test.go | 89 ---------------------------------------------- 1 file changed, 89 deletions(-) diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 032005b653e0f..8cd23af585df8 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -26,7 +26,6 @@ import ( gofail "github.com/coreos/gofail/runtime" "github.com/juju/errors" . "github.com/pingcap/check" - "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" @@ -1932,91 +1931,3 @@ func (s *testDBSuite) TestAddIndexFailed(c *C) { tk.MustExec("admin check index t idx_b") tk.MustExec("admin check table t") } - -func (s *testDBSuite) getHistoryDDLJob(id int64) (*model.Job, error) { - var job *model.Job - - err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - var err1 error - job, err1 = t.GetHistoryDDLJob(id) - return errors.Trace(err1) - }) - - return job, errors.Trace(err) -} - -func (s *testDBSuite) TestBackwardCompatibility(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("create database if not exists test_backward_compatibility") - defer tk.MustExec("drop database test_backward_compatibility") - tk.MustExec("use test_backward_compatibility") - tk.MustExec("create table t(a int primary key, b int)") - for i := 0; i < 200; i++ { - tk.MustExec(fmt.Sprintf("insert into t values(%v, %v)", i, i)) - } - - // alter table t add index idx_b(b); - is := s.dom.InfoSchema() - schemaName := model.NewCIStr("test_backward_compatibility") - tableName := model.NewCIStr("t") - schema, ok := is.SchemaByName(schemaName) - c.Assert(ok, IsTrue) - tbl, err := is.TableByName(schemaName, tableName) - c.Assert(err, IsNil) - - // Split the table. - s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 100) - - unique := false - indexName := model.NewCIStr("idx_b") - idxColName := &ast.IndexColName{ - Column: &ast.ColumnName{ - Schema: schemaName, - Table: tableName, - Name: model.NewCIStr("b"), - }, - Length: types.UnspecifiedLength, - } - idxColNames := []*ast.IndexColName{idxColName} - var indexOption *ast.IndexOption - job := &model.Job{ - SchemaID: schema.ID, - TableID: tbl.Meta().ID, - Type: model.ActionAddIndex, - BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{unique, indexName, idxColNames, indexOption}, - } - txn, err := s.store.Begin() - c.Assert(err, IsNil) - t := meta.NewMeta(txn) - job.ID, err = t.GenGlobalID() - c.Assert(err, IsNil) - job.Version = 1 - job.StartTS = txn.StartTS() - - // Simulate old TiDB init the add index job, old TiDB will not init the model.Job.ReorgMeta field, - // if we set job.SnapshotVer here, can simulate the behavior. - job.SnapshotVer = txn.StartTS() - err = t.EnQueueDDLJob(job) - c.Assert(err, IsNil) - err = txn.Commit(context.Background()) - c.Assert(err, IsNil) - ticker := time.NewTicker(s.lease) - for range ticker.C { - historyJob, err := s.getHistoryDDLJob(job.ID) - c.Assert(err, IsNil) - if historyJob == nil { - - continue - } - c.Assert(historyJob.Error, IsNil) - - if historyJob.IsSynced() { - break - } - } - - // finished add index - tk.MustExec("admin check index t idx_b") -} From 8ea2fa35337ae72075214ad58f8dcd50fef50b9d Mon Sep 17 00:00:00 2001 From: winkyao Date: Mon, 25 Jun 2018 21:01:44 +0800 Subject: [PATCH 3/3] fix data race --- ddl/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index e5c8c2a7af5a4..af75ab76a3afe 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -642,7 +642,7 @@ func (w *addIndexWorker) run() { log.Debug("[ddl-reorg] got backfill index task:#v", task) // gofail: var mockAddIndexErr bool - //if mockAddIndexErr && !gofailMockAddindexErrOnceGuard && w.id == 0 { + //if w.id == 0 && mockAddIndexErr && !gofailMockAddindexErrOnceGuard { // gofailMockAddindexErrOnceGuard = true // result := &addIndexResult{addedCount: 0, nextHandle: 0, err: errors.Errorf("mock add index error")} // w.resultCh <- result