Skip to content

Commit

Permalink
planner: fix the duplicate offset when pushing topn down across outer…
Browse files Browse the repository at this point in the history
… join (#57471)

ref #56321
  • Loading branch information
winoros authored Nov 19, 2024
1 parent 058d947 commit 66c7571
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 21 deletions.
23 changes: 17 additions & 6 deletions pkg/planner/core/operator/logicalop/logical_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,19 +369,25 @@ func (p *LogicalJoin) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimi
if topNLogicalPlan != nil {
topN = topNLogicalPlan.(*LogicalTopN)
}
topnEliminated := false
switch p.JoinType {
case LeftOuterJoin, LeftOuterSemiJoin, AntiLeftOuterSemiJoin:
p.Children()[0] = p.pushDownTopNToChild(topN, 0, opt)
p.Children()[0], topnEliminated = p.pushDownTopNToChild(topN, 0, opt)
p.Children()[1] = p.Children()[1].PushDownTopN(nil, opt)
case RightOuterJoin:
p.Children()[1] = p.pushDownTopNToChild(topN, 1, opt)
p.Children()[1], topnEliminated = p.pushDownTopNToChild(topN, 1, opt)
p.Children()[0] = p.Children()[0].PushDownTopN(nil, opt)
default:
return p.BaseLogicalPlan.PushDownTopN(topN, opt)
}

// The LogicalJoin may be also a LogicalApply. So we must use self to set parents.
if topN != nil {
if topnEliminated && len(topN.ByItems) > 0 {
sort := LogicalSort{ByItems: topN.ByItems}.Init(p.SCtx(), p.QueryBlockOffset())
sort.SetChildren(p.Self())
return sort
}
return topN.AttachChild(p.Self(), opt)
}
return p.Self()
Expand Down Expand Up @@ -1078,20 +1084,23 @@ func (p *LogicalJoin) MergeSchema() {
}

// pushDownTopNToChild will push a topN to one child of join. The idx stands for join child index. 0 is for left child.
func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
// When it's outer join and there's unique key information. The TopN can be totally pushed down to the join.
// We just need reserve the ORDER informaion
func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, bool) {
if topN == nil {
return p.Children()[idx].PushDownTopN(nil, opt)
return p.Children()[idx].PushDownTopN(nil, opt), false
}

for _, by := range topN.ByItems {
cols := expression.ExtractColumns(by.Expr)
for _, col := range cols {
if !p.Children()[idx].Schema().Contains(col) {
return p.Children()[idx].PushDownTopN(nil, opt)
return p.Children()[idx].PushDownTopN(nil, opt), false
}
}
}
count, offset := topN.Count+topN.Offset, uint64(0)
selfEliminated := false
if p.JoinType == LeftOuterJoin {
innerChild := p.Children()[1]
innerJoinKey := make([]*expression.Column, 0, len(p.EqualConditions))
Expand All @@ -1108,6 +1117,7 @@ func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *optim
if innerChild.Schema().IsUnique(true, innerJoinKey...) ||
(!isNullEQ && innerChild.Schema().IsUnique(false, innerJoinKey...)) {
count, offset = topN.Count, topN.Offset
selfEliminated = true
}
} else if p.JoinType == RightOuterJoin {
innerChild := p.Children()[0]
Expand All @@ -1122,6 +1132,7 @@ func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *optim
if innerChild.Schema().IsUnique(true, innerJoinKey...) ||
(!isNullEQ && innerChild.Schema().IsUnique(false, innerJoinKey...)) {
count, offset = topN.Count, topN.Offset
selfEliminated = true
}
}

Expand All @@ -1135,7 +1146,7 @@ func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *optim
newTopN.ByItems[i] = topN.ByItems[i].Clone()
}
appendTopNPushDownJoinTraceStep(p, newTopN, idx, opt)
return p.Children()[idx].PushDownTopN(newTopN, opt)
return p.Children()[idx].PushDownTopN(newTopN, opt), selfEliminated
}

