Skip to content

Commit

Permalink
hint: refactor PlanHints (#50768)
Browse files Browse the repository at this point in the history
ref #48875
  • Loading branch information
hawkingrei authored Jan 29, 2024
1 parent dc54d6c commit 1760a26
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 63 deletions.
28 changes: 14 additions & 14 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -3101,7 +3101,7 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope
if !la.canPushToCop(kv.TiKV) || !la.SCtx().GetSessionVars().AllowDistinctAggPushDown {
taskTypes = []property.TaskType{property.RootTaskType}
}
} else if !la.aggHints.PreferAggToCop {
} else if !la.PreferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
}
for _, taskTp := range taskTypes {
Expand Down Expand Up @@ -3182,7 +3182,7 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
} else if !la.distinctArgsMeetsProperty() {
continue
}
} else if !la.aggHints.PreferAggToCop {
} else if !la.PreferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
}
if !la.canPushToCop(kv.TiKV) && !la.canPushToCop(kv.TiFlash) {
Expand All @@ -3208,7 +3208,7 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
}
// If STREAM_AGG hint is existed, it should consider enforce stream aggregation,
// because we can't trust possibleChildProperty completely.
if (la.aggHints.PreferAggType & h.PreferStreamAgg) > 0 {
if (la.PreferAggType & h.PreferStreamAgg) > 0 {
streamAggs = append(streamAggs, la.getEnforcedStreamAggs(prop)...)
}
return streamAggs
Expand Down Expand Up @@ -3342,9 +3342,9 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert
// handle MPP Agg hints
var preferMode AggMppRunMode
var prefer bool
if la.aggHints.PreferAggType&h.PreferMPP1PhaseAgg > 0 {
if la.PreferAggType&h.PreferMPP1PhaseAgg > 0 {
preferMode, prefer = Mpp1Phase, true
} else if la.aggHints.PreferAggType&h.PreferMPP2PhaseAgg > 0 {
} else if la.PreferAggType&h.PreferMPP2PhaseAgg > 0 {
preferMode, prefer = Mpp2Phase, true
}
if prefer {
Expand Down Expand Up @@ -3386,7 +3386,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
// if variable does allow DistinctAggPushDown, but OP itself can't be pushed down to tikv, just produce root task type.
taskTypes = []property.TaskType{property.RootTaskType}
}
} else if !la.aggHints.PreferAggToCop {
} else if !la.PreferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
}
if !la.canPushToCop(kv.TiKV) && !canPushDownToTiFlash {
Expand All @@ -3397,11 +3397,11 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
} else {
hasMppHints := false
var errMsg string
if la.aggHints.PreferAggType&h.PreferMPP1PhaseAgg > 0 {
if la.PreferAggType&h.PreferMPP1PhaseAgg > 0 {
errMsg = "The agg can not push down to the MPP side, the MPP_1PHASE_AGG() hint is invalid"
hasMppHints = true
}
if la.aggHints.PreferAggType&h.PreferMPP2PhaseAgg > 0 {
if la.PreferAggType&h.PreferMPP2PhaseAgg > 0 {
errMsg = "The agg can not push down to the MPP side, the MPP_2PHASE_AGG() hint is invalid"
hasMppHints = true
}
Expand All @@ -3428,25 +3428,25 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
return hashAggs
}

// ResetHintIfConflicted resets the aggHints.PreferAggType if they are conflicted,
// ResetHintIfConflicted resets the PreferAggType if they are conflicted,
// and returns the two PreferAggType hints.
func (la *LogicalAggregation) ResetHintIfConflicted() (preferHash bool, preferStream bool) {
preferHash = (la.aggHints.PreferAggType & h.PreferHashAgg) > 0
preferStream = (la.aggHints.PreferAggType & h.PreferStreamAgg) > 0
preferHash = (la.PreferAggType & h.PreferHashAgg) > 0
preferStream = (la.PreferAggType & h.PreferStreamAgg) > 0
if preferHash && preferStream {
la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer aggregation hints are conflicted")
la.aggHints.PreferAggType = 0
la.PreferAggType = 0
preferHash, preferStream = false, false
}
return
}

func (la *LogicalAggregation) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
if la.aggHints.PreferAggToCop {
if la.PreferAggToCop {
if !la.canPushToCop(kv.TiKV) {
la.SCtx().GetSessionVars().StmtCtx.SetHintWarning(
"Optimizer Hint AGG_TO_COP is inapplicable")
la.aggHints.PreferAggToCop = false
la.PreferAggToCop = false
}
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,9 @@ func (er *expressionRewriter) handleCompareSubquery(ctx context.Context, planCtx
func (er *expressionRewriter) handleOtherComparableSubq(planCtx *exprRewriterPlanCtx, lexpr, rexpr expression.Expression, np LogicalPlan, useMin bool, cmpFunc string, all, markNoDecorrelate bool) {
intest.AssertNotNil(planCtx)
plan4Agg := LogicalAggregation{}.Init(planCtx.builder.ctx, planCtx.builder.getSelectOffset())
if hint := planCtx.builder.TableHints(); hint != nil {
plan4Agg.aggHints = hint.Agg
if hintinfo := planCtx.builder.TableHints(); hintinfo != nil {
plan4Agg.PreferAggType = hintinfo.PreferAggType
plan4Agg.PreferAggToCop = hintinfo.PreferAggToCop
}
plan4Agg.SetChildren(np)

Expand Down Expand Up @@ -886,8 +887,9 @@ func (er *expressionRewriter) handleNEAny(planCtx *exprRewriterPlanCtx, lexpr, r
plan4Agg := LogicalAggregation{
AggFuncs: []*aggregation.AggFuncDesc{maxFunc, countFunc},
}.Init(sctx, planCtx.builder.getSelectOffset())
if hint := planCtx.builder.TableHints(); hint != nil {
plan4Agg.aggHints = hint.Agg
if hintinfo := planCtx.builder.TableHints(); hintinfo != nil {
plan4Agg.PreferAggType = hintinfo.PreferAggType
plan4Agg.PreferAggToCop = hintinfo.PreferAggToCop
}
plan4Agg.SetChildren(np)
maxResultCol := &expression.Column{
Expand Down Expand Up @@ -925,8 +927,9 @@ func (er *expressionRewriter) handleEQAll(planCtx *exprRewriterPlanCtx, lexpr, r
plan4Agg := LogicalAggregation{
AggFuncs: []*aggregation.AggFuncDesc{firstRowFunc, countFunc},
}.Init(sctx, planCtx.builder.getSelectOffset())
if hint := planCtx.builder.TableHints(); hint != nil {
plan4Agg.aggHints = hint.Agg
if hintinfo := planCtx.builder.TableHints(); hintinfo != nil {
plan4Agg.PreferAggType = hintinfo.PreferAggType
plan4Agg.PreferAggToCop = hintinfo.PreferAggToCop
}
plan4Agg.SetChildren(np)
plan4Agg.names = append(plan4Agg.names, types.EmptyName)
Expand Down
10 changes: 6 additions & 4 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu
}

plan4Agg := LogicalAggregation{AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(aggFuncList))}.Init(b.ctx, b.getSelectOffset())
if hint := b.TableHints(); hint != nil {
plan4Agg.aggHints = hint.Agg
if hintinfo := b.TableHints(); hintinfo != nil {
plan4Agg.PreferAggType = hintinfo.PreferAggType
plan4Agg.PreferAggToCop = hintinfo.PreferAggToCop
}
schema4Agg := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncList)+p.Schema().Len())...)
names := make(types.NameSlice, 0, len(aggFuncList)+p.Schema().Len())
Expand Down Expand Up @@ -1822,8 +1823,9 @@ func (b *PlanBuilder) buildDistinct(child LogicalPlan, length int) (*LogicalAggr
AggFuncs: make([]*aggregation.AggFuncDesc, 0, child.Schema().Len()),
GroupByItems: expression.Column2Exprs(child.Schema().Clone().Columns[:length]),
}.Init(b.ctx, child.QueryBlockOffset())
if hint := b.TableHints(); hint != nil {
plan4Agg.aggHints = hint.Agg
if hintinfo := b.TableHints(); hintinfo != nil {
plan4Agg.PreferAggType = hintinfo.PreferAggType
plan4Agg.PreferAggToCop = hintinfo.PreferAggToCop
}
for _, col := range child.Schema().Columns {
aggDesc, err := aggregation.NewAggFuncDesc(b.ctx, ast.AggFuncFirstRow, []expression.Expression{col}, false)
Expand Down
8 changes: 5 additions & 3 deletions pkg/planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,9 @@ type LogicalAggregation struct {
AggFuncs []*aggregation.AggFuncDesc
GroupByItems []expression.Expression

// aggHints stores aggregation hint information.
aggHints h.AggHints
// PreferAggType And PreferAggToCop stores aggregation hint information.
PreferAggType uint
PreferAggToCop bool

possibleProperties [][]*expression.Column
inputCount float64 // inputCount is the input count of this plan.
Expand Down Expand Up @@ -1078,7 +1079,8 @@ func (la *LogicalAggregation) CopyAggHints(agg *LogicalAggregation) {
// `HaveThrownWarningMessage` to avoid this. Besides, finalAgg and
// partialAgg (in cascades planner) should share the same hint, instead
// of a copy.
la.aggHints = agg.aggHints
la.PreferAggType = agg.PreferAggType
la.PreferAggToCop = agg.PreferAggToCop
}

// IsPartialModeAgg returns if all of the AggFuncs are partialMode.
Expand Down
30 changes: 16 additions & 14 deletions pkg/planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
h "github.com/pingcap/tidb/pkg/util/hint"
)

type aggregationPushDownSolver struct {
Expand Down Expand Up @@ -250,7 +249,7 @@ func (*aggregationPushDownSolver) decompose(ctx sessionctx.Context, aggFunc *agg
// process it temporarily. If not, We will add additional group by columns and first row functions. We make a new aggregation operator.
// If the pushed aggregation is grouped by unique key, it's no need to push it down.
func (a *aggregationPushDownSolver) tryToPushDownAgg(oldAgg *LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column,
join *LogicalJoin, childIdx int, aggHints h.AggHints, blockOffset int, opt *logicalOptimizeOp) (_ LogicalPlan, err error) {
join *LogicalJoin, childIdx int, blockOffset int, opt *logicalOptimizeOp) (_ LogicalPlan, err error) {
child := join.children[childIdx]
if aggregation.IsAllFirstRow(aggFuncs) {
return child, nil
Expand All @@ -267,7 +266,7 @@ func (a *aggregationPushDownSolver) tryToPushDownAgg(oldAgg *LogicalAggregation,
}
nullGenerating := (join.JoinType == LeftOuterJoin && childIdx == 1) ||
(join.JoinType == RightOuterJoin && childIdx == 0)
agg, err := a.makeNewAgg(join.SCtx(), aggFuncs, gbyCols, aggHints, blockOffset, nullGenerating)
agg, err := a.makeNewAgg(join.SCtx(), aggFuncs, gbyCols, oldAgg.PreferAggType, oldAgg.PreferAggToCop, blockOffset, nullGenerating)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -327,10 +326,11 @@ func (*aggregationPushDownSolver) checkAllArgsColumn(fun *aggregation.AggFuncDes
// 1. https://github.com/pingcap/tidb/issues/16355, push avg & distinct functions across join
// 2. remove this method and use splitPartialAgg instead for clean code.
func (a *aggregationPushDownSolver) makeNewAgg(ctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc,
gbyCols []*expression.Column, aggHints h.AggHints, blockOffset int, nullGenerating bool) (*LogicalAggregation, error) {
gbyCols []*expression.Column, preferAggType uint, preferAggToCop bool, blockOffset int, nullGenerating bool) (*LogicalAggregation, error) {
agg := LogicalAggregation{
GroupByItems: expression.Column2Exprs(gbyCols),
aggHints: aggHints,
GroupByItems: expression.Column2Exprs(gbyCols),
PreferAggType: preferAggType,
PreferAggToCop: preferAggToCop,
}.Init(ctx, blockOffset)
aggLen := len(aggFuncs) + len(gbyCols)
newAggFuncDescs := make([]*aggregation.AggFuncDesc, 0, aggLen)
Expand Down Expand Up @@ -372,9 +372,10 @@ func (*aggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (push
agg.GroupByItems = final.GroupByItems

pushedAgg = LogicalAggregation{
AggFuncs: partial.AggFuncs,
GroupByItems: partial.GroupByItems,
aggHints: agg.aggHints,
AggFuncs: partial.AggFuncs,
GroupByItems: partial.GroupByItems,
PreferAggType: agg.PreferAggType,
PreferAggToCop: agg.PreferAggToCop,
}.Init(agg.SCtx(), agg.QueryBlockOffset())
pushedAgg.SetSchema(partial.Schema)
return
Expand All @@ -385,9 +386,10 @@ func (*aggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (push
func (*aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) (LogicalPlan, error) {
ctx := agg.SCtx()
newAgg := LogicalAggregation{
AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)),
GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)),
aggHints: agg.aggHints,
AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)),
GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)),
PreferAggType: agg.PreferAggType,
PreferAggToCop: agg.PreferAggToCop,
}.Init(ctx, agg.QueryBlockOffset())
newAgg.SetSchema(agg.schema.Clone())
for _, aggFunc := range agg.AggFuncs {
Expand Down Expand Up @@ -493,15 +495,15 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *logicalOptim
if rightInvalid {
rChild = join.children[1]
} else {
rChild, err = a.tryToPushDownAgg(agg, rightAggFuncs, rightGbyCols, join, 1, agg.aggHints, agg.QueryBlockOffset(), opt)
rChild, err = a.tryToPushDownAgg(agg, rightAggFuncs, rightGbyCols, join, 1, agg.QueryBlockOffset(), opt)
if err != nil {
return nil, err
}
}
if leftInvalid {
lChild = join.children[0]
} else {
lChild, err = a.tryToPushDownAgg(agg, leftAggFuncs, leftGbyCols, join, 0, agg.aggHints, agg.QueryBlockOffset(), opt)
lChild, err = a.tryToPushDownAgg(agg, leftAggFuncs, leftGbyCols, join, 0, agg.QueryBlockOffset(), opt)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/planner/core/rule_aggregation_skew_rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,17 @@ func (a *skewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *LogicalAggregation

// now create the bottom and top aggregate operators
bottomAgg := LogicalAggregation{
AggFuncs: bottomAggFuncs,
GroupByItems: bottomAggGroupbyItems,
aggHints: agg.aggHints,
AggFuncs: bottomAggFuncs,
GroupByItems: bottomAggGroupbyItems,
PreferAggType: agg.PreferAggType,
}.Init(agg.SCtx(), agg.QueryBlockOffset())
bottomAgg.SetChildren(agg.children...)
bottomAgg.SetSchema(bottomAggSchema)

topAgg := LogicalAggregation{
AggFuncs: topAggFuncs,
GroupByItems: agg.GroupByItems,
aggHints: agg.aggHints,
AggFuncs: topAggFuncs,
GroupByItems: agg.GroupByItems,
PreferAggToCop: agg.PreferAggToCop,
}.Init(agg.SCtx(), agg.QueryBlockOffset())
topAgg.SetChildren(bottomAgg)
topAgg.SetSchema(topAggSchema)
Expand Down
29 changes: 13 additions & 16 deletions pkg/util/hint/hint.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,10 @@ type PlanHints struct {
HJProbe []HintedTable // hash_join_probe

// Hints belows are not associated with any particular table.
Agg AggHints // hash_agg, merge_agg, agg_to_cop
PreferLimitToCop bool // limit_to_cop
CTEMerge bool // merge
PreferAggType uint // hash_agg, merge_agg, agg_to_cop and so on
PreferAggToCop bool
PreferLimitToCop bool // limit_to_cop
CTEMerge bool // merge
TimeRangeHint ast.HintTimeRange
}

Expand Down Expand Up @@ -535,12 +536,6 @@ func (hint *HintedIndex) IndexString() string {
return fmt.Sprintf("%s.%s%s", hint.DBName, hint.TblName, indexListString)
}

// AggHints stores Agg hint information.
type AggHints struct {
PreferAggType uint
PreferAggToCop bool
}

// IfPreferMergeJoin checks whether the join hint is merge join.
func (pHints *PlanHints) IfPreferMergeJoin(tableNames ...*HintedTable) bool {
return pHints.MatchTableName(tableNames, pHints.SortMergeJoin)
Expand Down Expand Up @@ -674,7 +669,8 @@ func ParsePlanHints(hints []*ast.TableOptimizerHint,
shuffleJoinTables []HintedTable
indexHintList, indexMergeHintList []HintedIndex
tiflashTables, tikvTables []HintedTable
aggHints AggHints
preferAggType uint
preferAggToCop bool
timeRangeHint ast.HintTimeRange
preferLimitToCop bool
cteMerge bool
Expand Down Expand Up @@ -726,19 +722,19 @@ func ParsePlanHints(hints []*ast.TableOptimizerHint,
case HintNoIndexMergeJoin:
noIndexMergeJoinTables = append(noIndexMergeJoinTables, tableNames2HintTableInfo(currentDB, hint.HintName.L, hint.Tables, hintProcessor, currentLevel, warnHandler)...)
case HintMPP1PhaseAgg:
aggHints.PreferAggType |= PreferMPP1PhaseAgg
preferAggType |= PreferMPP1PhaseAgg
case HintMPP2PhaseAgg:
aggHints.PreferAggType |= PreferMPP2PhaseAgg
preferAggType |= PreferMPP2PhaseAgg
case HintHashJoinBuild:
hjBuildTables = append(hjBuildTables, tableNames2HintTableInfo(currentDB, hint.HintName.L, hint.Tables, hintProcessor, currentLevel, warnHandler)...)
case HintHashJoinProbe:
hjProbeTables = append(hjProbeTables, tableNames2HintTableInfo(currentDB, hint.HintName.L, hint.Tables, hintProcessor, currentLevel, warnHandler)...)
case HintHashAgg:
aggHints.PreferAggType |= PreferHashAgg
preferAggType |= PreferHashAgg
case HintStreamAgg:
aggHints.PreferAggType |= PreferStreamAgg
preferAggType |= PreferStreamAgg
case HintAggToCop:
aggHints.PreferAggToCop = true
preferAggToCop = true
case HintUseIndex, HintIgnoreIndex, HintForceIndex, HintOrderIndex, HintNoOrderIndex:
dbName := hint.Tables[0].DBName
if dbName.L == "" {
Expand Down Expand Up @@ -841,7 +837,8 @@ func ParsePlanHints(hints []*ast.TableOptimizerHint,
IndexHintList: indexHintList,
TiFlashTables: tiflashTables,
TiKVTables: tikvTables,
Agg: aggHints,
PreferAggToCop: preferAggToCop,
PreferAggType: preferAggType,
IndexMergeHintList: indexMergeHintList,
TimeRangeHint: timeRangeHint,
PreferLimitToCop: preferLimitToCop,
Expand Down

0 comments on commit 1760a26

Please sign in to comment.