From 4058c5d8838775c77be6a159a9f2a648a6244bfe Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 30 Jun 2021 14:55:26 +0800 Subject: [PATCH 1/3] cherry pick #25662 to release-5.0 Signed-off-by: ti-srebot --- planner/core/exhaust_physical_plans.go | 6 ++++- planner/core/fragment.go | 7 +++++ planner/core/physical_plans.go | 2 ++ planner/core/task.go | 26 +++++++++++++++++++ .../testdata/integration_serial_suite_in.json | 4 ++- .../integration_serial_suite_out.json | 25 ++++++++++++++++++ 6 files changed, 68 insertions(+), 2 deletions(-) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index dedf980ee063f..3e955c5848ec3 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2418,7 +2418,11 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64} agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.schema.Clone()) - agg.MppRunMode = MppTiDB + if la.HasDistinct() { + agg.MppRunMode = MppScalar + } else { + agg.MppRunMode = MppTiDB + } hashAggs = append(hashAggs, agg) } return diff --git a/planner/core/fragment.go b/planner/core/fragment.go index f329374d853f5..3f465cfabdc02 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -40,6 +41,8 @@ type Fragment struct { ExchangeSender *PhysicalExchangeSender // data exporter IsRoot bool + + singleton bool // indicates if this is a task running on a single node. } type tasksAndFrags struct { @@ -121,6 +124,7 @@ func (f *Fragment) init(p PhysicalPlan) error { } f.TableScan = x case *PhysicalExchangeReceiver: + f.singleton = x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough f.ExchangeReceivers = append(f.ExchangeReceivers, x) case *PhysicalUnionAll: return errors.New("unexpected union all detected") @@ -246,6 +250,9 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv for _, r := range f.ExchangeReceivers { childrenTasks = append(childrenTasks, r.Tasks...) } + if f.singleton { + childrenTasks = childrenTasks[0:1] + } tasks = e.constructMPPTasksByChildrenTasks(childrenTasks) } if err != nil { diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 292d9f8606d83..7819fdac20d34 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -1004,6 +1004,8 @@ const ( Mpp2Phase // MppTiDB runs agg on TiDB (and a partial agg on TiFlash if in 2 phase agg) MppTiDB + // MppScalar also has 2 phases. The second phase runs in a single task. + MppScalar ) type basePhysicalAgg struct { diff --git a/planner/core/task.go b/planner/core/task.go index a4720e860a7b6..666e0c17112d6 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1874,6 +1874,13 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } // TODO: how to set 2-phase cost? newMpp.addCost(p.GetCost(inputRows, false, true)) +<<<<<<< HEAD +======= + finalAgg.SetCost(newMpp.cost()) + if proj != nil { + proj.SetCost(newMpp.cost()) + } +>>>>>>> 3ad894da9... planner/core: thoroughly push down count-distinct agg in the MPP mode. (#25662) return newMpp case MppTiDB: partialAgg, finalAgg := p.newPartialAggregate(kv.TiFlash, false) @@ -1886,6 +1893,25 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { attachPlan2Task(finalAgg, t) t.addCost(p.GetCost(inputRows, true, false)) return t + case MppScalar: + proj := p.convertAvgForMPP() + partialAgg, finalAgg := p.newPartialAggregate(kv.TiFlash, true) + if partialAgg == nil || finalAgg == nil { + return invalidTask + } + attachPlan2Task(partialAgg, mpp) + prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.AnyType} + newMpp := mpp.enforceExchangerImpl(prop) + attachPlan2Task(finalAgg, newMpp) + if proj != nil { + attachPlan2Task(proj, newMpp) + } + newMpp.addCost(p.GetCost(inputRows, false, true)) + finalAgg.SetCost(newMpp.cost()) + if proj != nil { + proj.SetCost(newMpp.cost()) + } + return newMpp default: return invalidTask } diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index 887a4e9afd390..a3c1edee0ffb7 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -255,7 +255,9 @@ "desc format = 'brief' select * from t join ( select count(distinct value), id from t group by id) as A on A.id = t.id", "desc format = 'brief' select * from t join ( select count(1/value), id from t group by id) as A on A.id = t.id", "desc format = 'brief' select /*+hash_agg()*/ sum(id) from (select value, id from t where id > value group by id, value)A group by value /*the exchange should have only one partition column: test.t.value*/", - "desc format = 'brief' select /*+hash_agg()*/ sum(B.value) from t as B where B.id+1 > (select count(*) from t where t.id= B.id and t.value=B.value) group by B.id /*the exchange should have only one partition column: test.t.id*/" + "desc format = 'brief' select /*+hash_agg()*/ sum(B.value) from t as B where B.id+1 > (select count(*) from t where t.id= B.id and t.value=B.value) group by B.id /*the exchange should have only one partition column: test.t.id*/", + "desc format = 'brief' select count(distinct value) from t", + "desc format = 'brief' select count(distinct value), count(value), avg(value) from t" ] }, { diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index a00ee8a2dd8cb..fd60231ad4928 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -2161,6 +2161,31 @@ " └─ExchangeSender 10000.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.id", " └─TableFullScan 10000.00 batchCop[tiflash] table:B keep order:false, stats:pseudo" ] + }, + { + "SQL": "desc format = 'brief' select count(distinct value) from t", + "Plan": [ + "TableReader 1.00 root data:ExchangeSender", + "└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ", + " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select count(distinct value), count(value), avg(value) from t", + "Plan": [ + "TableReader 1.00 root data:ExchangeSender", + "└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Projection 1.00 batchCop[tiflash] Column#4, Column#5, div(Column#6, cast(case(eq(Column#7, 0), 1, Column#7), decimal(20,0) BINARY))->Column#6", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4, funcs:sum(Column#8)->Column#5, funcs:sum(Column#9)->Column#7, funcs:sum(Column#10)->Column#6", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, funcs:count(test.t.value)->Column#8, funcs:count(test.t.value)->Column#9, funcs:sum(test.t.value)->Column#10", + " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ] } ] }, From b744e894869ec5dca17c748d76dd935e82e41eeb Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 1 Jul 2021 15:12:22 +0800 Subject: [PATCH 2/3] fix conflicts --- planner/core/task.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/planner/core/task.go b/planner/core/task.go index 666e0c17112d6..a2134bb7b2810 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1874,13 +1874,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } // TODO: how to set 2-phase cost? newMpp.addCost(p.GetCost(inputRows, false, true)) -<<<<<<< HEAD -======= - finalAgg.SetCost(newMpp.cost()) - if proj != nil { - proj.SetCost(newMpp.cost()) - } ->>>>>>> 3ad894da9... planner/core: thoroughly push down count-distinct agg in the MPP mode. (#25662) return newMpp case MppTiDB: partialAgg, finalAgg := p.newPartialAggregate(kv.TiFlash, false) @@ -1900,17 +1893,12 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { return invalidTask } attachPlan2Task(partialAgg, mpp) - prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.AnyType} + prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.AnyType} newMpp := mpp.enforceExchangerImpl(prop) attachPlan2Task(finalAgg, newMpp) if proj != nil { attachPlan2Task(proj, newMpp) } - newMpp.addCost(p.GetCost(inputRows, false, true)) - finalAgg.SetCost(newMpp.cost()) - if proj != nil { - proj.SetCost(newMpp.cost()) - } return newMpp default: return invalidTask From 3f0928615030c11861273456abab2008c776af99 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 15 Jul 2021 16:05:32 +0800 Subject: [PATCH 3/3] planner/core: always add projection to agg that is pushed to tiflash (#26263) --- planner/core/rule_eliminate_projection.go | 4 +-- planner/core/task.go | 12 ++++++-- .../testdata/integration_serial_suite_in.json | 1 + .../integration_serial_suite_out.json | 29 +++++++++++++++---- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/planner/core/rule_eliminate_projection.go b/planner/core/rule_eliminate_projection.go index 5731495f9c2d2..ebc6b23d2b57d 100644 --- a/planner/core/rule_eliminate_projection.go +++ b/planner/core/rule_eliminate_projection.go @@ -43,14 +43,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool { // the align the output schema. In the future, we can solve this in-compatibility by // passing down the aggregation mode to TiFlash. if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok { - if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase { + if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar { if physicalAgg.isFinalAgg() { return false } } } if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok { - if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase { + if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar { if physicalAgg.isFinalAgg() { return false } diff --git a/planner/core/task.go b/planner/core/task.go index a2134bb7b2810..93776a059f8dd 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1896,9 +1896,17 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.AnyType} newMpp := mpp.enforceExchangerImpl(prop) attachPlan2Task(finalAgg, newMpp) - if proj != nil { - attachPlan2Task(proj, newMpp) + if proj == nil { + proj = PhysicalProjection{ + Exprs: make([]expression.Expression, 0, len(p.Schema().Columns)), + }.Init(p.ctx, p.statsInfo(), p.SelectBlockOffset()) + for _, col := range p.Schema().Columns { + proj.Exprs = append(proj.Exprs, col) + } + proj.SetSchema(p.schema) } + attachPlan2Task(proj, newMpp) + newMpp.addCost(p.GetCost(inputRows, false, true)) return newMpp default: return invalidTask diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index a3c1edee0ffb7..a63cf787828cd 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -257,6 +257,7 @@ "desc format = 'brief' select /*+hash_agg()*/ sum(id) from (select value, id from t where id > value group by id, value)A group by value /*the exchange should have only one partition column: test.t.value*/", "desc format = 'brief' select /*+hash_agg()*/ sum(B.value) from t as B where B.id+1 > (select count(*) from t where t.id= B.id and t.value=B.value) group by B.id /*the exchange should have only one partition column: test.t.id*/", "desc format = 'brief' select count(distinct value) from t", + "desc format = 'brief' select count(distinct x ) from (select count(distinct value) x from t) t", "desc format = 'brief' select count(distinct value), count(value), avg(value) from t" ] }, diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index fd60231ad4928..905312cacaa38 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -2167,11 +2167,30 @@ "Plan": [ "TableReader 1.00 root data:ExchangeSender", "└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4", - " └─ExchangeReceiver 1.00 batchCop[tiflash] ", - " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ", - " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + " └─Projection 1.00 batchCop[tiflash] Column#4", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ", + " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select count(distinct x ) from (select count(distinct value) x from t) t", + "Plan": [ + "TableReader 1.00 root data:ExchangeSender", + "└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Projection 1.00 batchCop[tiflash] Column#5", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct Column#4)->Column#5", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:Column#4, ", + " └─Projection 1.00 batchCop[tiflash] Column#4", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ", + " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" ] }, {