Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: support inline projection for TopN | tidb-test=pr/2464 #58500

Merged
merged 18 commits into from
Jan 6, 2025
134 changes: 134 additions & 0 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,140 @@ func BenchmarkLimitExec(b *testing.B) {
}
}

type topNTestCase struct {
rows int
offset int
count int
orderByIdx []int
usingInlineProjection bool
columnIdxsUsedByChild []bool
ctx sessionctx.Context
}

func (tc topNTestCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 2, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}

func (tc topNTestCase) String() string {
return fmt.Sprintf("(rows:%v, offset:%v, count:%v, orderByIdx:%v, inline_projection:%v)",
tc.rows, tc.offset, tc.count, tc.orderByIdx, tc.usingInlineProjection)
}

func defaultTopNTestCase() *topNTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
return &topNTestCase{
rows: 100000,
offset: 0,
count: 10,
orderByIdx: []int{0},
usingInlineProjection: false,
columnIdxsUsedByChild: []bool{false, true, false},
ctx: ctx,
}
}

func benchmarkTopNExec(b *testing.B, cas *topNTestCase) {
opt := testutil.MockDataSourceParameters{
DataSchema: expression.NewSchema(cas.columns()...),
Rows: cas.rows,
Ctx: cas.ctx,
}
dataSource := testutil.BuildMockDataSource(opt)
executorSort := sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 4, dataSource),
ByItems: make([]*util.ByItems, 0, len(cas.orderByIdx)),
ExecSchema: dataSource.Schema(),
}
for _, idx := range cas.orderByIdx {
executorSort.ByItems = append(executorSort.ByItems, &util.ByItems{Expr: cas.columns()[idx]})
}

executor := &sortexec.TopNExec{
SortExec: executorSort,
Limit: &core.PhysicalLimit{
Count: uint64(cas.count),
Offset: uint64(cas.offset),
},
}

executor.ExecSchema = dataSource.Schema().Clone()

