Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support trace topn push down #30800

Merged
merged 5 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions planner/core/logical_plan_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,52 @@ func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
assertRuleName string
assertRuleSteps []assertTraceStep
}{
{
sql: "select * from t as t1 left join t as t2 on t1.a = t2.a order by t1.a limit 10;",
flags: []uint64{flagPrunColumns, flagBuildKeyInfo, flagPushDownTopN},
assertRuleName: "topn_push_down",
assertRuleSteps: []assertTraceStep{
{
assertAction: "Limit_6 is converted into TopN_7",
assertReason: "",
},
{
assertAction: "Sort_5 passes ByItems[test.t.a] to TopN_7",
assertReason: "TopN_7 is Limit originally",
},
{
assertAction: "TopN_8 is added and pushed into Join_3's left table",
assertReason: "Join_3's joinType is left outer join, and all ByItems[test.t.a] contained in left table",
},
{
assertAction: "TopN_8 is added as DataSource_1's parent",
assertReason: "TopN is pushed down",
},
{
assertAction: "TopN_7 is added as Join_3's parent",
assertReason: "TopN is pushed down",
},
},
},
{
sql: "select * from t order by a limit 10",
flags: []uint64{flagPrunColumns, flagBuildKeyInfo, flagPushDownTopN},
assertRuleName: "topn_push_down",
assertRuleSteps: []assertTraceStep{
{
assertAction: "Limit_4 is converted into TopN_5",
assertReason: "",
},
{
assertAction: "Sort_3 passes ByItems[test.t.a] to TopN_5",
assertReason: "TopN_5 is Limit originally",
},
{
assertAction: "TopN_5 is added as DataSource_1's parent",
assertReason: "TopN is pushed down",
},
},
},
{
sql: "select * from pt3 where ptn > 3;",
flags: []uint64{flagPartitionProcessor, flagPredicatePushDown, flagBuildKeyInfo, flagPrunColumns},
Expand Down
2 changes: 1 addition & 1 deletion planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ type LogicalPlan interface {
BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema)

// pushDownTopN will push down the topN or limit operator during logical optimization.
pushDownTopN(topN *LogicalTopN) LogicalPlan
pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan

// recursiveDeriveStats derives statistic info between plans.
recursiveDeriveStats(colGroups [][]*expression.Column) (*property.StatsInfo, error)
Expand Down
133 changes: 100 additions & 33 deletions planner/core/rule_topn_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package core

import (
"bytes"
"context"
"fmt"

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/expression"
Expand All @@ -27,22 +29,22 @@ type pushDownTopNOptimizer struct {
}

func (s *pushDownTopNOptimizer) optimize(ctx context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, error) {
return p.pushDownTopN(nil), nil
return p.pushDownTopN(nil, opt), nil
}

func (s *baseLogicalPlan) pushDownTopN(topN *LogicalTopN) LogicalPlan {
func (s *baseLogicalPlan) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan {
p := s.self
for i, child := range p.Children() {
p.Children()[i] = child.pushDownTopN(nil)
p.Children()[i] = child.pushDownTopN(nil, opt)
}
if topN != nil {
return topN.setChild(p)
return topN.setChild(p, opt)
}
return p
}

// setChild set p as topn's child.
func (lt *LogicalTopN) setChild(p LogicalPlan) LogicalPlan {
func (lt *LogicalTopN) setChild(p LogicalPlan, opt *logicalOptimizeOp) LogicalPlan {
// Remove this TopN if its child is a TableDual.
dual, isDual := p.(*LogicalTableDual)
if isDual {
Expand All @@ -62,57 +64,67 @@ func (lt *LogicalTopN) setChild(p LogicalPlan) LogicalPlan {
limitHints: lt.limitHints,
}.Init(lt.ctx, lt.blockOffset)
limit.SetChildren(p)
appendTopNPushDownTraceStep(limit, p, opt)
return limit
}
// Then lt must be topN.
lt.SetChildren(p)
appendTopNPushDownTraceStep(lt, p, opt)
return lt
}

func (ls *LogicalSort) pushDownTopN(topN *LogicalTopN) LogicalPlan {
func (ls *LogicalSort) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan {
if topN == nil {
return ls.baseLogicalPlan.pushDownTopN(nil)
return ls.baseLogicalPlan.pushDownTopN(nil, opt)
} else if topN.isLimit() {
topN.ByItems = ls.ByItems
return ls.children[0].pushDownTopN(topN)
appendSortPassByItemsTraceStep(ls, topN, opt)
return ls.children[0].pushDownTopN(topN, opt)
}
// If a TopN is pushed down, this sort is useless.
return ls.children[0].pushDownTopN(topN)
return ls.children[0].pushDownTopN(topN, opt)
}

func (p *LogicalLimit) convertToTopN() *LogicalTopN {
return LogicalTopN{Offset: p.Offset, Count: p.Count, limitHints: p.limitHints}.Init(p.ctx, p.blockOffset)
func (p *LogicalLimit) convertToTopN(opt *logicalOptimizeOp) *LogicalTopN {
topn := LogicalTopN{Offset: p.Offset, Count: p.Count, limitHints: p.limitHints}.Init(p.ctx, p.blockOffset)
opt.appendStepToCurrent(topn.ID(), topn.TP(), "", fmt.Sprintf("%v_%v is converted into %v_%v",
p.TP(), p.ID(), topn.TP(), topn.ID()))
return topn
}

func (p *LogicalLimit) pushDownTopN(topN *LogicalTopN) LogicalPlan {
child := p.children[0].pushDownTopN(p.convertToTopN())
func (p *LogicalLimit) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan {
child := p.children[0].pushDownTopN(p.convertToTopN(opt), opt)
if topN != nil {
return topN.setChild(child)
return topN.setChild(child, opt)
}
return child
}

func (p *LogicalUnionAll) pushDownTopN(topN *LogicalTopN) LogicalPlan {
func (p *LogicalUnionAll) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan {
for i, child := range p.children {
var newTopN *LogicalTopN
if topN != nil {
newTopN = LogicalTopN{Count: topN.Count + topN.Offset, limitHints: topN.limitHints}.Init(p.ctx, topN.blockOffset)
for _, by := range topN.ByItems {
newTopN.ByItems = append(newTopN.ByItems, &util.ByItems{Expr: by.Expr, Desc: by.Desc})
}
// newTopN to push down Union's child
opt.appendStepToCurrent(newTopN.ID(), newTopN.TP(), "",
fmt.Sprintf("%v_%v is added and pushed down across %v_%v",
newTopN.TP(), newTopN.ID(), p.TP(), p.ID()))
}
p.children[i] = child.pushDownTopN(newTopN)
p.children[i] = child.pushDownTopN(newTopN, opt)
}
if topN != nil {
return topN.setChild(p)
return topN.setChild(p, opt)
}
return p
}

func (p *LogicalProjection) pushDownTopN(topN *LogicalTopN) LogicalPlan {
func (p *LogicalProjection) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan {
for _, expr := range p.Exprs {
if expression.HasAssignSetVarFunc(expr) {
return p.baseLogicalPlan.pushDownTopN(topN)
return p.baseLogicalPlan.pushDownTopN(topN, opt)
}
}
if topN != nil {
Expand All @@ -128,28 +140,28 @@ func (p *LogicalProjection) pushDownTopN(topN *LogicalTopN) LogicalPlan {
}
}
}
p.children[0] = p.children[0].pushDownTopN(topN)
p.children[0] = p.children[0].pushDownTopN(topN, opt)
return p
}

func (p *LogicalLock) pushDownTopN(topN *LogicalTopN) LogicalPlan {
func (p *LogicalLock) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan {
if topN != nil {
p.children[0] = p.children[0].pushDownTopN(topN)
p.children[0] = p.children[0].pushDownTopN(topN, opt)
}
return p.self
}

// pushDownTopNToChild will push a topN to one child of join. The idx stands for join child index. 0 is for left child.
func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int) LogicalPlan {
func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *logicalOptimizeOp) LogicalPlan {
if topN == nil {
return p.children[idx].pushDownTopN(nil)
return p.children[idx].pushDownTopN(nil, opt)
}

for _, by := range topN.ByItems {
cols := expression.ExtractColumns(by.Expr)
for _, col := range cols {
if !p.children[idx].Schema().Contains(col) {
return p.children[idx].pushDownTopN(nil)
return p.children[idx].pushDownTopN(nil, opt)
}
}
}
Expand All @@ -162,28 +174,83 @@ func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int) LogicalPla
for i := range topN.ByItems {
newTopN.ByItems[i] = topN.ByItems[i].Clone()
}
return p.children[idx].pushDownTopN(newTopN)
appendTopNPushDownJoinTraceStep(p, newTopN, idx, opt)
return p.children[idx].pushDownTopN(newTopN, opt)
}

func (p *LogicalJoin) pushDownTopN(topN *LogicalTopN) LogicalPlan {
func (p *LogicalJoin) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan {
switch p.JoinType {
case LeftOuterJoin, LeftOuterSemiJoin, AntiLeftOuterSemiJoin:
p.children[0] = p.pushDownTopNToChild(topN, 0)
p.children[1] = p.children[1].pushDownTopN(nil)
p.children[0] = p.pushDownTopNToChild(topN, 0, opt)
p.children[1] = p.children[1].pushDownTopN(nil, opt)
case RightOuterJoin:
p.children[1] = p.pushDownTopNToChild(topN, 1)
p.children[0] = p.children[0].pushDownTopN(nil)
p.children[1] = p.pushDownTopNToChild(topN, 1, opt)
p.children[0] = p.children[0].pushDownTopN(nil, opt)
default:
return p.baseLogicalPlan.pushDownTopN(topN)
return p.baseLogicalPlan.pushDownTopN(topN, opt)
}

// The LogicalJoin may be also a LogicalApply. So we must use self to set parents.
if topN != nil {
return topN.setChild(p.self)
return topN.setChild(p.self, opt)
}
return p.self
}

func (*pushDownTopNOptimizer) name() string {
return "topn_push_down"
}

func appendTopNPushDownTraceStep(parent LogicalPlan, child LogicalPlan, opt *logicalOptimizeOp) {
action := fmt.Sprintf("%v_%v is added as %v_%v's parent", parent.TP(), parent.ID(), child.TP(), child.ID())
reason := fmt.Sprintf("%v is pushed down", parent.TP())
opt.appendStepToCurrent(parent.ID(), parent.TP(), reason, action)
}

func appendTopNPushDownJoinTraceStep(p *LogicalJoin, topN *LogicalTopN, idx int, opt *logicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v is added and pushed into %v_%v's ",
topN.TP(), topN.ID(), p.TP(), p.ID()))
if idx == 0 {
buffer.WriteString("left ")
} else {
buffer.WriteString("right ")
}
buffer.WriteString("table")
return buffer.String()
}()
reason := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v's joinType is %v, and all ByItems[", p.TP(), p.ID(), p.JoinType.String()))
for i, item := range topN.ByItems {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(item.String())
}
buffer.WriteString("] contained in ")
if idx == 0 {
buffer.WriteString("left ")
} else {
buffer.WriteString("right ")
}
buffer.WriteString("table")
return buffer.String()
}()
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
}

func appendSortPassByItemsTraceStep(sort *LogicalSort, topN *LogicalTopN, opt *logicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v passes ByItems[", sort.TP(), sort.ID()))
for i, item := range sort.ByItems {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(item.String())
}
buffer.WriteString(fmt.Sprintf("] to %v_%v", topN.TP(), topN.ID()))
return buffer.String()
}()
reason := fmt.Sprintf("%v_%v is Limit originally", topN.TP(), topN.ID())
opt.appendStepToCurrent(sort.ID(), sort.TP(), reason, action)
}