Skip to content

Commit

Permalink
planner: consider agg func type in cost model (#12038)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 authored and sre-bot committed Sep 9, 2019
1 parent 15557df commit bb5bfa4
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 57 deletions.
53 changes: 23 additions & 30 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,13 @@ StreamAgg_12 1.00 root funcs:sum(col_0)
explain select c1 from t1 where c1 in (select c2 from t2);
id count task operator info
Projection_9 9990.00 root test.t1.c1
└─HashLeftJoin_17 9990.00 root inner join, inner:HashAgg_24, equal:[eq(test.t1.c1, test.t2.c2)]
└─HashLeftJoin_17 9990.00 root inner join, inner:HashAgg_21, equal:[eq(test.t1.c1, test.t2.c2)]
├─TableReader_30 10000.00 root data:TableScan_29
│ └─TableScan_29 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─HashAgg_24 7992.00 root group by:col_1, funcs:firstrow(col_1)
└─TableReader_25 7992.00 root data:HashAgg_19
└─HashAgg_19 7992.00 cop group by:test.t2.c2,
└─Selection_23 9990.00 cop not(isnull(test.t2.c2))
└─TableScan_22 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
└─HashAgg_21 7992.00 root group by:test.t2.c2, funcs:firstrow(test.t2.c2)
└─TableReader_28 9990.00 root data:Selection_27
└─Selection_27 9990.00 cop not(isnull(test.t2.c2))
└─TableScan_26 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
explain select (select count(1) k from t1 s where s.c1 = t1.c1 having k != 0) from t1;
id count task operator info
Projection_12 10000.00 root ifnull(5_col_0, 0)
Expand Down Expand Up @@ -165,32 +164,27 @@ id count task operator info
Union_17 26000.00 root
├─HashAgg_21 16000.00 root group by:c1, funcs:firstrow(join_agg_0)
│ └─Union_22 16000.00 root
│ ├─StreamAgg_34 8000.00 root group by:col_2, funcs:firstrow(col_2), firstrow(col_2)
│ │ └─IndexReader_35 8000.00 root index:StreamAgg_26
│ │ └─StreamAgg_26 8000.00 cop group by:test.t2.c1,
│ │ └─IndexScan_33 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
│ └─StreamAgg_49 8000.00 root group by:col_2, funcs:firstrow(col_2), firstrow(col_2)
│ └─IndexReader_50 8000.00 root index:StreamAgg_41
│ └─StreamAgg_41 8000.00 cop group by:test.t2.c1,
│ └─IndexScan_48 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
│ ├─StreamAgg_27 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ │ └─IndexReader_37 10000.00 root index:IndexScan_36
│ │ └─IndexScan_36 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
│ └─StreamAgg_42 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexReader_52 10000.00 root index:IndexScan_51
│ └─IndexScan_51 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─TableReader_55 10000.00 root data:TableScan_54
└─TableScan_54 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
explain select c1 from t2 union all select c1 from t2 union select c1 from t2;
id count task operator info
HashAgg_18 24000.00 root group by:c1, funcs:firstrow(join_agg_0)
└─Union_19 24000.00 root
├─StreamAgg_31 8000.00 root group by:col_2, funcs:firstrow(col_2), firstrow(col_2)
│ └─IndexReader_32 8000.00 root index:StreamAgg_23
│ └─StreamAgg_23 8000.00 cop group by:test.t2.c1,
│ └─IndexScan_30 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
├─StreamAgg_46 8000.00 root group by:col_2, funcs:firstrow(col_2), firstrow(col_2)
│ └─IndexReader_47 8000.00 root index:StreamAgg_38
│ └─StreamAgg_38 8000.00 cop group by:test.t2.c1,
│ └─IndexScan_45 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─StreamAgg_61 8000.00 root group by:col_2, funcs:firstrow(col_2), firstrow(col_2)
└─IndexReader_62 8000.00 root index:StreamAgg_53
└─StreamAgg_53 8000.00 cop group by:test.t2.c1,
└─IndexScan_60 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
├─StreamAgg_24 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexReader_34 10000.00 root index:IndexScan_33
│ └─IndexScan_33 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
├─StreamAgg_39 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexReader_49 10000.00 root index:IndexScan_48
│ └─IndexScan_48 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─StreamAgg_54 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
└─IndexReader_64 10000.00 root index:IndexScan_63
└─IndexScan_63 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
explain select count(1) from (select count(1) from (select * from t1 where c3 = 100) k) k2;
id count task operator info
StreamAgg_13 1.00 root funcs:count(1)
Expand All @@ -216,10 +210,9 @@ StreamAgg_11 1.00 root funcs:count(1)
explain select count(1) from (select count(c2) from t1 group by c3) k;
id count task operator info
StreamAgg_11 1.00 root funcs:count(1)
└─HashAgg_23 8000.00 root group by:col_1, funcs:firstrow(col_0)
└─TableReader_24 8000.00 root data:HashAgg_20
└─HashAgg_20 8000.00 cop group by:test.t1.c3, funcs:firstrow(1)
└─TableScan_15 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─HashAgg_22 8000.00 root group by:test.t1.c3, funcs:firstrow(1)
└─TableReader_19 10000.00 root data:TableScan_18
└─TableScan_18 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
set @@session.tidb_opt_insubq_to_join_and_agg=0;
explain select sum(t1.c1 in (select c1 from t2)) from t1;
id count task operator info
Expand Down
11 changes: 5 additions & 6 deletions cmd/explaintest/r/explain_easy_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,13 @@ IndexLookUp_9 0.00 root
explain select count(b.c2) from t1 a, t2 b where a.c1 = b.c2 group by a.c1;
id count task operator info
Projection_11 1985.00 root cast(join_agg_0)
└─HashLeftJoin_19 1985.00 root inner join, inner:HashAgg_26, equal:[eq(test.a.c1, test.b.c2)]
└─HashLeftJoin_19 1985.00 root inner join, inner:HashAgg_23, equal:[eq(test.a.c1, test.b.c2)]
├─TableReader_32 1999.00 root data:TableScan_31
│ └─TableScan_31 1999.00 cop table:a, range:[-inf,+inf], keep order:false
└─HashAgg_26 1985.00 root group by:col_2, funcs:count(col_0), firstrow(col_2)
└─TableReader_27 1985.00 root data:HashAgg_21
└─HashAgg_21 1985.00 cop group by:test.b.c2, funcs:count(test.b.c2)
└─Selection_25 1985.00 cop not(isnull(test.b.c2))
└─TableScan_24 1985.00 cop table:b, range:[-inf,+inf], keep order:false
└─HashAgg_23 1985.00 root group by:test.b.c2, funcs:count(test.b.c2), firstrow(test.b.c2)
└─TableReader_30 1985.00 root data:Selection_29
└─Selection_29 1985.00 cop not(isnull(test.b.c2))
└─TableScan_28 1985.00 cop table:b, range:[-inf,+inf], keep order:false
explain select * from t2 order by t2.c2 limit 0, 1;
id count task operator info
TopN_7 1.00 root test.t2.c2:asc, offset:0, count:1
Expand Down
7 changes: 3 additions & 4 deletions cmd/explaintest/r/index_join.result
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ Projection_8 10000.00 root test.t1.a, test.t1.b
├─IndexLookUp_11 10.00 root
│ ├─IndexScan_9 10.00 cop table:t1, index:a, range: decided by [eq(test.t1.a, test.t2.a)], keep order:false, stats:pseudo
│ └─TableScan_10 10.00 cop table:t1, keep order:false, stats:pseudo
└─StreamAgg_29 8000.00 root group by:col_1, funcs:firstrow(col_1)
└─IndexReader_30 8000.00 root index:StreamAgg_21
└─StreamAgg_21 8000.00 cop group by:test.t2.a,
└─IndexScan_28 10000.00 cop table:t2, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
└─StreamAgg_22 8000.00 root group by:test.t2.a, funcs:firstrow(test.t2.a)
└─IndexReader_32 10000.00 root index:IndexScan_31
└─IndexScan_31 10000.00 cop table:t2, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
2 changes: 1 addition & 1 deletion planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (s *testAnalyzeSuite) TestEmptyTable(c *C) {
},
{
sql: "select * from t where c1 in (select c1 from t1)",
best: "LeftHashJoin{TableReader(Table(t)->Sel([not(isnull(test.t.c1))]))->TableReader(Table(t1)->Sel([not(isnull(test.t1.c1))])->HashAgg)->HashAgg}(test.t.c1,test.t1.c1)->Projection",
best: "LeftHashJoin{TableReader(Table(t)->Sel([not(isnull(test.t.c1))]))->TableReader(Table(t1)->Sel([not(isnull(test.t1.c1))]))->HashAgg}(test.t.c1,test.t1.c1)->Projection",
},
{
sql: "select * from t, t1 where t.c1 = t1.c1",
Expand Down
19 changes: 19 additions & 0 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core
import (
"math"

"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -43,6 +44,24 @@ const (
distinctFactor = 0.8
)

var aggFuncFactor = map[string]float64{
ast.AggFuncCount: 1.0,
ast.AggFuncSum: 1.0,
ast.AggFuncAvg: 2.0,
ast.AggFuncFirstRow: 0.1,
ast.AggFuncMax: 1.0,
ast.AggFuncMin: 1.0,
ast.AggFuncGroupConcat: 1.0,
ast.AggFuncBitOr: 0.9,
ast.AggFuncBitXor: 0.9,
ast.AggFuncBitAnd: 0.9,
ast.AggFuncVarPop: 3.0,
ast.AggFuncVarSamp: 3.0,
ast.AggFuncStddevPop: 3.0,
ast.AggFuncStddevSamp: 3.0,
"default": 1.5,
}

// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get
// these tasks one by one.
var wholeTaskTypes = [...]property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType, property.RootTaskType}
Expand Down
6 changes: 3 additions & 3 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderAgg(c *C) {
// Test distinct.
{
sql: "select distinct b from t",
best: "TableReader(Table(t)->HashAgg)->HashAgg",
best: "TableReader(Table(t))->HashAgg",
},
{
sql: "select count(*) from (select * from t order by b) t group by b",
Expand All @@ -868,7 +868,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderAgg(c *C) {
// Test agg + table.
{
sql: "select sum(a), avg(b + c) from t group by d",
best: "TableReader(Table(t))->Projection->HashAgg",
best: "TableReader(Table(t)->HashAgg)->HashAgg",
},
{
sql: "select sum(distinct a), avg(b + c) from t group by d",
Expand Down Expand Up @@ -1639,7 +1639,7 @@ func (s *testPlanSuite) TestAggregationHints(c *C) {
// additional test
{
sql: "select /*+ STREAM_AGG() */ distinct a from t",
best: "TableReader(Table(t)->StreamAgg)->StreamAgg",
best: "TableReader(Table(t))->StreamAgg",
},
{
sql: "select /*+ HASH_AGG() */ t1.a from t t1 where t1.a < any(select t2.b from t t2)",
Expand Down
15 changes: 15 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,21 @@ func (p *basePhysicalAgg) numDistinctFunc() (num int) {
return
}

func (p *basePhysicalAgg) getAggFuncCostFactor() (factor float64) {
factor = 0.0
for _, agg := range p.AggFuncs {
if fac, ok := aggFuncFactor[agg.Name]; ok {
factor += fac
} else {
factor += aggFuncFactor["default"]
}
}
if factor == 0 {
factor = 1.0
}
return
}

// PhysicalHashAgg is hash operator of aggregate.
type PhysicalHashAgg struct {
basePhysicalAgg
Expand Down
20 changes: 7 additions & 13 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,15 +877,12 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {

// GetCost computes cost of stream aggregation considering CPU/memory.
func (p *PhysicalStreamAgg) GetCost(inputRows float64, isRoot bool) float64 {
numAggFunc := len(p.AggFuncs)
if numAggFunc == 0 {
numAggFunc = 1
}
aggFuncFactor := p.getAggFuncCostFactor()
var cpuCost float64
if isRoot {
cpuCost = inputRows * cpuFactor * float64(numAggFunc)
cpuCost = inputRows * cpuFactor * aggFuncFactor
} else {
cpuCost = inputRows * copCPUFactor * float64(numAggFunc)
cpuCost = inputRows * copCPUFactor * aggFuncFactor
}
rowsPerGroup := inputRows / p.statsInfo().RowCount
memoryCost := rowsPerGroup * distinctFactor * memoryFactor * float64(p.numDistinctFunc())
Expand Down Expand Up @@ -958,23 +955,20 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task {
func (p *PhysicalHashAgg) GetCost(inputRows float64, isRoot bool) float64 {
cardinality := p.statsInfo().RowCount
numDistinctFunc := p.numDistinctFunc()
numAggFunc := len(p.AggFuncs)
if numAggFunc == 0 {
numAggFunc = 1
}
aggFuncFactor := p.getAggFuncCostFactor()
var cpuCost float64
if isRoot {
cpuCost = inputRows * cpuFactor * float64(numAggFunc)
cpuCost = inputRows * cpuFactor * aggFuncFactor
divisor, con := p.cpuCostDivisor(numDistinctFunc > 0)
if divisor > 0 {
cpuCost /= divisor
// Cost of additional goroutines.
cpuCost += (con + 1) * concurrencyFactor
}
} else {
cpuCost = inputRows * copCPUFactor * float64(numAggFunc)
cpuCost = inputRows * copCPUFactor * aggFuncFactor
}
memoryCost := cardinality * memoryFactor * float64(numAggFunc)
memoryCost := cardinality * memoryFactor * float64(len(p.AggFuncs))
// When aggregation has distinct flag, we would allocate a map for each group to
// check duplication.
memoryCost += inputRows * distinctFactor * memoryFactor * float64(numDistinctFunc)
Expand Down

0 comments on commit bb5bfa4

Please sign in to comment.