Skip to content

Commit

Permalink
planner/core: always add projection to agg that is pushed to tiflash (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 committed Jul 19, 2021
1 parent b744e89 commit 3f09286
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 9 deletions.
4 changes: 2 additions & 2 deletions planner/core/rule_eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool {
// the align the output schema. In the future, we can solve this in-compatibility by
// passing down the aggregation mode to TiFlash.
if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
return false
}
}
}
if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
return false
}
Expand Down
12 changes: 10 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1896,9 +1896,17 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.AnyType}
newMpp := mpp.enforceExchangerImpl(prop)
attachPlan2Task(finalAgg, newMpp)
if proj != nil {
attachPlan2Task(proj, newMpp)
if proj == nil {
proj = PhysicalProjection{
Exprs: make([]expression.Expression, 0, len(p.Schema().Columns)),
}.Init(p.ctx, p.statsInfo(), p.SelectBlockOffset())
for _, col := range p.Schema().Columns {
proj.Exprs = append(proj.Exprs, col)
}
proj.SetSchema(p.schema)
}
attachPlan2Task(proj, newMpp)
newMpp.addCost(p.GetCost(inputRows, false, true))
return newMpp
default:
return invalidTask
Expand Down
1 change: 1 addition & 0 deletions planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@
"desc format = 'brief' select /*+hash_agg()*/ sum(id) from (select value, id from t where id > value group by id, value)A group by value /*the exchange should have only one partition column: test.t.value*/",
"desc format = 'brief' select /*+hash_agg()*/ sum(B.value) from t as B where B.id+1 > (select count(*) from t where t.id= B.id and t.value=B.value) group by B.id /*the exchange should have only one partition column: test.t.id*/",
"desc format = 'brief' select count(distinct value) from t",
"desc format = 'brief' select count(distinct x ) from (select count(distinct value) x from t) t",
"desc format = 'brief' select count(distinct value), count(value), avg(value) from t"
]
},
Expand Down
29 changes: 24 additions & 5 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -2167,11 +2167,30 @@
"Plan": [
"TableReader 1.00 root data:ExchangeSender",
"└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"
" └─Projection 1.00 batchCop[tiflash] Column#4",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select count(distinct x ) from (select count(distinct value) x from t) t",
"Plan": [
"TableReader 1.00 root data:ExchangeSender",
"└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection 1.00 batchCop[tiflash] Column#5",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct Column#4)->Column#5",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] group by:Column#4, ",
" └─Projection 1.00 batchCop[tiflash] Column#4",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"
]
},
{
Expand Down

0 comments on commit 3f09286

Please sign in to comment.