diff --git a/ddl/fail_db_test.go b/ddl/fail_db_test.go index ec7316baa6847..360df99dac420 100644 --- a/ddl/fail_db_test.go +++ b/ddl/fail_db_test.go @@ -15,9 +15,12 @@ package ddl_test import ( "fmt" + "math/rand" + "sync/atomic" "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" gofail "github.com/pingcap/gofail/runtime" "github.com/pingcap/parser" "github.com/pingcap/parser/model" @@ -25,6 +28,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" @@ -238,3 +242,61 @@ func (s *testDBSuite) TestFailSchemaSyncer(c *C) { _, err = tk.Exec("insert into t values(1)") c.Assert(err, IsNil) } + +func (s *testDBSuite) TestAddIndexWorkerNum(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_db") + tk.MustExec("use test_db") + tk.MustExec("drop table if exists test_add_index") + tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") + + done := make(chan error, 1) + start := -10 + num := 4096 + // first add some rows + for i := start; i < num; i++ { + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + tk.MustExec(sql) + } + + is := s.dom.InfoSchema() + schemaName := model.NewCIStr("test_db") + tableName := model.NewCIStr("test_add_index") + tbl, err := is.TableByName(schemaName, tableName) + c.Assert(err, IsNil) + + splitCount := 100 + // Split table to multi region. + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) + + originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter() + lastSetWorkerCnt := originDDLAddIndexWorkerCnt + atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt) + ddl.TestCheckWorkerNumber = lastSetWorkerCnt + defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt) + + gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum") + + sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) + checkNum := 0 + +LOOP: + for { + select { + case err = <-done: + if err == nil { + break LOOP + } + c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) + case <-ddl.TestCheckWorkerNumCh: + lastSetWorkerCnt = int32(rand.Intn(8) + 8) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) + atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt) + checkNum++ + } + } + c.Assert(checkNum, Greater, 5) + tk.MustExec("admin check table test_add_index") + tk.MustExec("drop table test_add_index") +} diff --git a/ddl/index.go b/ddl/index.go index b04c0199fedd4..53310b804d1a3 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -822,7 +822,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad if task.endIncluded { rightParenthesis = "]" } - log.Infof("[ddl-reorg] worker(%v), finish region %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", + log.Infof("[ddl-reorg] worker(%v), finish table %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", w.id, task.physicalTableID, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds()) return result @@ -1045,34 +1045,12 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker return nil, nil } -// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle), -// and send these tasks to add index workers, till we finish adding the indices. -func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error { - totalAddedCount := job.GetRowCount() - - startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle - for { - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) - if err != nil { - return errors.Trace(err) - } - - log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle) - remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges) - if err != nil { - return errors.Trace(err) - } - - if len(remains) == 0 { - break - } - startHandle, _, err = decodeHandleRange(remains[0]) - if err != nil { - return errors.Trace(err) - } - } - return nil -} +var ( + // TestCheckWorkerNumCh use for test adjust add index worker. + TestCheckWorkerNumCh = make(chan struct{}, 0) + // TestCheckWorkerNumber use for test adjust add index worker. + TestCheckWorkerNumber = int32(16) +) // addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition. // For a partitioned table, it should be handled partition by partition. @@ -1091,6 +1069,9 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error { job := reorgInfo.Job log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo) + totalAddedCount := job.GetRowCount() + + startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle sessCtx := newContext(reorgInfo.d.store) decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo) if err != nil { @@ -1099,16 +1080,68 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I // variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt". workerCnt := variable.GetDDLReorgWorkerCounter() - idxWorkers := make([]*addIndexWorker, workerCnt) - for i := 0; i < int(workerCnt); i++ { - sessCtx := newContext(reorgInfo.d.store) - idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap) - idxWorkers[i].priority = job.Priority - go idxWorkers[i].run(reorgInfo.d) - } - defer closeAddIndexWorkers(idxWorkers) - err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo) - return errors.Trace(err) + idxWorkers := make([]*addIndexWorker, 0, workerCnt) + defer func() { + closeAddIndexWorkers(idxWorkers) + }() + + for { + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) + if err != nil { + return errors.Trace(err) + } + + // For dynamic adjust add index worker number. + workerCnt = variable.GetDDLReorgWorkerCounter() + // If only have 1 range, we can only start 1 worker. + if len(kvRanges) < int(workerCnt) { + workerCnt = int32(len(kvRanges)) + } + // Enlarge the worker size. + for i := len(idxWorkers); i < int(workerCnt); i++ { + sessCtx := newContext(reorgInfo.d.store) + idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap) + idxWorker.priority = job.Priority + idxWorkers = append(idxWorkers, idxWorker) + go idxWorkers[i].run(reorgInfo.d) + } + // Shrink the worker size. + if len(idxWorkers) > int(workerCnt) { + workers := idxWorkers[workerCnt:] + idxWorkers = idxWorkers[:workerCnt] + closeAddIndexWorkers(workers) + } + + // gofail: var checkIndexWorkerNum bool + // if checkIndexWorkerNum { + // num := int(atomic.LoadInt32(&TestCheckWorkerNumber)) + // if num != 0 { + // if num > len(kvRanges) { + // if len(idxWorkers) != len(kvRanges) { + // return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers)) + // } + // } else if num != len(idxWorkers) { + // return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers)) + // } + // TestCheckWorkerNumCh <- struct{}{} + // } + //} + + log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle) + remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges) + if err != nil { + return errors.Trace(err) + } + + if len(remains) == 0 { + break + } + startHandle, _, err = decodeHandleRange(remains[0]) + if err != nil { + return errors.Trace(err) + } + } + return nil } // addTableIndex handles the add index reorganization state for a table.