Skip to content

Commit

Permalink
planner: add AGG_TO_COP hint (#12043)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreyes authored Sep 12, 2019
1 parent c153a5f commit c7518de
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 32 deletions.
45 changes: 38 additions & 7 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,16 @@ func (p *baseLogicalPlan) exhaustPhysicalPlans(_ *property.PhysicalProperty) []P
panic("baseLogicalPlan.exhaustPhysicalPlans() should never be called.")
}

func (la *LogicalAggregation) canPushToCop() bool {
// At present, only Aggregation, Limit, TopN can be pushed to cop task, and Projection will be supported in the future.
// When we push task to coprocessor, finishCopTask will close the cop task and create a root task in the current implementation.
// Thus, we can't push two different tasks to coprocessor now, and can only push task to coprocessor when the child is Datasource.

// TODO: develop this function after supporting push several tasks to coprecessor and supporting Projection to coprocessor.
_, ok := la.children[0].(*DataSource)
return ok
}

func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
_, desc := prop.AllSameOrder()
enforcedAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes))
Expand All @@ -1280,7 +1290,11 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope
Items: property.ItemsFromCols(la.groupByCols, desc),
}

for _, taskTp := range wholeTaskTypes {
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType}
if !la.aggHints.preferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
}
for _, taskTp := range taskTypes {
copiedChildProperty := new(property.PhysicalProperty)
*copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field.
copiedChildProperty.TaskTp = taskTp
Expand Down Expand Up @@ -1329,7 +1343,11 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P

// The table read of "CopDoubleReadTaskType" can't promises the sort
// property that the stream aggregation required, no need to consider.
for _, taskTp := range []property.TaskType{property.CopSingleReadTaskType, property.RootTaskType} {
taskTypes := []property.TaskType{property.CopSingleReadTaskType}
if !la.aggHints.preferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
}
for _, taskTp := range taskTypes {
copiedChildProperty := new(property.PhysicalProperty)
*copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field.
copiedChildProperty.TaskTp = taskTp
Expand All @@ -1344,7 +1362,7 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
}
// If STREAM_AGG hint is existed, it should consider enforce stream aggregation,
// because we can't trust possibleChildProperty completely.
if (la.preferAggType & preferStreamAgg) > 0 {
if (la.aggHints.preferAggType & preferStreamAgg) > 0 {
streamAggs = append(streamAggs, la.getEnforcedStreamAggs(prop)...)
}
return streamAggs
Expand All @@ -1355,7 +1373,11 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
return nil
}
hashAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes))
for _, taskTp := range wholeTaskTypes {
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType}
if !la.aggHints.preferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
}
for _, taskTp := range taskTypes {
agg := basePhysicalAgg{
GroupByItems: la.GroupByItems,
AggFuncs: la.AggFuncs,
Expand All @@ -1367,13 +1389,22 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
}

func (la *LogicalAggregation) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
preferHash := (la.preferAggType & preferHashAgg) > 0
preferStream := (la.preferAggType & preferStreamAgg) > 0
if la.aggHints.preferAggToCop {
if !la.canPushToCop() {
errMsg := "Optimizer Hint AGG_TO_COP is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
la.aggHints.preferAggToCop = false
}
}

preferHash := (la.aggHints.preferAggType & preferHashAgg) > 0
preferStream := (la.aggHints.preferAggType & preferStreamAgg) > 0
if preferHash && preferStream {
errMsg := "Optimizer aggregation hints are conflicted"
warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
la.preferAggType = 0
la.aggHints.preferAggType = 0
preferHash, preferStream = false, false
}

Expand Down
6 changes: 3 additions & 3 deletions planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (er *expressionRewriter) handleCompareSubquery(ctx context.Context, v *ast.
func (er *expressionRewriter) handleOtherComparableSubq(lexpr, rexpr expression.Expression, np LogicalPlan, useMin bool, cmpFunc string, all bool) {
plan4Agg := LogicalAggregation{}.Init(er.sctx, er.b.getSelectOffset())
if hint := er.b.TableHints(); hint != nil {
plan4Agg.preferAggType = hint.preferAggType
plan4Agg.aggHints = hint.aggHints
}
plan4Agg.SetChildren(np)

Expand Down Expand Up @@ -571,7 +571,7 @@ func (er *expressionRewriter) handleNEAny(lexpr, rexpr expression.Expression, np
AggFuncs: []*aggregation.AggFuncDesc{firstRowFunc, countFunc},
}.Init(er.sctx, er.b.getSelectOffset())
if hint := er.b.TableHints(); hint != nil {
plan4Agg.preferAggType = hint.preferAggType
plan4Agg.aggHints = hint.aggHints
}
plan4Agg.SetChildren(np)
firstRowResultCol := &expression.Column{
Expand Down Expand Up @@ -608,7 +608,7 @@ func (er *expressionRewriter) handleEQAll(lexpr, rexpr expression.Expression, np
AggFuncs: []*aggregation.AggFuncDesc{firstRowFunc, countFunc},
}.Init(er.sctx, er.b.getSelectOffset())
if hint := er.b.TableHints(); hint != nil {
plan4Agg.preferAggType = hint.preferAggType
plan4Agg.aggHints = hint.aggHints
}
plan4Agg.SetChildren(np)
firstRowResultCol := &expression.Column{
Expand Down
18 changes: 11 additions & 7 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ const (
HintStreamAgg = "stream_agg"
// HintIndex is hint enforce using some indexes.
HintIndex = "index"
// HintAggToCop is hint enforce pushing aggregation to coprocessor.
HintAggToCop = "agg_to_cop"
)

const (
Expand Down Expand Up @@ -98,7 +100,7 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu

plan4Agg := LogicalAggregation{AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(aggFuncList))}.Init(b.ctx, b.getSelectOffset())
if hint := b.TableHints(); hint != nil {
plan4Agg.preferAggType = hint.preferAggType
plan4Agg.aggHints = hint.aggHints
}
schema4Agg := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncList)+p.Schema().Len())...)
// aggIdxMap maps the old index to new index after applying common aggregation functions elimination.
Expand Down Expand Up @@ -794,7 +796,7 @@ func (b *PlanBuilder) buildDistinct(child LogicalPlan, length int) (*LogicalAggr
GroupByItems: expression.Column2Exprs(child.Schema().Clone().Columns[:length]),
}.Init(b.ctx, child.SelectBlockOffset())
if hint := b.TableHints(); hint != nil {
plan4Agg.preferAggType = hint.preferAggType
plan4Agg.aggHints = hint.aggHints
}
plan4Agg.collectGroupByColumns()
for _, col := range child.Schema().Columns {
Expand Down Expand Up @@ -1957,7 +1959,7 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType n
var (
sortMergeTables, INLJTables, hashJoinTables []hintTableInfo
indexHintList []indexHintInfo
preferAggType uint
aggHints aggHintInfo
)
for _, hint := range hints {
switch hint.HintName.L {
Expand All @@ -1968,9 +1970,11 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType n
case TiDBHashJoin, HintHJ:
hashJoinTables = append(hashJoinTables, tableNames2HintTableInfo(hint.Tables, b.hintProcessor, nodeType, currentLevel)...)
case HintHashAgg:
preferAggType |= preferHashAgg
aggHints.preferAggType |= preferHashAgg
case HintStreamAgg:
preferAggType |= preferStreamAgg
aggHints.preferAggType |= preferStreamAgg
case HintAggToCop:
aggHints.preferAggToCop = true
case HintIndex:
if len(hint.Tables) != 0 {
indexHintList = append(indexHintList, indexHintInfo{
Expand All @@ -1986,13 +1990,13 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType n
// ignore hints that not implemented
}
}
if len(sortMergeTables)+len(INLJTables)+len(hashJoinTables)+len(indexHintList) > 0 || preferAggType != 0 {
if len(sortMergeTables)+len(INLJTables)+len(hashJoinTables)+len(indexHintList) > 0 || aggHints.preferAggType != 0 || aggHints.preferAggToCop {
b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{
sortMergeJoinTables: sortMergeTables,
indexNestedLoopJoinTables: INLJTables,
hashJoinTables: hashJoinTables,
indexHintList: indexHintList,
preferAggType: preferAggType,
aggHints: aggHints,
})
return true
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ type LogicalAggregation struct {
// groupByCols stores the columns that are group-by items.
groupByCols []*expression.Column

// preferAggType stores preferred aggregation algorithm type.
preferAggType uint
// aggHints stores aggregation hint information.
aggHints aggHintInfo

possibleProperties [][]*expression.Column
inputCount float64 // inputCount is the input count of this plan.
Expand Down
64 changes: 64 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,70 @@ func (s *testPlanSuite) TestAggregationHints(c *C) {
}
}

func (s *testPlanSuite) TestAggToCopHint(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
defer func() {
dom.Close()
store.Close()
}()
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test")
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "insert into mysql.opt_rule_blacklist values(\"aggregation_eliminate\")")
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "admin reload opt_rule_blacklist")
c.Assert(err, IsNil)

tests := []struct {
sql string
best string
warning string
}{
{
sql: "select /*+ AGG_TO_COP(), HASH_AGG(), INDEX(t) */ sum(a) from t group by a",
best: "TableReader(Table(t)->HashAgg)->HashAgg",
},
{
sql: "select /*+ AGG_TO_COP(), INDEX(t) */ sum(b) from t group by b",
best: "TableReader(Table(t)->HashAgg)->HashAgg",
},
{
sql: "select /*+ AGG_TO_COP(), HASH_AGG(), INDEX(t) */ distinct a from t group by a",
best: "TableReader(Table(t)->HashAgg)->HashAgg->HashAgg",
warning: "[planner:1815]Optimizer Hint AGG_TO_COP is inapplicable",
},
{
sql: "select /*+ AGG_TO_COP(), HASH_AGG(), HASH_JOIN(t1), INDEX(t1), INDEX(t2) */ sum(t1.a) from t t1, t t2 where t1.a = t2.b group by t1.a",
best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t1.a,test.t2.b)->Projection->HashAgg",
warning: "[planner:1815]Optimizer Hint AGG_TO_COP is inapplicable",
},
}
ctx := context.Background()
for i, test := range tests {
comment := Commentf("case:%v sql:%s", i, test)
se.GetSessionVars().StmtCtx.SetWarnings(nil)

stmt, err := s.ParseOneStmt(test.sql, "", "")
c.Assert(err, IsNil, comment)

p, err := planner.Optimize(ctx, se, stmt, s.is)
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, test.best, comment)

warnings := se.GetSessionVars().StmtCtx.GetWarnings()
if test.warning == "" {
c.Assert(len(warnings), Equals, 0, comment)
} else {
c.Assert(len(warnings), Equals, 1, comment)
c.Assert(warnings[0].Level, Equals, stmtctx.WarnLevelWarning, comment)
c.Assert(warnings[0].Err.Error(), Equals, test.warning, comment)
}
}
}

func (s *testPlanSuite) TestHintAlias(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
Expand Down
7 changes: 6 additions & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type tableHintInfo struct {
sortMergeJoinTables []hintTableInfo
hashJoinTables []hintTableInfo
indexHintList []indexHintInfo
preferAggType uint
aggHints aggHintInfo
}

type hintTableInfo struct {
Expand All @@ -74,6 +74,11 @@ type indexHintInfo struct {
indexHint *ast.IndexHint
}

type aggHintInfo struct {
preferAggType uint
preferAggToCop bool
}

func tableNames2HintTableInfo(hintTables []ast.HintTable, p *BlockHintProcessor, nodeType nodeType, currentOffset int) []hintTableInfo {
if len(hintTables) == 0 {
return nil
Expand Down
24 changes: 12 additions & 12 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (a *aggregationPushDownSolver) decompose(ctx sessionctx.Context, aggFunc *a
// tryToPushDownAgg tries to push down an aggregate function into a join path. If all aggFuncs are first row, we won't
// process it temporarily. If not, We will add additional group by columns and first row functions. We make a new aggregation operator.
// If the pushed aggregation is grouped by unique key, it's no need to push it down.
func (a *aggregationPushDownSolver) tryToPushDownAgg(aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column, join *LogicalJoin, childIdx int, preferAggType uint, blockOffset int) (_ LogicalPlan, err error) {
func (a *aggregationPushDownSolver) tryToPushDownAgg(aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column, join *LogicalJoin, childIdx int, aggHints aggHintInfo, blockOffset int) (_ LogicalPlan, err error) {
child := join.children[childIdx]
if aggregation.IsAllFirstRow(aggFuncs) {
return child, nil
Expand All @@ -204,7 +204,7 @@ func (a *aggregationPushDownSolver) tryToPushDownAgg(aggFuncs []*aggregation.Agg
return child, nil
}
}
agg, err := a.makeNewAgg(join.ctx, aggFuncs, gbyCols, preferAggType, blockOffset)
agg, err := a.makeNewAgg(join.ctx, aggFuncs, gbyCols, aggHints, blockOffset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,11 +247,11 @@ func (a *aggregationPushDownSolver) checkAnyCountAndSum(aggFuncs []*aggregation.
return false
}

func (a *aggregationPushDownSolver) makeNewAgg(ctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column, preferAggType uint, blockOffset int) (*LogicalAggregation, error) {
func (a *aggregationPushDownSolver) makeNewAgg(ctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column, aggHints aggHintInfo, blockOffset int) (*LogicalAggregation, error) {
agg := LogicalAggregation{
GroupByItems: expression.Column2Exprs(gbyCols),
groupByCols: gbyCols,
preferAggType: preferAggType,
GroupByItems: expression.Column2Exprs(gbyCols),
groupByCols: gbyCols,
aggHints: aggHints,
}.Init(ctx, blockOffset)
aggLen := len(aggFuncs) + len(gbyCols)
newAggFuncDescs := make([]*aggregation.AggFuncDesc, 0, aggLen)
Expand Down Expand Up @@ -283,9 +283,9 @@ func (a *aggregationPushDownSolver) makeNewAgg(ctx sessionctx.Context, aggFuncs
func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) LogicalPlan {
ctx := agg.ctx
newAgg := LogicalAggregation{
AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)),
GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)),
preferAggType: agg.preferAggType,
AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)),
GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)),
aggHints: agg.aggHints,
}.Init(ctx, agg.blockOffset)
newAgg.SetSchema(agg.schema.Clone())
for _, aggFunc := range agg.AggFuncs {
Expand Down Expand Up @@ -342,15 +342,15 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan) (_ LogicalPlan, e
if rightInvalid {
rChild = join.children[1]
} else {
rChild, err = a.tryToPushDownAgg(rightAggFuncs, rightGbyCols, join, 1, agg.preferAggType, agg.blockOffset)
rChild, err = a.tryToPushDownAgg(rightAggFuncs, rightGbyCols, join, 1, agg.aggHints, agg.blockOffset)
if err != nil {
return nil, err
}
}
if leftInvalid {
lChild = join.children[0]
} else {
lChild, err = a.tryToPushDownAgg(leftAggFuncs, leftGbyCols, join, 0, agg.preferAggType, agg.blockOffset)
lChild, err = a.tryToPushDownAgg(leftAggFuncs, leftGbyCols, join, 0, agg.aggHints, agg.blockOffset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan) (_ LogicalPlan, e
} else if union, ok1 := child.(*LogicalUnionAll); ok1 {
var gbyCols []*expression.Column
gbyCols = expression.ExtractColumnsFromExpressions(gbyCols, agg.GroupByItems, nil)
pushedAgg, err := a.makeNewAgg(agg.ctx, agg.AggFuncs, gbyCols, agg.preferAggType, agg.blockOffset)
pushedAgg, err := a.makeNewAgg(agg.ctx, agg.AggFuncs, gbyCols, agg.aggHints, agg.blockOffset)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit c7518de

Please sign in to comment.