-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: Speed up adding index phase #2341
Conversation
func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, batchOpInfo *indexBatchOpInfo, seekHandle int64) error { | ||
cols := t.Cols() | ||
idxInfo := batchOpInfo.tblIndex.Meta() | ||
func isFinish(limit, input int64) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isFinished
if err1 != nil { | ||
return errors.Trace(err1) | ||
for i := 0; i < batches; i++ { | ||
go d.backfillIndex(t, batchOpInfo, seekHandle, &wg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should Add waitgroup outside the goroutine
func (b batchRetSlice) Less(i, j int) bool { return b[i].doneHandle < b[j].doneHandle } | ||
func (b batchRetSlice) Swap(i, j int) { b[i], b[j] = b[j], b[i] } | ||
|
||
type indexRecord struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments for the following structs.
} | ||
|
||
const defaultBatchCnt = 1024 | ||
const defaultSmallBatchCnt = 128 | ||
type indexBatchOpInfo struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment on this type and its members.
defaultSmallBatches = 16 | ||
) | ||
|
||
type batchRet struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more comment on this type.
} | ||
} | ||
|
||
// recordIterFunc is used for low-level record iteration. | ||
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error) | ||
type handleInfo struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments on the type and its members.
seekHandle := reorgInfo.Handle | ||
wg := sync.WaitGroup{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this in for loop is better.
) | ||
|
||
// batchRet is the result of the batch. | ||
type batchRet struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to batchResult
is better.
func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, batchOpInfo *indexBatchOpInfo, seekHandle int64) error { | ||
cols := t.Cols() | ||
idxInfo := batchOpInfo.tblIndex.Meta() | ||
func isFinished(limit, input int64) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name is too generic.
make it as a method of handleInfo
is better.
cols := t.Cols() | ||
idxInfo := batchOpInfo.tblIndex.Meta() | ||
func isFinished(limit, input int64) bool { | ||
if limit == 0 || input < limit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A 0 handle can be valid.
ret.doneHandle = idxRecords[ret.count-1].handle | ||
} | ||
// Be sure to do this operation only once. | ||
handleInfo.once.Do(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have to use once
, add a bool member in handleInfo is clearer.
} | ||
batchOpInfo.idxRecords = batchOpInfo.idxRecords[:0] | ||
err1 = d.backfillIndexInTxn(t, txn, batchOpInfo, seekHandle) | ||
seekHandle = handle + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename seekHandle
to batchStartHandle
is better.
for i := 0; i < batches; i++ { | ||
wg.Add(1) | ||
go d.backfillIndex(t, batchOpInfo, seekHandle, &wg) | ||
handle := <-batchOpInfo.nextCh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name it doneHandle
, keep it consistent with the sender's
colMap map[int64]*types.FieldType | ||
batchRetCh chan *batchRet | ||
nextCh chan int64 // It notifies to start the next batch. | ||
} | ||
|
||
// How to add index in reorganization state? | ||
// 1. Generate a snapshot with special version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the comment.
PTAL |
PTAL @coocood |
// The above operations are completed in a transaction. | ||
// When concurrent tasks are processed, the batch result returned by each batch is sorted by handle. Then traverse the | ||
// batch results, gets the total number of row in the concurrent task and update the processed handle value. If | ||
// you encounter an error message, exit traversal. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/you/we
// 3. For one row, if the row has been already deleted, skip to next row. | ||
// 4. If not deleted, check whether index has existed, if existed, skip to next row. | ||
// 5. If index doesn't exist, create the index and then continue to handle next row. | ||
// Concurrently process defaultSmallBatches tasks. Each task deals with a handle interval of the index record. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use handle range
uniformly.
// 4. If not deleted, check whether index has existed, if existed, skip to next row. | ||
// 5. If index doesn't exist, create the index and then continue to handle next row. | ||
// Concurrently process defaultSmallBatches tasks. Each task deals with a handle interval of the index record. | ||
// The handle interval is defaultSmallBatchCnt. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle range size
// 5. If index doesn't exist, create the index and then continue to handle next row. | ||
// Concurrently process defaultSmallBatches tasks. Each task deals with a handle interval of the index record. | ||
// The handle interval is defaultSmallBatchCnt. | ||
// Although the length of each handle interval is controllable, but the range of the handle value can't be expected, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because each handle range depends on the previous one, it's necessary to obtain the handle range sequentially.
bc6b660
to
1b7e6e7
Compare
return errors.Trace(err) | ||
} | ||
rk := t.RecordKey(handle) | ||
func (d *ddl) backfillIndex(t table.Table, batchOpInfo *indexBatchOpInfo, seekHandle int64, wg *sync.WaitGroup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/backfillIndex/doBackfillIndxTask
s/seekHandle/startHandle
break | ||
} else if err != nil { | ||
return errors.Trace(err) | ||
ret = d.backfillIndexInTxn(t, txn, batchOpInfo, handleInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/backfillIndexInTxn/doBackfillIndexTaskInTxn
go d.backfillIndex(t, batchOpInfo, batchStartHandle, &wg) | ||
doneHandle := <-batchOpInfo.nextCh | ||
// There is no data to seek. | ||
if doneHandle == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zero handle may be valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about check doneHandle == batchStartHandle
seekHandle := reorgInfo.Handle | ||
addedCount := job.GetRowCount() | ||
batchStartHandle := reorgInfo.Handle | ||
wg := sync.WaitGroup{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move wg
in for loop
const ( | ||
defaultBatchCnt = 1024 | ||
defaultSmallBatchCnt = 128 | ||
defaultSmallBatches = 16 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaultTasks
) | ||
|
||
// batchResult is the result of the batch. | ||
type batchResult struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskResult
// 3. For one row, if the row has been already deleted, skip to next row. | ||
// 4. If not deleted, check whether index has existed, if existed, skip to next row. | ||
// 5. If index doesn't exist, create the index and then continue to handle next row. | ||
// Concurrently process defaultSmallBatches tasks. Each task deals with a handle range of the index record. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaultSmallBatches tasks -> the defaultSmallBatches tasks.
// Because each handle range depends on the previous one, it's necessary to obtain the handle range serially. | ||
// Real concurrent processing needs to perform after the handle range has been acquired. | ||
// The operation flow of the each batch of data is as follows: | ||
// 1. Open a goroutine. Traverse the snapshot to obtain the handle range, while access to the corresponding row key and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
access to ->accessing
// The operation flow of the each batch of data is as follows: | ||
// 1. Open a goroutine. Traverse the snapshot to obtain the handle range, while access to the corresponding row key and | ||
// raw index value. Then notify to start the next batch. | ||
// 2. Decoding this batch of raw index value gets the corresponding index value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decoding this batch of raw index value gets the corresponding index value. ->Decode this batch of raw index value to get the corresponding index value.
// 1. Open a goroutine. Traverse the snapshot to obtain the handle range, while access to the corresponding row key and | ||
// raw index value. Then notify to start the next batch. | ||
// 2. Decoding this batch of raw index value gets the corresponding index value. | ||
// 3. Deal with this index records one by one. If the index record exists, skip to the next row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this index records -> these index records
// 2. Decoding this batch of raw index value gets the corresponding index value. | ||
// 3. Deal with this index records one by one. If the index record exists, skip to the next row. | ||
// If the index doesn't exist, create the index ande then continue to handle the next row. | ||
// 4. When the handle of a range is completed, returns the corresponding batch result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returns ->return
// 4. When the handle of a range is completed, returns the corresponding batch result. | ||
// The above operations are completed in a transaction. | ||
// When concurrent tasks are processed, the batch result returned by each batch is sorted by the handle. Then traverse the | ||
// batch results, gets the total number of row in the concurrent task and update the processed handle value. If |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gets -> get
row -> rows
// The above operations are completed in a transaction. | ||
// When concurrent tasks are processed, the batch result returned by each batch is sorted by the handle. Then traverse the | ||
// batch results, gets the total number of row in the concurrent task and update the processed handle value. If | ||
// we encounter an error message, exit traversal. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> an error message is displayed, exit the traversal.
colMap: colMap, | ||
handle: reorgInfo.Handle, | ||
idxRecords: make([]*indexRecord, 0, batchCnt), | ||
tasks := defaultTaskCnt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskCnt
} | ||
} | ||
|
||
// recordIterFunc is used for low-level record iteration. | ||
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error) | ||
// handleInfo records start ande end handle that is used in a task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and
LGTM |
|
||
// taskResult is the result of the task. | ||
type taskResult struct { | ||
count int // The number of records that has been proceed in the task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
proceeded
var err error | ||
for _, ret := range taskRets { | ||
if ret.err != nil { | ||
err = ret.err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return directly here? then u don't need to use an err var.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's OK.
if err == nil { | ||
err = err1 | ||
} else { | ||
log.Warnf("[ddl] add index failed when update handle %d, err %v", doneHandle, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use err %s and err.Error() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't necessary. Using err
is clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then should you log err1 in this place ?
} | ||
|
||
// Create the index. | ||
handle, err := batchOpInfo.tblIndex.Create(txn, idxRecord.vals, idxRecord.handle) | ||
handle, err := taskOpInfo.tblIndex.Create(txn, idxRecord.vals, idxRecord.handle) | ||
if err != nil { | ||
if terror.ErrorEqual(err, kv.ErrKeyExists) && idxRecord.handle == handle { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When it returns err key exists but the handle is not the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a unique key, the key value is updated when we create the index.
ret.doneHandle = idxRecords[ret.count-1].handle | ||
} | ||
// Be sure to do this operation only once. | ||
if !handleInfo.isSent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the handleInfo's isSent
is true at this time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transaction retries.
handleInfo.endHandle = ret.doneHandle | ||
handleInfo.isSent = true | ||
} | ||
if ret.count == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does ret.count
always equal to len(idxRecords)
? If so, you needn't record it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. But we need it to update statistics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Handle issue #2257
Remove the batch get operation when adding index.
Using 3 TiKV, 1 PD, 1 TiDB , the servers of TiKV and TiDB are not on a computer, two TiKV on a computer.
The lease of TiDB is 1s.
The number of records in this table is 3531200(3.4 M), and the table's structure is as follows:
+------------------+------------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------------------+------+------+---------+-------+
| a | varchar(64) | YES | | | |
| b | bigint(20) | NO | | 0 | |
| c | datetime | YES | | NULL | |
| d | int(11) UNSIGNED | NO | | 0 | |
| e | int(11) UNSIGNED | NO | | 0 | |
| f | int(11) UNSIGNED | NO | | 0 | |
| g | varchar(28) | NO | | | |
| h | varchar(28) | NO | | | |
+------------------+------------------+------+------+---------+-------+
tidb-before> alter table battle_begin add index b (event_id);
Query OK, 0 rows affected (8 min 36.40 sec)
tidb-before> alter table battle_begin add index g (sum_id);
Query OK, 0 rows affected (7 min 10.01 sec)
tidb-current> alter table battle_begin add index b (event_id);
Query OK, 0 rows affected (3 min 35.75 sec)
tidb-current> alter table battle_begin add index g (sum_id);
Query OK, 0 rows affected (3 min 46.25 sec)