From 3969aabc1ac4fe24051a00146fd2c04c03e52c7b Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 20 Mar 2023 14:59:42 +0800 Subject: [PATCH 01/15] add pids for sortedSelectResults --- distsql/select_result.go | 41 ++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index bf7b8d585d5d2..bfbb12747ef23 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -64,7 +64,7 @@ var ( var ( _ SelectResult = (*selectResult)(nil) - _ SelectResult = (*serialSelectResults)(nil) + _ SelectResult = (*SerialSelectResults)(nil) _ SelectResult = (*sortedSelectResults)(nil) ) @@ -108,15 +108,16 @@ func (h *chunkRowHeap) Pop() interface{} { } // NewSortedSelectResults is only for partition table -func NewSortedSelectResults(selectResult []SelectResult, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult { +// When pids != nil, the pid will be set in the last column of each chunk.Rows. +func NewSortedSelectResults(selectResult []SelectResult, pids []int64, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult { s := &sortedSelectResults{ selectResult: selectResult, byItems: byitems, memTracker: memTracker, + pids: pids, } s.initCompareFuncs() s.buildKeyColumns() - s.heap = &chunkRowHeap{s} s.cachedChunks = make([]*chunk.Chunk, len(selectResult)) return s @@ -132,6 +133,7 @@ type sortedSelectResults struct { rowPtrs []chunk.RowPtr heap *chunkRowHeap + pids []int64 memTracker *memory.Tracker } @@ -186,9 +188,16 @@ func (*sortedSelectResults) NextRaw(context.Context) ([]byte, error) { func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err error) { c.Reset() + r := make([]int, c.NumCols()-1) + for i := range r { + r[i] = i + } for i := range ssr.cachedChunks { if ssr.cachedChunks[i] == nil { ssr.cachedChunks[i] = c.CopyConstruct() + if len(ssr.pids) != 0 { + ssr.cachedChunks[i] = ssr.cachedChunks[i].Prune(r) + } ssr.memTracker.Consume(ssr.cachedChunks[i].MemoryUsage()) } } @@ -208,6 +217,9 @@ func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err e idx := heap.Pop(ssr.heap).(chunk.RowPtr) c.AppendRow(ssr.cachedChunks[idx.ChkIdx].GetRow(int(idx.RowIdx))) + if len(ssr.pids) != 0 { + c.AppendInt64(c.NumCols()-1, ssr.pids[idx.ChkIdx]) + } if int(idx.RowIdx) >= ssr.cachedChunks[idx.ChkIdx].NumRows()-1 { if err = ssr.updateCachedChunk(ctx, idx.ChkIdx); err != nil { @@ -233,20 +245,22 @@ func (ssr *sortedSelectResults) Close() (err error) { } // NewSerialSelectResults create a SelectResult which will read each SelectResult serially. -func NewSerialSelectResults(selectResults []SelectResult) SelectResult { - return &serialSelectResults{ +func NewSerialSelectResults(selectResults []SelectResult, pids []int64) SelectResult { + return &SerialSelectResults{ selectResults: selectResults, + pids: pids, cur: 0, } } -// serialSelectResults reads each SelectResult serially -type serialSelectResults struct { +// SerialSelectResults reads each SelectResult serially +type SerialSelectResults struct { selectResults []SelectResult + pids []int64 cur int } -func (ssr *serialSelectResults) NextRaw(ctx context.Context) ([]byte, error) { +func (ssr *SerialSelectResults) NextRaw(ctx context.Context) ([]byte, error) { for ssr.cur < len(ssr.selectResults) { resultSubset, err := ssr.selectResults[ssr.cur].NextRaw(ctx) if err != nil { @@ -260,7 +274,7 @@ func (ssr *serialSelectResults) NextRaw(ctx context.Context) ([]byte, error) { return nil, nil } -func (ssr *serialSelectResults) Next(ctx context.Context, chk *chunk.Chunk) error { +func (ssr *SerialSelectResults) Next(ctx context.Context, chk *chunk.Chunk) error { for ssr.cur < len(ssr.selectResults) { if err := ssr.selectResults[ssr.cur].Next(ctx, chk); err != nil { return err @@ -273,7 +287,14 @@ func (ssr *serialSelectResults) Next(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (ssr *serialSelectResults) Close() (err error) { +func (ssr *SerialSelectResults) GetCurrentPID() (int64, error) { + if len(ssr.pids) == 0 { + return 0, errors.Errorf("No Partition ID info") + } + return ssr.pids[ssr.cur], nil +} + +func (ssr *SerialSelectResults) Close() (err error) { for _, r := range ssr.selectResults { if rerr := r.Close(); rerr != nil { err = rerr From 290064752788fd6370ae63945bfed86de0c5f1bc Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 22 Mar 2023 18:55:14 +0800 Subject: [PATCH 02/15] support mergeSort for partition tables in IndexLookUp --- executor/builder.go | 30 +++++++- executor/distsql.go | 69 +++++++++++++------ executor/partition_table_test.go | 6 +- executor/table_reader.go | 8 ++- .../integration_partition_suite_out.json | 66 ++++++++---------- planner/core/task.go | 32 ++++++--- 6 files changed, 137 insertions(+), 74 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 58ddbb4936100..9aa57ce89db34 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3809,6 +3809,26 @@ func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []pla if len(indexReq.OutputOffsets) == 0 { indexReq.OutputOffsets = []uint32{uint32(schemaLen)} } + + if len(plans[0].(*plannercore.PhysicalIndexScan).ByItems) != 0 { + idxScan := plans[0].(*plannercore.PhysicalIndexScan) + tblInfo := idxScan.Table + offset := make([]uint32, len(idxScan.ByItems)) + for i, item := range idxScan.ByItems { + if c, ok := item.Expr.(*expression.Column); !ok { + return nil, errors.Errorf("Not support non-column in orderBy pushed down") + } else { + for _, c1 := range tblInfo.Columns { + if c1.ID == c.ID { + offset[i] = uint32(c1.Offset) + break + } + } + } + } + + indexReq.OutputOffsets = append(offset, indexReq.OutputOffsets...) + } return indexReq, err } @@ -3824,6 +3844,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn // Should output pid col. handleLen++ } + indexReq, err := buildIndexReq(b.ctx, len(is.Index.Columns), handleLen, v.IndexPlans) if err != nil { return nil, err @@ -3854,6 +3875,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn table: tbl, index: is.Index, keepOrder: is.KeepOrder, + byItems: is.ByItems, desc: is.Desc, tableRequest: tableReq, columns: ts.Columns, @@ -3938,7 +3960,9 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo return ret } - if is.Index.Global { + indexScanPlan := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + + if is.Index.Global || len(indexScanPlan.ByItems) != 0 { tmp, ok := b.is.TableByID(ts.Table.ID) if !ok { b.err = err @@ -3955,7 +3979,9 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo return nil } - return ret + if is.Index.Global { + return ret + } } if ok, _ := is.IsPartition(); ok { // Already pruned when translated to logical union. diff --git a/executor/distsql.go b/executor/distsql.go index 5460fc76825a7..b60a4365d3871 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -377,7 +377,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) } results = append(results, result) } - e.result = distsql.NewSortedSelectResults(results, e.byItems, e.memTracker) + e.result = distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker) } return nil } @@ -428,6 +428,7 @@ type IndexLookUpExecutor struct { kvRanges []kv.KeyRange workerStarted bool + byItems []*plannerutil.ByItems keepOrder bool desc bool @@ -589,12 +590,17 @@ func (e *IndexLookUpExecutor) isCommonHandle() bool { func (e *IndexLookUpExecutor) getRetTpsByHandle() []*types.FieldType { var tps []*types.FieldType + if len(e.byItems) != 0 { + for _, item := range e.byItems { + tps = append(tps, item.Expr.GetType()) + } + } if e.isCommonHandle() { for _, handleCol := range e.handleCols { tps = append(tps, handleCol.RetType) } } else { - tps = []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) } if e.index.Global { tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) @@ -618,6 +624,12 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< if e.partitionTableMode { kvRanges = e.partitionKVRanges } + // When len(kvrange) = 1, no sorting is required, + // so remove byItems and non-necessary output colums + if len(kvRanges) == 1 { + e.dagPB.OutputOffsets = e.dagPB.OutputOffsets[len(e.byItems):] + e.byItems = nil + } tps := e.getRetTpsByHandle() idxID := e.getIndexPlanRootID() e.idxWorkerWg.Add(1) @@ -649,6 +661,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetMemTracker(tracker). SetConnID(e.ctx.GetSessionVars().ConnectionID) + results := make([]distsql.SelectResult, len(kvRanges)) + pids := make([]int64, 0, len(kvRanges)) for partTblIdx, kvRange := range kvRanges { // check if executor is closed finished := false @@ -660,9 +674,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< if finished { break } - if worker.PushedLimit != nil && worker.scannedKeys >= worker.PushedLimit.Count+worker.PushedLimit.Offset { - break - } // init kvReq, result and worker for this partition // The key ranges should be ordered. @@ -679,29 +690,34 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< worker.syncErr(err) break } + results[partTblIdx] = result worker.batchSize = initBatchSize if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize } if e.partitionTableMode { worker.partitionTable = e.prunedPartitions[partTblIdx] + pids = append(pids, worker.partitionTable.GetPhysicalID()) } - - // fetch data from this partition - ctx1, cancel := context.WithCancel(ctx) - fetchErr := worker.fetchHandles(ctx1, result) - if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again - e.feedback.Invalidate() - } - cancel() - if err := result.Close(); err != nil { - logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) - } - e.ctx.StoreQueryFeedback(e.feedback) - if fetchErr != nil { - break // if any error occurs, exit after releasing all resources - } } + var ssr distsql.SelectResult + if len(results) > 1 && len(e.byItems) != 0 { + ssr = distsql.NewSortedSelectResults(results, pids, e.byItems, e.memTracker) + } else if len(results) > 1 { + ssr = distsql.NewSerialSelectResults(results, pids) + } else { + ssr = results[0] + } + ctx1, cancel := context.WithCancel(ctx) + fetchErr := worker.fetchHandles(ctx1, ssr) + if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again + e.feedback.Invalidate() + } + cancel() + if err := ssr.Close(); err != nil { + logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) + } + e.ctx.StoreQueryFeedback(e.feedback) close(workCh) close(e.resultCh) e.idxWorkerWg.Done() @@ -753,6 +769,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup corColInFilter: e.corColInTblSide, plans: e.tblPlans, netDataSize: e.avgRowSize * float64(len(task.handles)), + byItems: e.byItems, } tableReaderExec.buildVirtualColumnInfo() tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true) @@ -934,6 +951,9 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } }() retTps := w.idxLookup.getRetTpsByHandle() + if !w.idxLookup.index.Global && len(w.idxLookup.byItems) > 0 { + retTps = append(retTps, types.NewFieldType(mysql.TypeLonglong)) + } chk := w.idxLookup.ctx.GetSessionVars().GetNewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize, w.idxLookup.maxChunkSize, w.idxLookup.AllocPool) idxID := w.idxLookup.getIndexPlanRootID() if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { @@ -973,7 +993,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { numColsWithoutPid := chk.NumCols() - if w.idxLookup.index.Global { + if w.idxLookup.index.Global || len(w.idxLookup.byItems) > 0 { numColsWithoutPid = numColsWithoutPid - 1 } handleOffset := make([]int, 0, len(w.idxLookup.handleCols)) @@ -1028,6 +1048,11 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, if _, exist := w.idxLookup.partitionIDMap[ph.PartitionID]; !exist { continue } + } else if ssr, ok := idxResult.(*distsql.SerialSelectResults); ok { + pid, err := ssr.GetCurrentPID() + if err == nil { + h = kv.PartitionHandle{Handle: h, PartitionID: pid} + } } handles = append(handles, h) } @@ -1165,7 +1190,7 @@ func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int, handle = kv.IntHandle(row.GetInt64(handleIdx[0])) } } - if e.index.Global { + if e.index.Global || len(e.byItems) > 0 { pidOffset := row.Len() - 1 pid := row.GetInt64(pidOffset) handle = kv.NewPartitionHandle(pid, handle) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 094e113c01e37..945e9b0f2003f 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -552,9 +552,9 @@ func TestOrderByAndLimit(t *testing.T) { require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp")) require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "Limit")) require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "IndexLookUp")) - require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // but not fully pushed - require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN")) - require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN")) + require.False(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // fully pushed + require.False(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN")) + require.False(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN")) regularResult := tk.MustQuery(queryRegular).Sort().Rows() tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult) tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult) diff --git a/executor/table_reader.go b/executor/table_reader.go index 7385b5f8500c1..6540996edec2b 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -301,7 +301,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra } results = append(results, result) } - return distsql.NewSerialSelectResults(results), nil + return distsql.NewSerialSelectResults(results, nil), nil } // Use PartitionTable Scan kvReq, err := e.buildKVReqForPartitionTableScan(ctx, ranges) @@ -329,7 +329,11 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra } results = append(results, result) } - return distsql.NewSortedSelectResults(results, e.byItems, e.memTracker), nil + if len(results) > 1 { + return distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker), nil + } else { + return results[0], nil + } } kvReq, err := e.buildKVReq(ctx, ranges) diff --git a/planner/core/casetest/testdata/integration_partition_suite_out.json b/planner/core/casetest/testdata/integration_partition_suite_out.json index 2fbe7f417621f..c81544c819656 100644 --- a/planner/core/casetest/testdata/integration_partition_suite_out.json +++ b/planner/core/casetest/testdata/integration_partition_suite_out.json @@ -1176,7 +1176,7 @@ "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", - " └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + " └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo" ] }, { @@ -1185,7 +1185,7 @@ "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", - " └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + " └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo" ] }, { @@ -1194,7 +1194,7 @@ "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", - " └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + " └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo" ] }, { @@ -1209,31 +1209,28 @@ { "SQL": "explain format='brief' select * from trange use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.trange.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" ] }, { "SQL": "explain format='brief' select * from tlist use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.tlist.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" ] }, { "SQL": "explain format='brief' select * from thash use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.thash.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" ] }, { @@ -1249,34 +1246,31 @@ { "SQL": "explain format='brief' select * from trange use index (ia) where a + 1 > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.trange.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─Selection 10.00 cop[tikv] gt(plus(test.trange.a, 1), 10)", - " │ └─IndexFullScan 12.50 cop[tikv] table:trange, index:ia(a) keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─Selection 10.00 cop[tikv] gt(plus(test.trange.a, 1), 10)", + "│ └─IndexFullScan 12.50 cop[tikv] table:trange, index:ia(a) keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" ] }, { "SQL": "explain format='brief' select * from tlist use index (ia) where a + 1 > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.tlist.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─Selection 10.00 cop[tikv] gt(plus(test.tlist.a, 1), 10)", - " │ └─IndexFullScan 12.50 cop[tikv] table:tlist, index:ia(a) keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─Selection 10.00 cop[tikv] gt(plus(test.tlist.a, 1), 10)", + "│ └─IndexFullScan 12.50 cop[tikv] table:tlist, index:ia(a) keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" ] }, { "SQL": "explain format='brief' select * from thash use index (ia) where a + 1 > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.thash.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─Selection 10.00 cop[tikv] gt(plus(test.thash.a, 1), 10)", - " │ └─IndexFullScan 12.50 cop[tikv] table:thash, index:ia(a) keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─Selection 10.00 cop[tikv] gt(plus(test.thash.a, 1), 10)", + "│ └─IndexFullScan 12.50 cop[tikv] table:thash, index:ia(a) keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" ] }, { diff --git a/planner/core/task.go b/planner/core/task.go index 01d31d204d724..09646519e5b20 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1211,6 +1211,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { return nil, false } idxScan.Desc = isDesc + idxScan.KeepOrder = true idxScan.ByItems = p.ByItems childProfile := copTsk.plan().statsInfo() newCount := p.Offset + p.Count @@ -1233,16 +1234,29 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { } rootTask := copTsk.convertToRootTask(p.ctx) - // only support IndexReader now. - if _, ok := rootTask.p.(*PhysicalIndexReader); ok { - rootLimit := PhysicalLimit{ - Count: p.Count, - Offset: p.Offset, - PartitionBy: newPartitionBy, - }.Init(p.SCtx(), stats, p.SelectBlockOffset()) - rootLimit.SetSchema(rootTask.plan().Schema()) - return attachPlan2Task(rootLimit, rootTask), true + // embeded limit in indexLookUp, no more limit needed. + if idxLookup, ok := rootTask.p.(*PhysicalIndexLookUpReader); ok { + idxLookup.PushedLimit = &PushedDownLimit{ + Offset: p.Offset, + Count: p.Count, + } + extraInfo, extraCol, hasExtraCol := tryGetPkExtraColumn(p.ctx.GetSessionVars(), tblInfo) + if hasExtraCol { + idxLookup.ExtraHandleCol = extraCol + ts := idxLookup.TablePlans[0].(*PhysicalTableScan) + ts.Columns = append(ts.Columns, extraInfo) + ts.schema.Append(extraCol) + ts.HandleIdx = []int{len(ts.Columns) - 1} + } + return rootTask, true } + rootLimit := PhysicalLimit{ + Count: p.Count, + Offset: p.Offset, + PartitionBy: newPartitionBy, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + rootLimit.SetSchema(rootTask.plan().Schema()) + return attachPlan2Task(rootLimit, rootTask), true } } else if copTsk.indexPlan == nil { if tblScan.HandleCols == nil { From b674a34eab93abc65bd7348b47ea5f61242766ce Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 22 Mar 2023 19:51:12 +0800 Subject: [PATCH 03/15] remove use serialSelectResults --- distsql/select_result.go | 21 ++++++------------ executor/distsql.go | 46 ++++++++++++++++++++++------------------ 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index bfbb12747ef23..07513a3b56f0d 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -64,7 +64,7 @@ var ( var ( _ SelectResult = (*selectResult)(nil) - _ SelectResult = (*SerialSelectResults)(nil) + _ SelectResult = (*serialSelectResults)(nil) _ SelectResult = (*sortedSelectResults)(nil) ) @@ -246,21 +246,21 @@ func (ssr *sortedSelectResults) Close() (err error) { // NewSerialSelectResults create a SelectResult which will read each SelectResult serially. func NewSerialSelectResults(selectResults []SelectResult, pids []int64) SelectResult { - return &SerialSelectResults{ + return &serialSelectResults{ selectResults: selectResults, pids: pids, cur: 0, } } -// SerialSelectResults reads each SelectResult serially -type SerialSelectResults struct { +// serialSelectResults reads each SelectResult serially +type serialSelectResults struct { selectResults []SelectResult pids []int64 cur int } -func (ssr *SerialSelectResults) NextRaw(ctx context.Context) ([]byte, error) { +func (ssr *serialSelectResults) NextRaw(ctx context.Context) ([]byte, error) { for ssr.cur < len(ssr.selectResults) { resultSubset, err := ssr.selectResults[ssr.cur].NextRaw(ctx) if err != nil { @@ -274,7 +274,7 @@ func (ssr *SerialSelectResults) NextRaw(ctx context.Context) ([]byte, error) { return nil, nil } -func (ssr *SerialSelectResults) Next(ctx context.Context, chk *chunk.Chunk) error { +func (ssr *serialSelectResults) Next(ctx context.Context, chk *chunk.Chunk) error { for ssr.cur < len(ssr.selectResults) { if err := ssr.selectResults[ssr.cur].Next(ctx, chk); err != nil { return err @@ -287,14 +287,7 @@ func (ssr *SerialSelectResults) Next(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (ssr *SerialSelectResults) GetCurrentPID() (int64, error) { - if len(ssr.pids) == 0 { - return 0, errors.Errorf("No Partition ID info") - } - return ssr.pids[ssr.cur], nil -} - -func (ssr *SerialSelectResults) Close() (err error) { +func (ssr *serialSelectResults) Close() (err error) { for _, r := range ssr.selectResults { if rerr := r.Close(); rerr != nil { err = rerr diff --git a/executor/distsql.go b/executor/distsql.go index b60a4365d3871..aa077887773a1 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -696,26 +696,24 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< worker.batchSize = worker.maxBatchSize } if e.partitionTableMode { - worker.partitionTable = e.prunedPartitions[partTblIdx] - pids = append(pids, worker.partitionTable.GetPhysicalID()) + pids = append(pids, e.prunedPartitions[partTblIdx].GetPhysicalID()) } } - var ssr distsql.SelectResult + r := results if len(results) > 1 && len(e.byItems) != 0 { - ssr = distsql.NewSortedSelectResults(results, pids, e.byItems, e.memTracker) - } else if len(results) > 1 { - ssr = distsql.NewSerialSelectResults(results, pids) - } else { - ssr = results[0] + ssr := distsql.NewSortedSelectResults(results, pids, e.byItems, e.memTracker) + r = []distsql.SelectResult{ssr} } ctx1, cancel := context.WithCancel(ctx) - fetchErr := worker.fetchHandles(ctx1, ssr) + fetchErr := worker.fetchHandles(ctx1, r) if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedback.Invalidate() } cancel() - if err := ssr.Close(); err != nil { - logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) + for _, ssr := range results { + if err := ssr.Close(); err != nil { + logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) + } } e.ctx.StoreQueryFeedback(e.feedback) close(workCh) @@ -924,8 +922,6 @@ type indexWorker struct { PushedLimit *plannercore.PushedDownLimit // scannedKeys indicates how many keys be scanned scannedKeys uint64 - // partitionTable indicates if this worker is accessing a particular partition table. - partitionTable table.PhysicalTable } func (w *indexWorker) syncErr(err error) { @@ -939,7 +935,7 @@ func (w *indexWorker) syncErr(err error) { // fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. // The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh // at the same time to keep data ordered. -func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (err error) { +func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.SelectResult) (err error) { defer func() { if r := recover(); r != nil { logutil.Logger(ctx).Error("indexWorker in IndexLookupExecutor panicked", zap.Any("recover", r), zap.Stack("stack")) @@ -961,7 +957,12 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes w.idxLookup.stats.indexScanBasicStats = w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID) } } + i := int(0) + result := results[i] for { + if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset { + break + } startTime := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, result) finishFetch := time.Now() @@ -970,10 +971,18 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes return err } if len(handles) == 0 { - return nil + if i == len(results)-1 { + return nil + } + i++ + result = results[i] + continue } task := w.buildTableTask(handles, retChunk) finishBuild := time.Now() + if w.idxLookup.partitionTableMode { + task.partitionTable = w.idxLookup.prunedPartitions[i] + } select { case <-ctx.Done(): return nil @@ -988,6 +997,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes atomic.AddInt64(&w.idxLookup.stats.FetchHandleTotal, int64(time.Since(startTime))) } } + return nil } func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( @@ -1048,11 +1058,6 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, if _, exist := w.idxLookup.partitionIDMap[ph.PartitionID]; !exist { continue } - } else if ssr, ok := idxResult.(*distsql.SerialSelectResults); ok { - pid, err := ssr.GetCurrentPID() - if err == nil { - h = kv.PartitionHandle{Handle: h, PartitionID: pid} - } } handles = append(handles, h) } @@ -1099,7 +1104,6 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) * indexOrder: indexOrder, duplicatedIndexOrder: duplicatedIndexOrder, idxRows: retChk, - partitionTable: w.partitionTable, } task.doneCh = make(chan error, 1) From 4a6ccd5ed24987c5dec1db51efac6f4b2c0bb940 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 22 Mar 2023 19:57:48 +0800 Subject: [PATCH 04/15] update --- distsql/select_result.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 07513a3b56f0d..7063c9d776a4b 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -245,10 +245,9 @@ func (ssr *sortedSelectResults) Close() (err error) { } // NewSerialSelectResults create a SelectResult which will read each SelectResult serially. -func NewSerialSelectResults(selectResults []SelectResult, pids []int64) SelectResult { +func NewSerialSelectResults(selectResults []SelectResult) SelectResult { return &serialSelectResults{ selectResults: selectResults, - pids: pids, cur: 0, } } @@ -256,7 +255,6 @@ func NewSerialSelectResults(selectResults []SelectResult, pids []int64) SelectRe // serialSelectResults reads each SelectResult serially type serialSelectResults struct { selectResults []SelectResult - pids []int64 cur int } From 661e0836d83ecbe04bc052e768d6c60605d67913 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 22 Mar 2023 20:04:40 +0800 Subject: [PATCH 05/15] update --- executor/distsql.go | 1 + executor/table_reader.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/distsql.go b/executor/distsql.go index aa077887773a1..62b200e499b5b 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -947,6 +947,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select } }() retTps := w.idxLookup.getRetTpsByHandle() + // for sortedSelectResult, add pids in last column if !w.idxLookup.index.Global && len(w.idxLookup.byItems) > 0 { retTps = append(retTps, types.NewFieldType(mysql.TypeLonglong)) } diff --git a/executor/table_reader.go b/executor/table_reader.go index 6540996edec2b..206dd71fe5863 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -301,7 +301,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra } results = append(results, result) } - return distsql.NewSerialSelectResults(results, nil), nil + return distsql.NewSerialSelectResults(results), nil } // Use PartitionTable Scan kvReq, err := e.buildKVReqForPartitionTableScan(ctx, ranges) From 14461064c131d0d118d9100bbba9d3053b696cb2 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 23 Mar 2023 11:03:02 +0800 Subject: [PATCH 06/15] fix fmt --- executor/builder.go | 15 +++++++-------- executor/table_reader.go | 5 ++--- planner/core/task.go | 2 +- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 9aa57ce89db34..0d4ce8480bc09 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3815,18 +3815,17 @@ func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []pla tblInfo := idxScan.Table offset := make([]uint32, len(idxScan.ByItems)) for i, item := range idxScan.ByItems { - if c, ok := item.Expr.(*expression.Column); !ok { + c, ok := item.Expr.(*expression.Column) + if !ok { return nil, errors.Errorf("Not support non-column in orderBy pushed down") - } else { - for _, c1 := range tblInfo.Columns { - if c1.ID == c.ID { - offset[i] = uint32(c1.Offset) - break - } + } + for _, c1 := range tblInfo.Columns { + if c1.ID == c.ID { + offset[i] = uint32(c1.Offset) + break } } } - indexReq.OutputOffsets = append(offset, indexReq.OutputOffsets...) } return indexReq, err diff --git a/executor/table_reader.go b/executor/table_reader.go index 206dd71fe5863..b3a2a5565614c 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -329,11 +329,10 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra } results = append(results, result) } - if len(results) > 1 { - return distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker), nil - } else { + if len(results) == 1 { return results[0], nil } + return distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker), nil } kvReq, err := e.buildKVReq(ctx, ranges) diff --git a/planner/core/task.go b/planner/core/task.go index 09646519e5b20..3263fd31b7270 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1234,7 +1234,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { } rootTask := copTsk.convertToRootTask(p.ctx) - // embeded limit in indexLookUp, no more limit needed. + // embedded limit in indexLookUp, no more limit needed. if idxLookup, ok := rootTask.p.(*PhysicalIndexLookUpReader); ok { idxLookup.PushedLimit = &PushedDownLimit{ Offset: p.Offset, From e09b76708f4f8d2299f8fff5df5136821ff4c405 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 23 Mar 2023 11:19:39 +0800 Subject: [PATCH 07/15] fix fmt --- executor/builder.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 0d4ce8480bc09..cad05d815c084 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3813,15 +3813,15 @@ func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []pla if len(plans[0].(*plannercore.PhysicalIndexScan).ByItems) != 0 { idxScan := plans[0].(*plannercore.PhysicalIndexScan) tblInfo := idxScan.Table - offset := make([]uint32, len(idxScan.ByItems)) - for i, item := range idxScan.ByItems { + offset := make([]uint32, 0, len(idxScan.ByItems)) + for _, item := range idxScan.ByItems { c, ok := item.Expr.(*expression.Column) if !ok { return nil, errors.Errorf("Not support non-column in orderBy pushed down") } for _, c1 := range tblInfo.Columns { if c1.ID == c.ID { - offset[i] = uint32(c1.Offset) + offset = append(offset, uint32(c1.Offset)) break } } From 3827a80e574a2a9d99c6d85f7573cc78c003668f Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 23 Mar 2023 13:58:32 +0800 Subject: [PATCH 08/15] fix test --- executor/distsql.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index 62b200e499b5b..abf67f6c08f47 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -710,8 +710,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< e.feedback.Invalidate() } cancel() - for _, ssr := range results { - if err := ssr.Close(); err != nil { + for _, result := range r { + if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) } } @@ -958,9 +958,8 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select w.idxLookup.stats.indexScanBasicStats = w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID) } } - i := int(0) - result := results[i] - for { + for i := 0; i < len(results); { + result := results[i] if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset { break } @@ -972,11 +971,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select return err } if len(handles) == 0 { - if i == len(results)-1 { - return nil - } i++ - result = results[i] continue } task := w.buildTableTask(handles, retChunk) From b1a728c1c67096ea23443acebe7b99191cc31d0e Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 23 Mar 2023 14:58:23 +0800 Subject: [PATCH 09/15] fix test --- executor/distsql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index abf67f6c08f47..c78dc42a890ed 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -661,7 +661,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetMemTracker(tracker). SetConnID(e.ctx.GetSessionVars().ConnectionID) - results := make([]distsql.SelectResult, len(kvRanges)) + results := make([]distsql.SelectResult, 0, len(kvRanges)) pids := make([]int64, 0, len(kvRanges)) for partTblIdx, kvRange := range kvRanges { // check if executor is closed @@ -690,7 +690,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< worker.syncErr(err) break } - results[partTblIdx] = result + results = append(results, result) worker.batchSize = initBatchSize if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize From eb9103dda256d8528ac616b1237ef29433f7f9dd Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 29 Mar 2023 18:03:37 +0800 Subject: [PATCH 10/15] fix --- executor/builder.go | 21 ++++++++++++--------- executor/partition_table_test.go | 32 +++++++++++++++++++++++++++++--- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index cad05d815c084..4e8064d0351c6 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3797,18 +3797,16 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic return tableReq, tbl, err } -func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) { +func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) { indexReq, err := constructDAGReq(ctx, plans, kv.TiKV) if err != nil { return nil, err } + schemaLen := len(columns) indexReq.OutputOffsets = []uint32{} for i := 0; i < handleLen; i++ { indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(schemaLen+i)) } - if len(indexReq.OutputOffsets) == 0 { - indexReq.OutputOffsets = []uint32{uint32(schemaLen)} - } if len(plans[0].(*plannercore.PhysicalIndexScan).ByItems) != 0 { idxScan := plans[0].(*plannercore.PhysicalIndexScan) @@ -3819,15 +3817,20 @@ func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []pla if !ok { return nil, errors.Errorf("Not support non-column in orderBy pushed down") } - for _, c1 := range tblInfo.Columns { - if c1.ID == c.ID { - offset = append(offset, uint32(c1.Offset)) + column := model.FindColumnInfoByID(tblInfo.Columns, c.ID) + for i, idxColumn := range columns { + if idxColumn.Name.L == column.Name.L { + offset = append(offset, uint32(i)) break } } } indexReq.OutputOffsets = append(offset, indexReq.OutputOffsets...) } + + if len(indexReq.OutputOffsets) == 0 { + indexReq.OutputOffsets = []uint32{uint32(schemaLen)} + } return indexReq, err } @@ -3844,7 +3847,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn handleLen++ } - indexReq, err := buildIndexReq(b.ctx, len(is.Index.Columns), handleLen, v.IndexPlans) + indexReq, err := buildIndexReq(b.ctx, is.Index.Columns, handleLen, v.IndexPlans) if err != nil { return nil, err } @@ -4018,7 +4021,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd feedbacks = append(feedbacks, feedback) if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok { - tempReq, err = buildIndexReq(b.ctx, len(is.Index.Columns), ts.HandleCols.NumCols(), v.PartialPlans[i]) + tempReq, err = buildIndexReq(b.ctx, is.Index.Columns, ts.HandleCols.NumCols(), v.PartialPlans[i]) descs = append(descs, is.Desc) indexes = append(indexes, is.Index) } else { diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 945e9b0f2003f..ab52f41529778 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -367,7 +367,7 @@ func TestOrderByAndLimit(t *testing.T) { tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") // range partition table - tk.MustExec(`create table trange(a int, b int, index idx_a(a)) partition by range(a) ( + tk.MustExec(`create table trange(a int, b int, index idx_a(a), index idx_b(b)) partition by range(a) ( partition p0 values less than(300), partition p1 values less than (500), partition p2 values less than(1100));`) @@ -376,7 +376,7 @@ func TestOrderByAndLimit(t *testing.T) { tk.MustExec("create table thash(a int, b int, index idx_a(a), index idx_b(b)) partition by hash(a) partitions 4;") // regular table - tk.MustExec("create table tregular(a int, b int, index idx_a(a))") + tk.MustExec("create table tregular(a int, b int, index idx_a(a), index idx_b(b))") // range partition table with int pk tk.MustExec(`create table trange_intpk(a int primary key, b int) partition by range(a) ( @@ -437,7 +437,7 @@ func TestOrderByAndLimit(t *testing.T) { partition p2 values in (%s), partition p3 values in (%s) )`, listVals1, listVals2, listVals3)) - tk.MustExec(fmt.Sprintf(`create table tlist(a int, b int, index idx_a(a)) partition by list(a)( + tk.MustExec(fmt.Sprintf(`create table tlist(a int, b int, index idx_a(a), index idx_b(b)) partition by list(a)( partition p1 values in (%s), partition p2 values in (%s), partition p3 values in (%s) @@ -561,6 +561,32 @@ func TestOrderByAndLimit(t *testing.T) { tk.MustQuery(queryListPartitionWithLimitHint).Sort().Check(regularResult) } + // test indexLookUp with order property pushed down. + for i := 0; i < 100; i++ { + // explain select * from t where b > {y} use index(idx_b) order by b limit {x}; // check if IndexLookUp is used + // select * from t where b > {y} use index(idx_b) order by b limit {x}; // it can return the correct result + x := rand.Intn(1999) + y := rand.Intn(2000) + 1 + maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(b), 2000) from (select * from tregular use index(idx_b) where b > %v order by b limit %v) t", x, y)).Rows()[0][0] + queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_b) where b > %v and b < greatest(%v+1, %v) order by b limit %v", x, x+1, maxEle, y) + queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_b) where b > %v and b < greatest(%v+1, %v) order by b limit %v", x, x+1, maxEle, y) + queryListPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from tlist use index(idx_b) where b > %v and b < greatest(%v+1, %v) order by b limit %v", x, x+1, maxEle, y) + queryRegular := fmt.Sprintf("select * from tregular use index(idx_b) where b > %v and b < greatest(%v+1, %v) order by b limit %v;", x, x+1, maxEle, y) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp")) + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp")) + require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "IndexLookUp")) + require.False(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // fully pushed + require.False(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN")) + require.False(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN")) + regularResult := tk.MustQuery(queryRegular).Sort().Rows() + tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult) + tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult) + tk.MustQuery(queryListPartitionWithLimitHint).Sort().Check(regularResult) + } + // test tableReader for i := 0; i < 100; i++ { // explain select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // check if IndexLookUp is used From 767182f0b5063abea2ecf98bf14855587a8f0bb1 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 30 Mar 2023 23:21:18 +0800 Subject: [PATCH 11/15] delete useless code --- executor/builder.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 4e8064d0351c6..f3c7f3b863b41 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3828,9 +3828,6 @@ func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleL indexReq.OutputOffsets = append(offset, indexReq.OutputOffsets...) } - if len(indexReq.OutputOffsets) == 0 { - indexReq.OutputOffsets = []uint32{uint32(schemaLen)} - } return indexReq, err } From 949f8e1cf05dffc0c8025dd9a40ddeabe82463ea Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 3 Apr 2023 13:20:59 +0800 Subject: [PATCH 12/15] update --- distsql/select_result.go | 8 ++++---- executor/builder.go | 6 +----- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 7063c9d776a4b..1f1da952bd761 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -188,14 +188,14 @@ func (*sortedSelectResults) NextRaw(context.Context) ([]byte, error) { func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err error) { c.Reset() - r := make([]int, c.NumCols()-1) - for i := range r { - r[i] = i - } for i := range ssr.cachedChunks { if ssr.cachedChunks[i] == nil { ssr.cachedChunks[i] = c.CopyConstruct() if len(ssr.pids) != 0 { + r := make([]int, c.NumCols()-1) + for i := range r { + r[i] = i + } ssr.cachedChunks[i] = ssr.cachedChunks[i].Prune(r) } ssr.memTracker.Consume(ssr.cachedChunks[i].MemoryUsage()) diff --git a/executor/builder.go b/executor/builder.go index f3c7f3b863b41..a3baed141ee91 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3827,7 +3827,6 @@ func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleL } indexReq.OutputOffsets = append(offset, indexReq.OutputOffsets...) } - return indexReq, err } @@ -3843,7 +3842,6 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn // Should output pid col. handleLen++ } - indexReq, err := buildIndexReq(b.ctx, is.Index.Columns, handleLen, v.IndexPlans) if err != nil { return nil, err @@ -3959,9 +3957,7 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo return ret } - indexScanPlan := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) - - if is.Index.Global || len(indexScanPlan.ByItems) != 0 { + if is.Index.Global || len(is.ByItems) != 0 { tmp, ok := b.is.TableByID(ts.Table.ID) if !ok { b.err = err From 9283be5728e3adb084fbec6491d6d5562effbc69 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 3 Apr 2023 13:36:48 +0800 Subject: [PATCH 13/15] update --- executor/builder.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index a3baed141ee91..b92db69e56634 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3802,16 +3802,11 @@ func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleL if err != nil { return nil, err } - schemaLen := len(columns) - indexReq.OutputOffsets = []uint32{} - for i := 0; i < handleLen; i++ { - indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(schemaLen+i)) - } + indexReq.OutputOffsets = []uint32{} if len(plans[0].(*plannercore.PhysicalIndexScan).ByItems) != 0 { idxScan := plans[0].(*plannercore.PhysicalIndexScan) tblInfo := idxScan.Table - offset := make([]uint32, 0, len(idxScan.ByItems)) for _, item := range idxScan.ByItems { c, ok := item.Expr.(*expression.Column) if !ok { @@ -3820,12 +3815,15 @@ func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleL column := model.FindColumnInfoByID(tblInfo.Columns, c.ID) for i, idxColumn := range columns { if idxColumn.Name.L == column.Name.L { - offset = append(offset, uint32(i)) + indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(i)) break } } } - indexReq.OutputOffsets = append(offset, indexReq.OutputOffsets...) + } + + for i := 0; i < handleLen; i++ { + indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(len(columns)+i)) } return indexReq, err } From 5ac3de1b915e7e4d7c701d186a2c39446d4232bb Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 3 Apr 2023 16:35:17 +0800 Subject: [PATCH 14/15] follow comments --- executor/builder.go | 3 +++ executor/distsql.go | 19 +++++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index b92db69e56634..ad839570b340f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3797,6 +3797,9 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic return tableReq, tbl, err } +// buildIndexReq is designed to create a DAG for index request. +// If len(ByItems) != 0 means index request should return related columns +// to sort result rows in TiDB side for parition tables. func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) { indexReq, err := constructDAGReq(ctx, plans, kv.TiKV) if err != nil { diff --git a/executor/distsql.go b/executor/distsql.go index c78dc42a890ed..af52ab268a7c4 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -584,11 +584,15 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in return nil } +func (e *IndexLookUpExecutor) hasExtralPidCol() bool { + return e.index.Global || len(e.byItems) > 0 +} + func (e *IndexLookUpExecutor) isCommonHandle() bool { return !(len(e.handleCols) == 1 && e.handleCols[0].ID == model.ExtraHandleID) && e.table.Meta() != nil && e.table.Meta().IsCommonHandle } -func (e *IndexLookUpExecutor) getRetTpsByHandle() []*types.FieldType { +func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType { var tps []*types.FieldType if len(e.byItems) != 0 { for _, item := range e.byItems { @@ -630,7 +634,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< e.dagPB.OutputOffsets = e.dagPB.OutputOffsets[len(e.byItems):] e.byItems = nil } - tps := e.getRetTpsByHandle() + tps := e.getRetTpsForIndexReader() idxID := e.getIndexPlanRootID() e.idxWorkerWg.Add(1) go func() { @@ -699,18 +703,17 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< pids = append(pids, e.prunedPartitions[partTblIdx].GetPhysicalID()) } } - r := results if len(results) > 1 && len(e.byItems) != 0 { ssr := distsql.NewSortedSelectResults(results, pids, e.byItems, e.memTracker) - r = []distsql.SelectResult{ssr} + results = []distsql.SelectResult{ssr} } ctx1, cancel := context.WithCancel(ctx) - fetchErr := worker.fetchHandles(ctx1, r) + fetchErr := worker.fetchHandles(ctx1, results) if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedback.Invalidate() } cancel() - for _, result := range r { + for _, result := range results { if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) } @@ -946,7 +949,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select } } }() - retTps := w.idxLookup.getRetTpsByHandle() + retTps := w.idxLookup.getRetTpsForIndexReader() // for sortedSelectResult, add pids in last column if !w.idxLookup.index.Global && len(w.idxLookup.byItems) > 0 { retTps = append(retTps, types.NewFieldType(mysql.TypeLonglong)) @@ -1190,7 +1193,7 @@ func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int, handle = kv.IntHandle(row.GetInt64(handleIdx[0])) } } - if e.index.Global || len(e.byItems) > 0 { + if e.hasExtralPidCol() { pidOffset := row.Len() - 1 pid := row.GetInt64(pidOffset) handle = kv.NewPartitionHandle(pid, handle) From fbff55a930085177f318f70adfbe9c87473c9a0a Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 3 Apr 2023 16:53:57 +0800 Subject: [PATCH 15/15] update --- executor/distsql.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/distsql.go b/executor/distsql.go index af52ab268a7c4..7bcfcc24002ff 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -1002,7 +1002,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { numColsWithoutPid := chk.NumCols() - if w.idxLookup.index.Global || len(w.idxLookup.byItems) > 0 { + if w.idxLookup.hasExtralPidCol() { numColsWithoutPid = numColsWithoutPid - 1 } handleOffset := make([]int, 0, len(w.idxLookup.handleCols))