Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix forever hanging when HashAgg is called by apply (#12760) #12765

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type HashAggExec struct {
isChildReturnEmpty bool

childResult *chunk.Chunk
executed bool
}

// HashAggInput indicates the input of hash agg exec.
Expand Down Expand Up @@ -216,6 +217,7 @@ func (e *HashAggExec) Close() error {
}
for range e.finalOutputCh {
}
e.executed = false
return e.baseExecutor.Close()
}

Expand Down Expand Up @@ -614,10 +616,14 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
}
})

if e.executed {
return nil
}
for !chk.IsFull() {
e.finalInputCh <- chk
result, ok := <-e.finalOutputCh
if !ok { // all finalWorkers exited
e.executed = true
if chk.NumRows() > 0 { // but there are some data left
return nil
}
Expand Down
46 changes: 45 additions & 1 deletion executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,50 @@ func (s *testSuite1) TestIssue10608(c *C) {
tk.MustExec("create table s(a int, b int)")
tk.MustExec("insert into s values(100292, 508931), (120002, 508932)")
tk.MustExec("insert into t values(508931), (508932)")
tk.MustQuery("select (select group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-"))
tk.MustQuery("select (select /*+ stream_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-"))
tk.MustQuery("select (select /*+ hash_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-"))

}

func (s *testSuite1) TestIssue12759HashAggCalledByApply(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.Se.GetSessionVars().HashAggFinalConcurrency = 4
tk.MustExec(`insert into mysql.opt_rule_blacklist value("decorrelate");`)
defer func() {
tk.MustExec(`delete from mysql.opt_rule_blacklist where name = "decorrelate";`)
tk.MustExec(`admin reload opt_rule_blacklist;`)
}()
tk.MustExec(`drop table if exists test;`)
tk.MustExec("create table test (a int);")
tk.MustExec("insert into test value(1);")
tk.MustQuery("select /*+ hash_agg() */ sum(a), (select NULL from test where tt.a = test.a limit 1),(select NULL from test where tt.a = test.a limit 1),(select NULL from test where tt.a = test.a limit 1) from test tt;").Check(testkit.Rows("1 <nil> <nil> <nil>"))

// make sure the plan is Apply -> Apply -> Apply -> HashAgg, and the count of Apply is equal to HashAggFinalConcurrency-1.
tk.MustQuery("explain select /*+ hash_agg() */ sum(a), (select NULL from test where tt.a = test.a limit 1),(select NULL from test where tt.a = test.a limit 1),(select NULL from test where tt.a = test.a limit 1) from test tt;").Check(testkit.Rows("" +
"Projection_28 1.00 root 2_col_0, null, null, null]\n" +
"[└─Apply_30 1.00 root CARTESIAN left outer join, inner:Projection_65]\n" +
"[ ├─Apply_32 1.00 root CARTESIAN left outer join, inner:Projection_54]\n" +
"[ │ ├─Apply_34 1.00 root CARTESIAN left outer join, inner:Projection_43]\n" +
"[ │ │ ├─HashAgg_39 1.00 root funcs:sum(col_0), firstrow(col_1)]\n" +
"[ │ │ │ └─TableReader_40 1.00 root data:HashAgg_35]\n" +
"[ │ │ │ └─HashAgg_35 1.00 cop funcs:sum(test.tt.a), firstrow(test.tt.a)]\n" +
"[ │ │ │ └─TableScan_38 10000.00 cop table:tt, range:[-inf,+inf], keep order:false, stats:pseudo]\n" +
"[ │ │ └─Projection_43 1.00 root NULL]\n" +
"[ │ │ └─Limit_44 1.00 root offset:0, count:1]\n" +
"[ │ │ └─TableReader_50 1.00 root data:Limit_49]\n" +
"[ │ │ └─Limit_49 1.00 cop offset:0, count:1]\n" +
"[ │ │ └─Selection_48 1.00 cop eq(test.tt.a, test.test.a)]\n" +
"[ │ │ └─TableScan_47 1000.00 cop table:test, range:[-inf,+inf], keep order:false, stats:pseudo]\n" +
"[ │ └─Projection_54 1.00 root NULL]\n" +
"[ │ └─Limit_55 1.00 root offset:0, count:1]\n" +
"[ │ └─TableReader_61 1.00 root data:Limit_60]\n" +
"[ │ └─Limit_60 1.00 cop offset:0, count:1]\n" +
"[ │ └─Selection_59 1.00 cop eq(test.tt.a, test.test.a)]\n" +
"[ │ └─TableScan_58 1000.00 cop table:test, range:[-inf,+inf], keep order:false, stats:pseudo]\n" +
"[ └─Projection_65 1.00 root NULL]\n" +
"[ └─Limit_66 1.00 root offset:0, count:1]\n" +
"[ └─TableReader_72 1.00 root data:Limit_71]\n" +
"[ └─Limit_71 1.00 cop offset:0, count:1]\n" +
"[ └─Selection_70 1.00 cop eq(test.tt.a, test.test.a)]\n" +
"[ └─TableScan_69 1000.00 cop table:test, range:[-inf,+inf], keep order:false, stats:pseudo"))
}