Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support trace for min/max eliminate #30441

Merged
merged 3 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions planner/core/logical_plan_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,36 @@ func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
},
},
},
{
sql: "select max(a)-min(a) from t",
flags: []uint64{flagBuildKeyInfo, flagPrunColumns, flagMaxMinEliminate},
assertRuleName: "max_min_eliminate",
assertRuleSteps: []assertTraceStep{
{
assertAction: "add sort[8],add limit[9] during eliminate agg[4] max function",
assertReason: "agg[4] has only one function[max] without group by, the columns in agg[4] should be sorted",
},
{
assertAction: "add sort[10],add limit[11] during eliminate agg[6] min function",
assertReason: "agg[6] has only one function[min] without group by, the columns in agg[6] should be sorted",
},
{
assertAction: "agg[2] splited into aggs[4,6], and add joins[12] as their parent during eliminate agg[2] multi min/max functions",
assertReason: "each column is sorted and has index in agg[4,6] and none of them has group by statement",
},
},
},
{
sql: "select max(f) from t",
flags: []uint64{flagBuildKeyInfo, flagPrunColumns, flagMaxMinEliminate},
assertRuleName: "max_min_eliminate",
assertRuleSteps: []assertTraceStep{
{
assertAction: "add sort[4],add limit[5] during eliminate agg[2] max function",
assertReason: "agg[2] has only one function[max] without group by, the columns in agg[2] should be sorted",
},
},
},
}

