Skip to content

Commit

Permalink
planner: check required order property for enforced stream aggregation (
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored May 25, 2020
1 parent 0414aa5 commit a22d3e7
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 1 deletion.
4 changes: 3 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,9 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope
Enforced: true,
Items: property.ItemsFromCols(la.groupByCols, desc),
}

if !prop.IsPrefix(childProp) {
return enforcedAggs
}
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType}
if !la.aggHints.preferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
Expand Down
26 changes: 26 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,3 +740,29 @@ func (s *testIntegrationSuite) TestTableDualWithRequiredProperty(c *C) {
tk.MustExec("create table t2 (a int, b int)")
tk.MustExec("select /*+ MERGE_JOIN(t1, t2) */ * from t1 partition (p0), t2 where t1.a > 100 and t1.a = t2.a")
}

func (s *testIntegrationSuite) TestStreamAggProp(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1),(1),(2)")

var input []string
var output []struct {
SQL string
Plan []string
Res []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows())
output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...))
}
}
9 changes: 9 additions & 0 deletions planner/core/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,14 @@
"select /*+ USE_INDEX(t1, a), USE_INDEX(t2, a), USE_INDEX(t3, a) */ * from t1, t2 where t1.a=t2.a",
"select /*+ USE_INDEX(t3, a), USE_INDEX(t4, b), IGNORE_INDEX(t3, a) */ * from t1, t2 where t1.a=t2.a"
]
},
{
"name": "TestStreamAggProp",
"cases": [
"select /*+ stream_agg() */ count(*) c from t group by a order by c limit 1",
"select /*+ stream_agg() */ count(*) c from t group by a order by c",
"select /*+ stream_agg() */ count(*) c from t group by a order by a limit 1",
"select /*+ stream_agg() */ count(*) c from t group by a order by a"
]
}
]
60 changes: 60 additions & 0 deletions planner/core/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -201,5 +201,65 @@
]
}
]
},
{
"Name": "TestStreamAggProp",
"Cases": [
{
"SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by c limit 1",
"Plan": [
"TopN_10 1.00 root 2_col_0:asc, offset:0, count:1",
"└─StreamAgg_17 8000.00 root group by:test.t.a, funcs:count(1)",
" └─Sort_22 10000.00 root test.t.a:asc",
" └─TableReader_21 10000.00 root data:TableScan_20",
" └─TableScan_20 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
],
"Res": [
"1"
]
},
{
"SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by c",
"Plan": [
"Sort_5 8000.00 root c:asc",
"└─StreamAgg_11 8000.00 root group by:test.t.a, funcs:count(1)",
" └─Sort_16 10000.00 root test.t.a:asc",
" └─TableReader_15 10000.00 root data:TableScan_14",
" └─TableScan_14 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
],
"Res": [
"1",
"2"
]
},
{
"SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by a limit 1",
"Plan": [
"Projection_8 1.00 root c",
"└─Limit_14 1.00 root offset:0, count:1",
" └─StreamAgg_27 1.00 root group by:test.t.a, funcs:count(1), firstrow(test.t.a)",
" └─Sort_32 1.25 root test.t.a:asc",
" └─TableReader_31 1.25 root data:TableScan_30",
" └─TableScan_30 1.25 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
],
"Res": [
"2"
]
},
{
"SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by a",
"Plan": [
"Projection_6 8000.00 root c",
"└─StreamAgg_21 8000.00 root group by:test.t.a, funcs:count(1), firstrow(test.t.a)",
" └─Sort_17 10000.00 root test.t.a:asc",
" └─TableReader_16 10000.00 root data:TableScan_15",
" └─TableScan_15 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
],
"Res": [
"2",
"1"
]
}
]
}
]

0 comments on commit a22d3e7

Please sign in to comment.