Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: classify logical cte logic into a separate file for later pkg move #54515

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"initialize.go",
"logical_aggregation.go",
"logical_apply.go",
"logical_cte.go",
"logical_datasource.go",
"logical_expand.go",
"logical_index_scan.go",
Expand Down
3 changes: 1 addition & 2 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -3636,8 +3636,7 @@ func exhaustPhysicalPlans4LogicalMaxOneRow(p *LogicalMaxOneRow, prop *property.P
return []base.PhysicalPlan{mor}, true, nil
}

// ExhaustPhysicalPlans implements LogicalPlan interface.
func (p *LogicalCTE) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
func exhaustPhysicalPlans4LogicalCTE(p *LogicalCTE, prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
pcte := PhysicalCTE{CTE: p.cte}.Init(p.SCtx(), p.StatsInfo())
if prop.IsFlashProp() {
pcte.storageSender = PhysicalExchangeSender{
Expand Down
3 changes: 1 addition & 2 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2882,8 +2882,7 @@ func (ds *DataSource) getOriginalPhysicalIndexScan(prop *property.PhysicalProper
return is
}

// FindBestTask implements the LogicalPlan interface.
func (p *LogicalCTE) FindBestTask(prop *property.PhysicalProperty, counter *base.PlanCounterTp, pop *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) {
func findBestTask4LogicalCTE(p *LogicalCTE, prop *property.PhysicalProperty, counter *base.PlanCounterTp, pop *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) {
if p.ChildLen() > 0 {
return p.BaseLogicalPlan.FindBestTask(prop, counter, pop)
}
Expand Down
213 changes: 213 additions & 0 deletions pkg/planner/core/logical_cte.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"

"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util/coreusage"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/pingcap/tidb/pkg/util/plancodec"
)

// LogicalCTE is for CTE.
type LogicalCTE struct {
logicalop.LogicalSchemaProducer

cte *CTEClass
cteAsName model.CIStr
cteName model.CIStr
seedStat *property.StatsInfo

onlyUsedAsStorage bool
}

// Init only assigns type and context.
func (p LogicalCTE) Init(ctx base.PlanContext, offset int) *LogicalCTE {
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTE, &p, offset)
return &p
}

// *************************** start implementation of logicalPlan interface ***************************

// HashCode inherits the BaseLogicalPlan.<0th> implementation.

// PredicatePushDown implements base.LogicalPlan.<1st> interface.
func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, _ *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
if p.cte.recursivePartLogicalPlan != nil {
// Doesn't support recursive CTE yet.
return predicates, p.Self()
}
if !p.cte.isOuterMostCTE {
return predicates, p.Self()
}
pushedPredicates := make([]expression.Expression, len(predicates))
copy(pushedPredicates, predicates)
// The filter might change the correlated status of the cte.
// We forbid the push down that makes the change for now.
// Will support it later.
if !p.cte.IsInApply {
for i := len(pushedPredicates) - 1; i >= 0; i-- {
if len(expression.ExtractCorColumns(pushedPredicates[i])) == 0 {
continue
}
pushedPredicates = append(pushedPredicates[0:i], pushedPredicates[i+1:]...)
}
}
if len(pushedPredicates) == 0 {
p.cte.pushDownPredicates = append(p.cte.pushDownPredicates, expression.NewOne())
return predicates, p.Self()
}
newPred := make([]expression.Expression, 0, len(predicates))
for i := range pushedPredicates {
newPred = append(newPred, pushedPredicates[i].Clone())
ResolveExprAndReplace(newPred[i], p.cte.ColumnMap)
}
p.cte.pushDownPredicates = append(p.cte.pushDownPredicates, expression.ComposeCNFCondition(p.SCtx().GetExprCtx(), newPred...))
return predicates, p.Self()
}

// PruneColumns implements the base.LogicalPlan.<2nd> interface.
// LogicalCTE just do an empty function call. It's logical optimize is indivisual phase.
func (p *LogicalCTE) PruneColumns(_ []*expression.Column, _ *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
return p, nil
}

// FindBestTask implements the base.LogicalPlan.<3rd> interface.
func (p *LogicalCTE) FindBestTask(prop *property.PhysicalProperty, counter *base.PlanCounterTp, pop *optimizetrace.PhysicalOptimizeOp) (t base.Task, cntPlan int64, err error) {
return findBestTask4LogicalCTE(p, prop, counter, pop)
}

// BuildKeyInfo inherits the BaseLogicalPlan.<4th> implementation.

// PushDownTopN implements the base.LogicalPlan.<5th> interface.
func (p *LogicalCTE) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
var topN *LogicalTopN
if topNLogicalPlan != nil {
topN = topNLogicalPlan.(*LogicalTopN)
}
if topN != nil {
return topN.AttachChild(p, opt)
}
return p
}

// DeriveTopN inherits BaseLogicalPlan.LogicalPlan.<6th> implementation.

// PredicateSimplification inherits BaseLogicalPlan.LogicalPlan.<7th> implementation.

// ConstantPropagation inherits BaseLogicalPlan.LogicalPlan.<8th> implementation.

// PullUpConstantPredicates inherits BaseLogicalPlan.LogicalPlan.<9th> implementation.

// RecursiveDeriveStats inherits BaseLogicalPlan.LogicalPlan.<10th> implementation.

// DeriveStats implements the base.LogicalPlan.<11th> interface.
func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression.Schema, _ []*expression.Schema, _ [][]*expression.Column) (*property.StatsInfo, error) {
if p.StatsInfo() != nil {
return p.StatsInfo(), nil
}

var err error
if p.cte.seedPartPhysicalPlan == nil {
// Build push-downed predicates.
if len(p.cte.pushDownPredicates) > 0 {
newCond := expression.ComposeDNFCondition(p.SCtx().GetExprCtx(), p.cte.pushDownPredicates...)
newSel := LogicalSelection{Conditions: []expression.Expression{newCond}}.Init(p.SCtx(), p.cte.seedPartLogicalPlan.QueryBlockOffset())
newSel.SetChildren(p.cte.seedPartLogicalPlan)
p.cte.seedPartLogicalPlan = newSel
p.cte.optFlag |= flagPredicatePushDown
}
p.cte.seedPartLogicalPlan, p.cte.seedPartPhysicalPlan, _, err = doOptimize(context.TODO(), p.SCtx(), p.cte.optFlag, p.cte.seedPartLogicalPlan)
if err != nil {
return nil, err
}
}
if p.onlyUsedAsStorage {
p.SetChildren(p.cte.seedPartLogicalPlan)
}
resStat := p.cte.seedPartPhysicalPlan.StatsInfo()
// Changing the pointer so that seedStat in LogicalCTETable can get the new stat.
*p.seedStat = *resStat
p.SetStats(&property.StatsInfo{
RowCount: resStat.RowCount,
ColNDVs: make(map[int64]float64, selfSchema.Len()),
})
for i, col := range selfSchema.Columns {
p.StatsInfo().ColNDVs[col.UniqueID] += resStat.ColNDVs[p.cte.seedPartLogicalPlan.Schema().Columns[i].UniqueID]
}
if p.cte.recursivePartLogicalPlan != nil {
if p.cte.recursivePartPhysicalPlan == nil {
p.cte.recursivePartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.SCtx(), p.cte.optFlag, p.cte.recursivePartLogicalPlan)
if err != nil {
return nil, err
}
}
recurStat := p.cte.recursivePartLogicalPlan.StatsInfo()
for i, col := range selfSchema.Columns {
p.StatsInfo().ColNDVs[col.UniqueID] += recurStat.ColNDVs[p.cte.recursivePartLogicalPlan.Schema().Columns[i].UniqueID]
}
if p.cte.IsDistinct {
p.StatsInfo().RowCount, _ = cardinality.EstimateColsNDVWithMatchedLen(p.Schema().Columns, p.Schema(), p.StatsInfo())
} else {
p.StatsInfo().RowCount += recurStat.RowCount
}
}
return p.StatsInfo(), nil
}

