diff --git a/distsql/select_result.go b/distsql/select_result.go index 390707831c9bc..5d7bfdfc93d83 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -108,15 +108,16 @@ func (h *chunkRowHeap) Pop() interface{} { } // NewSortedSelectResults is only for partition table -func NewSortedSelectResults(selectResult []SelectResult, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult { +// When pids != nil, the pid will be set in the last column of each chunk.Rows. +func NewSortedSelectResults(selectResult []SelectResult, pids []int64, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult { s := &sortedSelectResults{ selectResult: selectResult, byItems: byitems, memTracker: memTracker, + pids: pids, } s.initCompareFuncs() s.buildKeyColumns() - s.heap = &chunkRowHeap{s} s.cachedChunks = make([]*chunk.Chunk, len(selectResult)) return s @@ -132,6 +133,7 @@ type sortedSelectResults struct { rowPtrs []chunk.RowPtr heap *chunkRowHeap + pids []int64 memTracker *memory.Tracker } @@ -189,6 +191,13 @@ func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err e for i := range ssr.cachedChunks { if ssr.cachedChunks[i] == nil { ssr.cachedChunks[i] = c.CopyConstruct() + if len(ssr.pids) != 0 { + r := make([]int, c.NumCols()-1) + for i := range r { + r[i] = i + } + ssr.cachedChunks[i] = ssr.cachedChunks[i].Prune(r) + } ssr.memTracker.Consume(ssr.cachedChunks[i].MemoryUsage()) } } @@ -208,6 +217,9 @@ func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err e idx := heap.Pop(ssr.heap).(chunk.RowPtr) c.AppendRow(ssr.cachedChunks[idx.ChkIdx].GetRow(int(idx.RowIdx))) + if len(ssr.pids) != 0 { + c.AppendInt64(c.NumCols()-1, ssr.pids[idx.ChkIdx]) + } if int(idx.RowIdx) >= ssr.cachedChunks[idx.ChkIdx].NumRows()-1 { if err = ssr.updateCachedChunk(ctx, idx.ChkIdx); err != nil { diff --git a/executor/builder.go b/executor/builder.go index e7065cf73aa2f..ff1c03fb59f69 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3797,17 +3797,36 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic return tableReq, tbl, err } -func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) { +// buildIndexReq is designed to create a DAG for index request. +// If len(ByItems) != 0 means index request should return related columns +// to sort result rows in TiDB side for parition tables. +func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) { indexReq, err := constructDAGReq(ctx, plans, kv.TiKV) if err != nil { return nil, err } + indexReq.OutputOffsets = []uint32{} - for i := 0; i < handleLen; i++ { - indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(schemaLen+i)) + if len(plans[0].(*plannercore.PhysicalIndexScan).ByItems) != 0 { + idxScan := plans[0].(*plannercore.PhysicalIndexScan) + tblInfo := idxScan.Table + for _, item := range idxScan.ByItems { + c, ok := item.Expr.(*expression.Column) + if !ok { + return nil, errors.Errorf("Not support non-column in orderBy pushed down") + } + column := model.FindColumnInfoByID(tblInfo.Columns, c.ID) + for i, idxColumn := range columns { + if idxColumn.Name.L == column.Name.L { + indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(i)) + break + } + } + } } - if len(indexReq.OutputOffsets) == 0 { - indexReq.OutputOffsets = []uint32{uint32(schemaLen)} + + for i := 0; i < handleLen; i++ { + indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(len(columns)+i)) } return indexReq, err } @@ -3824,7 +3843,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn // Should output pid col. handleLen++ } - indexReq, err := buildIndexReq(b.ctx, len(is.Index.Columns), handleLen, v.IndexPlans) + indexReq, err := buildIndexReq(b.ctx, is.Index.Columns, handleLen, v.IndexPlans) if err != nil { return nil, err } @@ -3854,6 +3873,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn table: tbl, index: is.Index, keepOrder: is.KeepOrder, + byItems: is.ByItems, desc: is.Desc, tableRequest: tableReq, columns: ts.Columns, @@ -3938,7 +3958,7 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo return ret } - if is.Index.Global { + if is.Index.Global || len(is.ByItems) != 0 { tmp, ok := b.is.TableByID(ts.Table.ID) if !ok { b.err = err @@ -3955,7 +3975,9 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo return nil } - return ret + if is.Index.Global { + return ret + } } if ok, _ := is.IsPartition(); ok { // Already pruned when translated to logical union. @@ -3993,7 +4015,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd feedbacks = append(feedbacks, feedback) if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok { - tempReq, err = buildIndexReq(b.ctx, len(is.Index.Columns), ts.HandleCols.NumCols(), v.PartialPlans[i]) + tempReq, err = buildIndexReq(b.ctx, is.Index.Columns, ts.HandleCols.NumCols(), v.PartialPlans[i]) descs = append(descs, is.Desc) indexes = append(indexes, is.Index) } else { diff --git a/executor/distsql.go b/executor/distsql.go index 5460fc76825a7..7bcfcc24002ff 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -377,7 +377,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) } results = append(results, result) } - e.result = distsql.NewSortedSelectResults(results, e.byItems, e.memTracker) + e.result = distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker) } return nil } @@ -428,6 +428,7 @@ type IndexLookUpExecutor struct { kvRanges []kv.KeyRange workerStarted bool + byItems []*plannerutil.ByItems keepOrder bool desc bool @@ -583,18 +584,27 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in return nil } +func (e *IndexLookUpExecutor) hasExtralPidCol() bool { + return e.index.Global || len(e.byItems) > 0 +} + func (e *IndexLookUpExecutor) isCommonHandle() bool { return !(len(e.handleCols) == 1 && e.handleCols[0].ID == model.ExtraHandleID) && e.table.Meta() != nil && e.table.Meta().IsCommonHandle } -func (e *IndexLookUpExecutor) getRetTpsByHandle() []*types.FieldType { +func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType { var tps []*types.FieldType + if len(e.byItems) != 0 { + for _, item := range e.byItems { + tps = append(tps, item.Expr.GetType()) + } + } if e.isCommonHandle() { for _, handleCol := range e.handleCols { tps = append(tps, handleCol.RetType) } } else { - tps = []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) } if e.index.Global { tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) @@ -618,7 +628,13 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< if e.partitionTableMode { kvRanges = e.partitionKVRanges } - tps := e.getRetTpsByHandle() + // When len(kvrange) = 1, no sorting is required, + // so remove byItems and non-necessary output colums + if len(kvRanges) == 1 { + e.dagPB.OutputOffsets = e.dagPB.OutputOffsets[len(e.byItems):] + e.byItems = nil + } + tps := e.getRetTpsForIndexReader() idxID := e.getIndexPlanRootID() e.idxWorkerWg.Add(1) go func() { @@ -649,6 +665,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetMemTracker(tracker). SetConnID(e.ctx.GetSessionVars().ConnectionID) + results := make([]distsql.SelectResult, 0, len(kvRanges)) + pids := make([]int64, 0, len(kvRanges)) for partTblIdx, kvRange := range kvRanges { // check if executor is closed finished := false @@ -660,9 +678,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< if finished { break } - if worker.PushedLimit != nil && worker.scannedKeys >= worker.PushedLimit.Count+worker.PushedLimit.Offset { - break - } // init kvReq, result and worker for this partition // The key ranges should be ordered. @@ -679,29 +694,31 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< worker.syncErr(err) break } + results = append(results, result) worker.batchSize = initBatchSize if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize } if e.partitionTableMode { - worker.partitionTable = e.prunedPartitions[partTblIdx] + pids = append(pids, e.prunedPartitions[partTblIdx].GetPhysicalID()) } - - // fetch data from this partition - ctx1, cancel := context.WithCancel(ctx) - fetchErr := worker.fetchHandles(ctx1, result) - if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again - e.feedback.Invalidate() - } - cancel() + } + if len(results) > 1 && len(e.byItems) != 0 { + ssr := distsql.NewSortedSelectResults(results, pids, e.byItems, e.memTracker) + results = []distsql.SelectResult{ssr} + } + ctx1, cancel := context.WithCancel(ctx) + fetchErr := worker.fetchHandles(ctx1, results) + if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again + e.feedback.Invalidate() + } + cancel() + for _, result := range results { if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) } - e.ctx.StoreQueryFeedback(e.feedback) - if fetchErr != nil { - break // if any error occurs, exit after releasing all resources - } } + e.ctx.StoreQueryFeedback(e.feedback) close(workCh) close(e.resultCh) e.idxWorkerWg.Done() @@ -753,6 +770,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup corColInFilter: e.corColInTblSide, plans: e.tblPlans, netDataSize: e.avgRowSize * float64(len(task.handles)), + byItems: e.byItems, } tableReaderExec.buildVirtualColumnInfo() tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true) @@ -907,8 +925,6 @@ type indexWorker struct { PushedLimit *plannercore.PushedDownLimit // scannedKeys indicates how many keys be scanned scannedKeys uint64 - // partitionTable indicates if this worker is accessing a particular partition table. - partitionTable table.PhysicalTable } func (w *indexWorker) syncErr(err error) { @@ -922,7 +938,7 @@ func (w *indexWorker) syncErr(err error) { // fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. // The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh // at the same time to keep data ordered. -func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (err error) { +func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.SelectResult) (err error) { defer func() { if r := recover(); r != nil { logutil.Logger(ctx).Error("indexWorker in IndexLookupExecutor panicked", zap.Any("recover", r), zap.Stack("stack")) @@ -933,7 +949,11 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } }() - retTps := w.idxLookup.getRetTpsByHandle() + retTps := w.idxLookup.getRetTpsForIndexReader() + // for sortedSelectResult, add pids in last column + if !w.idxLookup.index.Global && len(w.idxLookup.byItems) > 0 { + retTps = append(retTps, types.NewFieldType(mysql.TypeLonglong)) + } chk := w.idxLookup.ctx.GetSessionVars().GetNewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize, w.idxLookup.maxChunkSize, w.idxLookup.AllocPool) idxID := w.idxLookup.getIndexPlanRootID() if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { @@ -941,7 +961,11 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes w.idxLookup.stats.indexScanBasicStats = w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID) } } - for { + for i := 0; i < len(results); { + result := results[i] + if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset { + break + } startTime := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, result) finishFetch := time.Now() @@ -950,10 +974,14 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes return err } if len(handles) == 0 { - return nil + i++ + continue } task := w.buildTableTask(handles, retChunk) finishBuild := time.Now() + if w.idxLookup.partitionTableMode { + task.partitionTable = w.idxLookup.prunedPartitions[i] + } select { case <-ctx.Done(): return nil @@ -968,12 +996,13 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes atomic.AddInt64(&w.idxLookup.stats.FetchHandleTotal, int64(time.Since(startTime))) } } + return nil } func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { numColsWithoutPid := chk.NumCols() - if w.idxLookup.index.Global { + if w.idxLookup.hasExtralPidCol() { numColsWithoutPid = numColsWithoutPid - 1 } handleOffset := make([]int, 0, len(w.idxLookup.handleCols)) @@ -1074,7 +1103,6 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) * indexOrder: indexOrder, duplicatedIndexOrder: duplicatedIndexOrder, idxRows: retChk, - partitionTable: w.partitionTable, } task.doneCh = make(chan error, 1) @@ -1165,7 +1193,7 @@ func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int, handle = kv.IntHandle(row.GetInt64(handleIdx[0])) } } - if e.index.Global { + if e.hasExtralPidCol() { pidOffset := row.Len() - 1 pid := row.GetInt64(pidOffset) handle = kv.NewPartitionHandle(pid, handle) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 094e113c01e37..ab52f41529778 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -367,7 +367,7 @@ func TestOrderByAndLimit(t *testing.T) { tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") // range partition table - tk.MustExec(`create table trange(a int, b int, index idx_a(a)) partition by range(a) ( + tk.MustExec(`create table trange(a int, b int, index idx_a(a), index idx_b(b)) partition by range(a) ( partition p0 values less than(300), partition p1 values less than (500), partition p2 values less than(1100));`) @@ -376,7 +376,7 @@ func TestOrderByAndLimit(t *testing.T) { tk.MustExec("create table thash(a int, b int, index idx_a(a), index idx_b(b)) partition by hash(a) partitions 4;") // regular table - tk.MustExec("create table tregular(a int, b int, index idx_a(a))") + tk.MustExec("create table tregular(a int, b int, index idx_a(a), index idx_b(b))") // range partition table with int pk tk.MustExec(`create table trange_intpk(a int primary key, b int) partition by range(a) ( @@ -437,7 +437,7 @@ func TestOrderByAndLimit(t *testing.T) { partition p2 values in (%s), partition p3 values in (%s) )`, listVals1, listVals2, listVals3)) - tk.MustExec(fmt.Sprintf(`create table tlist(a int, b int, index idx_a(a)) partition by list(a)( + tk.MustExec(fmt.Sprintf(`create table tlist(a int, b int, index idx_a(a), index idx_b(b)) partition by list(a)( partition p1 values in (%s), partition p2 values in (%s), partition p3 values in (%s) @@ -552,9 +552,35 @@ func TestOrderByAndLimit(t *testing.T) { require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp")) require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "Limit")) require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "IndexLookUp")) - require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // but not fully pushed - require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN")) - require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN")) + require.False(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // fully pushed + require.False(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN")) + require.False(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN")) + regularResult := tk.MustQuery(queryRegular).Sort().Rows() + tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult) + tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult) + tk.MustQuery(queryListPartitionWithLimitHint).Sort().Check(regularResult) + } + + // test indexLookUp with order property pushed down. + for i := 0; i < 100; i++ { + // explain select * from t where b > {y} use index(idx_b) order by b limit {x}; // check if IndexLookUp is used + // select * from t where b > {y} use index(idx_b) order by b limit {x}; // it can return the correct result + x := rand.Intn(1999) + y := rand.Intn(2000) + 1 + maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(b), 2000) from (select * from tregular use index(idx_b) where b > %v order by b limit %v) t", x, y)).Rows()[0][0] + queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_b) where b > %v and b < greatest(%v+1, %v) order by b limit %v", x, x+1, maxEle, y) + queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_b) where b > %v and b < greatest(%v+1, %v) order by b limit %v", x, x+1, maxEle, y) + queryListPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from tlist use index(idx_b) where b > %v and b < greatest(%v+1, %v) order by b limit %v", x, x+1, maxEle, y) + queryRegular := fmt.Sprintf("select * from tregular use index(idx_b) where b > %v and b < greatest(%v+1, %v) order by b limit %v;", x, x+1, maxEle, y) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp")) + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp")) + require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "IndexLookUp")) + require.False(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // fully pushed + require.False(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN")) + require.False(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN")) regularResult := tk.MustQuery(queryRegular).Sort().Rows() tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult) tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult) diff --git a/executor/table_reader.go b/executor/table_reader.go index 7385b5f8500c1..b3a2a5565614c 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -329,7 +329,10 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra } results = append(results, result) } - return distsql.NewSortedSelectResults(results, e.byItems, e.memTracker), nil + if len(results) == 1 { + return results[0], nil + } + return distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker), nil } kvReq, err := e.buildKVReq(ctx, ranges) diff --git a/planner/core/casetest/testdata/integration_partition_suite_out.json b/planner/core/casetest/testdata/integration_partition_suite_out.json index 2fbe7f417621f..c81544c819656 100644 --- a/planner/core/casetest/testdata/integration_partition_suite_out.json +++ b/planner/core/casetest/testdata/integration_partition_suite_out.json @@ -1176,7 +1176,7 @@ "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", - " └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + " └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo" ] }, { @@ -1185,7 +1185,7 @@ "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", - " └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + " └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo" ] }, { @@ -1194,7 +1194,7 @@ "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", - " └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + " └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo" ] }, { @@ -1209,31 +1209,28 @@ { "SQL": "explain format='brief' select * from trange use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.trange.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" ] }, { "SQL": "explain format='brief' select * from tlist use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.tlist.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" ] }, { "SQL": "explain format='brief' select * from thash use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.thash.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" ] }, { @@ -1249,34 +1246,31 @@ { "SQL": "explain format='brief' select * from trange use index (ia) where a + 1 > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.trange.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─Selection 10.00 cop[tikv] gt(plus(test.trange.a, 1), 10)", - " │ └─IndexFullScan 12.50 cop[tikv] table:trange, index:ia(a) keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─Selection 10.00 cop[tikv] gt(plus(test.trange.a, 1), 10)", + "│ └─IndexFullScan 12.50 cop[tikv] table:trange, index:ia(a) keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" ] }, { "SQL": "explain format='brief' select * from tlist use index (ia) where a + 1 > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.tlist.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─Selection 10.00 cop[tikv] gt(plus(test.tlist.a, 1), 10)", - " │ └─IndexFullScan 12.50 cop[tikv] table:tlist, index:ia(a) keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─Selection 10.00 cop[tikv] gt(plus(test.tlist.a, 1), 10)", + "│ └─IndexFullScan 12.50 cop[tikv] table:tlist, index:ia(a) keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" ] }, { "SQL": "explain format='brief' select * from thash use index (ia) where a + 1 > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.thash.a, offset:0, count:10", - "└─IndexLookUp 10.00 root partition:all ", - " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", - " │ └─Selection 10.00 cop[tikv] gt(plus(test.thash.a, 1), 10)", - " │ └─IndexFullScan 12.50 cop[tikv] table:thash, index:ia(a) keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" + "IndexLookUp 10.00 root partition:all limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─Selection 10.00 cop[tikv] gt(plus(test.thash.a, 1), 10)", + "│ └─IndexFullScan 12.50 cop[tikv] table:thash, index:ia(a) keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" ] }, { diff --git a/planner/core/task.go b/planner/core/task.go index 01d31d204d724..3263fd31b7270 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1211,6 +1211,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { return nil, false } idxScan.Desc = isDesc + idxScan.KeepOrder = true idxScan.ByItems = p.ByItems childProfile := copTsk.plan().statsInfo() newCount := p.Offset + p.Count @@ -1233,16 +1234,29 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { } rootTask := copTsk.convertToRootTask(p.ctx) - // only support IndexReader now. - if _, ok := rootTask.p.(*PhysicalIndexReader); ok { - rootLimit := PhysicalLimit{ - Count: p.Count, - Offset: p.Offset, - PartitionBy: newPartitionBy, - }.Init(p.SCtx(), stats, p.SelectBlockOffset()) - rootLimit.SetSchema(rootTask.plan().Schema()) - return attachPlan2Task(rootLimit, rootTask), true + // embedded limit in indexLookUp, no more limit needed. + if idxLookup, ok := rootTask.p.(*PhysicalIndexLookUpReader); ok { + idxLookup.PushedLimit = &PushedDownLimit{ + Offset: p.Offset, + Count: p.Count, + } + extraInfo, extraCol, hasExtraCol := tryGetPkExtraColumn(p.ctx.GetSessionVars(), tblInfo) + if hasExtraCol { + idxLookup.ExtraHandleCol = extraCol + ts := idxLookup.TablePlans[0].(*PhysicalTableScan) + ts.Columns = append(ts.Columns, extraInfo) + ts.schema.Append(extraCol) + ts.HandleIdx = []int{len(ts.Columns) - 1} + } + return rootTask, true } + rootLimit := PhysicalLimit{ + Count: p.Count, + Offset: p.Offset, + PartitionBy: newPartitionBy, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + rootLimit.SetSchema(rootTask.plan().Schema()) + return attachPlan2Task(rootLimit, rootTask), true } } else if copTsk.indexPlan == nil { if tblScan.HandleCols == nil {