Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: support mergeSort for partition tables in IndexLookUp #42483

Merged
merged 16 commits into from
Apr 4, 2023
16 changes: 14 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,16 @@ func (h *chunkRowHeap) Pop() interface{} {
}

// NewSortedSelectResults is only for partition table
func NewSortedSelectResults(selectResult []SelectResult, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult {
// When pids != nil, the pid will be set in the last column of each chunk.Rows.
func NewSortedSelectResults(selectResult []SelectResult, pids []int64, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult {
s := &sortedSelectResults{
selectResult: selectResult,
byItems: byitems,
memTracker: memTracker,
pids: pids,
}
s.initCompareFuncs()
s.buildKeyColumns()

s.heap = &chunkRowHeap{s}
s.cachedChunks = make([]*chunk.Chunk, len(selectResult))
return s
Expand All @@ -132,6 +133,7 @@ type sortedSelectResults struct {
rowPtrs []chunk.RowPtr
heap *chunkRowHeap

pids []int64
memTracker *memory.Tracker
}

Expand Down Expand Up @@ -186,9 +188,16 @@ func (*sortedSelectResults) NextRaw(context.Context) ([]byte, error) {

func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err error) {
c.Reset()
r := make([]int, c.NumCols()-1)
for i := range r {
r[i] = i
}
for i := range ssr.cachedChunks {
if ssr.cachedChunks[i] == nil {
ssr.cachedChunks[i] = c.CopyConstruct()
if len(ssr.pids) != 0 {
ssr.cachedChunks[i] = ssr.cachedChunks[i].Prune(r)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 193, CopyConstruct creates a new chunk already. Prune will also create a new chunk. We should refine this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation in Prune and CopyConstruct is different. The Prune will influence the original chunk.

func (c *Chunk) Prune(usedColIdxs []int) *Chunk {
	...
	for i, idx := range usedColIdxs {
		chk.columns[i] = c.columns[idx]
	}
	...
}
func (c *Chunk) CopyConstruct() *Chunk {
    ...
	for i := range c.columns {
		newChk.columns[i] = c.columns[i].CopyConstruct(nil)
	}
    ...
}

}
ssr.memTracker.Consume(ssr.cachedChunks[i].MemoryUsage())
}
}
Expand All @@ -208,6 +217,9 @@ func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err e

idx := heap.Pop(ssr.heap).(chunk.RowPtr)
c.AppendRow(ssr.cachedChunks[idx.ChkIdx].GetRow(int(idx.RowIdx)))
if len(ssr.pids) != 0 {
c.AppendInt64(c.NumCols()-1, ssr.pids[idx.ChkIdx])
}

if int(idx.RowIdx) >= ssr.cachedChunks[idx.ChkIdx].NumRows()-1 {
if err = ssr.updateCachedChunk(ctx, idx.ChkIdx); err != nil {
Expand Down
29 changes: 27 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3809,6 +3809,25 @@ func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []pla
if len(indexReq.OutputOffsets) == 0 {
indexReq.OutputOffsets = []uint32{uint32(schemaLen)}
}

if len(plans[0].(*plannercore.PhysicalIndexScan).ByItems) != 0 {
idxScan := plans[0].(*plannercore.PhysicalIndexScan)
tblInfo := idxScan.Table
offset := make([]uint32, 0, len(idxScan.ByItems))
for _, item := range idxScan.ByItems {
c, ok := item.Expr.(*expression.Column)
if !ok {
return nil, errors.Errorf("Not support non-column in orderBy pushed down")
}
for _, c1 := range tblInfo.Columns {
if c1.ID == c.ID {
offset = append(offset, uint32(c1.Offset))
break
}
}
}
indexReq.OutputOffsets = append(offset, indexReq.OutputOffsets...)
}
return indexReq, err
}

Expand All @@ -3824,6 +3843,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
// Should output pid col.
handleLen++
}

indexReq, err := buildIndexReq(b.ctx, len(is.Index.Columns), handleLen, v.IndexPlans)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3854,6 +3874,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
byItems: is.ByItems,
desc: is.Desc,
tableRequest: tableReq,
columns: ts.Columns,
Expand Down Expand Up @@ -3938,7 +3959,9 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
return ret
}

if is.Index.Global {
indexScanPlan := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)

if is.Index.Global || len(indexScanPlan.ByItems) != 0 {
tmp, ok := b.is.TableByID(ts.Table.ID)
if !ok {
b.err = err
Expand All @@ -3955,7 +3978,9 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
return nil
}

return ret
if is.Index.Global {
return ret
}
}
if ok, _ := is.IsPartition(); ok {
// Already pruned when translated to logical union.
Expand Down
77 changes: 51 additions & 26 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
}
results = append(results, result)
}
e.result = distsql.NewSortedSelectResults(results, e.byItems, e.memTracker)
e.result = distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker)
}
return nil
}
Expand Down Expand Up @@ -428,6 +428,7 @@ type IndexLookUpExecutor struct {
kvRanges []kv.KeyRange
workerStarted bool

byItems []*plannerutil.ByItems
keepOrder bool
desc bool

Expand Down Expand Up @@ -589,12 +590,17 @@ func (e *IndexLookUpExecutor) isCommonHandle() bool {

func (e *IndexLookUpExecutor) getRetTpsByHandle() []*types.FieldType {
var tps []*types.FieldType
if len(e.byItems) != 0 {
for _, item := range e.byItems {
tps = append(tps, item.Expr.GetType())
}
}
if e.isCommonHandle() {
for _, handleCol := range e.handleCols {
tps = append(tps, handleCol.RetType)
}
} else {
tps = []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}
if e.index.Global {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
Expand All @@ -618,6 +624,12 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
if e.partitionTableMode {
kvRanges = e.partitionKVRanges
}
// When len(kvrange) = 1, no sorting is required,
// so remove byItems and non-necessary output colums
if len(kvRanges) == 1 {
e.dagPB.OutputOffsets = e.dagPB.OutputOffsets[len(e.byItems):]
e.byItems = nil
}
Comment on lines +633 to +636
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here we think they're at the beginning of the output offsets?

tps := e.getRetTpsByHandle()
idxID := e.getIndexPlanRootID()
e.idxWorkerWg.Add(1)
Expand Down Expand Up @@ -649,6 +661,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
SetMemTracker(tracker).
SetConnID(e.ctx.GetSessionVars().ConnectionID)

results := make([]distsql.SelectResult, 0, len(kvRanges))
pids := make([]int64, 0, len(kvRanges))
for partTblIdx, kvRange := range kvRanges {
// check if executor is closed
finished := false
Expand All @@ -660,9 +674,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
if finished {
break
}
if worker.PushedLimit != nil && worker.scannedKeys >= worker.PushedLimit.Count+worker.PushedLimit.Offset {
break
}

// init kvReq, result and worker for this partition
// The key ranges should be ordered.
Expand All @@ -679,29 +690,32 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
worker.syncErr(err)
break
}
results = append(results, result)
worker.batchSize = initBatchSize
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
}
if e.partitionTableMode {
worker.partitionTable = e.prunedPartitions[partTblIdx]
pids = append(pids, e.prunedPartitions[partTblIdx].GetPhysicalID())
}

// fetch data from this partition
ctx1, cancel := context.WithCancel(ctx)
fetchErr := worker.fetchHandles(ctx1, result)
if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again
e.feedback.Invalidate()
}
cancel()
}
r := results
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
if len(results) > 1 && len(e.byItems) != 0 {
ssr := distsql.NewSortedSelectResults(results, pids, e.byItems, e.memTracker)
r = []distsql.SelectResult{ssr}
}
ctx1, cancel := context.WithCancel(ctx)
fetchErr := worker.fetchHandles(ctx1, r)
if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again
e.feedback.Invalidate()
}
cancel()
for _, result := range r {
if err := result.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err))
}
e.ctx.StoreQueryFeedback(e.feedback)
if fetchErr != nil {
break // if any error occurs, exit after releasing all resources
}
}
e.ctx.StoreQueryFeedback(e.feedback)
close(workCh)
close(e.resultCh)
e.idxWorkerWg.Done()
Expand Down Expand Up @@ -753,6 +767,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
netDataSize: e.avgRowSize * float64(len(task.handles)),
byItems: e.byItems,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true)
Expand Down Expand Up @@ -907,8 +922,6 @@ type indexWorker struct {
PushedLimit *plannercore.PushedDownLimit
// scannedKeys indicates how many keys be scanned
scannedKeys uint64
// partitionTable indicates if this worker is accessing a particular partition table.
partitionTable table.PhysicalTable
}

