From a22d3e770b8ab0a890cfc783a61dccda63740d37 Mon Sep 17 00:00:00 2001 From: pingcap-github-bot Date: Mon, 25 May 2020 15:10:26 +0800 Subject: [PATCH] planner: check required order property for enforced stream aggregation (#17338) (#17346) --- planner/core/exhaust_physical_plans.go | 4 +- planner/core/integration_test.go | 26 ++++++++ .../core/testdata/integration_suite_in.json | 9 +++ .../core/testdata/integration_suite_out.json | 60 +++++++++++++++++++ 4 files changed, 98 insertions(+), 1 deletion(-) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 237a8118ef89d..d5387c15b8cc4 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1234,7 +1234,9 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope Enforced: true, Items: property.ItemsFromCols(la.groupByCols, desc), } - + if !prop.IsPrefix(childProp) { + return enforcedAggs + } taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} if !la.aggHints.preferAggToCop { taskTypes = append(taskTypes, property.RootTaskType) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 5e28e12130201..2773b9d60141c 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -740,3 +740,29 @@ func (s *testIntegrationSuite) TestTableDualWithRequiredProperty(c *C) { tk.MustExec("create table t2 (a int, b int)") tk.MustExec("select /*+ MERGE_JOIN(t1, t2) */ * from t1 partition (p0), t2 where t1.a > 100 and t1.a = t2.a") } + +func (s *testIntegrationSuite) TestStreamAggProp(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values(1),(1),(2)") + + var input []string + var output []struct { + SQL string + Plan []string + Res []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows()) + output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...)) + tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...)) + } +} diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index 4b1ac101c85ed..2ddc240535d2d 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -47,5 +47,14 @@ "select /*+ USE_INDEX(t1, a), USE_INDEX(t2, a), USE_INDEX(t3, a) */ * from t1, t2 where t1.a=t2.a", "select /*+ USE_INDEX(t3, a), USE_INDEX(t4, b), IGNORE_INDEX(t3, a) */ * from t1, t2 where t1.a=t2.a" ] + }, + { + "name": "TestStreamAggProp", + "cases": [ + "select /*+ stream_agg() */ count(*) c from t group by a order by c limit 1", + "select /*+ stream_agg() */ count(*) c from t group by a order by c", + "select /*+ stream_agg() */ count(*) c from t group by a order by a limit 1", + "select /*+ stream_agg() */ count(*) c from t group by a order by a" + ] } ] diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index 5ea394eb3fb47..d5967dd9d414b 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -201,5 +201,65 @@ ] } ] + }, + { + "Name": "TestStreamAggProp", + "Cases": [ + { + "SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by c limit 1", + "Plan": [ + "TopN_10 1.00 root 2_col_0:asc, offset:0, count:1", + "└─StreamAgg_17 8000.00 root group by:test.t.a, funcs:count(1)", + " └─Sort_22 10000.00 root test.t.a:asc", + " └─TableReader_21 10000.00 root data:TableScan_20", + " └─TableScan_20 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Res": [ + "1" + ] + }, + { + "SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by c", + "Plan": [ + "Sort_5 8000.00 root c:asc", + "└─StreamAgg_11 8000.00 root group by:test.t.a, funcs:count(1)", + " └─Sort_16 10000.00 root test.t.a:asc", + " └─TableReader_15 10000.00 root data:TableScan_14", + " └─TableScan_14 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Res": [ + "1", + "2" + ] + }, + { + "SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by a limit 1", + "Plan": [ + "Projection_8 1.00 root c", + "└─Limit_14 1.00 root offset:0, count:1", + " └─StreamAgg_27 1.00 root group by:test.t.a, funcs:count(1), firstrow(test.t.a)", + " └─Sort_32 1.25 root test.t.a:asc", + " └─TableReader_31 1.25 root data:TableScan_30", + " └─TableScan_30 1.25 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Res": [ + "2" + ] + }, + { + "SQL": "select /*+ stream_agg() */ count(*) c from t group by a order by a", + "Plan": [ + "Projection_6 8000.00 root c", + "└─StreamAgg_21 8000.00 root group by:test.t.a, funcs:count(1), firstrow(test.t.a)", + " └─Sort_17 10000.00 root test.t.a:asc", + " └─TableReader_16 10000.00 root data:TableScan_15", + " └─TableScan_15 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Res": [ + "2", + "1" + ] + } + ] } ]