Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support mergeSort different selectResult in TableScan and IndexScan (#42024) #42224

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ go_library(
"//config",
"//ddl/placement",
"//errno",
"//expression",
"//infoschema",
"//kv",
"//metrics",
"//parser/mysql",
"//parser/terror",
"//planner/util",
"//sessionctx",
"//sessionctx/stmtctx",
"//sessionctx/variable",
Expand Down
158 changes: 158 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package distsql

import (
"bytes"
"container/heap"
"context"
"fmt"
"strconv"
Expand All @@ -26,9 +27,11 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/copr"
Expand Down Expand Up @@ -56,6 +59,7 @@ var (
var (
_ SelectResult = (*selectResult)(nil)
_ SelectResult = (*serialSelectResults)(nil)
_ SelectResult = (*sortedSelectResults)(nil)
)

// SelectResult is an iterator of coprocessor partial results.
Expand All @@ -68,6 +72,160 @@ type SelectResult interface {
Close() error
}

type chunkRowHeap struct {
*sortedSelectResults
}

func (h chunkRowHeap) Len() int {
return len(h.rowPtrs)
}

func (h chunkRowHeap) Less(i, j int) bool {
iPtr := h.rowPtrs[i]
jPtr := h.rowPtrs[j]
return h.lessRow(h.cachedChunks[iPtr.ChkIdx].GetRow(int(iPtr.RowIdx)),
h.cachedChunks[jPtr.ChkIdx].GetRow(int(jPtr.RowIdx)))
}

func (h chunkRowHeap) Swap(i, j int) {
h.rowPtrs[i], h.rowPtrs[j] = h.rowPtrs[j], h.rowPtrs[i]
}

func (h *chunkRowHeap) Push(x interface{}) {
h.rowPtrs = append(h.rowPtrs, x.(chunk.RowPtr))
}

func (h *chunkRowHeap) Pop() interface{} {
ret := h.rowPtrs[len(h.rowPtrs)-1]
h.rowPtrs = h.rowPtrs[0 : len(h.rowPtrs)-1]
return ret
}

// NewSortedSelectResults is only for partition table
func NewSortedSelectResults(selectResult []SelectResult, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult {
s := &sortedSelectResults{
selectResult: selectResult,
byItems: byitems,
memTracker: memTracker,
}
s.initCompareFuncs()
s.buildKeyColumns()

s.heap = &chunkRowHeap{s}
s.cachedChunks = make([]*chunk.Chunk, len(selectResult))
return s
}

type sortedSelectResults struct {
selectResult []SelectResult
compareFuncs []chunk.CompareFunc
byItems []*util.ByItems
keyColumns []int

cachedChunks []*chunk.Chunk
rowPtrs []chunk.RowPtr
heap *chunkRowHeap

memTracker *memory.Tracker
}

func (ssr *sortedSelectResults) updateCachedChunk(ctx context.Context, idx uint32) error {
prevMemUsage := ssr.cachedChunks[idx].MemoryUsage()
if err := ssr.selectResult[idx].Next(ctx, ssr.cachedChunks[idx]); err != nil {
return err
}
ssr.memTracker.Consume(ssr.cachedChunks[idx].MemoryUsage() - prevMemUsage)
if ssr.cachedChunks[idx].NumRows() == 0 {
return nil
}
heap.Push(ssr.heap, chunk.RowPtr{ChkIdx: idx, RowIdx: 0})
return nil
}

func (ssr *sortedSelectResults) initCompareFuncs() {
ssr.compareFuncs = make([]chunk.CompareFunc, len(ssr.byItems))
for i, item := range ssr.byItems {
keyType := item.Expr.GetType()
ssr.compareFuncs[i] = chunk.GetCompareFunc(keyType)
}
}

func (ssr *sortedSelectResults) buildKeyColumns() {
ssr.keyColumns = make([]int, 0, len(ssr.byItems))
for _, by := range ssr.byItems {
col := by.Expr.(*expression.Column)
ssr.keyColumns = append(ssr.keyColumns, col.Index)
}
}

func (ssr *sortedSelectResults) lessRow(rowI, rowJ chunk.Row) bool {
for i, colIdx := range ssr.keyColumns {
cmpFunc := ssr.compareFuncs[i]
cmp := cmpFunc(rowI, colIdx, rowJ, colIdx)
if ssr.byItems[i].Desc {
cmp = -cmp
}
if cmp < 0 {
return true
} else if cmp > 0 {
return false
}
}
return false
}

func (*sortedSelectResults) NextRaw(context.Context) ([]byte, error) {
panic("Not support NextRaw for sortedSelectResults")
}

func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err error) {
c.Reset()
for i := range ssr.cachedChunks {
if ssr.cachedChunks[i] == nil {
ssr.cachedChunks[i] = c.CopyConstruct()
ssr.memTracker.Consume(ssr.cachedChunks[i].MemoryUsage())
}
}

if ssr.heap.Len() == 0 {
for i := range ssr.cachedChunks {
if err = ssr.updateCachedChunk(ctx, uint32(i)); err != nil {
return err
}
}
}

for c.NumRows() < c.RequiredRows() {
if ssr.heap.Len() == 0 {
break
}

idx := heap.Pop(ssr.heap).(chunk.RowPtr)
c.AppendRow(ssr.cachedChunks[idx.ChkIdx].GetRow(int(idx.RowIdx)))

if int(idx.RowIdx) >= ssr.cachedChunks[idx.ChkIdx].NumRows()-1 {
if err = ssr.updateCachedChunk(ctx, idx.ChkIdx); err != nil {
return err
}
} else {
heap.Push(ssr.heap, chunk.RowPtr{ChkIdx: idx.ChkIdx, RowIdx: idx.RowIdx + 1})
}
}
return nil
}

func (ssr *sortedSelectResults) Close() (err error) {
for i, sr := range ssr.selectResult {
err = sr.Close()
if err != nil {
return err
}
ssr.memTracker.Consume(-ssr.cachedChunks[i].MemoryUsage())
ssr.cachedChunks[i] = nil
}
return nil
}

// NewSerialSelectResults create a SelectResult which will read each SelectResult serially.
func NewSerialSelectResults(selectResults []SelectResult) SelectResult {
return &serialSelectResults{
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3352,6 +3352,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
byItems: ts.ByItems,
columns: ts.Columns,
paging: paging,
corColInFilter: b.corColInDistPlan(v.TablePlans),
Expand Down Expand Up @@ -3636,6 +3637,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
Expand Down
57 changes: 57 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
plannerutil "github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -192,6 +193,8 @@ type IndexReaderExecutor struct {

keepOrder bool
desc bool
// byItems only for partition table with orderBy + pushedLimit
byItems []*plannerutil.ByItems

corColInFilter bool
corColInAccess bool
Expand Down Expand Up @@ -293,6 +296,25 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
return e.open(ctx, kvRanges)
}

func (e *IndexReaderExecutor) buildKVReq(ctx context.Context, r []kv.KeyRange) (*kv.Request, error) {
var builder distsql.RequestBuilder
builder.SetKeyRanges(r).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetFromInfoSchema(e.ctx.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.netDataSize)).
SetConnID(e.ctx.GetSessionVars().ConnectionID)
kvReq, err := builder.Build()
return kvReq, err
}

func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
Expand Down Expand Up @@ -323,6 +345,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
slices.SortFunc(kvRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
<<<<<<< HEAD
var builder distsql.RequestBuilder
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
Expand All @@ -345,6 +368,40 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
if err != nil {
e.feedback.Invalidate()
return err
=======
// use sortedSelectResults only when byItems pushed down and partition numbers > 1
if e.byItems == nil || len(e.partitions) <= 1 {
kvReq, err := e.buildKVReq(ctx, kvRanges)
if err != nil {
e.feedback.Invalidate()
return err
}
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
e.feedback.Invalidate()
return err
}
} else {
kvReqs := make([]*kv.Request, 0, len(kvRanges))
for _, kvRange := range kvRanges {
kvReq, err := e.buildKVReq(ctx, []kv.KeyRange{kvRange})
if err != nil {
e.feedback.Invalidate()
return err
}
kvReqs = append(kvReqs, kvReq)
}
var results []distsql.SelectResult
for _, kvReq := range kvReqs {
result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
e.feedback.Invalidate()
return err
}
results = append(results, result)
}
e.result = distsql.NewSortedSelectResults(results, e.byItems, e.memTracker)
>>>>>>> cc56b21242 (executor: support mergeSort different selectResult in TableScan and IndexScan (#42024))
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions executor/distsqltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_test(
"//config",
"//kv",
"//meta/autoid",
"//sessionctx/variable",
"//testkit",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
17 changes: 8 additions & 9 deletions executor/distsqltest/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -61,18 +62,16 @@ func TestDistsqlPartitionTableConcurrency(t *testing.T) {
// 20-ranges-partitioned table checker
ctx3 := context.WithValue(context.Background(), "CheckSelectRequestHook", func(req *kv.Request) {
require.Equal(t, req.KeyRanges.PartitionNum(), 20)
require.Equal(t, req.Concurrency, 15)
require.Equal(t, req.Concurrency, variable.DefDistSQLScanConcurrency)
})
ctxs := []context.Context{ctx1, ctx2, ctx3}
for i, tbl := range []string{"t1", "t2", "t3"} {
ctx := ctxs[i]
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id asc limit 1", tbl)).
Check(testkit.Rows("0 0"))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id asc limit 5", tbl)).
Check(testkit.Rows("0 0", "50 50", "100 100", "150 150", "200 200"))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id desc limit 1", tbl)).
Check(testkit.Rows("950 950"))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id desc limit 5", tbl)).
Check(testkit.Rows("950 950", "900 900", "850 850", "800 800", "750 750"))
// If order by is added here, the concurrency is always equal to 1.
// Because we will use different kv.Request for each partition in TableReader.
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 1", tbl))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 5", tbl))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 1", tbl))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 5", tbl))
}
}
3 changes: 3 additions & 0 deletions executor/partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
)

func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, recursive bool, partitionIDs []int64) error {
if exec == nil {
return nil
}
var child *tipb.Executor
switch exec.Tp {
case tipb.ExecType_TypeTableScan:
Expand Down
Loading