func (w *indexWorker) syncErr(err error) {
Expand All @@ -922,7 +935,7 @@ func (w *indexWorker) syncErr(err error) {
// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh
// at the same time to keep data ordered.
func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (err error) {
func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.SelectResult) (err error) {
defer func() {
if r := recover(); r != nil {
logutil.Logger(ctx).Error("indexWorker in IndexLookupExecutor panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -934,14 +947,22 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}()
retTps := w.idxLookup.getRetTpsByHandle()
// for sortedSelectResult, add pids in last column
if !w.idxLookup.index.Global && len(w.idxLookup.byItems) > 0 {
retTps = append(retTps, types.NewFieldType(mysql.TypeLonglong))
}
Comment on lines +954 to +956
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this code block into getRetTpsByHandle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't do that. Because we use getRetTpsByHandle func in two different places. For create chk, we need add pid column, for create select result, we shouldn't do that.

chk := w.idxLookup.ctx.GetSessionVars().GetNewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize, w.idxLookup.maxChunkSize, w.idxLookup.AllocPool)
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if idxID != w.idxLookup.id && w.idxLookup.stats != nil {
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID)
}
}
for {
for i := 0; i < len(results); {
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
result := results[i]
if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset {
break
}
startTime := time.Now()
handles, retChunk, err := w.extractTaskHandles(ctx, chk, result)
finishFetch := time.Now()
Expand All @@ -950,10 +971,14 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
return err
}
if len(handles) == 0 {
return nil
i++
continue
}
task := w.buildTableTask(handles, retChunk)
finishBuild := time.Now()
if w.idxLookup.partitionTableMode {
task.partitionTable = w.idxLookup.prunedPartitions[i]
}
select {
case <-ctx.Done():
return nil
Expand All @@ -968,12 +993,13 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
atomic.AddInt64(&w.idxLookup.stats.FetchHandleTotal, int64(time.Since(startTime)))
}
}
return nil
}

func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (
handles []kv.Handle, retChk *chunk.Chunk, err error) {
numColsWithoutPid := chk.NumCols()
if w.idxLookup.index.Global {
if w.idxLookup.index.Global || len(w.idxLookup.byItems) > 0 {
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
numColsWithoutPid = numColsWithoutPid - 1
}
handleOffset := make([]int, 0, len(w.idxLookup.handleCols))
Expand Down Expand Up @@ -1074,7 +1100,6 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) *
indexOrder: indexOrder,
duplicatedIndexOrder: duplicatedIndexOrder,
idxRows: retChk,
partitionTable: w.partitionTable,
}

task.doneCh = make(chan error, 1)
Expand Down Expand Up @@ -1165,7 +1190,7 @@ func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int,
handle = kv.IntHandle(row.GetInt64(handleIdx[0]))
}
}
if e.index.Global {
if e.index.Global || len(e.byItems) > 0 {
pidOffset := row.Len() - 1
pid := row.GetInt64(pidOffset)
handle = kv.NewPartitionHandle(pid, handle)
Expand Down
6 changes: 3 additions & 3 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,9 @@ func TestOrderByAndLimit(t *testing.T) {
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN"))
require.False(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // fully pushed
require.False(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN"))
require.False(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN"))
regularResult := tk.MustQuery(queryRegular).Sort().Rows()
tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult)
tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult)
Expand Down
5 changes: 4 additions & 1 deletion executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,10 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
}
results = append(results, result)
}
return distsql.NewSortedSelectResults(results, e.byItems, e.memTracker), nil
if len(results) == 1 {
return results[0], nil
}
return distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker), nil
}

kvReq, err := e.buildKVReq(ctx, ranges)
Expand Down
Loading