var exe exec.Executor
if cas.usingInlineProjection {
if len(cas.columnIdxsUsedByChild) > 0 {
executor.ColumnIdxsUsedByChild = make([]int, 0, len(cas.columnIdxsUsedByChild))
for i, used := range cas.columnIdxsUsedByChild {
if used {
executor.ColumnIdxsUsedByChild = append(executor.ColumnIdxsUsedByChild, i)
}
}
}
exe = executor
} else {
columns := cas.columns()
usedCols := make([]*expression.Column, 0, len(columns))
exprs := make([]expression.Expression, 0, len(columns))
for i, used := range cas.columnIdxsUsedByChild {
if used {
usedCols = append(usedCols, columns[i])
exprs = append(exprs, columns[i])
}
}
proj := &ProjectionExec{
BaseExecutorV2: exec.NewBaseExecutorV2(cas.ctx.GetSessionVars(), expression.NewSchema(usedCols...), 0, executor),
numWorkers: 1,
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
exe = proj
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
tmpCtx := context.Background()
chk := exec.NewFirstChunk(exe)
dataSource.PrepareChunks()

b.StartTimer()
if err := exe.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
if err := exe.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
if chk.NumRows() == 0 {
break
}
}

if err := exe.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkTopNExec(b *testing.B) {
b.ReportAllocs()
usingInlineProjection := []bool{false, true}

for _, inlineProjection := range usingInlineProjection {
cas := defaultTopNTestCase()
cas.usingInlineProjection = inlineProjection
b.Run(fmt.Sprintf("TopNExec InlineProjection:%v", inlineProjection), func(b *testing.B) {
benchmarkTopNExec(b, cas)
})
}
}

func BenchmarkReadLastLinesOfHugeLine(b *testing.B) {
// step 1. initial a huge line log file
hugeLine := make([]byte, 1024*1024*10)
Expand Down
44 changes: 43 additions & 1 deletion pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2489,11 +2489,27 @@ func (b *executorBuilder) buildTopN(v *plannercore.PhysicalTopN) exec.Executor {
ExecSchema: v.Schema(),
}
executor_metrics.ExecutorCounterTopNExec.Inc()
return &sortexec.TopNExec{
t := &sortexec.TopNExec{
SortExec: sortExec,
Limit: &plannercore.PhysicalLimit{Count: v.Count, Offset: v.Offset},
Concurrency: b.ctx.GetSessionVars().Concurrency.ExecutorConcurrency,
}
columnIdxsUsedByChild, columnMissing := retrieveColumnIdxsUsedByChild(v.Schema(), v.Children()[0].Schema())
if columnIdxsUsedByChild != nil && columnMissing {
// In the expected cases colMissing will never happen.
// However, suppose that childSchema contains generatedCol and is cloned by selfSchema.
// Then childSchema.generatedCol.UniqueID will not be equal to selfSchema.generatedCol.UniqueID.
// In this case, colMissing occurs, but it is not wrong.
// So here we cancel the inline projection, take all of columns from child.
// If the inline projection directly generates some error causes colMissing,
// notice that the error feedback given would be inaccurate.
columnIdxsUsedByChild = nil
// TODO: If there is valid verification logic, please uncomment the following code
// b.err = errors.Annotate(ErrBuildExecutor, "Inline projection occurs when `buildTopN` exectutor, columns should not missing in the child schema")
// return nil
}
t.ColumnIdxsUsedByChild = columnIdxsUsedByChild
return t
}

func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor {
Expand Down Expand Up @@ -3179,6 +3195,32 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
return e
}

// retrieveColumnIdxsUsedByChild retrieve column indices map from child physical plan schema columns.
//
// E.g. columnIdxsUsedByChild = [2, 3, 1] means child[col2, col3, col1] -> parent[col0, col1, col2].
// `columnMissing` indicates whether one or more columns in `selfSchema` are not found in `childSchema`.
// And `-1` in `columnIdxsUsedByChild` indicates the column not found.
// If columnIdxsUsedByChild == nil, means selfSchema and childSchema are equal.
func retrieveColumnIdxsUsedByChild(selfSchema *expression.Schema, childSchema *expression.Schema) ([]int, bool) {
equalSchema := (selfSchema.Len() == childSchema.Len())
columnMissing := false
columnIdxsUsedByChild := make([]int, 0, selfSchema.Len())
for selfIdx, selfCol := range selfSchema.Columns {
colIdxInChild := childSchema.ColumnIndex(selfCol)
if !columnMissing && colIdxInChild == -1 {
columnMissing = true
}
if equalSchema && selfIdx != colIdxInChild {
equalSchema = false
}
columnIdxsUsedByChild = append(columnIdxsUsedByChild, colIdxInChild)
}
if equalSchema {
columnIdxsUsedByChild = nil
}
return columnIdxsUsedByChild, columnMissing
}

// markChildrenUsedCols compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]int) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/executor/sortexec/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type TopNExec struct {
isSpillTriggeredInStage2ForTest bool

Concurrency int

// ColumnIdxsUsedByChild keep column indexes of child executor used for inline projection
ColumnIdxsUsedByChild []int
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -240,7 +243,12 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error {
if !ok || row.err != nil {
return row.err
}
req.AppendRow(row.row)
// Be carefule, if inline projection occurs.
EricZequan marked this conversation as resolved.
Show resolved Hide resolved
// TopN's schema may be not match child executor's output columns.
// We should extract only the required columns from child's executor.
// Do not do it on `loadChunksUntilTotalLimit` or `processChildChk`,
// cauz it may destroy the correctness of executor's `keyColumns`.
req.AppendRowsByColIdxs([]chunk.Row{row.row}, e.ColumnIdxsUsedByChild)
}
}
return nil
Expand Down
10 changes: 8 additions & 2 deletions pkg/executor/sortexec/topn_chunk_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type topNChunkHeap struct {
func (h *topNChunkHeap) init(topnExec *TopNExec, memTracker *memory.Tracker, totalLimit uint64, idx int, greaterRow func(chunk.Row, chunk.Row) bool, fieldTypes []*types.FieldType) {
h.memTracker = memTracker

h.rowChunks = chunk.NewList(exec.RetTypes(topnExec), topnExec.InitCap(), topnExec.MaxChunkSize())
// Row size of new chunk list may not be enough to hold the result set from child executor when inline projection occurs.
// To avoid this problem, we use child executor's schmea to build new chunk list by default.
ch := topnExec.Children(0)
h.rowChunks = chunk.NewList(exec.RetTypes(ch), ch.InitCap(), ch.MaxChunkSize())
h.rowChunks.GetMemTracker().AttachTo(h.memTracker)
h.rowChunks.GetMemTracker().SetLabel(memory.LabelForRowChunks)

Expand Down Expand Up @@ -112,7 +115,10 @@ func (h *topNChunkHeap) processChk(chk *chunk.Chunk) {
// but we want descending top N, then we will keep all data in memory.
// But if data is distributed randomly, this function will be called log(n) times.
func (h *topNChunkHeap) doCompaction(topnExec *TopNExec) error {
newRowChunks := chunk.NewList(exec.RetTypes(topnExec), topnExec.InitCap(), topnExec.MaxChunkSize())
// Row size of new chunk list may not be enough to hold the result set from child executor when inline projection occurs.
// To avoid this problem, we use child executor's schmea to build new chunk list by default.
ch := topnExec.Children(0)
newRowChunks := chunk.NewList(exec.RetTypes(ch), ch.InitCap(), ch.MaxChunkSize())
newRowPtrs := make([]chunk.RowPtr, 0, h.rowChunks.Len())
for _, rowPtr := range h.rowPtrs {
newRowPtr := newRowChunks.AppendRow(h.rowChunks.GetRow(rowPtr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,11 @@
"explain format = 'brief' SELECT a FROM t WHERE b = 2 and c > 0 ORDER BY a limit 1"
],
"Plan": [
"Projection 1.00 root test.t.a",
"└─TopN 1.00 root test.t.a, offset:0, count:1",
" └─IndexReader 1.00 root index:TopN",
" └─TopN 1.00 cop[tikv] test.t.a, offset:0, count:1",
" └─Selection 6.00 cop[tikv] gt(test.t.c, 0)",
" └─IndexRangeScan 6.00 cop[tikv] table:t, index:idx(b, d, a, c) range:[2,2], keep order:false"
"TopN 1.00 root test.t.a, offset:0, count:1",
"└─IndexReader 1.00 root index:TopN",
" └─TopN 1.00 cop[tikv] test.t.a, offset:0, count:1",
" └─Selection 6.00 cop[tikv] gt(test.t.c, 0)",
" └─IndexRangeScan 6.00 cop[tikv] table:t, index:idx(b, d, a, c) range:[2,2], keep order:false"
]
}
]
Expand Down
22 changes: 11 additions & 11 deletions pkg/planner/core/casetest/dag/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
},
{
"SQL": "select c from t where t.c = 1 and t.e = 1 order by t.d limit 1",
"Best": "IndexReader(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit)->Limit->Projection"
"Best": "IndexReader(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit)->Limit"
},
{
"SQL": "select c from t order by t.a limit 1",
"Best": "TableReader(Table(t)->Limit)->Limit->Projection"
"Best": "TableReader(Table(t)->Limit)->Limit"
},
{
"SQL": "select c from t order by t.a + t.b limit 1",
"Best": "TableReader(Table(t)->TopN([plus(test.t.a, test.t.b)],0,1))->Projection->TopN([Column#14],0,1)->Projection->Projection"
"Best": "TableReader(Table(t)->TopN([plus(test.t.a, test.t.b)],0,1))->Projection->TopN([Column#14],0,1)->Projection"
},
{
"SQL": "select c from t limit 1",
Expand Down Expand Up @@ -92,11 +92,11 @@
},
{
"SQL": "select c from t where t.c = 1 and t.a > 1 order by t.d limit 1",
"Best": "IndexReader(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit)->Limit->Projection"
"Best": "IndexReader(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit)->Limit"
},
{
"SQL": "select c from t where t.c = 1 and t.d = 1 order by t.a limit 1",
"Best": "IndexReader(Index(t.c_d_e)[[1 1,1 1]]->TopN([test.t.a],0,1))->TopN([test.t.a],0,1)->Projection"
"Best": "IndexReader(Index(t.c_d_e)[[1 1,1 1]]->TopN([test.t.a],0,1))->TopN([test.t.a],0,1)"
},
{
"SQL": "select * from t where t.c = 1 and t.a > 1 order by t.d limit 1",
Expand Down Expand Up @@ -451,7 +451,7 @@
},
{
"SQL": "delete from t where b < 1 order by d limit 1",
"Best": "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->Projection->Delete",
"Best": "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->Delete",
"Hints": "use_index(@`del_1` `test`.`t` ), no_order_index(@`del_1` `test`.`t` `primary`), limit_to_cop(@`del_1`)"
},
{
Expand Down Expand Up @@ -626,7 +626,7 @@
},
{
"SQL": "select count(*) from t group by g order by g limit 10",
"Best": "IndexReader(Index(t.g)[[NULL,+inf]]->StreamAgg)->StreamAgg->Limit->Projection"
"Best": "IndexReader(Index(t.g)[[NULL,+inf]]->StreamAgg)->StreamAgg->Limit"
},
{
"SQL": "select count(*) from t group by g limit 10",
Expand All @@ -638,11 +638,11 @@
},
{
"SQL": "select count(*) from t group by g order by g desc limit 1",
"Best": "IndexReader(Index(t.g)[[NULL,+inf]]->StreamAgg)->StreamAgg->Limit->Projection"
"Best": "IndexReader(Index(t.g)[[NULL,+inf]]->StreamAgg)->StreamAgg->Limit"
},
{
"SQL": "select count(*) from t group by b order by b limit 10",
"Best": "TableReader(Table(t)->HashAgg)->HashAgg->TopN([test.t.b],0,10)->Projection"
"Best": "TableReader(Table(t)->HashAgg)->HashAgg->TopN([test.t.b],0,10)"
},
{
"SQL": "select count(*) from t group by b order by b",
Expand All @@ -658,11 +658,11 @@
},
{
"SQL": "select /*+ tidb_inlj(a,b) */ sum(a.g), sum(b.g) from t a join t b on a.g = b.g and a.g > 60 group by a.g order by a.g limit 1",
"Best": "IndexJoin{IndexReader(Index(t.g)[(60,+inf]])->IndexReader(Index(t.g)[[NULL,NULL]]->Sel([gt(test.t.g, 60)]))}(test.t.g,test.t.g)->Projection->StreamAgg->Limit->Projection"
"Best": "IndexJoin{IndexReader(Index(t.g)[(60,+inf]])->IndexReader(Index(t.g)[[NULL,NULL]]->Sel([gt(test.t.g, 60)]))}(test.t.g,test.t.g)->Projection->StreamAgg->Limit"
},
{
"SQL": "select sum(a.g), sum(b.g) from t a join t b on a.g = b.g and a.a>5 group by a.g order by a.g limit 1",
"Best": "MergeInnerJoin{IndexReader(Index(t.g)[[NULL,+inf]]->Sel([gt(test.t.a, 5)]))->IndexReader(Index(t.g)[[NULL,+inf]])}(test.t.g,test.t.g)->Projection->StreamAgg->Limit->Projection"
"Best": "MergeInnerJoin{IndexReader(Index(t.g)[[NULL,+inf]]->Sel([gt(test.t.a, 5)]))->IndexReader(Index(t.g)[[NULL,+inf]])}(test.t.g,test.t.g)->Projection->StreamAgg->Limit"
},
{
"SQL": "select sum(d) from t",
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/vectorsearch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
data = glob(["testdata/**"]),
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,33 @@
"explain select id, vec_cosine_distance(vec, '[1,1,1]') as d, a, b from t1 order by d limit 10",
"explain select id, a, b, vec_cosine_distance(vec, '[1,1,1]') as d from t1 order by d limit 10"
]
},
{
"name": "TestVectorSearchHeavyFunction",
"cases": [
"explain select id from t1 order by vec_cosine_distance(vec, '[1,1,1]') limit 10",
"explain select id from t1 order by vec_l1_distance(vec, '[1,1,1]') limit 10",
"explain select id from t1 order by vec_l2_distance(vec, '[1,1,1]') limit 10",
"explain select id from t1 order by vec_negative_inner_product(vec, '[1,1,1]') limit 10",
"explain select id from t1 order by vec_dims(vec) limit 10",
"explain select id from t1 order by vec_l2_norm(vec) limit 10",
"explain select id from t1 order by MOD(a, 3) limit 10",

"explain select id, vec_cosine_distance(vec, '[1,1,1]') as d from t1 order by d limit 10",
"explain select id, vec_l1_distance(vec, '[1,1,1]') as d from t1 order by d limit 10",
"explain select id, vec_l2_distance(vec, '[1,1,1]') as d from t1 order by d limit 10",
"explain select id, vec_negative_inner_product(vec, '[1,1,1]') as d from t1 order by d limit 10",
"explain select id, vec_dims(vec) as d from t1 order by d limit 10",
"explain select id, vec_l2_norm(vec) as d from t1 order by d limit 10",
"explain select id, MOD(a, 3) as d from t1 order by d limit 10",

"explain select * from t1 order by vec_cosine_distance(vec, '[1,1,1]') limit 10",
"explain select * from t1 order by vec_l1_distance(vec, '[1,1,1]') limit 10",
"explain select * from t1 order by vec_l2_distance(vec, '[1,1,1]') limit 10",
"explain select * from t1 order by vec_negative_inner_product(vec, '[1,1,1]') limit 10",
"explain select * from t1 order by vec_dims(vec) limit 10",
"explain select * from t1 order by vec_l2_norm(vec) limit 10",
"explain select * from t1 order by MOD(a, 3) limit 10"
]
}
]
Loading