Skip to content

Commit

Permalink
*: support parallel hash agg (#6658)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored and shenli committed Jun 29, 2018
1 parent 8c66635 commit dd37138
Show file tree
Hide file tree
Showing 13 changed files with 791 additions and 106 deletions.
574 changes: 564 additions & 10 deletions executor/aggregate.go

Large diffs are not rendered by default.

102 changes: 51 additions & 51 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,53 +39,53 @@ func (s *testSuite) TestAggregation(c *C) {
tk.MustQuery("select bit_xor(c) from t where NULL").Check(testkit.Rows("0"))
result := tk.MustQuery("select count(*) from t")
result.Check(testkit.Rows("7"))
result = tk.MustQuery("select count(*) from t group by d")
result = tk.MustQuery("select count(*) from t group by d order by c")
result.Check(testkit.Rows("3", "2", "2"))
result = tk.MustQuery("select distinct 99 from t group by d having d > 0")
result.Check(testkit.Rows("99"))
result = tk.MustQuery("select count(*) from t having 1 = 0")
result.Check(testkit.Rows())
result = tk.MustQuery("select c,d from t group by d")
result = tk.MustQuery("select c,d from t group by d order by d")
result.Check(testkit.Rows("<nil> 1", "1 2", "1 3"))
result = tk.MustQuery("select - c, c as d from t group by c having null not between c and avg(distinct d) - d")
result.Check(testkit.Rows())
result = tk.MustQuery("select - c as c from t group by c having t.c > 5")
result.Check(testkit.Rows())
result = tk.MustQuery("select t1.c from t t1, t t2 group by c having c > 5")
result.Check(testkit.Rows())
result = tk.MustQuery("select count(*) from (select d, c from t) k where d != 0 group by d")
result = tk.MustQuery("select count(*) from (select d, c from t) k where d != 0 group by d order by c")
result.Check(testkit.Rows("3", "2", "2"))
result = tk.MustQuery("select c as a from t group by d having a < 0")
result.Check(testkit.Rows())
result = tk.MustQuery("select c as a from t group by d having sum(a) = 2")
result.Check(testkit.Rows("<nil>"))
result = tk.MustQuery("select count(distinct c) from t group by d")
result = tk.MustQuery("select count(distinct c) from t group by d order by c")
result.Check(testkit.Rows("1", "2", "2"))
result = tk.MustQuery("select sum(c) from t group by d")
result = tk.MustQuery("select sum(c) as a from t group by d order by a")
result.Check(testkit.Rows("2", "4", "5"))
result = tk.MustQuery("select sum(c), sum(c+1), sum(c), sum(c+1) from t group by d")
result = tk.MustQuery("select sum(c) as a, sum(c+1), sum(c), sum(c+1) from t group by d order by a")
result.Check(testkit.Rows("2 4 2 4", "4 6 4 6", "5 7 5 7"))
result = tk.MustQuery("select count(distinct c,d) from t")
result.Check(testkit.Rows("5"))
_, err := tk.Exec("select count(c,d) from t")
c.Assert(err, NotNil)
result = tk.MustQuery("select d*2 as ee, sum(c) from t group by ee")
result = tk.MustQuery("select d*2 as ee, sum(c) from t group by ee order by ee")
result.Check(testkit.Rows("2 2", "4 4", "6 5"))
result = tk.MustQuery("select sum(distinct c) from t group by d")
result = tk.MustQuery("select sum(distinct c) as a from t group by d order by a")
result.Check(testkit.Rows("1", "4", "5"))
result = tk.MustQuery("select min(c) from t group by d")
result = tk.MustQuery("select min(c) as a from t group by d order by a")
result.Check(testkit.Rows("1", "1", "1"))
result = tk.MustQuery("select max(c) from t group by d")
result = tk.MustQuery("select max(c) as a from t group by d order by a")
result.Check(testkit.Rows("1", "3", "4"))
result = tk.MustQuery("select avg(c) from t group by d")
result = tk.MustQuery("select avg(c) as a from t group by d order by a")
result.Check(testkit.Rows("1.0000", "2.0000", "2.5000"))
result = tk.MustQuery("select d, d + 1 from t group by d")
result = tk.MustQuery("select d, d + 1 from t group by d order by d")
result.Check(testkit.Rows("1 2", "2 3", "3 4"))
result = tk.MustQuery("select count(*) from t")
result.Check(testkit.Rows("7"))
result = tk.MustQuery("select count(distinct d) from t")
result.Check(testkit.Rows("3"))
result = tk.MustQuery("select count(*) from t group by d having sum(c) > 3")
result = tk.MustQuery("select count(*) as a from t group by d having sum(c) > 3 order by a")
result.Check(testkit.Rows("2", "2"))
result = tk.MustQuery("select max(c) from t group by d having sum(c) > 3 order by avg(c) desc")
result.Check(testkit.Rows("4", "3"))
Expand All @@ -107,7 +107,7 @@ func (s *testSuite) TestAggregation(c *C) {
result.Check(testkit.Rows("343"))
result = tk.MustQuery("select count(*) from t a , t b where a.c = b.d")
result.Check(testkit.Rows("14"))
result = tk.MustQuery("select count(a.d), sum(b.c) from t a , t b where a.c = b.d")
result = tk.MustQuery("select count(a.d), sum(b.c) from t a , t b where a.c = b.d order by a.d")
result.Check(testkit.Rows("14 13"))
result = tk.MustQuery("select count(*) from t a , t b, t c where a.c = b.d and b.d = c.d")
result.Check(testkit.Rows("40"))
Expand Down Expand Up @@ -146,29 +146,29 @@ func (s *testSuite) TestAggregation(c *C) {
result.Check(testkit.Rows("1 0 1", "0 1 1", "-1 2 1"))
result = tk.MustQuery("select d, 1-d as d, c as d from t order by d+1")
result.Check(testkit.Rows("-1 2 1", "0 1 1", "1 0 1"))
result = tk.MustQuery("select d, 1-d as d, c as d from t group by d")
result.Check(testkit.Rows("-1 2 1", "0 1 1", "1 0 1"))
result = tk.MustQuery("select d as d1, t.d as d1, 1-d as d1, c as d1 from t having d1 < 10")
result = tk.MustQuery("select d, 1-d as d, c as d from t group by d order by d")
result.Check(testkit.Rows("1 0 1", "0 1 1", "-1 2 1"))
result = tk.MustQuery("select d as d1, t.d as d1, 1-d as d1, c as d1 from t having d1 < 10 order by d")
result.Check(testkit.Rows("-1 -1 2 1", "0 0 1 1", "1 1 0 1"))
result = tk.MustQuery("select d*d as d1, c as d1 from t group by d1")
result.Check(testkit.Rows("1 1", "0 1"))
result = tk.MustQuery("select d*d as d1, c as d1 from t group by d1 order by d1")
result.Check(testkit.Rows("0 1", "1 1"))
result = tk.MustQuery("select d*d as d1, c as d1 from t group by 2")
result.Check(testkit.Rows("1 1"))
result = tk.MustQuery("select * from t group by 2")
result = tk.MustQuery("select * from t group by 2 order by d")
result.Check(testkit.Rows("1 -1", "1 0", "1 1"))
result = tk.MustQuery("select * , sum(d) from t group by 1")
result = tk.MustQuery("select * , sum(d) from t group by 1 order by d")
result.Check(testkit.Rows("1 -1 0"))
result = tk.MustQuery("select sum(d), t.* from t group by 2")
result = tk.MustQuery("select sum(d), t.* from t group by 2 order by d")
result.Check(testkit.Rows("0 1 -1"))
result = tk.MustQuery("select d as d, c as d from t group by d + 1")
result = tk.MustQuery("select d as d, c as d from t group by d + 1 order by t.d")
result.Check(testkit.Rows("-1 1", "0 1", "1 1"))
result = tk.MustQuery("select c as d, c as d from t group by d")
result = tk.MustQuery("select c as d, c as d from t group by d order by d")
result.Check(testkit.Rows("1 1", "1 1", "1 1"))
_, err = tk.Exec("select d as d, c as d from t group by d")
c.Assert(err, NotNil)
_, err = tk.Exec("select t.d, c as d from t group by d")
c.Assert(err, NotNil)
result = tk.MustQuery("select *, c+1 as d from t group by 3")
result = tk.MustQuery("select *, c+1 as d from t group by 3 order by d")
result.Check(testkit.Rows("1 -1 2"))
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a float, b int default 3)")
Expand All @@ -188,41 +188,41 @@ func (s *testSuite) TestAggregation(c *C) {
result = tk.MustQuery("select sum(b) from (select * from t1) t")
result.Check(testkit.Rows("<nil>"))
tk.MustExec("insert into t1 (a, b) values (1, 1),(2, 2),(3, 3),(1, 4),(3, 5)")
result = tk.MustQuery("select avg(b) from (select * from t1) t group by a")
result = tk.MustQuery("select avg(b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("2.5000", "2.0000", "4.0000"))
result = tk.MustQuery("select sum(b) from (select * from t1) t group by a")
result = tk.MustQuery("select sum(b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("5", "2", "8"))
result = tk.MustQuery("select count(b) from (select * from t1) t group by a")
result = tk.MustQuery("select count(b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("2", "1", "2"))
result = tk.MustQuery("select max(b) from (select * from t1) t group by a")
result = tk.MustQuery("select max(b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("4", "2", "5"))
result = tk.MustQuery("select min(b) from (select * from t1) t group by a")
result = tk.MustQuery("select min(b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("1", "2", "3"))
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int, index(a,b))")
tk.MustExec("insert into t1 (a, b) values (1, 1),(2, 2),(3, 3),(1, 4), (1,1),(3, 5), (2,2), (3,5), (3,3)")
result = tk.MustQuery("select avg(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select avg(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("2.5000", "2.0000", "4.0000"))
result = tk.MustQuery("select sum(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select sum(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("5", "2", "8"))
result = tk.MustQuery("select count(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select count(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("2", "1", "2"))
result = tk.MustQuery("select max(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select max(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("4", "2", "5"))
result = tk.MustQuery("select min(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select min(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("1", "2", "3"))
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int, index(b, a))")
tk.MustExec("insert into t1 (a, b) values (1, 1),(2, 2),(3, 3),(1, 4), (1,1),(3, 5), (2,2), (3,5), (3,3)")
result = tk.MustQuery("select avg(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select avg(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("2.5000", "2.0000", "4.0000"))
result = tk.MustQuery("select sum(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select sum(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("5", "2", "8"))
result = tk.MustQuery("select count(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select count(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("2", "1", "2"))
result = tk.MustQuery("select max(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select max(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("4", "2", "5"))
result = tk.MustQuery("select min(distinct b) from (select * from t1) t group by a")
result = tk.MustQuery("select min(distinct b) from (select * from t1) t group by a order by a")
result.Check(testkit.Rows("1", "2", "3"))
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key, ds date)")
Expand Down Expand Up @@ -274,12 +274,12 @@ func (s *testSuite) TestAggregation(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int(11), b decimal(15,2))")
tk.MustExec("insert into t values(1,771.64),(2,378.49),(3,920.92),(4,113.97)")
tk.MustQuery("select a, max(b) from t group by a limit 2").Check(testkit.Rows("1 771.64", "2 378.49"))
tk.MustQuery("select a, max(b) from t group by a order by a limit 2").Check(testkit.Rows("1 771.64", "2 378.49"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int(11), b char(15))")
tk.MustExec("insert into t values(1,771.64),(2,378.49),(3,920.92),(4,113.97)")
tk.MustQuery("select a, max(b) from t group by a limit 2").Check(testkit.Rows("1 771.64", "2 378.49"))
tk.MustQuery("select a, max(b) from t group by a order by a limit 2").Check(testkit.Rows("1 771.64", "2 378.49"))

// for issue #6014
tk.MustExec("use test")
Expand Down Expand Up @@ -347,19 +347,19 @@ func (s *testSuite) TestGroupConcatAggr(c *C) {
tk.MustExec("insert into test values(2, 20);")
tk.MustExec("insert into test values(3, 200);")
tk.MustExec("insert into test values(3, 500);")
result := tk.MustQuery("select id, group_concat(name) from test group by id")
result := tk.MustQuery("select id, group_concat(name) from test group by id order by id")
result.Check(testkit.Rows("1 10,20,30", "2 20", "3 200,500"))

result = tk.MustQuery("select id, group_concat(name SEPARATOR ';') from test group by id")
result = tk.MustQuery("select id, group_concat(name SEPARATOR ';') from test group by id order by id")
result.Check(testkit.Rows("1 10;20;30", "2 20", "3 200;500"))

result = tk.MustQuery("select id, group_concat(name SEPARATOR ',') from test group by id")
result = tk.MustQuery("select id, group_concat(name SEPARATOR ',') from test group by id order by id")
result.Check(testkit.Rows("1 10,20,30", "2 20", "3 200,500"))

result = tk.MustQuery(`select id, group_concat(name SEPARATOR '%') from test group by id`)
result = tk.MustQuery(`select id, group_concat(name SEPARATOR '%') from test group by id order by id`)
result.Check(testkit.Rows("1 10%20%30", "2 20", `3 200%500`))

result = tk.MustQuery("select id, group_concat(name SEPARATOR '') from test group by id")
result = tk.MustQuery("select id, group_concat(name SEPARATOR '') from test group by id order by id")
result.Check(testkit.Rows("1 102030", "2 20", "3 200500"))
}

Expand All @@ -384,7 +384,7 @@ func (s *testSuite) TestAggPushDown(c *C) {
tk.MustExec("insert into t values(1, 1, 1), (2, 1, 1)")
tk.MustExec("insert into tt values(1, 2, 1)")
tk.MustQuery("select max(a.b), max(b.b) from t a join tt b on a.a = b.a group by a.c").Check(testkit.Rows("1 2"))
tk.MustQuery("select a, count(b) from (select * from t union all select * from tt) k group by a").Check(testkit.Rows("1 2", "2 1"))
tk.MustQuery("select a, count(b) from (select * from t union all select * from tt) k group by a order by a").Check(testkit.Rows("1 2", "2 1"))
}

func (s *testSuite) TestOnlyFullGroupBy(c *C) {
Expand Down Expand Up @@ -518,8 +518,8 @@ func (s *testSuite) TestHaving(c *C) {
tk.MustQuery("select c1 as a from t group by c3 having sum(a) + a = 2;").Check(testkit.Rows("1"))
tk.MustQuery("select a.c1 as c, a.c1 as d from t as a, t as b having c1 = 1 limit 1;").Check(testkit.Rows("1 1"))

tk.MustQuery("select sum(c1) from t group by c1 having sum(c1)").Check(testkit.Rows("1", "2", "3"))
tk.MustQuery("select sum(c1) - 1 from t group by c1 having sum(c1) - 1").Check(testkit.Rows("1", "2"))
tk.MustQuery("select sum(c1) as s from t group by c1 having sum(c1) order by s").Check(testkit.Rows("1", "2", "3"))
tk.MustQuery("select sum(c1) - 1 as s from t group by c1 having sum(c1) - 1 order by s").Check(testkit.Rows("1", "2"))
tk.MustQuery("select 1 from t group by c1 having sum(abs(c2 + c3)) = c1").Check(testkit.Rows("1"))
}

Expand Down Expand Up @@ -566,7 +566,7 @@ func (s *testSuite) TestBuildProjBelowAgg(c *C) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (i int);")
tk.MustExec("insert into t values (1), (1), (1),(2),(3),(2),(3),(2),(3);")
rs := tk.MustQuery("select i+1, count(i+2), sum(i+3), group_concat(i+4), bit_or(i+5) from t group by i, hex(i+6)")
rs := tk.MustQuery("select i+1 as a, count(i+2), sum(i+3), group_concat(i+4), bit_or(i+5) from t group by i, hex(i+6) order by a")
rs.Check(testkit.Rows(
"2 3 12 5,5,5 6",
"3 3 15 6,6,6 7",
Expand Down
50 changes: 50 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
Expand Down Expand Up @@ -900,9 +901,58 @@ func (b *executorBuilder) buildHashAgg(v *plan.PhysicalHashAgg) Executor {
AggFuncs: make([]aggregation.Aggregation, 0, len(v.AggFuncs)),
GroupByItems: v.GroupByItems,
}
// We take `create table t(a int, b int);` as example.
//
// 1. If all the aggregation functions are FIRST_ROW, we do not need to set the defaultVal for them:
// e.g.
// mysql> select distinct a, b from t;
// 0 rows in set (0.00 sec)
//
// 2. If there exists group by items, we do not need to set the defaultVal for them either:
// e.g.
// mysql> select avg(a) from t group by b;
// Empty set (0.00 sec)
//
// mysql> select avg(a) from t group by a;
// +--------+
// | avg(a) |
// +--------+
// | NULL |
// +--------+
// 1 row in set (0.00 sec)
if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1)
}
for _, aggDesc := range v.AggFuncs {
if aggDesc.HasDistinct {
e.isUnparallelExec = true
}
}
// When we set both tidb_hashagg_final_concurrency and tidb_hashagg_partial_concurrency to 1,
// we do not need to parallelly execute hash agg,
// and this action can be a workaround when meeting some unexpected situation using parallelExec.
if finalCon, partialCon := sessionVars.HashAggFinalConcurrency, sessionVars.HashAggPartialConcurrency; finalCon <= 0 || partialCon <= 0 || finalCon == 1 && partialCon == 1 {
e.isUnparallelExec = true
}
for i, aggDesc := range v.AggFuncs {
if !e.isUnparallelExec {
if aggDesc.Mode == aggregation.CompleteMode {
aggDesc.Mode = aggregation.Partial1Mode
} else {
aggDesc.Mode = aggregation.Partial2Mode
}
}
e.AggFuncs = append(e.AggFuncs, aggDesc.GetAggFunc())
if e.defaultVal != nil {
value, existsDefaultValue := aggDesc.CalculateDefaultValue(e.ctx, e.children[0].Schema())
if existsDefaultValue {
e.defaultVal.AppendDatum(i, &value)
}
}
}

metrics.ExecutorCounter.WithLabelValues("HashAggExec").Inc()
return e
}
Expand Down
14 changes: 5 additions & 9 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,12 +822,8 @@ func (s *testSuite) TestUnion(c *C) {
testSQL = `insert union_test values (1),(2)`
tk.MustExec(testSQL)

testSQL = `select id from union_test union select id from union_test;`
r := tk.MustQuery(testSQL)
r.Check(testkit.Rows("1", "2"))

testSQL = `select * from (select id from union_test union select id from union_test) t order by id;`
r = tk.MustQuery(testSQL)
r := tk.MustQuery(testSQL)
r.Check(testkit.Rows("1", "2"))

r = tk.MustQuery("select 1 union all select 1")
Expand Down Expand Up @@ -895,7 +891,7 @@ func (s *testSuite) TestUnion(c *C) {
tk.MustExec("insert into t (c1, c2) values (1, 1)")
tk.MustExec("insert into t (c1, c2) values (1, 2)")
tk.MustExec("insert into t (c1, c2) values (2, 3)")
r = tk.MustQuery("select * from t where t.c1 = 1 union select * from t where t.id = 1")
r = tk.MustQuery("select * from (select * from t where t.c1 = 1 union select * from t where t.id = 1) s order by s.id")
r.Check(testkit.Rows("1 1 1", "2 1 2"))

tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -1865,9 +1861,9 @@ func (s *testSuite) TestSimpleDAG(c *C) {
tk.MustQuery("select a from t where b > 1 and a < 3").Check(testkit.Rows())
tk.MustQuery("select count(*) from t where b > 1 and a < 3").Check(testkit.Rows("0"))
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
tk.MustQuery("select count(*), c from t group by c").Check(testkit.Rows("2 1", "1 2", "1 3"))
tk.MustQuery("select sum(c) from t group by b").Check(testkit.Rows("4", "3"))
tk.MustQuery("select avg(a) from t group by b").Check(testkit.Rows("2.0000", "4.0000"))
tk.MustQuery("select count(*), c from t group by c order by c").Check(testkit.Rows("2 1", "1 2", "1 3"))
tk.MustQuery("select sum(c) as s from t group by b order by s").Check(testkit.Rows("3", "4"))
tk.MustQuery("select avg(a) as s from t group by b order by s").Check(testkit.Rows("2.0000", "4.0000"))
tk.MustQuery("select sum(distinct c) from t group by b").Check(testkit.Rows("3", "3"))

tk.MustExec("create index i on t(c,b)")
Expand Down
Loading

0 comments on commit dd37138

Please sign in to comment.