Skip to content

Commit

Permalink
ddl: fix a bug that we miss adding last handle index in some case. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
winkyao authored Jul 25, 2018
1 parent 9a1a95d commit 85cd246
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 20 deletions.
13 changes: 13 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ func (s *testIntegrationSuite) TestCreateTableIfNotExists(c *C) {
c.Assert(terror.ErrorEqual(infoschema.ErrTableExists, lastWarn.Err), IsTrue)
}

func (s *testIntegrationSuite) TestEndIncluded(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("USE test")
tk.MustExec("create table t(a int, b int)")
for i := 0; i < ddl.DefaultTaskHandleCnt+1; i++ {
tk.MustExec("insert into t values(1, 1)")
}
tk.MustExec("alter table t add index b(b);")
tk.MustExec("admin check index t b")
tk.MustExec("admin check table t")
}

func newStoreWithBootstrap() (kv.Storage, *domain.Domain, error) {
store, err := mockstore.NewMockTikvStore()
if err != nil {
Expand Down
75 changes: 55 additions & 20 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package ddl

import (
"context"
"math"
"sync/atomic"
"time"

Expand Down Expand Up @@ -444,7 +445,8 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

const (
defaultTaskHandleCnt = 128
// DefaultTaskHandleCnt is default batch size of adding indices.
DefaultTaskHandleCnt = 128
)

// indexRecord is the record information of an index.
Expand Down Expand Up @@ -492,7 +494,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
return &addIndexWorker{
id: id,
ddlWorker: worker,
batchCnt: defaultTaskHandleCnt,
batchCnt: DefaultTaskHandleCnt,
sessCtx: sessCtx,
taskCh: make(chan *reorgIndexTask, 1),
resultCh: make(chan *addIndexResult, 1),
Expand Down Expand Up @@ -549,11 +551,37 @@ func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecor
return idxRecord, nil
}

func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgIndexTask) ([]*indexRecord, bool, error) {
// getNextHandle gets next handle of entry that we are going to process.
func (w *addIndexWorker) getNextHandle(taskRange reorgIndexTask, taskDone bool) (nextHandle int64) {
if !taskDone {
// The task is not done. So we need to pick the last processed entry's handle and add one.
return w.idxRecords[len(w.idxRecords)-1].handle + 1
}

// The task is done. So we need to choose a handle outside this range.
// Some corner cases should be considered:
// - The end of task range is MaxInt64.
// - The end of the task is excluded in the range.
if taskRange.endHandle == math.MaxInt64 || !taskRange.endIncluded {
return taskRange.endHandle
}

return taskRange.endHandle + 1
}

// fetchRowColVals fetch w.batchCnt count rows that need to backfill indices, and build the corresponding indexRecord slice.
// fetchRowColVals returns:
// 1. The corresponding indexRecord slice.
// 2. Next handle of entry that we need to process.
// 3. Boolean indicates whether the task is done.
// 4. error occurs in fetchRowColVals. nil if no error occurs.
func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgIndexTask) ([]*indexRecord, int64, bool, error) {
// TODO: use tableScan to prune columns.
w.idxRecords = w.idxRecords[:0]
startTime := time.Now()
handleOutOfRange := false

// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := time.Now()
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
Expand All @@ -562,12 +590,12 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
oprStartTime = oprEndTime

if !taskRange.endIncluded {
handleOutOfRange = handle >= taskRange.endHandle
taskDone = handle >= taskRange.endHandle
} else {
handleOutOfRange = handle > taskRange.endHandle
taskDone = handle > taskRange.endHandle
}

if handleOutOfRange || len(w.idxRecords) >= w.batchCnt {
if taskDone || len(w.idxRecords) >= w.batchCnt {
return false, nil
}

Expand All @@ -579,14 +607,18 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
w.idxRecords = append(w.idxRecords, idxRecord)
if handle == taskRange.endHandle {
// If taskRange.endIncluded == false, we will not reach here when handle == taskRange.endHandle
handleOutOfRange = true
taskDone = true
return false, nil
}
return true, nil
})

if len(w.idxRecords) == 0 {
taskDone = true
}

log.Debugf("[ddl] txn %v fetches handle info %v, takes time %v", txn.StartTS(), taskRange, time.Since(startTime))
return w.idxRecords, handleOutOfRange, errors.Trace(err)
return w.idxRecords, w.getNextHandle(taskRange, taskDone), taskDone, errors.Trace(err)
}

func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
Expand All @@ -603,13 +635,17 @@ func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string
// indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry.
// backfillIndexInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128.
// TODO: make w.batchCnt can be modified by system variable.
func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHandle int64, addedCount, scanCount int, errInTxn error) {
func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHandle int64, taskDone bool, addedCount, scanCount int, errInTxn error) {
oprStartTime := time.Now()
errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error {
addedCount = 0
scanCount = 0
txn.SetOption(kv.Priority, kv.PriorityLow)
idxRecords, handleOutOfRange, err := w.fetchRowColVals(txn, handleRange)
var (
idxRecords []*indexRecord
err error
)
idxRecords, nextHandle, taskDone, err = w.fetchRowColVals(txn, handleRange)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -635,11 +671,6 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHan
addedCount++
}

if handleOutOfRange || len(idxRecords) == 0 {
nextHandle = handleRange.endHandle
} else {
nextHandle = idxRecords[len(idxRecords)-1].handle + 1
}
return nil
})
w.logSlowOperations(time.Since(oprStartTime), "backfillIndexInTxn", 3000)
Expand All @@ -655,7 +686,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
startTime := time.Now()
for {
addedCount := 0
nextHandle, addedCount, scanCount, err := w.backfillIndexInTxn(handleRange)
nextHandle, taskDone, addedCount, scanCount, err := w.backfillIndexInTxn(handleRange)
if err == nil {
// Because reorgIndexTask may run a long time,
// we should check whether this ddl job is still runnable.
Expand All @@ -678,12 +709,16 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
}

handleRange.startHandle = nextHandle
if handleRange.startHandle >= handleRange.endHandle {
if taskDone {
break
}
}
log.Infof("[ddl-reorg] worker(%v), finish region ranges [%v,%v) addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
w.id, task.startHandle, task.endHandle, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds())
rightParenthesis := ")"
if task.endIncluded {
rightParenthesis = "]"
}
log.Infof("[ddl-reorg] worker(%v), finish region ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
w.id, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds())

return result
}
Expand Down

0 comments on commit 85cd246

Please sign in to comment.