From 85cd2463bcc0ccb82502d73aec12d11fc1b5e6f4 Mon Sep 17 00:00:00 2001 From: winkyao Date: Wed, 25 Jul 2018 18:52:35 +0800 Subject: [PATCH] ddl: fix a bug that we miss adding last handle index in some case. (#7142) --- ddl/db_integration_test.go | 13 +++++++ ddl/index.go | 75 ++++++++++++++++++++++++++++---------- 2 files changed, 68 insertions(+), 20 deletions(-) diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 04a2205372819..632d366dc9e6e 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -123,6 +123,19 @@ func (s *testIntegrationSuite) TestCreateTableIfNotExists(c *C) { c.Assert(terror.ErrorEqual(infoschema.ErrTableExists, lastWarn.Err), IsTrue) } +func (s *testIntegrationSuite) TestEndIncluded(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("USE test") + tk.MustExec("create table t(a int, b int)") + for i := 0; i < ddl.DefaultTaskHandleCnt+1; i++ { + tk.MustExec("insert into t values(1, 1)") + } + tk.MustExec("alter table t add index b(b);") + tk.MustExec("admin check index t b") + tk.MustExec("admin check table t") +} + func newStoreWithBootstrap() (kv.Storage, *domain.Domain, error) { store, err := mockstore.NewMockTikvStore() if err != nil { diff --git a/ddl/index.go b/ddl/index.go index f339d91f08671..80225cce262c4 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -15,6 +15,7 @@ package ddl import ( "context" + "math" "sync/atomic" "time" @@ -444,7 +445,8 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { } const ( - defaultTaskHandleCnt = 128 + // DefaultTaskHandleCnt is default batch size of adding indices. + DefaultTaskHandleCnt = 128 ) // indexRecord is the record information of an index. @@ -492,7 +494,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab return &addIndexWorker{ id: id, ddlWorker: worker, - batchCnt: defaultTaskHandleCnt, + batchCnt: DefaultTaskHandleCnt, sessCtx: sessCtx, taskCh: make(chan *reorgIndexTask, 1), resultCh: make(chan *addIndexResult, 1), @@ -549,11 +551,37 @@ func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecor return idxRecord, nil } -func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgIndexTask) ([]*indexRecord, bool, error) { +// getNextHandle gets next handle of entry that we are going to process. +func (w *addIndexWorker) getNextHandle(taskRange reorgIndexTask, taskDone bool) (nextHandle int64) { + if !taskDone { + // The task is not done. So we need to pick the last processed entry's handle and add one. + return w.idxRecords[len(w.idxRecords)-1].handle + 1 + } + + // The task is done. So we need to choose a handle outside this range. + // Some corner cases should be considered: + // - The end of task range is MaxInt64. + // - The end of the task is excluded in the range. + if taskRange.endHandle == math.MaxInt64 || !taskRange.endIncluded { + return taskRange.endHandle + } + + return taskRange.endHandle + 1 +} + +// fetchRowColVals fetch w.batchCnt count rows that need to backfill indices, and build the corresponding indexRecord slice. +// fetchRowColVals returns: +// 1. The corresponding indexRecord slice. +// 2. Next handle of entry that we need to process. +// 3. Boolean indicates whether the task is done. +// 4. error occurs in fetchRowColVals. nil if no error occurs. +func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgIndexTask) ([]*indexRecord, int64, bool, error) { // TODO: use tableScan to prune columns. w.idxRecords = w.idxRecords[:0] startTime := time.Now() - handleOutOfRange := false + + // taskDone means that the added handle is out of taskRange.endHandle. + taskDone := false oprStartTime := time.Now() err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle, func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) { @@ -562,12 +590,12 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde oprStartTime = oprEndTime if !taskRange.endIncluded { - handleOutOfRange = handle >= taskRange.endHandle + taskDone = handle >= taskRange.endHandle } else { - handleOutOfRange = handle > taskRange.endHandle + taskDone = handle > taskRange.endHandle } - if handleOutOfRange || len(w.idxRecords) >= w.batchCnt { + if taskDone || len(w.idxRecords) >= w.batchCnt { return false, nil } @@ -579,14 +607,18 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde w.idxRecords = append(w.idxRecords, idxRecord) if handle == taskRange.endHandle { // If taskRange.endIncluded == false, we will not reach here when handle == taskRange.endHandle - handleOutOfRange = true + taskDone = true return false, nil } return true, nil }) + if len(w.idxRecords) == 0 { + taskDone = true + } + log.Debugf("[ddl] txn %v fetches handle info %v, takes time %v", txn.StartTS(), taskRange, time.Since(startTime)) - return w.idxRecords, handleOutOfRange, errors.Trace(err) + return w.idxRecords, w.getNextHandle(taskRange, taskDone), taskDone, errors.Trace(err) } func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) { @@ -603,13 +635,17 @@ func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string // indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry. // backfillIndexInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128. // TODO: make w.batchCnt can be modified by system variable. -func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHandle int64, addedCount, scanCount int, errInTxn error) { +func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHandle int64, taskDone bool, addedCount, scanCount int, errInTxn error) { oprStartTime := time.Now() errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error { addedCount = 0 scanCount = 0 txn.SetOption(kv.Priority, kv.PriorityLow) - idxRecords, handleOutOfRange, err := w.fetchRowColVals(txn, handleRange) + var ( + idxRecords []*indexRecord + err error + ) + idxRecords, nextHandle, taskDone, err = w.fetchRowColVals(txn, handleRange) if err != nil { return errors.Trace(err) } @@ -635,11 +671,6 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHan addedCount++ } - if handleOutOfRange || len(idxRecords) == 0 { - nextHandle = handleRange.endHandle - } else { - nextHandle = idxRecords[len(idxRecords)-1].handle + 1 - } return nil }) w.logSlowOperations(time.Since(oprStartTime), "backfillIndexInTxn", 3000) @@ -655,7 +686,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad startTime := time.Now() for { addedCount := 0 - nextHandle, addedCount, scanCount, err := w.backfillIndexInTxn(handleRange) + nextHandle, taskDone, addedCount, scanCount, err := w.backfillIndexInTxn(handleRange) if err == nil { // Because reorgIndexTask may run a long time, // we should check whether this ddl job is still runnable. @@ -678,12 +709,16 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad } handleRange.startHandle = nextHandle - if handleRange.startHandle >= handleRange.endHandle { + if taskDone { break } } - log.Infof("[ddl-reorg] worker(%v), finish region ranges [%v,%v) addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", - w.id, task.startHandle, task.endHandle, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds()) + rightParenthesis := ")" + if task.endIncluded { + rightParenthesis = "]" + } + log.Infof("[ddl-reorg] worker(%v), finish region ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", + w.id, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds()) return result }