// Add a new selection between parent plan and current plan with candidate predicates
Expand Down
14 changes: 7 additions & 7 deletions pkg/planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,22 @@
"Join{DataScan(t)->DataScan(s)}->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->DataScan(s)}->Limit->Projection",
"DataScan(t)->Projection->TopN([Column#13 true],0,1)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->TopN([test.t.a],5,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.f test.t.g],5,5)->DataScan(s)}(test.t.f,test.t.f)(test.t.g,test.t.g)->TopN([test.t.f test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.g test.t.f],5,5)->DataScan(s)}(test.t.f,test.t.f)(test.t.g,test.t.g)->TopN([test.t.g test.t.f],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->Sort->Projection",
"Join{DataScan(t)->TopN([test.t.a],5,5)->DataScan(s)}(test.t.a,test.t.a)->Sort->Projection",
"Join{DataScan(t)->TopN([test.t.f test.t.g],5,5)->DataScan(s)}(test.t.f,test.t.f)(test.t.g,test.t.g)->Sort->Projection",
"Join{DataScan(t)->TopN([test.t.g test.t.f],5,5)->DataScan(s)}(test.t.f,test.t.f)(test.t.g,test.t.g)->Sort->Projection",
"Join{DataScan(t)->TopN([test.t.g],0,10)->DataScan(s)}(test.t.g,test.t.g)->TopN([test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.f test.t.g],0,10)->DataScan(s)}(test.t.g,test.t.g)->TopN([test.t.f test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.c test.t.d test.t.e],5,5)->DataScan(s)}(test.t.c,test.t.c)(test.t.d,test.t.d)(test.t.e,test.t.e)->TopN([test.t.c test.t.d test.t.e],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.c test.t.d test.t.e],5,5)->DataScan(s)}(test.t.c,test.t.c)(test.t.d,test.t.d)(test.t.e,test.t.e)->Sort->Projection",
"Join{DataScan(t)->TopN([test.t.c test.t.d],0,10)->DataScan(s)}(test.t.c,test.t.c)(test.t.d,test.t.d)->TopN([test.t.c test.t.d],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.b],0,10)->DataScan(s)}(test.t.b,test.t.b)->TopN([test.t.b],5,5)->Projection",
"Join{DataScan(t)->Limit->DataScan(s)}(test.t.a,test.t.a)->Limit->Projection",
"Join{DataScan(t)->Limit->DataScan(s)}(test.t.a,test.t.a)->Limit->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->Sort->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->DataScan(s)}(test.t.a,test.t.a)->TopN([Column#37],0,5)->Projection",
"Join{DataScan(t)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->DataScan(s)->TopN([test.t.a],0,5)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->DataScan(s)->TopN([test.t.a],0,5)}(test.t.a,test.t.a)->Sort->Projection",
"Join{DataScan(t)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a test.t.b],0,5)->Projection",
"UnionAll{DataScan(t)->TopN([test.t.a test.t.b],0,5)->Projection->DataScan(s)->TopN([test.t.a test.t.b],0,5)->Projection}->TopN([Column#25 Column#26],0,5)",
"UnionAll{DataScan(t)->TopN([test.t.a test.t.b],0,10)->Projection->DataScan(s)->TopN([test.t.a test.t.b],0,10)->Projection}->TopN([Column#25 Column#26],5,5)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,13 @@ LIMIT
240;
id estRows task access object operator info
Projection 41.67 root planner__core__casetest__predicate_simplification.dt.a, planner__core__casetest__predicate_simplification.dt.pk, planner__core__casetest__predicate_simplification.dt.b, planner__core__casetest__predicate_simplification.dt.c
└─Limit 41.67 root offset:0, count:240
└─IndexHashJoin 41.67 root left outer join, inner:TableReader, outer key:planner__core__casetest__predicate_simplification.it.pk, inner key:planner__core__casetest__predicate_simplification.dt.pk, equal cond:eq(planner__core__casetest__predicate_simplification.it.pk, planner__core__casetest__predicate_simplification.dt.pk), other cond:or(gt(planner__core__casetest__predicate_simplification.it.a, "a"), and(eq(planner__core__casetest__predicate_simplification.it.a, "a"), gt(planner__core__casetest__predicate_simplification.dt.pk, 1)))
├─Limit(Build) 33.33 root offset:0, count:240
│ └─IndexReader 33.33 root index:Limit
│ └─Limit 33.33 cop[tikv] offset:0, count:240
│ └─IndexRangeScan 33.33 cop[tikv] table:it, index:f(a, pk) range:("a" 1,"a" +inf], keep order:true, stats:pseudo
└─TableReader(Probe) 33.33 root data:TableRangeScan
└─TableRangeScan 33.33 cop[tikv] table:dt range: decided by [planner__core__casetest__predicate_simplification.it.pk], keep order:false, stats:pseudo
└─IndexHashJoin 41.67 root left outer join, inner:TableReader, outer key:planner__core__casetest__predicate_simplification.it.pk, inner key:planner__core__casetest__predicate_simplification.dt.pk, equal cond:eq(planner__core__casetest__predicate_simplification.it.pk, planner__core__casetest__predicate_simplification.dt.pk), other cond:or(gt(planner__core__casetest__predicate_simplification.it.a, "a"), and(eq(planner__core__casetest__predicate_simplification.it.a, "a"), gt(planner__core__casetest__predicate_simplification.dt.pk, 1)))
├─Limit(Build) 33.33 root offset:0, count:240
│ └─IndexReader 33.33 root index:Limit
│ └─Limit 33.33 cop[tikv] offset:0, count:240
│ └─IndexRangeScan 33.33 cop[tikv] table:it, index:f(a, pk) range:("a" 1,"a" +inf], keep order:true, stats:pseudo
└─TableReader(Probe) 33.33 root data:TableRangeScan
└─TableRangeScan 33.33 cop[tikv] table:dt range: decided by [planner__core__casetest__predicate_simplification.it.pk], keep order:false, stats:pseudo
explain format='brief' SELECT * FROM
(
SELECT
Expand Down

0 comments on commit 66c7571

Please sign in to comment.