-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
planner, executor: support mergeSort for partition tables in IndexLookUp #42483
Changes from all commits
3969aab
2900647
b674a34
4a6ccd5
661e083
1446106
e09b767
3827a80
b1a728c
eb9103d
767182f
949f8e1
9283be5
5ac3de1
fbff55a
5e4c0de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -377,7 +377,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) | |
} | ||
results = append(results, result) | ||
} | ||
e.result = distsql.NewSortedSelectResults(results, e.byItems, e.memTracker) | ||
e.result = distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker) | ||
} | ||
return nil | ||
} | ||
|
@@ -428,6 +428,7 @@ type IndexLookUpExecutor struct { | |
kvRanges []kv.KeyRange | ||
workerStarted bool | ||
|
||
byItems []*plannerutil.ByItems | ||
keepOrder bool | ||
desc bool | ||
|
||
|
@@ -583,18 +584,27 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in | |
return nil | ||
} | ||
|
||
func (e *IndexLookUpExecutor) hasExtralPidCol() bool { | ||
return e.index.Global || len(e.byItems) > 0 | ||
} | ||
|
||
func (e *IndexLookUpExecutor) isCommonHandle() bool { | ||
return !(len(e.handleCols) == 1 && e.handleCols[0].ID == model.ExtraHandleID) && e.table.Meta() != nil && e.table.Meta().IsCommonHandle | ||
} | ||
|
||
func (e *IndexLookUpExecutor) getRetTpsByHandle() []*types.FieldType { | ||
func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType { | ||
var tps []*types.FieldType | ||
if len(e.byItems) != 0 { | ||
for _, item := range e.byItems { | ||
tps = append(tps, item.Expr.GetType()) | ||
} | ||
} | ||
if e.isCommonHandle() { | ||
for _, handleCol := range e.handleCols { | ||
tps = append(tps, handleCol.RetType) | ||
} | ||
} else { | ||
tps = []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} | ||
tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) | ||
} | ||
if e.index.Global { | ||
tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) | ||
|
@@ -618,7 +628,13 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< | |
if e.partitionTableMode { | ||
kvRanges = e.partitionKVRanges | ||
} | ||
tps := e.getRetTpsByHandle() | ||
// When len(kvrange) = 1, no sorting is required, | ||
// so remove byItems and non-necessary output colums | ||
if len(kvRanges) == 1 { | ||
e.dagPB.OutputOffsets = e.dagPB.OutputOffsets[len(e.byItems):] | ||
e.byItems = nil | ||
} | ||
Comment on lines
+633
to
+636
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But here we think they're at the beginning of the output offsets? |
||
tps := e.getRetTpsForIndexReader() | ||
idxID := e.getIndexPlanRootID() | ||
e.idxWorkerWg.Add(1) | ||
go func() { | ||
|
@@ -649,6 +665,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< | |
SetMemTracker(tracker). | ||
SetConnID(e.ctx.GetSessionVars().ConnectionID) | ||
|
||
results := make([]distsql.SelectResult, 0, len(kvRanges)) | ||
pids := make([]int64, 0, len(kvRanges)) | ||
for partTblIdx, kvRange := range kvRanges { | ||
// check if executor is closed | ||
finished := false | ||
|
@@ -660,9 +678,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< | |
if finished { | ||
break | ||
} | ||
if worker.PushedLimit != nil && worker.scannedKeys >= worker.PushedLimit.Count+worker.PushedLimit.Offset { | ||
break | ||
} | ||
|
||
// init kvReq, result and worker for this partition | ||
// The key ranges should be ordered. | ||
|
@@ -679,29 +694,31 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< | |
worker.syncErr(err) | ||
break | ||
} | ||
results = append(results, result) | ||
worker.batchSize = initBatchSize | ||
if worker.batchSize > worker.maxBatchSize { | ||
worker.batchSize = worker.maxBatchSize | ||
} | ||
if e.partitionTableMode { | ||
worker.partitionTable = e.prunedPartitions[partTblIdx] | ||
pids = append(pids, e.prunedPartitions[partTblIdx].GetPhysicalID()) | ||
} | ||
|
||
// fetch data from this partition | ||
ctx1, cancel := context.WithCancel(ctx) | ||
fetchErr := worker.fetchHandles(ctx1, result) | ||
if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again | ||
e.feedback.Invalidate() | ||
} | ||
cancel() | ||
} | ||
if len(results) > 1 && len(e.byItems) != 0 { | ||
ssr := distsql.NewSortedSelectResults(results, pids, e.byItems, e.memTracker) | ||
results = []distsql.SelectResult{ssr} | ||
} | ||
ctx1, cancel := context.WithCancel(ctx) | ||
fetchErr := worker.fetchHandles(ctx1, results) | ||
if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again | ||
e.feedback.Invalidate() | ||
} | ||
cancel() | ||
for _, result := range results { | ||
if err := result.Close(); err != nil { | ||
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) | ||
} | ||
e.ctx.StoreQueryFeedback(e.feedback) | ||
if fetchErr != nil { | ||
break // if any error occurs, exit after releasing all resources | ||
} | ||
} | ||
e.ctx.StoreQueryFeedback(e.feedback) | ||
close(workCh) | ||
close(e.resultCh) | ||
e.idxWorkerWg.Done() | ||
|
@@ -753,6 +770,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup | |
corColInFilter: e.corColInTblSide, | ||
plans: e.tblPlans, | ||
netDataSize: e.avgRowSize * float64(len(task.handles)), | ||
byItems: e.byItems, | ||
} | ||
tableReaderExec.buildVirtualColumnInfo() | ||
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true) | ||
|
@@ -907,8 +925,6 @@ type indexWorker struct { | |
PushedLimit *plannercore.PushedDownLimit | ||
// scannedKeys indicates how many keys be scanned | ||
scannedKeys uint64 | ||
// partitionTable indicates if this worker is accessing a particular partition table. | ||
partitionTable table.PhysicalTable | ||
} | ||
|
||
func (w *indexWorker) syncErr(err error) { | ||
|
@@ -922,7 +938,7 @@ func (w *indexWorker) syncErr(err error) { | |
// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. | ||
// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh | ||
// at the same time to keep data ordered. | ||
func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (err error) { | ||
func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.SelectResult) (err error) { | ||
defer func() { | ||
if r := recover(); r != nil { | ||
logutil.Logger(ctx).Error("indexWorker in IndexLookupExecutor panicked", zap.Any("recover", r), zap.Stack("stack")) | ||
|
@@ -933,15 +949,23 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes | |
} | ||
} | ||
}() | ||
retTps := w.idxLookup.getRetTpsByHandle() | ||
retTps := w.idxLookup.getRetTpsForIndexReader() | ||
// for sortedSelectResult, add pids in last column | ||
if !w.idxLookup.index.Global && len(w.idxLookup.byItems) > 0 { | ||
retTps = append(retTps, types.NewFieldType(mysql.TypeLonglong)) | ||
} | ||
Comment on lines
+954
to
+956
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move this code block into getRetTpsByHandle? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't do that. Because we use |
||
chk := w.idxLookup.ctx.GetSessionVars().GetNewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize, w.idxLookup.maxChunkSize, w.idxLookup.AllocPool) | ||
idxID := w.idxLookup.getIndexPlanRootID() | ||
if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { | ||
if idxID != w.idxLookup.id && w.idxLookup.stats != nil { | ||
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID) | ||
} | ||
} | ||
for { | ||
for i := 0; i < len(results); { | ||
Defined2014 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result := results[i] | ||
if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset { | ||
break | ||
} | ||
startTime := time.Now() | ||
handles, retChunk, err := w.extractTaskHandles(ctx, chk, result) | ||
finishFetch := time.Now() | ||
|
@@ -950,10 +974,14 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes | |
return err | ||
} | ||
if len(handles) == 0 { | ||
return nil | ||
i++ | ||
continue | ||
} | ||
task := w.buildTableTask(handles, retChunk) | ||
finishBuild := time.Now() | ||
if w.idxLookup.partitionTableMode { | ||
task.partitionTable = w.idxLookup.prunedPartitions[i] | ||
} | ||
select { | ||
case <-ctx.Done(): | ||
return nil | ||
|
@@ -968,12 +996,13 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes | |
atomic.AddInt64(&w.idxLookup.stats.FetchHandleTotal, int64(time.Since(startTime))) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( | ||
handles []kv.Handle, retChk *chunk.Chunk, err error) { | ||
numColsWithoutPid := chk.NumCols() | ||
if w.idxLookup.index.Global { | ||
if w.idxLookup.hasExtralPidCol() { | ||
numColsWithoutPid = numColsWithoutPid - 1 | ||
} | ||
handleOffset := make([]int, 0, len(w.idxLookup.handleCols)) | ||
|
@@ -1074,7 +1103,6 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) * | |
indexOrder: indexOrder, | ||
duplicatedIndexOrder: duplicatedIndexOrder, | ||
idxRows: retChk, | ||
partitionTable: w.partitionTable, | ||
} | ||
|
||
task.doneCh = make(chan error, 1) | ||
|
@@ -1165,7 +1193,7 @@ func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int, | |
handle = kv.IntHandle(row.GetInt64(handleIdx[0])) | ||
} | ||
} | ||
if e.index.Global { | ||
if e.hasExtralPidCol() { | ||
pidOffset := row.Len() - 1 | ||
pid := row.GetInt64(pidOffset) | ||
handle = kv.NewPartitionHandle(pid, handle) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line 193, CopyConstruct creates a new chunk already. Prune will also create a new chunk. We should refine this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation in
Prune
andCopyConstruct
is different. ThePrune
will influence the original chunk.