// ExtractColGroups inherits BaseLogicalPlan.LogicalPlan.<12th> implementation.

// PreparePossibleProperties inherits BaseLogicalPlan.LogicalPlan.<13th> implementation.

// ExhaustPhysicalPlans implements the base.LogicalPlan.<14th> interface.
func (p *LogicalCTE) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
return exhaustPhysicalPlans4LogicalCTE(p, prop)
}

// ExtractCorrelatedCols implements the base.LogicalPlan.<15th> interface.
func (p *LogicalCTE) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
corCols := coreusage.ExtractCorrelatedCols4LogicalPlan(p.cte.seedPartLogicalPlan)
if p.cte.recursivePartLogicalPlan != nil {
corCols = append(corCols, coreusage.ExtractCorrelatedCols4LogicalPlan(p.cte.recursivePartLogicalPlan)...)
}
return corCols
}

// MaxOneRow inherits BaseLogicalPlan.LogicalPlan.<16th> implementation.

// Children inherits BaseLogicalPlan.LogicalPlan.<17th> implementation.

// SetChildren inherits BaseLogicalPlan.LogicalPlan.<18th> implementation.

// SetChild inherits BaseLogicalPlan.LogicalPlan.<19th> implementation.

// RollBackTaskMap inherits BaseLogicalPlan.LogicalPlan.<20th> implementation.

// CanPushToCop inherits BaseLogicalPlan.LogicalPlan.<21st> implementation.

// ExtractFD inherits BaseLogicalPlan.LogicalPlan.<22nd> implementation.

// GetBaseLogicalPlan inherits BaseLogicalPlan.LogicalPlan.<23rd> implementation.

// ConvertOuterToInnerJoin inherits BaseLogicalPlan.LogicalPlan.<24th> implementation.

// *************************** end implementation of logicalPlan interface ***************************
6 changes: 0 additions & 6 deletions pkg/planner/core/logical_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ func (p LogicalMemTable) Init(ctx base.PlanContext, offset int) *LogicalMemTable
return &p
}

