Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: move logical optimizing trace logic out of core pkg #52161

Merged
merged 10 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ go_library(
"plan_cost_detail.go",
"plan_cost_ver1.go",
"plan_cost_ver2.go",
"plan_stats.go",
"plan_to_pb.go",
"planbuilder.go",
"point_get_plan.go",
Expand All @@ -50,6 +49,7 @@ go_library(
"rule_aggregation_push_down.go",
"rule_aggregation_skew_rewrite.go",
"rule_build_key_info.go",
"rule_collect_plan_stats.go",
"rule_column_pruning.go",
"rule_constant_propagation.go",
"rule_decorrelate.go",
Expand Down
47 changes: 6 additions & 41 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,41 +128,6 @@ var optRuleList = []logicalOptRule{
*/
var optInteractionRuleList = map[logicalOptRule]logicalOptRule{}

type logicalOptimizeOp struct {
// tracer is goring to track optimize steps during rule optimizing
tracer *tracing.LogicalOptimizeTracer
}

func defaultLogicalOptimizeOption() *logicalOptimizeOp {
return &logicalOptimizeOp{}
}

func (op *logicalOptimizeOp) withEnableOptimizeTracer(tracer *tracing.LogicalOptimizeTracer) *logicalOptimizeOp {
op.tracer = tracer
return op
}

func (op *logicalOptimizeOp) appendBeforeRuleOptimize(index int, name string, before LogicalPlan) {
if op == nil || op.tracer == nil {
return
}
op.tracer.AppendRuleTracerBeforeRuleOptimize(index, name, before.BuildPlanTrace())
}

func (op *logicalOptimizeOp) appendStepToCurrent(id int, tp string, reason, action func() string) {
if op == nil || op.tracer == nil {
return
}
op.tracer.AppendRuleTracerStepToCurrent(id, tp, reason(), action())
}

func (op *logicalOptimizeOp) recordFinalLogicalPlan(final LogicalPlan) {
if op == nil || op.tracer == nil {
return
}
op.tracer.RecordFinalLogicalPlan(final.BuildPlanTrace())
}

// logicalOptRule means a logical optimizing rule, which contains decorrelate, ppd, column pruning, etc.
type logicalOptRule interface {
/* Return Parameters:
Expand All @@ -172,7 +137,7 @@ type logicalOptRule interface {
The default value is false. It means that no interaction rule will be triggered.
3. error: If there is error during the rule optimizer, it will be thrown
*/
optimize(context.Context, LogicalPlan, *logicalOptimizeOp) (LogicalPlan, bool, error)
optimize(context.Context, LogicalPlan, *plannerutil.LogicalOptimizeOp) (LogicalPlan, bool, error)
name() string
}

Expand Down Expand Up @@ -1157,14 +1122,14 @@ func logicalOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (Logic
debugtrace.EnterContextCommon(logic.SCtx())
defer debugtrace.LeaveContextCommon(logic.SCtx())
}
opt := defaultLogicalOptimizeOption()
opt := plannerutil.DefaultLogicalOptimizeOption()
vars := logic.SCtx().GetSessionVars()
if vars.StmtCtx.EnableOptimizeTrace {
vars.StmtCtx.OptimizeTracer = &tracing.OptimizeTracer{}
tracer := &tracing.LogicalOptimizeTracer{
Steps: make([]*tracing.LogicalRuleOptimizeTracer, 0),
}
opt = opt.withEnableOptimizeTracer(tracer)
opt = opt.WithEnableOptimizeTracer(tracer)
defer func() {
vars.StmtCtx.OptimizeTracer.Logical = tracer
}()
Expand All @@ -1178,7 +1143,7 @@ func logicalOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (Logic
if flag&(1<<uint(i)) == 0 || isLogicalRuleDisabled(rule) {
continue
}
opt.appendBeforeRuleOptimize(i, rule.name(), logic)
opt.AppendBeforeRuleOptimize(i, rule.name(), logic.BuildPlanTrace)
var planChanged bool
logic, planChanged, err = rule.optimize(ctx, logic, opt)
if err != nil {
Expand All @@ -1193,14 +1158,14 @@ func logicalOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (Logic

// Trigger the interaction rule
for i, rule := range againRuleList {
opt.appendBeforeRuleOptimize(i, rule.name(), logic)
opt.AppendBeforeRuleOptimize(i, rule.name(), logic.BuildPlanTrace)
logic, _, err = rule.optimize(ctx, logic, opt)
if err != nil {
return nil, err
}
}

opt.recordFinalLogicalPlan(logic)
opt.RecordFinalLogicalPlan(logic.BuildPlanTrace)
return logic, err
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ type LogicalPlan interface {
// PredicatePushDown pushes down the predicates in the where/on/having clauses as deeply as possible.
// It will accept a predicate that is an expression slice, and return the expressions that can't be pushed.
// Because it might change the root if the having clause exists, we need to return a plan that represents a new root.
PredicatePushDown([]expression.Expression, *logicalOptimizeOp) ([]expression.Expression, LogicalPlan)
PredicatePushDown([]expression.Expression, *util.LogicalOptimizeOp) ([]expression.Expression, LogicalPlan)

// PruneColumns prunes the unused columns, and return the new logical plan if changed, otherwise it's same.
PruneColumns([]*expression.Column, *logicalOptimizeOp) (LogicalPlan, error)
PruneColumns([]*expression.Column, *util.LogicalOptimizeOp) (LogicalPlan, error)

// findBestTask converts the logical plan to the physical plan. It's a new interface.
// It is called recursively from the parent to the children to create the result physical plan.
Expand All @@ -292,16 +292,16 @@ type LogicalPlan interface {
BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema)

// pushDownTopN will push down the topN or limit operator during logical optimization.
pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan
pushDownTopN(topN *LogicalTopN, opt *util.LogicalOptimizeOp) LogicalPlan

// deriveTopN derives an implicit TopN from a filter on row_number window function..
deriveTopN(opt *logicalOptimizeOp) LogicalPlan
deriveTopN(opt *util.LogicalOptimizeOp) LogicalPlan

// predicateSimplification consolidates different predcicates on a column and its equivalence classes.
predicateSimplification(opt *logicalOptimizeOp) LogicalPlan
predicateSimplification(opt *util.LogicalOptimizeOp) LogicalPlan

// constantPropagation generate new constant predicate according to column equivalence relation
constantPropagation(parentPlan LogicalPlan, currentChildIdx int, opt *logicalOptimizeOp) (newRoot LogicalPlan)
constantPropagation(parentPlan LogicalPlan, currentChildIdx int, opt *util.LogicalOptimizeOp) (newRoot LogicalPlan)

// pullUpConstantPredicates recursive find constant predicate, used for the constant propagation rule
pullUpConstantPredicates() []expression.Expression
Expand Down Expand Up @@ -775,7 +775,7 @@ func (*baseLogicalPlan) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
}

// PruneColumns implements LogicalPlan interface.
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt *logicalOptimizeOp) (LogicalPlan, error) {
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt *util.LogicalOptimizeOp) (LogicalPlan, error) {
if len(p.children) == 0 {
return p.self, nil
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/planner/core/rule_aggregation_elimination.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/types"
)

Expand All @@ -47,7 +48,7 @@ type aggregationEliminateChecker struct {
// e.g. select min(b) from t group by a. If a is a unique key, then this sql is equal to `select b from t group by a`.
// For count(expr), sum(expr), avg(expr), count(distinct expr, [expr...]) we may need to rewrite the expr. Details are shown below.
// If we can eliminate agg successful, we return a projection. Else we return a nil pointer.
func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation, opt *logicalOptimizeOp) *LogicalProjection {
func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation, opt *util.LogicalOptimizeOp) *LogicalProjection {
for _, af := range agg.AggFuncs {
// TODO(issue #9968): Actually, we can rewrite GROUP_CONCAT when all the
// arguments it accepts are promised to be NOT-NULL.
Expand Down Expand Up @@ -88,7 +89,7 @@ func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggr

// tryToEliminateDistinct will eliminate distinct in the aggregation function if the aggregation args
// have unique key column. see detail example in https://github.com/pingcap/tidb/issues/23436
func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregation, opt *logicalOptimizeOp) {
func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregation, opt *util.LogicalOptimizeOp) {
for _, af := range agg.AggFuncs {
if af.HasDistinct {
cols := make([]*expression.Column, 0, len(af.Args))
Expand Down Expand Up @@ -128,26 +129,26 @@ func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregati
}
}

func appendAggregationEliminateTraceStep(agg *LogicalAggregation, proj *LogicalProjection, uniqueKey expression.KeyInfo, opt *logicalOptimizeOp) {
func appendAggregationEliminateTraceStep(agg *LogicalAggregation, proj *LogicalProjection, uniqueKey expression.KeyInfo, opt *util.LogicalOptimizeOp) {
reason := func() string {
return fmt.Sprintf("%s is a unique key", uniqueKey.String())
}
action := func() string {
return fmt.Sprintf("%v_%v is simplified to a %v_%v", agg.TP(), agg.ID(), proj.TP(), proj.ID())
}

opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

func appendDistinctEliminateTraceStep(agg *LogicalAggregation, uniqueKey expression.KeyInfo, af *aggregation.AggFuncDesc,
opt *logicalOptimizeOp) {
opt *util.LogicalOptimizeOp) {
reason := func() string {
return fmt.Sprintf("%s is a unique key", uniqueKey.String())
}
action := func() string {
return fmt.Sprintf("%s(distinct ...) is simplified to %s(...)", af.Name, af.Name)
}
opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

// CheckCanConvertAggToProj check whether a special old aggregation (which has already been pushed down) to projection.
Expand Down Expand Up @@ -253,7 +254,7 @@ func wrapCastFunction(ctx expression.BuildContext, arg expression.Expression, ta
return expression.BuildCastFunction(ctx, arg, targetTp)
}

func (a *aggregationEliminator) optimize(ctx context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (a *aggregationEliminator) optimize(ctx context.Context, p LogicalPlan, opt *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
Expand Down
20 changes: 10 additions & 10 deletions pkg/planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (*aggregationPushDownSolver) decompose(ctx PlanContext, aggFunc *aggregatio
// process it temporarily. If not, We will add additional group by columns and first row functions. We make a new aggregation operator.
// If the pushed aggregation is grouped by unique key, it's no need to push it down.
func (a *aggregationPushDownSolver) tryToPushDownAgg(oldAgg *LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column,
join *LogicalJoin, childIdx int, blockOffset int, opt *logicalOptimizeOp) (_ LogicalPlan, err error) {
join *LogicalJoin, childIdx int, blockOffset int, opt *util.LogicalOptimizeOp) (_ LogicalPlan, err error) {
child := join.children[childIdx]
if aggregation.IsAllFirstRow(aggFuncs) {
return child, nil
Expand Down Expand Up @@ -433,13 +433,13 @@ func (*aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, uni
return newAgg, nil
}

func (a *aggregationPushDownSolver) optimize(_ context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (a *aggregationPushDownSolver) optimize(_ context.Context, p LogicalPlan, opt *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
newLogicalPlan, err := a.aggPushDown(p, opt)
return newLogicalPlan, planChanged, err
}

func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAll, agg *LogicalAggregation, opt *logicalOptimizeOp) error {
func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAll, agg *LogicalAggregation, opt *util.LogicalOptimizeOp) error {
for _, aggFunc := range agg.AggFuncs {
if !a.isDecomposableWithUnion(aggFunc) {
return nil
Expand Down Expand Up @@ -474,7 +474,7 @@ func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAl
}

// aggPushDown tries to push down aggregate functions to join paths.
func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *logicalOptimizeOp) (_ LogicalPlan, err error) {
func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *util.LogicalOptimizeOp) (_ LogicalPlan, err error) {
if agg, ok := p.(*LogicalAggregation); ok {
proj := a.tryToEliminateAggregation(agg, opt)
if proj != nil {
Expand Down Expand Up @@ -683,7 +683,7 @@ func (*aggregationPushDownSolver) name() string {
}

func appendAggPushDownAcrossJoinTraceStep(oldAgg, newAgg *LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, join *LogicalJoin,
childIdx int, opt *logicalOptimizeOp) {
childIdx int, opt *util.LogicalOptimizeOp) {
reason := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v's functions[", oldAgg.TP(), oldAgg.ID()))
for i, aggFunc := range aggFuncs {
Expand All @@ -705,10 +705,10 @@ func appendAggPushDownAcrossJoinTraceStep(oldAgg, newAgg *LogicalAggregation, ag
}(), newAgg.TP(), newAgg.ID())
return buffer.String()
}
opt.appendStepToCurrent(join.ID(), join.TP(), reason, action)
opt.AppendStepToCurrent(join.ID(), join.TP(), reason, action)
}

func appendAggPushDownAcrossProjTraceStep(agg *LogicalAggregation, proj *LogicalProjection, opt *logicalOptimizeOp) {
func appendAggPushDownAcrossProjTraceStep(agg *LogicalAggregation, proj *LogicalProjection, opt *util.LogicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v is eliminated, and %v_%v's functions changed into[", proj.TP(), proj.ID(), agg.TP(), agg.ID()))
for i, aggFunc := range agg.AggFuncs {
Expand All @@ -723,10 +723,10 @@ func appendAggPushDownAcrossProjTraceStep(agg *LogicalAggregation, proj *Logical
reason := func() string {
return fmt.Sprintf("%v_%v is directly below an %v_%v and has no side effects", proj.TP(), proj.ID(), agg.TP(), agg.ID())
}
opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

func appendAggPushDownAcrossUnionTraceStep(union *LogicalUnionAll, agg *LogicalAggregation, opt *logicalOptimizeOp) {
func appendAggPushDownAcrossUnionTraceStep(union *LogicalUnionAll, agg *LogicalAggregation, opt *util.LogicalOptimizeOp) {
reason := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v functions[", agg.TP(), agg.ID()))
for i, aggFunc := range agg.AggFuncs {
Expand All @@ -749,5 +749,5 @@ func appendAggPushDownAcrossUnionTraceStep(union *LogicalUnionAll, agg *LogicalA
buffer.WriteString("]")
return buffer.String()
}
opt.appendStepToCurrent(union.ID(), union.TP(), reason, action)
opt.AppendStepToCurrent(union.ID(), union.TP(), reason, action)
}
9 changes: 5 additions & 4 deletions pkg/planner/core/rule_aggregation_skew_rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/util/intset"
)

Expand All @@ -46,7 +47,7 @@ type skewDistinctAggRewriter struct {
// - The aggregate has 1 and only 1 distinct aggregate function (limited to count, avg, sum)
//
// This rule is disabled by default. Use tidb_opt_skew_distinct_agg to enable the rule.
func (a *skewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *LogicalAggregation, opt *logicalOptimizeOp) LogicalPlan {
func (a *skewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *LogicalAggregation, opt *util.LogicalOptimizeOp) LogicalPlan {
// only group aggregate is applicable
if len(agg.GroupByItems) == 0 {
return nil
Expand Down Expand Up @@ -262,18 +263,18 @@ func (*skewDistinctAggRewriter) isQualifiedAgg(aggFunc *aggregation.AggFuncDesc)
}
}

func appendSkewDistinctAggRewriteTraceStep(agg *LogicalAggregation, result LogicalPlan, opt *logicalOptimizeOp) {
func appendSkewDistinctAggRewriteTraceStep(agg *LogicalAggregation, result LogicalPlan, opt *util.LogicalOptimizeOp) {
reason := func() string {
return fmt.Sprintf("%v_%v has a distinct agg function", agg.TP(), agg.ID())
}
action := func() string {
return fmt.Sprintf("%v_%v is rewritten to a %v_%v", agg.TP(), agg.ID(), result.TP(), result.ID())
}

opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

func (a *skewDistinctAggRewriter) optimize(ctx context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (a *skewDistinctAggRewriter) optimize(ctx context.Context, p LogicalPlan, opt *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/planner/core/rule_build_key_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/util"
)

type buildKeySolver struct{}

func (*buildKeySolver) optimize(_ context.Context, p LogicalPlan, _ *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (*buildKeySolver) optimize(_ context.Context, p LogicalPlan, _ *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
buildKeyInfo(p)
return p, planChanged, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/table"
Expand All @@ -31,7 +32,7 @@ import (

type collectPredicateColumnsPoint struct{}

func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan, _ *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan, _ *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, planChanged, nil
Expand Down Expand Up @@ -77,7 +78,7 @@ func (collectPredicateColumnsPoint) name() string {

type syncWaitStatsLoadPoint struct{}

func (syncWaitStatsLoadPoint) optimize(_ context.Context, plan LogicalPlan, _ *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (syncWaitStatsLoadPoint) optimize(_ context.Context, plan LogicalPlan, _ *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, planChanged, nil
Expand Down
Loading