Skip to content

Commit

Permalink
planner: fix wrong agg function when agg push down union (#17022) (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored May 21, 2020
1 parent f360ad7 commit 7e33460
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 27 deletions.
18 changes: 9 additions & 9 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ id estRows task access object operator info
Union_17 26000.00 root
├─HashAgg_21 16000.00 root group by:Column#10, funcs:firstrow(Column#12)->Column#10
│ └─Union_22 16000.00 root
│ ├─StreamAgg_27 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12
│ ├─StreamAgg_27 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12, funcs:firstrow(test.t2.c1)->Column#10
│ │ └─IndexReader_40 10000.00 root index:IndexFullScan_39
│ │ └─IndexFullScan_39 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
│ └─StreamAgg_45 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12
│ └─StreamAgg_45 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12, funcs:firstrow(test.t2.c1)->Column#10
│ └─IndexReader_58 10000.00 root index:IndexFullScan_57
│ └─IndexFullScan_57 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
└─IndexReader_63 10000.00 root index:IndexFullScan_62
Expand All @@ -176,13 +176,13 @@ explain select c1 from t2 union all select c1 from t2 union select c1 from t2;
id estRows task access object operator info
HashAgg_18 24000.00 root group by:Column#10, funcs:firstrow(Column#11)->Column#10
└─Union_19 24000.00 root
├─StreamAgg_24 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11
├─StreamAgg_24 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
│ └─IndexReader_37 10000.00 root index:IndexFullScan_36
│ └─IndexFullScan_36 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
├─StreamAgg_42 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11
├─StreamAgg_42 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
│ └─IndexReader_55 10000.00 root index:IndexFullScan_54
│ └─IndexFullScan_54 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
└─StreamAgg_60 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11
└─StreamAgg_60 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
└─IndexReader_73 10000.00 root index:IndexFullScan_72
└─IndexFullScan_72 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
select * from information_schema.tidb_indexes where table_name='t4';
Expand Down Expand Up @@ -669,17 +669,17 @@ id estRows task access object operator info
Sort_13 2.00 root Column#3:asc
└─HashAgg_17 2.00 root group by:Column#3, funcs:firstrow(Column#6)->Column#3
└─Union_18 2.00 root
├─HashAgg_19 1.00 root group by:1, funcs:firstrow(0)->Column#6
├─HashAgg_19 1.00 root group by:1, funcs:firstrow(0)->Column#6, funcs:firstrow(0)->Column#3
│ └─TableDual_22 1.00 root rows:1
└─HashAgg_25 1.00 root group by:1, funcs:firstrow(1)->Column#6
└─HashAgg_25 1.00 root group by:1, funcs:firstrow(1)->Column#6, funcs:firstrow(1)->Column#3
└─TableDual_28 1.00 root rows:1
explain SELECT 0 AS a FROM dual UNION (SELECT 1 AS a FROM dual ORDER BY a);
id estRows task access object operator info
HashAgg_15 2.00 root group by:Column#3, funcs:firstrow(Column#6)->Column#3
└─Union_16 2.00 root
├─HashAgg_17 1.00 root group by:1, funcs:firstrow(0)->Column#6
├─HashAgg_17 1.00 root group by:1, funcs:firstrow(0)->Column#6, funcs:firstrow(0)->Column#3
│ └─TableDual_20 1.00 root rows:1
└─StreamAgg_27 1.00 root group by:Column#1, funcs:firstrow(Column#1)->Column#6
└─StreamAgg_27 1.00 root group by:Column#1, funcs:firstrow(Column#1)->Column#6, funcs:firstrow(Column#1)->Column#3
└─Projection_32 1.00 root 1->Column#1
└─TableDual_33 1.00 root rows:1
create table t (i int key, j int, unique key (i, j));
Expand Down
32 changes: 32 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,38 @@ func (s *testSuiteAgg) TestIssue16279(c *C) {
tk.MustQuery("select count(a) , date_format(a, '%Y-%m-%d') as xx from s group by xx")
}

func (s *testSuiteAgg) TestAggPushDownPartitionTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec(`CREATE TABLE t1 (
a int(11) DEFAULT NULL,
b tinyint(4) NOT NULL,
PRIMARY KEY (b)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
PARTITION BY RANGE ( b ) (
PARTITION p0 VALUES LESS THAN (10),
PARTITION p1 VALUES LESS THAN (20),
PARTITION p2 VALUES LESS THAN (30),
PARTITION p3 VALUES LESS THAN (40),
PARTITION p4 VALUES LESS THAN (MAXVALUE)
)`)
tk.MustExec("insert into t1 values (0, 0), (1, 1), (1, 2), (1, 3), (2, 4), (2, 5), (2, 6), (3, 7), (3, 10), (3, 11), (12, 12), (12, 13), (14, 14), (14, 15), (20, 20), (20, 21), (20, 22), (23, 23), (23, 24), (23, 25), (31, 30), (31, 31), (31, 32), (33, 33), (33, 34), (33, 35), (36, 36), (80, 80), (90, 90), (100, 100)")
tk.MustExec("set @@tidb_opt_agg_push_down = 1")
tk.MustQuery("select /*+ AGG_TO_COP() */ sum(a), sum(b) from t1 where a < 40 group by a").Sort().Check(testkit.Rows(
"0 0",
"24 25",
"28 29",
"3 6",
"36 36",
"6 15",
"60 63",
"69 72",
"9 28",
"93 93",
"99 102"))
}