// Init only assigns type and context.
func (p LogicalCTE) Init(ctx base.PlanContext, offset int) *LogicalCTE {
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTE, &p, offset)
return &p
}

// Init only assigns type and context.
func (p LogicalCTETable) Init(ctx base.PlanContext, offset int) *LogicalCTETable {
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTETable, &p, offset)
Expand Down
22 changes: 0 additions & 22 deletions pkg/planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
fd "github.com/pingcap/tidb/pkg/planner/funcdep"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/coreusage"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/intset"
Expand Down Expand Up @@ -376,18 +375,6 @@ func (cc *CTEClass) MemoryUsage() (sum int64) {
return
}

// LogicalCTE is for CTE.
type LogicalCTE struct {
logicalop.LogicalSchemaProducer

cte *CTEClass
cteAsName model.CIStr
cteName model.CIStr
seedStat *property.StatsInfo

onlyUsedAsStorage bool
}

// LogicalCTETable is for CTE table
type LogicalCTETable struct {
logicalop.LogicalSchemaProducer
Expand All @@ -400,15 +387,6 @@ type LogicalCTETable struct {
seedSchema *expression.Schema
}

// ExtractCorrelatedCols implements LogicalPlan interface.
func (p *LogicalCTE) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
corCols := coreusage.ExtractCorrelatedCols4LogicalPlan(p.cte.seedPartLogicalPlan)
if p.cte.recursivePartLogicalPlan != nil {
corCols = append(corCols, coreusage.ExtractCorrelatedCols4LogicalPlan(p.cte.recursivePartLogicalPlan)...)
}
return corCols
}

// LogicalSequence is used to mark the CTE producer in the main query tree.
// Its last child is main query. The previous children are cte producers.
// And there might be dependencies between the CTE producers:
Expand Down
6 changes: 0 additions & 6 deletions pkg/planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,6 @@ func preferKeyColumnFromTable(dataSource *DataSource, originColumns []*expressio
return resultColumn, resultColumnInfo
}

// PruneColumns implements the interface of base.LogicalPlan.
// LogicalCTE just do an empty function call. It's logical optimize is indivisual phase.
func (p *LogicalCTE) PruneColumns(_ []*expression.Column, _ *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
return p, nil
}

// PruneColumns implements the interface of base.LogicalPlan.
func (p *LogicalSequence) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
var err error
Expand Down
35 changes: 0 additions & 35 deletions pkg/planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,41 +683,6 @@ func (adder *exprPrefixAdder) addExprPrefix4DNFCond(condition *expression.Scalar
return []expression.Expression{expression.ComposeDNFCondition(exprCtx, newAccessItems...)}, nil
}

// PredicatePushDown implements base.LogicalPlan PredicatePushDown interface.
func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, _ *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
if p.cte.recursivePartLogicalPlan != nil {
// Doesn't support recursive CTE yet.
return predicates, p.Self()
}
if !p.cte.isOuterMostCTE {
return predicates, p.Self()
}
pushedPredicates := make([]expression.Expression, len(predicates))
copy(pushedPredicates, predicates)
// The filter might change the correlated status of the cte.
// We forbid the push down that makes the change for now.
// Will support it later.
if !p.cte.IsInApply {
for i := len(pushedPredicates) - 1; i >= 0; i-- {
if len(expression.ExtractCorColumns(pushedPredicates[i])) == 0 {
continue
}
pushedPredicates = append(pushedPredicates[0:i], pushedPredicates[i+1:]...)
}
}
if len(pushedPredicates) == 0 {
p.cte.pushDownPredicates = append(p.cte.pushDownPredicates, expression.NewOne())
return predicates, p.Self()
}
newPred := make([]expression.Expression, 0, len(predicates))
for i := range pushedPredicates {
newPred = append(newPred, pushedPredicates[i].Clone())
ResolveExprAndReplace(newPred[i], p.cte.ColumnMap)
}
p.cte.pushDownPredicates = append(p.cte.pushDownPredicates, expression.ComposeCNFCondition(p.SCtx().GetExprCtx(), newPred...))
return predicates, p.Self()
}

// PredicatePushDown implements the base.LogicalPlan interface.
// Currently, we only maintain the main query tree.
func (p *LogicalSequence) PredicatePushDown(predicates []expression.Expression, op *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
Expand Down
12 changes: 0 additions & 12 deletions pkg/planner/core/rule_topn_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,6 @@ func pushDownTopNForBaseLogicalPlan(lp base.LogicalPlan, topNLogicalPlan base.Lo
return p
}

// PushDownTopN implements the LogicalPlan interface.
func (p *LogicalCTE) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
var topN *LogicalTopN
if topNLogicalPlan != nil {
topN = topNLogicalPlan.(*LogicalTopN)
}
if topN != nil {
return topN.AttachChild(p, opt)
}
return p
}

// pushDownTopNToChild will push a topN to one child of join. The idx stands for join child index. 0 is for left child.
func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
if topN == nil {
Expand Down
Loading