for i, tc := range tt {
Expand Down
90 changes: 78 additions & 12 deletions planner/core/rule_max_min_eliminate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package core

import (
"bytes"
"context"
"fmt"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -35,20 +37,23 @@ type maxMinEliminator struct {
}

func (a *maxMinEliminator) optimize(ctx context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, error) {
return a.eliminateMaxMin(p), nil
return a.eliminateMaxMin(p, opt), nil
}

// composeAggsByInnerJoin composes the scalar aggregations by cartesianJoin.
func (a *maxMinEliminator) composeAggsByInnerJoin(aggs []*LogicalAggregation) (plan LogicalPlan) {
func (a *maxMinEliminator) composeAggsByInnerJoin(originAgg *LogicalAggregation, aggs []*LogicalAggregation, opt *logicalOptimizeOp) (plan LogicalPlan) {
plan = aggs[0]
sctx := plan.SCtx()
joins := make([]*LogicalJoin, 0)
for i := 1; i < len(aggs); i++ {
join := LogicalJoin{JoinType: InnerJoin}.Init(sctx, plan.SelectBlockOffset())
join.SetChildren(plan, aggs[i])
join.schema = buildLogicalJoinSchema(InnerJoin, join)
join.cartesianJoin = true
plan = join
joins = append(joins, join)
}
appendEliminateMultiMinMaxTraceStep(originAgg, aggs, joins, opt)
return
}

Expand Down Expand Up @@ -132,7 +137,7 @@ func (a *maxMinEliminator) cloneSubPlans(plan LogicalPlan) LogicalPlan {
// `select max(a) from t` + `select min(a) from t` + `select max(b) from t`.
// Then we check whether `a` and `b` have indices. If any of the used column has no index, we cannot eliminate
// this aggregation.
func (a *maxMinEliminator) splitAggFuncAndCheckIndices(agg *LogicalAggregation) (aggs []*LogicalAggregation, canEliminate bool) {
func (a *maxMinEliminator) splitAggFuncAndCheckIndices(agg *LogicalAggregation, opt *logicalOptimizeOp) (aggs []*LogicalAggregation, canEliminate bool) {
for _, f := range agg.AggFuncs {
// We must make sure the args of max/min is a simple single column.
col, ok := f.Args[0].(*expression.Column)
Expand All @@ -158,16 +163,18 @@ func (a *maxMinEliminator) splitAggFuncAndCheckIndices(agg *LogicalAggregation)
}

// eliminateSingleMaxMin tries to convert a single max/min to Limit+Sort operators.
func (a *maxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation) *LogicalAggregation {
func (a *maxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation, opt *logicalOptimizeOp) *LogicalAggregation {
f := agg.AggFuncs[0]
child := agg.Children()[0]
ctx := agg.SCtx()

var sel *LogicalSelection
var sort *LogicalSort
// If there's no column in f.GetArgs()[0], we still need limit and read data from real table because the result should be NULL if the input is empty.
if len(expression.ExtractColumns(f.Args[0])) > 0 {
// If it can be NULL, we need to filter NULL out first.
if !mysql.HasNotNullFlag(f.Args[0].GetType().Flag) {
sel := LogicalSelection{}.Init(ctx, agg.blockOffset)
sel = LogicalSelection{}.Init(ctx, agg.blockOffset)
isNullFunc := expression.NewFunctionInternal(ctx, ast.IsNull, types.NewFieldType(mysql.TypeTiny), f.Args[0])
notNullFunc := expression.NewFunctionInternal(ctx, ast.UnaryNot, types.NewFieldType(mysql.TypeTiny), isNullFunc)
sel.Conditions = []expression.Expression{notNullFunc}
Expand All @@ -179,7 +186,7 @@ func (a *maxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation) *Logic
// For max function, the sort order should be desc.
desc := f.Name == ast.AggFuncMax
// Compose Sort operator.
sort := LogicalSort{}.Init(ctx, agg.blockOffset)
sort = LogicalSort{}.Init(ctx, agg.blockOffset)
sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: f.Args[0], Desc: desc})
sort.SetChildren(child)
child = sort
Expand All @@ -192,14 +199,15 @@ func (a *maxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation) *Logic
// If no data in the child, we need to return NULL instead of empty. This cannot be done by sort and limit themselves.
// Since now there would be at most one row returned, the remained agg operator is not expensive anymore.
agg.SetChildren(li)
appendEliminateSingleMaxMinTrace(agg, sel, sort, li, opt)
return agg
}

// eliminateMaxMin tries to convert max/min to Limit+Sort operators.
func (a *maxMinEliminator) eliminateMaxMin(p LogicalPlan) LogicalPlan {
func (a *maxMinEliminator) eliminateMaxMin(p LogicalPlan, opt *logicalOptimizeOp) LogicalPlan {
newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
newChildren = append(newChildren, a.eliminateMaxMin(child))
newChildren = append(newChildren, a.eliminateMaxMin(child, opt))
}
p.SetChildren(newChildren...)
if agg, ok := p.(*LogicalAggregation); ok {
Expand All @@ -222,22 +230,80 @@ func (a *maxMinEliminator) eliminateMaxMin(p LogicalPlan) LogicalPlan {
if len(agg.AggFuncs) == 1 {
// If there is only one aggFunc, we don't need to guarantee that the child of it is a data
// source, or whether the sort can be eliminated. This transformation won't be worse than previous.
return a.eliminateSingleMaxMin(agg)
return a.eliminateSingleMaxMin(agg, opt)
}
// If we have more than one aggFunc, we can eliminate this agg only if all of the aggFuncs can benefit from
// their column's index.
aggs, canEliminate := a.splitAggFuncAndCheckIndices(agg)
aggs, canEliminate := a.splitAggFuncAndCheckIndices(agg, opt)
if !canEliminate {
return agg
}
for i := range aggs {
aggs[i] = a.eliminateSingleMaxMin(aggs[i])
aggs[i] = a.eliminateSingleMaxMin(aggs[i], opt)
}
return a.composeAggsByInnerJoin(aggs)
return a.composeAggsByInnerJoin(agg, aggs, opt)
}
return p
}

func (*maxMinEliminator) name() string {
return "max_min_eliminate"
}

func appendEliminateSingleMaxMinTrace(agg *LogicalAggregation, sel *LogicalSelection, sort *LogicalSort, limit *LogicalLimit, opt *logicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString("")
if sel != nil {
buffer.WriteString(fmt.Sprintf("add selection[%v],", sel.ID()))
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}
if sort != nil {
buffer.WriteString(fmt.Sprintf("add sort[%v],", sort.ID()))
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}
buffer.WriteString(fmt.Sprintf("add limit[%v] during eliminate agg[%v] %s function", limit.ID(), agg.ID(), agg.AggFuncs[0].Name))
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
return buffer.String()
}()
reason := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("agg[%v] has only one function[%s] without group by", agg.ID(), agg.AggFuncs[0].Name))
if sel != nil {
buffer.WriteString(fmt.Sprintf(", the columns in agg[%v] shouldn't be NULL and needs to be filer NULL out", agg.ID()))
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}
if sort != nil {
buffer.WriteString(fmt.Sprintf(", the columns in agg[%v] should be sorted", agg.ID()))
}
return buffer.String()
}()
opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

func appendEliminateMultiMinMaxTraceStep(originAgg *LogicalAggregation, aggs []*LogicalAggregation, joins []*LogicalJoin, opt *logicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("agg[%v] splited into aggs[", originAgg.ID()))
for i, agg := range aggs {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("%v", agg.ID()))
}
buffer.WriteString("], and add joins[")
for i, join := range joins {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("%v", join.ID()))
}
buffer.WriteString(fmt.Sprintf("] as their parent during eliminate agg[%v] multi min/max functions", originAgg.ID()))
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
return buffer.String()
}()
reason := func() string {
buffer := bytes.NewBufferString("each column is sorted and has index in agg[")
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
for i, agg := range aggs {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("%v", agg.ID()))
}
buffer.WriteString("] and none of them has group by statement")
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
return buffer.String()
}()
opt.appendStepToCurrent(originAgg.ID(), originAgg.TP(), reason, action)
}