func (s *testSuiteAgg) TestIssue13652(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
17 changes: 13 additions & 4 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (a *aggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (pu

// pushAggCrossUnion will try to push the agg down to the union. If the new aggregation's group-by columns doesn't contain unique key.
// We will return the new aggregation. Otherwise we will transform the aggregation to projection.
func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) LogicalPlan {
func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) (LogicalPlan, error) {
ctx := agg.ctx
newAgg := LogicalAggregation{
AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)),
Expand All @@ -340,6 +340,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u
for _, gbyExpr := range agg.GroupByItems {
newExpr := expression.ColumnSubstitute(gbyExpr, unionSchema, expression.Column2Exprs(unionChild.Schema().Columns))
newAgg.GroupByItems = append(newAgg.GroupByItems, newExpr)
// TODO: if there is a duplicated first_row function, we can delete it.
firstRow, err := aggregation.NewAggFuncDesc(agg.ctx, ast.AggFuncFirstRow, []expression.Expression{gbyExpr}, false)
if err != nil {
return nil, err
}
newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow)
}
newAgg.collectGroupByColumns()
tmpSchema := expression.NewSchema(newAgg.groupByCols...)
Expand All @@ -350,13 +356,13 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u
if tmpSchema.ColumnsIndices(key) != nil {
if ok, proj := ConvertAggToProj(newAgg, newAgg.schema); ok {
proj.SetChildren(unionChild)
return proj
return proj, nil
}
break
}
}
newAgg.SetChildren(unionChild)
return newAgg
return newAgg, nil
}

