diff --git a/distsql/BUILD.bazel b/distsql/BUILD.bazel index 5839f55fbc52c..1d9f169050a18 100644 --- a/distsql/BUILD.bazel +++ b/distsql/BUILD.bazel @@ -13,11 +13,13 @@ go_library( "//config", "//ddl/placement", "//errno", + "//expression", "//infoschema", "//kv", "//metrics", "//parser/mysql", "//parser/terror", + "//planner/util", "//sessionctx", "//sessionctx/stmtctx", "//sessionctx/variable", diff --git a/distsql/select_result.go b/distsql/select_result.go index 6d1f6308e4120..a9e6ee9b4124c 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -16,6 +16,7 @@ package distsql import ( "bytes" + "container/heap" "context" "fmt" "strconv" @@ -26,9 +27,11 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/copr" @@ -56,6 +59,7 @@ var ( var ( _ SelectResult = (*selectResult)(nil) _ SelectResult = (*serialSelectResults)(nil) + _ SelectResult = (*sortedSelectResults)(nil) ) // SelectResult is an iterator of coprocessor partial results. @@ -68,6 +72,160 @@ type SelectResult interface { Close() error } +type chunkRowHeap struct { + *sortedSelectResults +} + +func (h chunkRowHeap) Len() int { + return len(h.rowPtrs) +} + +func (h chunkRowHeap) Less(i, j int) bool { + iPtr := h.rowPtrs[i] + jPtr := h.rowPtrs[j] + return h.lessRow(h.cachedChunks[iPtr.ChkIdx].GetRow(int(iPtr.RowIdx)), + h.cachedChunks[jPtr.ChkIdx].GetRow(int(jPtr.RowIdx))) +} + +func (h chunkRowHeap) Swap(i, j int) { + h.rowPtrs[i], h.rowPtrs[j] = h.rowPtrs[j], h.rowPtrs[i] +} + +func (h *chunkRowHeap) Push(x interface{}) { + h.rowPtrs = append(h.rowPtrs, x.(chunk.RowPtr)) +} + +func (h *chunkRowHeap) Pop() interface{} { + ret := h.rowPtrs[len(h.rowPtrs)-1] + h.rowPtrs = h.rowPtrs[0 : len(h.rowPtrs)-1] + return ret +} + +// NewSortedSelectResults is only for partition table +func NewSortedSelectResults(selectResult []SelectResult, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult { + s := &sortedSelectResults{ + selectResult: selectResult, + byItems: byitems, + memTracker: memTracker, + } + s.initCompareFuncs() + s.buildKeyColumns() + + s.heap = &chunkRowHeap{s} + s.cachedChunks = make([]*chunk.Chunk, len(selectResult)) + return s +} + +type sortedSelectResults struct { + selectResult []SelectResult + compareFuncs []chunk.CompareFunc + byItems []*util.ByItems + keyColumns []int + + cachedChunks []*chunk.Chunk + rowPtrs []chunk.RowPtr + heap *chunkRowHeap + + memTracker *memory.Tracker +} + +func (ssr *sortedSelectResults) updateCachedChunk(ctx context.Context, idx uint32) error { + prevMemUsage := ssr.cachedChunks[idx].MemoryUsage() + if err := ssr.selectResult[idx].Next(ctx, ssr.cachedChunks[idx]); err != nil { + return err + } + ssr.memTracker.Consume(ssr.cachedChunks[idx].MemoryUsage() - prevMemUsage) + if ssr.cachedChunks[idx].NumRows() == 0 { + return nil + } + heap.Push(ssr.heap, chunk.RowPtr{ChkIdx: idx, RowIdx: 0}) + return nil +} + +func (ssr *sortedSelectResults) initCompareFuncs() { + ssr.compareFuncs = make([]chunk.CompareFunc, len(ssr.byItems)) + for i, item := range ssr.byItems { + keyType := item.Expr.GetType() + ssr.compareFuncs[i] = chunk.GetCompareFunc(keyType) + } +} + +func (ssr *sortedSelectResults) buildKeyColumns() { + ssr.keyColumns = make([]int, 0, len(ssr.byItems)) + for _, by := range ssr.byItems { + col := by.Expr.(*expression.Column) + ssr.keyColumns = append(ssr.keyColumns, col.Index) + } +} + +func (ssr *sortedSelectResults) lessRow(rowI, rowJ chunk.Row) bool { + for i, colIdx := range ssr.keyColumns { + cmpFunc := ssr.compareFuncs[i] + cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) + if ssr.byItems[i].Desc { + cmp = -cmp + } + if cmp < 0 { + return true + } else if cmp > 0 { + return false + } + } + return false +} + +func (*sortedSelectResults) NextRaw(context.Context) ([]byte, error) { + panic("Not support NextRaw for sortedSelectResults") +} + +func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err error) { + c.Reset() + for i := range ssr.cachedChunks { + if ssr.cachedChunks[i] == nil { + ssr.cachedChunks[i] = c.CopyConstruct() + ssr.memTracker.Consume(ssr.cachedChunks[i].MemoryUsage()) + } + } + + if ssr.heap.Len() == 0 { + for i := range ssr.cachedChunks { + if err = ssr.updateCachedChunk(ctx, uint32(i)); err != nil { + return err + } + } + } + + for c.NumRows() < c.RequiredRows() { + if ssr.heap.Len() == 0 { + break + } + + idx := heap.Pop(ssr.heap).(chunk.RowPtr) + c.AppendRow(ssr.cachedChunks[idx.ChkIdx].GetRow(int(idx.RowIdx))) + + if int(idx.RowIdx) >= ssr.cachedChunks[idx.ChkIdx].NumRows()-1 { + if err = ssr.updateCachedChunk(ctx, idx.ChkIdx); err != nil { + return err + } + } else { + heap.Push(ssr.heap, chunk.RowPtr{ChkIdx: idx.ChkIdx, RowIdx: idx.RowIdx + 1}) + } + } + return nil +} + +func (ssr *sortedSelectResults) Close() (err error) { + for i, sr := range ssr.selectResult { + err = sr.Close() + if err != nil { + return err + } + ssr.memTracker.Consume(-ssr.cachedChunks[i].MemoryUsage()) + ssr.cachedChunks[i] = nil + } + return nil +} + // NewSerialSelectResults create a SelectResult which will read each SelectResult serially. func NewSerialSelectResults(selectResults []SelectResult) SelectResult { return &serialSelectResults{ diff --git a/executor/builder.go b/executor/builder.go index ab0ad87efc891..437b4f5e6f2f5 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3352,6 +3352,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea table: tbl, keepOrder: ts.KeepOrder, desc: ts.Desc, + byItems: ts.ByItems, columns: ts.Columns, paging: paging, corColInFilter: b.corColInDistPlan(v.TablePlans), @@ -3636,6 +3637,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea keepOrder: is.KeepOrder, desc: is.Desc, columns: is.Columns, + byItems: is.ByItems, paging: paging, corColInFilter: b.corColInDistPlan(v.IndexPlans), corColInAccess: b.corColInAccess(v.IndexPlans[0]), diff --git a/executor/distsql.go b/executor/distsql.go index 3d7d54bf6824c..a456f06507d30 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" + plannerutil "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" @@ -192,6 +193,8 @@ type IndexReaderExecutor struct { keepOrder bool desc bool + // byItems only for partition table with orderBy + pushedLimit + byItems []*plannerutil.ByItems corColInFilter bool corColInAccess bool @@ -293,6 +296,25 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { return e.open(ctx, kvRanges) } +func (e *IndexReaderExecutor) buildKVReq(ctx context.Context, r []kv.KeyRange) (*kv.Request, error) { + var builder distsql.RequestBuilder + builder.SetKeyRanges(r). + SetDAGRequest(e.dagPB). + SetStartTS(e.startTS). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetTxnScope(e.txnScope). + SetReadReplicaScope(e.readReplicaScope). + SetIsStaleness(e.isStaleness). + SetFromSessionVars(e.ctx.GetSessionVars()). + SetFromInfoSchema(e.ctx.GetInfoSchema()). + SetMemTracker(e.memTracker). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.netDataSize)). + SetConnID(e.ctx.GetSessionVars().ConnectionID) + kvReq, err := builder.Build() + return kvReq, err +} + func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { @@ -323,6 +345,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) slices.SortFunc(kvRanges, func(i, j kv.KeyRange) bool { return bytes.Compare(i.StartKey, j.StartKey) < 0 }) +<<<<<<< HEAD var builder distsql.RequestBuilder builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -345,6 +368,40 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) if err != nil { e.feedback.Invalidate() return err +======= + // use sortedSelectResults only when byItems pushed down and partition numbers > 1 + if e.byItems == nil || len(e.partitions) <= 1 { + kvReq, err := e.buildKVReq(ctx, kvRanges) + if err != nil { + e.feedback.Invalidate() + return err + } + e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + e.feedback.Invalidate() + return err + } + } else { + kvReqs := make([]*kv.Request, 0, len(kvRanges)) + for _, kvRange := range kvRanges { + kvReq, err := e.buildKVReq(ctx, []kv.KeyRange{kvRange}) + if err != nil { + e.feedback.Invalidate() + return err + } + kvReqs = append(kvReqs, kvReq) + } + var results []distsql.SelectResult + for _, kvReq := range kvReqs { + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + e.feedback.Invalidate() + return err + } + results = append(results, result) + } + e.result = distsql.NewSortedSelectResults(results, e.byItems, e.memTracker) +>>>>>>> cc56b21242 (executor: support mergeSort different selectResult in TableScan and IndexScan (#42024)) } return nil } diff --git a/executor/distsqltest/BUILD.bazel b/executor/distsqltest/BUILD.bazel index e9beb087cd470..580a37416d63e 100644 --- a/executor/distsqltest/BUILD.bazel +++ b/executor/distsqltest/BUILD.bazel @@ -10,6 +10,7 @@ go_test( "//config", "//kv", "//meta/autoid", + "//sessionctx/variable", "//testkit", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", diff --git a/executor/distsqltest/distsql_test.go b/executor/distsqltest/distsql_test.go index 59017e055bf41..6d634b12f6a7f 100644 --- a/executor/distsqltest/distsql_test.go +++ b/executor/distsqltest/distsql_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -61,18 +62,16 @@ func TestDistsqlPartitionTableConcurrency(t *testing.T) { // 20-ranges-partitioned table checker ctx3 := context.WithValue(context.Background(), "CheckSelectRequestHook", func(req *kv.Request) { require.Equal(t, req.KeyRanges.PartitionNum(), 20) - require.Equal(t, req.Concurrency, 15) + require.Equal(t, req.Concurrency, variable.DefDistSQLScanConcurrency) }) ctxs := []context.Context{ctx1, ctx2, ctx3} for i, tbl := range []string{"t1", "t2", "t3"} { ctx := ctxs[i] - tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id asc limit 1", tbl)). - Check(testkit.Rows("0 0")) - tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id asc limit 5", tbl)). - Check(testkit.Rows("0 0", "50 50", "100 100", "150 150", "200 200")) - tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id desc limit 1", tbl)). - Check(testkit.Rows("950 950")) - tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id desc limit 5", tbl)). - Check(testkit.Rows("950 950", "900 900", "850 850", "800 800", "750 750")) + // If order by is added here, the concurrency is always equal to 1. + // Because we will use different kv.Request for each partition in TableReader. + tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 1", tbl)) + tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 5", tbl)) + tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 1", tbl)) + tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 5", tbl)) } } diff --git a/executor/partition_table.go b/executor/partition_table.go index de6f17e8d2cc0..3dec5d8caf392 100644 --- a/executor/partition_table.go +++ b/executor/partition_table.go @@ -23,6 +23,9 @@ import ( ) func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, recursive bool, partitionIDs []int64) error { + if exec == nil { + return nil + } var child *tipb.Executor switch exec.Tp { case tipb.ExecType_TypeTableScan: diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 79e710d7e2e5b..a52d3070ea01b 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -473,7 +473,7 @@ func TestOrderByandLimit(t *testing.T) { queryPartition := fmt.Sprintf("select * from trange use index(idx_a) where a > %v order by a, b limit %v;", x, y) queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v order by a, b limit %v;", x, y) require.True(t, tk.HasPlan(queryPartition, "IndexLookUp")) // check if IndexLookUp is used - tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + tk.MustQuery(queryPartition).Check(tk.MustQuery(queryRegular).Rows()) } // test indexLookUp with order property pushed down. @@ -487,6 +487,10 @@ func TestOrderByandLimit(t *testing.T) { maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(a), 1100) from (select * from tregular use index(idx_a) where a > %v order by a limit %v) t", x, y)).Rows()[0][0] queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) +<<<<<<< HEAD +======= + queryListPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from tlist use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) +>>>>>>> cc56b21242 (executor: support mergeSort different selectResult in TableScan and IndexScan (#42024)) queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v;", x, x+1, maxEle, y) require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit")) require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp")) @@ -508,7 +512,7 @@ func TestOrderByandLimit(t *testing.T) { queryPartition := fmt.Sprintf("select * from trange ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) queryRegular := fmt.Sprintf("select * from tregular ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) require.True(t, tk.HasPlan(queryPartition, "TableReader")) // check if tableReader is used - tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + tk.MustQuery(queryPartition).Check(tk.MustQuery(queryRegular).Rows()) } // test tableReader with order property pushed down. @@ -524,9 +528,17 @@ func TestOrderByandLimit(t *testing.T) { require.True(t, tk.HasPlan(queryHashPartition, "TableReader")) require.False(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed require.False(t, tk.HasPlan(queryHashPartition, "Limit")) +<<<<<<< HEAD regularResult := tk.MustQuery(queryRegular).Sort().Rows() tk.MustQuery(queryRangePartition).Sort().Check(regularResult) tk.MustQuery(queryHashPartition).Sort().Check(regularResult) +======= + require.False(t, tk.HasPlan(queryListPartition, "Limit")) + regularResult := tk.MustQuery(queryRegular).Rows() + tk.MustQuery(queryRangePartition).Check(regularResult) + tk.MustQuery(queryHashPartition).Check(regularResult) + tk.MustQuery(queryListPartition).Check(regularResult) +>>>>>>> cc56b21242 (executor: support mergeSort different selectResult in TableScan and IndexScan (#42024)) // test int pk // To be simplified, we only read column a. @@ -549,8 +561,15 @@ func TestOrderByandLimit(t *testing.T) { require.True(t, tk.HasPlan(queryHashPartition, "TableReader")) require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed require.True(t, tk.HasPlan(queryHashPartition, "Limit")) +<<<<<<< HEAD require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed require.True(t, tk.HasPlan(queryHashPartition, "TopN")) +======= + require.True(t, tk.HasPlan(queryListPartition, "Limit")) + require.False(t, tk.HasPlan(queryRangePartition, "TopN")) // could fully pushed for TableScan executor + require.False(t, tk.HasPlan(queryHashPartition, "TopN")) + require.False(t, tk.HasPlan(queryListPartition, "TopN")) +>>>>>>> cc56b21242 (executor: support mergeSort different selectResult in TableScan and IndexScan (#42024)) regularResult = tk.MustQuery(queryRegular).Rows() tk.MustQuery(queryRangePartition).Check(regularResult) tk.MustQuery(queryHashPartition).Check(regularResult) @@ -604,7 +623,7 @@ func TestOrderByandLimit(t *testing.T) { queryPartition := fmt.Sprintf("select a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) queryRegular := fmt.Sprintf("select a from tregular use index(idx_a) where a > %v order by a limit %v;", x, y) require.True(t, tk.HasPlan(queryPartition, "IndexReader")) // check if indexReader is used - tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + tk.MustQuery(queryPartition).Check(tk.MustQuery(queryRegular).Rows()) } // test indexReader with order property pushed down. @@ -614,27 +633,40 @@ func TestOrderByandLimit(t *testing.T) { x := rand.Intn(1099) y := rand.Intn(2000) + 1 queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) - queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) + queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from thash use index(idx_a) where a > %v order by a limit %v;", x, y) queryRegular := fmt.Sprintf("select a from tregular use index(idx_a) where a > %v order by a limit %v;", x, y) require.True(t, tk.HasPlan(queryRangePartition, "IndexReader")) // check if indexReader is used require.True(t, tk.HasPlan(queryHashPartition, "IndexReader")) require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed require.True(t, tk.HasPlan(queryHashPartition, "Limit")) - regularResult := tk.MustQuery(queryRegular).Sort().Rows() - tk.MustQuery(queryRangePartition).Sort().Check(regularResult) - tk.MustQuery(queryHashPartition).Sort().Check(regularResult) + require.False(t, tk.HasPlan(queryRangePartition, "TopN")) // fully pushed limit + require.False(t, tk.HasPlan(queryHashPartition, "TopN")) + regularResult := tk.MustQuery(queryRegular).Rows() + tk.MustQuery(queryRangePartition).Check(regularResult) + tk.MustQuery(queryHashPartition).Check(regularResult) } // test indexMerge for i := 0; i < 100; i++ { - // explain select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // check if IndexMerge is used - // select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // can return the correct value + // explain select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a, b limit {x}; // check if IndexMerge is used + // select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a, b limit {x}; // can return the correct value y := rand.Intn(2000) + 1 - queryPartition := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > 2 or b < 5 order by a, b limit %v;", y) + queryHashPartition := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > 2 or b < 5 order by a, b limit %v;", y) queryRegular := fmt.Sprintf("select * from tregular where a > 2 or b < 5 order by a, b limit %v;", y) - require.True(t, tk.HasPlan(queryPartition, "IndexMerge")) // check if indexMerge is used - tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) - } + require.True(t, tk.HasPlan(queryHashPartition, "IndexMerge")) // check if indexMerge is used + tk.MustQuery(queryHashPartition).Check(tk.MustQuery(queryRegular).Rows()) + } + + // test sql killed when memory exceed `tidb_mem_quota_query` + originMemQuota := tk.MustQuery("show variables like 'tidb_mem_quota_query'").Rows()[0][1].(string) + originOOMAction := tk.MustQuery("show variables like 'tidb_mem_oom_action'").Rows()[0][1].(string) + tk.MustExec("set session tidb_mem_quota_query=128") + tk.MustExec("set global tidb_mem_oom_action=CANCEL") + err := tk.QueryToErr("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > 1 order by a limit 2000") + require.Error(t, err) + require.Regexp(t, "Out Of Memory Quota.*", err) + tk.MustExec(fmt.Sprintf("set session tidb_mem_quota_query=%s", originMemQuota)) + tk.MustExec(fmt.Sprintf("set global tidb_mem_oom_action=%s", originOOMAction)) } func TestOrderByOnUnsignedPk(t *testing.T) { diff --git a/executor/table_reader.go b/executor/table_reader.go index aa8fcf910be1b..f4a86ebd05572 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/statistics" @@ -105,6 +106,8 @@ type TableReaderExecutor struct { keepOrder bool desc bool + // byItems only for partition table with orderBy + pushedLimit + byItems []*util.ByItems paging bool storeType kv.StoreType // corColInFilter tells whether there's correlated column in filter. @@ -188,6 +191,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { e.feedback.Invalidate() } } + firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle) // Treat temporary table as dummy table, avoid sending distsql request to TiKV. @@ -314,6 +318,23 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra return result, nil } + // use sortedSelectResults here when pushDown limit for partition table. + if e.kvRangeBuilder != nil && e.byItems != nil { + kvReqs, err := e.buildKVReqSeparately(ctx, ranges) + if err != nil { + return nil, err + } + var results []distsql.SelectResult + for _, kvReq := range kvReqs { + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + return nil, err + } + results = append(results, result) + } + return distsql.NewSortedSelectResults(results, e.byItems, e.memTracker), nil + } + kvReq, err := e.buildKVReq(ctx, ranges) if err != nil { return nil, err diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 537b0826eb381..fd33f30451d7e 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -667,6 +667,9 @@ type PhysicalIndexScan struct { isPartition bool Desc bool KeepOrder bool + // ByItems only for partition table with orderBy + pushedLimit + ByItems []*util.ByItems + // DoubleRead means if the index executor will read kv two times. // If the query requires the columns that don't belong to index, DoubleRead will be true. DoubleRead bool @@ -822,6 +825,8 @@ type PhysicalTableScan struct { // KeepOrder is true, if sort data by scanning pkcol, KeepOrder bool Desc bool + // ByItems only for partition table with orderBy + pushedLimit + ByItems []*util.ByItems isChildOfIndexLookUp bool diff --git a/planner/core/task.go b/planner/core/task.go index 9198bcdaec0bd..9e6fd0e69fc26 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1167,6 +1167,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { return nil, false } idxScan.Desc = isDesc + idxScan.ByItems = p.ByItems childProfile := copTsk.plan().statsInfo() newCount := p.Offset + p.Count stats := deriveLimitStats(childProfile, float64(newCount)) @@ -1175,6 +1176,32 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { }.Init(p.SCtx(), stats, p.SelectBlockOffset()) pushedLimit.SetSchema(copTsk.indexPlan.Schema()) copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) +<<<<<<< HEAD +======= + + // A similar but simplified logic compared the ExpectedCnt handling logic in getOriginalPhysicalIndexScan. + child := pushedLimit.Children()[0] + // The row count of the direct child of Limit should be adjusted to be no larger than the Limit.Count. + child.SetStats(child.statsInfo().ScaleByExpectCnt(float64(newCount))) + // The Limit->Selection->IndexScan case: + // adjust the row count of IndexScan according to the selectivity of the Selection. + if selSelectivity > 0 && selSelectivity < 1 { + scaledRowCount := child.Stats().RowCount / selSelectivity + idxScan.SetStats(idxScan.Stats().ScaleByExpectCnt(scaledRowCount)) + } + + rootTask := copTsk.convertToRootTask(p.ctx) + // only support IndexReader now. + if _, ok := rootTask.p.(*PhysicalIndexReader); ok { + rootLimit := PhysicalLimit{ + Count: p.Count, + Offset: p.Offset, + PartitionBy: newPartitionBy, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + rootLimit.SetSchema(rootTask.plan().Schema()) + return attachPlan2Task(rootLimit, rootTask), true + } +>>>>>>> cc56b21242 (executor: support mergeSort different selectResult in TableScan and IndexScan (#42024)) } } else if copTsk.indexPlan == nil { if tblScan.HandleCols == nil { @@ -1196,6 +1223,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { tblScan.Desc = isDesc // SplitRangesAcrossInt64Boundary needs the KeepOrder flag. See that func and the struct tableResultHandler for more details. tblScan.KeepOrder = true + tblScan.ByItems = p.ByItems childProfile := copTsk.plan().statsInfo() newCount := p.Offset + p.Count stats := deriveLimitStats(childProfile, float64(newCount)) @@ -1204,6 +1232,29 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { }.Init(p.SCtx(), stats, p.SelectBlockOffset()) pushedLimit.SetSchema(copTsk.tablePlan.Schema()) copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) +<<<<<<< HEAD +======= + + // A similar but simplified logic compared the ExpectedCnt handling logic in getOriginalPhysicalTableScan. + child := pushedLimit.Children()[0] + // The row count of the direct child of Limit should be adjusted to be no larger than the Limit.Count. + child.SetStats(child.statsInfo().ScaleByExpectCnt(float64(newCount))) + // The Limit->Selection->TableScan case: + // adjust the row count of IndexScan according to the selectivity of the Selection. + if selSelectivity > 0 && selSelectivity < 1 { + scaledRowCount := child.Stats().RowCount / selSelectivity + tblScan.SetStats(tblScan.Stats().ScaleByExpectCnt(scaledRowCount)) + } + + rootTask := copTsk.convertToRootTask(p.ctx) + rootLimit := PhysicalLimit{ + Count: p.Count, + Offset: p.Offset, + PartitionBy: newPartitionBy, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + rootLimit.SetSchema(rootTask.plan().Schema()) + return attachPlan2Task(rootLimit, rootTask), true +>>>>>>> cc56b21242 (executor: support mergeSort different selectResult in TableScan and IndexScan (#42024)) } else { return nil, false } diff --git a/planner/core/testdata/integration_partition_suite_out.json b/planner/core/testdata/integration_partition_suite_out.json index e496996969211..101a0581e98af 100644 --- a/planner/core/testdata/integration_partition_suite_out.json +++ b/planner/core/testdata/integration_partition_suite_out.json @@ -1157,5 +1157,263 @@ ] } ] +<<<<<<< HEAD:planner/core/testdata/integration_partition_suite_out.json +======= + }, + { + "Name": "TestEstimationForTopNPushToDynamicPartition", + "Cases": [ + { + "SQL": "explain format='brief' select a from t use index (ia) where a > 10 order by a limit 10", + "Plan": [ + "Limit 10.00 root offset:0, count:10", + "└─IndexReader 10.00 root index:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─IndexRangeScan 10.00 cop[tikv] table:t, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from trange use index (ia) where a > 10 order by a limit 10", + "Plan": [ + "Limit 10.00 root offset:0, count:10", + "└─IndexReader 10.00 root partition:all index:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from tlist use index (ia) where a > 10 order by a limit 10", + "Plan": [ + "Limit 10.00 root offset:0, count:10", + "└─IndexReader 10.00 root partition:all index:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from thash use index (ia) where a > 10 order by a limit 10", + "Plan": [ + "Limit 10.00 root offset:0, count:10", + "└─IndexReader 10.00 root partition:all index:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select * from t use index (ia) where a > 10 order by a limit 10", + "Plan": [ + "IndexLookUp 10.00 root limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─IndexRangeScan 10.00 cop[tikv] table:t, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select * from trange use index (ia) where a > 10 order by a limit 10", + "Plan": [ + "TopN 10.00 root test.trange.a, offset:0, count:10", + "└─IndexLookUp 10.00 root partition:all ", + " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + " │ └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select * from tlist use index (ia) where a > 10 order by a limit 10", + "Plan": [ + "TopN 10.00 root test.tlist.a, offset:0, count:10", + "└─IndexLookUp 10.00 root partition:all ", + " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + " │ └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select * from thash use index (ia) where a > 10 order by a limit 10", + "Plan": [ + "TopN 10.00 root test.thash.a, offset:0, count:10", + "└─IndexLookUp 10.00 root partition:all ", + " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + " │ └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select * from t use index (ia) where a + 1 > 10 order by a limit 10", + "Plan": [ + "IndexLookUp 10.00 root limit embedded(offset:0, count:10)", + "├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + "│ └─Selection 10.00 cop[tikv] gt(plus(test.t.a, 1), 10)", + "│ └─IndexFullScan 12.50 cop[tikv] table:t, index:ia(a) keep order:true, stats:pseudo", + "└─TableRowIDScan(Probe) 10.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select * from trange use index (ia) where a + 1 > 10 order by a limit 10", + "Plan": [ + "TopN 10.00 root test.trange.a, offset:0, count:10", + "└─IndexLookUp 10.00 root partition:all ", + " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + " │ └─Selection 10.00 cop[tikv] gt(plus(test.trange.a, 1), 10)", + " │ └─IndexFullScan 12.50 cop[tikv] table:trange, index:ia(a) keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:trange keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select * from tlist use index (ia) where a + 1 > 10 order by a limit 10", + "Plan": [ + "TopN 10.00 root test.tlist.a, offset:0, count:10", + "└─IndexLookUp 10.00 root partition:all ", + " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + " │ └─Selection 10.00 cop[tikv] gt(plus(test.tlist.a, 1), 10)", + " │ └─IndexFullScan 12.50 cop[tikv] table:tlist, index:ia(a) keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:tlist keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select * from thash use index (ia) where a + 1 > 10 order by a limit 10", + "Plan": [ + "TopN 10.00 root test.thash.a, offset:0, count:10", + "└─IndexLookUp 10.00 root partition:all ", + " ├─Limit(Build) 10.00 cop[tikv] offset:0, count:10", + " │ └─Selection 10.00 cop[tikv] gt(plus(test.thash.a, 1), 10)", + " │ └─IndexFullScan 12.50 cop[tikv] table:thash, index:ia(a) keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:thash keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from t use index (ia) where a > 10 and c = 10 order by a limit 10", + "Plan": [ + "Projection 3.33 root test.t.a", + "└─Limit 3.33 root offset:0, count:10", + " └─Projection 3.33 root test.t.a, test.t.c", + " └─IndexLookUp 3.33 root ", + " ├─IndexRangeScan(Build) 3333.33 cop[tikv] table:t, index:ia(a) range:(10,+inf], keep order:true, stats:pseudo", + " └─Selection(Probe) 3.33 cop[tikv] eq(test.t.c, 10)", + " └─TableRowIDScan 3333.33 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from trange use index (ia) where a > 10 and c = 10 order by a limit 10", + "Plan": [ + "Projection 3.33 root test.trange.a", + "└─TopN 3.33 root test.trange.a, offset:0, count:10", + " └─IndexLookUp 3.33 root partition:all ", + " ├─IndexRangeScan(Build) 3333.33 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", + " └─TopN(Probe) 3.33 cop[tikv] test.trange.a, offset:0, count:10", + " └─Selection 3.33 cop[tikv] eq(test.trange.c, 10)", + " └─TableRowIDScan 3333.33 cop[tikv] table:trange keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from tlist use index (ia) where a > 10 and c = 10 order by a limit 10", + "Plan": [ + "Projection 3.33 root test.tlist.a", + "└─TopN 3.33 root test.tlist.a, offset:0, count:10", + " └─IndexLookUp 3.33 root partition:all ", + " ├─IndexRangeScan(Build) 3333.33 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", + " └─TopN(Probe) 3.33 cop[tikv] test.tlist.a, offset:0, count:10", + " └─Selection 3.33 cop[tikv] eq(test.tlist.c, 10)", + " └─TableRowIDScan 3333.33 cop[tikv] table:tlist keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from thash use index (ia) where a > 10 and c = 10 order by a limit 10", + "Plan": [ + "Projection 3.33 root test.thash.a", + "└─TopN 3.33 root test.thash.a, offset:0, count:10", + " └─IndexLookUp 3.33 root partition:all ", + " ├─IndexRangeScan(Build) 3333.33 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo", + " └─TopN(Probe) 3.33 cop[tikv] test.thash.a, offset:0, count:10", + " └─Selection 3.33 cop[tikv] eq(test.thash.c, 10)", + " └─TableRowIDScan 3333.33 cop[tikv] table:thash keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from t use index () where b > 10 order by b limit 10", + "Plan": [ + "Projection 10.00 root test.t.a", + "└─Limit 10.00 root offset:0, count:10", + " └─TableReader 10.00 root data:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─TableRangeScan 10.00 cop[tikv] table:t range:(10,+inf], keep order:true, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from trange use index () where b > 10 order by b limit 10", + "Plan": [ + "Projection 10.00 root test.trange.a", + "└─Limit 10.00 root offset:0, count:10", + " └─TableReader 10.00 root partition:all data:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─TableRangeScan 10.00 cop[tikv] table:trange range:(10,+inf], keep order:true, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from tlist use index () where b > 10 order by b limit 10", + "Plan": [ + "Projection 10.00 root test.tlist.a", + "└─Limit 10.00 root offset:0, count:10", + " └─TableReader 10.00 root partition:all data:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─TableRangeScan 10.00 cop[tikv] table:tlist range:(10,+inf], keep order:true, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from thash use index () where b > 10 order by b limit 10", + "Plan": [ + "Projection 10.00 root test.thash.a", + "└─Limit 10.00 root offset:0, count:10", + " └─TableReader 10.00 root partition:all data:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─TableRangeScan 10.00 cop[tikv] table:thash range:(10,+inf], keep order:true, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from t use index () where a > 10 order by b limit 10", + "Plan": [ + "Projection 10.00 root test.t.a", + "└─Limit 10.00 root offset:0, count:10", + " └─TableReader 10.00 root data:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─Selection 10.00 cop[tikv] gt(test.t.a, 10)", + " └─TableFullScan 30.00 cop[tikv] table:t keep order:true, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from trange use index () where a > 10 order by b limit 10", + "Plan": [ + "Projection 10.00 root test.trange.a", + "└─Limit 10.00 root offset:0, count:10", + " └─TableReader 10.00 root partition:all data:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─Selection 10.00 cop[tikv] gt(test.trange.a, 10)", + " └─TableFullScan 30.00 cop[tikv] table:trange keep order:true, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from tlist use index () where a > 10 order by b limit 10", + "Plan": [ + "Projection 10.00 root test.tlist.a", + "└─Limit 10.00 root offset:0, count:10", + " └─TableReader 10.00 root partition:all data:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─Selection 10.00 cop[tikv] gt(test.tlist.a, 10)", + " └─TableFullScan 30.00 cop[tikv] table:tlist keep order:true, stats:pseudo" + ] + }, + { + "SQL": "explain format='brief' select a from thash use index () where a > 10 order by b limit 10", + "Plan": [ + "Projection 10.00 root test.thash.a", + "└─Limit 10.00 root offset:0, count:10", + " └─TableReader 10.00 root partition:all data:Limit", + " └─Limit 10.00 cop[tikv] offset:0, count:10", + " └─Selection 10.00 cop[tikv] gt(test.thash.a, 10)", + " └─TableFullScan 30.00 cop[tikv] table:thash keep order:true, stats:pseudo" + ] + } + ] +>>>>>>> cc56b21242 (executor: support mergeSort different selectResult in TableScan and IndexScan (#42024)):planner/core/casetest/testdata/integration_partition_suite_out.json } ]