diff --git a/executor/aggregate.go b/executor/aggregate.go index aa7e42b5c0ff3..10075fb51ef20 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -163,6 +163,7 @@ type HashAggExec struct { isChildReturnEmpty bool childResult *chunk.Chunk + executed bool } // HashAggInput indicates the input of hash agg exec. @@ -216,6 +217,7 @@ func (e *HashAggExec) Close() error { } for range e.finalOutputCh { } + e.executed = false return e.baseExecutor.Close() } @@ -614,10 +616,14 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error } }) + if e.executed { + return nil + } for !chk.IsFull() { e.finalInputCh <- chk result, ok := <-e.finalOutputCh if !ok { // all finalWorkers exited + e.executed = true if chk.NumRows() > 0 { // but there are some data left return nil } diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index fca05420430ad..289b8d9a35d2e 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -735,6 +735,50 @@ func (s *testSuite1) TestIssue10608(c *C) { tk.MustExec("create table s(a int, b int)") tk.MustExec("insert into s values(100292, 508931), (120002, 508932)") tk.MustExec("insert into t values(508931), (508932)") - tk.MustQuery("select (select group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) + tk.MustQuery("select (select /*+ stream_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) + tk.MustQuery("select (select /*+ hash_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) } + +func (s *testSuite1) TestIssue12759HashAggCalledByApply(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.Se.GetSessionVars().HashAggFinalConcurrency = 4 + tk.MustExec(`insert into mysql.opt_rule_blacklist value("decorrelate");`) + defer func() { + tk.MustExec(`delete from mysql.opt_rule_blacklist where name = "decorrelate";`) + tk.MustExec(`admin reload opt_rule_blacklist;`) + }() + tk.MustExec(`drop table if exists test;`) + tk.MustExec("create table test (a int);") + tk.MustExec("insert into test value(1);") + tk.MustQuery("select /*+ hash_agg() */ sum(a), (select NULL from test where tt.a = test.a limit 1),(select NULL from test where tt.a = test.a limit 1),(select NULL from test where tt.a = test.a limit 1) from test tt;").Check(testkit.Rows("1 ")) + + // make sure the plan is Apply -> Apply -> Apply -> HashAgg, and the count of Apply is equal to HashAggFinalConcurrency-1. + tk.MustQuery("explain select /*+ hash_agg() */ sum(a), (select NULL from test where tt.a = test.a limit 1),(select NULL from test where tt.a = test.a limit 1),(select NULL from test where tt.a = test.a limit 1) from test tt;").Check(testkit.Rows("" + + "Projection_28 1.00 root 2_col_0, null, null, null]\n" + + "[└─Apply_30 1.00 root CARTESIAN left outer join, inner:Projection_65]\n" + + "[ ├─Apply_32 1.00 root CARTESIAN left outer join, inner:Projection_54]\n" + + "[ │ ├─Apply_34 1.00 root CARTESIAN left outer join, inner:Projection_43]\n" + + "[ │ │ ├─HashAgg_39 1.00 root funcs:sum(col_0), firstrow(col_1)]\n" + + "[ │ │ │ └─TableReader_40 1.00 root data:HashAgg_35]\n" + + "[ │ │ │ └─HashAgg_35 1.00 cop funcs:sum(test.tt.a), firstrow(test.tt.a)]\n" + + "[ │ │ │ └─TableScan_38 10000.00 cop table:tt, range:[-inf,+inf], keep order:false, stats:pseudo]\n" + + "[ │ │ └─Projection_43 1.00 root NULL]\n" + + "[ │ │ └─Limit_44 1.00 root offset:0, count:1]\n" + + "[ │ │ └─TableReader_50 1.00 root data:Limit_49]\n" + + "[ │ │ └─Limit_49 1.00 cop offset:0, count:1]\n" + + "[ │ │ └─Selection_48 1.00 cop eq(test.tt.a, test.test.a)]\n" + + "[ │ │ └─TableScan_47 1000.00 cop table:test, range:[-inf,+inf], keep order:false, stats:pseudo]\n" + + "[ │ └─Projection_54 1.00 root NULL]\n" + + "[ │ └─Limit_55 1.00 root offset:0, count:1]\n" + + "[ │ └─TableReader_61 1.00 root data:Limit_60]\n" + + "[ │ └─Limit_60 1.00 cop offset:0, count:1]\n" + + "[ │ └─Selection_59 1.00 cop eq(test.tt.a, test.test.a)]\n" + + "[ │ └─TableScan_58 1000.00 cop table:test, range:[-inf,+inf], keep order:false, stats:pseudo]\n" + + "[ └─Projection_65 1.00 root NULL]\n" + + "[ └─Limit_66 1.00 root offset:0, count:1]\n" + + "[ └─TableReader_72 1.00 root data:Limit_71]\n" + + "[ └─Limit_71 1.00 cop offset:0, count:1]\n" + + "[ └─Selection_70 1.00 cop eq(test.tt.a, test.test.a)]\n" + + "[ └─TableScan_69 1000.00 cop table:test, range:[-inf,+inf], keep order:false, stats:pseudo")) +}