func (a *aggregationPushDownSolver) optimize(ctx context.Context, p LogicalPlan) (LogicalPlan, error) {
Expand Down Expand Up @@ -430,7 +436,10 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan) (_ LogicalPlan, e
pushedAgg := a.splitPartialAgg(agg)
newChildren := make([]LogicalPlan, 0, len(union.children))
for _, child := range union.children {
newChild := a.pushAggCrossUnion(pushedAgg, union.Schema(), child)
newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child)
if err != nil {
return p, err
}
newChildren = append(newChildren, newChild)
}
union.SetSchema(expression.NewSchema(newChildren[0].Schema().Columns...))
Expand Down
24 changes: 12 additions & 12 deletions planner/core/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1554,14 +1554,14 @@
{
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ sum(distinct b) from pt;",
"Plan": [
"HashAgg_11 1.00 root funcs:sum(distinct Column#7)->Column#4",
"└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#7",
"HashAgg_11 1.00 root funcs:sum(distinct Column#9)->Column#4",
"└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#9",
" └─Union_12 16000.00 root ",
" ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b",
" ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b",
" │ └─TableReader_17 8000.00 root data:HashAgg_13",
" │ └─HashAgg_13 8000.00 cop[tikv] group by:test.pt.b, ",
" │ └─TableFullScan_15 10000.00 cop[tikv] table:pt, partition:p0 keep order:false, stats:pseudo",
" └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b",
" └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b",
" └─TableReader_22 8000.00 root data:HashAgg_18",
" └─HashAgg_18 8000.00 cop[tikv] group by:test.pt.b, ",
" └─TableFullScan_20 10000.00 cop[tikv] table:pt, partition:p1 keep order:false, stats:pseudo"
Expand All @@ -1575,11 +1575,11 @@
"Plan": [
"HashAgg_14 1.00 root funcs:count(distinct Column#5)->Column#6",
"└─Union_15 16000.00 root ",
" ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5",
" ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5, funcs:firstrow(test.ta.a)->Column#5",
" │ └─TableReader_20 8000.00 root data:HashAgg_16",
" │ └─HashAgg_16 8000.00 cop[tikv] group by:test.ta.a, ",
" │ └─TableFullScan_18 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo",
" └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5",
" └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5, funcs:firstrow(test.tb.a)->Column#5",
" └─TableReader_25 8000.00 root data:HashAgg_21",
" └─HashAgg_21 8000.00 cop[tikv] group by:test.tb.a, ",
" └─TableFullScan_23 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo"
Expand Down Expand Up @@ -1683,14 +1683,14 @@
{
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ sum(distinct b) from pt;",
"Plan": [
"HashAgg_11 1.00 root funcs:sum(distinct Column#7)->Column#4",
"└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#7",
"HashAgg_11 1.00 root funcs:sum(distinct Column#9)->Column#4",
"└─Projection_23 16000.00 root cast(test.pt.b, decimal(65,0) BINARY)->Column#9",
" └─Union_12 16000.00 root ",
" ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b",
" ├─HashAgg_16 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b",
" │ └─TableReader_17 8000.00 root data:HashAgg_13",
" │ └─HashAgg_13 8000.00 cop[tikv] group by:test.pt.b, ",
" │ └─TableFullScan_15 10000.00 cop[tikv] table:pt, partition:p0 keep order:false, stats:pseudo",
" └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b",
" └─HashAgg_21 8000.00 root group by:test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b, funcs:firstrow(test.pt.b)->test.pt.b",
" └─TableReader_22 8000.00 root data:HashAgg_18",
" └─HashAgg_18 8000.00 cop[tikv] group by:test.pt.b, ",
" └─TableFullScan_20 10000.00 cop[tikv] table:pt, partition:p1 keep order:false, stats:pseudo"
Expand All @@ -1704,11 +1704,11 @@
"Plan": [
"HashAgg_14 1.00 root funcs:count(distinct Column#5)->Column#6",
"└─Union_15 16000.00 root ",
" ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5",
" ├─HashAgg_19 8000.00 root group by:test.ta.a, funcs:firstrow(test.ta.a)->Column#5, funcs:firstrow(test.ta.a)->Column#5",
" │ └─TableReader_20 8000.00 root data:HashAgg_16",
" │ └─HashAgg_16 8000.00 cop[tikv] group by:test.ta.a, ",
" │ └─TableFullScan_18 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo",
" └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5",
" └─HashAgg_24 8000.00 root group by:test.tb.a, funcs:firstrow(test.tb.a)->Column#5, funcs:firstrow(test.tb.a)->Column#5",
" └─TableReader_25 8000.00 root data:HashAgg_21",
" └─HashAgg_21 8000.00 cop[tikv] group by:test.tb.a, ",
" └─TableFullScan_23 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo"
Expand Down
4 changes: 2 additions & 2 deletions planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
"Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
"Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
"DataScan(t)->Aggr(sum(test.t.a))->Projection",
"UnionAll{DataScan(a)->Projection->Aggr(sum(test.t.c))->DataScan(b)->Projection->Aggr(sum(test.t.a))->DataScan(c)->Projection->Aggr(sum(test.t.b))}->Aggr(sum(Column#40))->Projection",
"UnionAll{DataScan(a)->Projection->Aggr(sum(test.t.c),firstrow(test.t.d))->DataScan(b)->Projection->Aggr(sum(test.t.a),firstrow(test.t.b))->DataScan(c)->Projection->Aggr(sum(test.t.b),firstrow(test.t.e))}->Aggr(sum(Column#40))->Projection",
"Join{DataScan(a)->DataScan(b)->Aggr(max(test.t.b),firstrow(test.t.c))}(test.t.c,test.t.c)->Projection->Projection",
"Join{DataScan(a)->DataScan(b)}(test.t.a,test.t.a)->Aggr(max(test.t.b),max(test.t.b))->Projection",
"UnionAll{DataScan(a)->Projection->Projection->Projection->DataScan(b)->Projection->Projection->Projection}->Aggr(max(Column#38))->Projection",
"Join{DataScan(a)->DataScan(b)}(test.t.a,test.t.a)(test.t.b,test.t.b)->Aggr(max(test.t.c))->Projection",
"Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->Projection->Projection",
"UnionAll{DataScan(t1)->Projection->Aggr(count(test.t.a),sum(test.t.a))->DataScan(t2)->Projection->Aggr(count(test.t.a),sum(test.t.a))}->Aggr(avg(Column#38, Column#39))->Projection",
"UnionAll{DataScan(t1)->Projection->Projection->Projection->DataScan(t2)->Projection->Projection->Projection}->Aggr(count(distinct Column#25))->Projection",
"UnionAll{DataScan(t1)->Projection->Aggr(firstrow(test.t.b))->DataScan(t2)->Projection->Aggr(firstrow(test.t.b))}->Aggr(count(distinct Column#26))->Projection"
"UnionAll{DataScan(t1)->Projection->Aggr(firstrow(test.t.b),firstrow(test.t.b))->DataScan(t2)->Projection->Aggr(firstrow(test.t.b),firstrow(test.t.b))}->Aggr(count(distinct Column#26))->Projection"
]
},
{
Expand Down

0 comments on commit 7e33460

Please sign in to comment.