Skip to content

Commit

Permalink
planner: fix CTE hang or wrong result when multiple Apply is used (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Aug 22, 2024
1 parent f2c0e70 commit d04ec9d
Show file tree
Hide file tree
Showing 3 changed files with 863 additions and 24 deletions.
60 changes: 36 additions & 24 deletions pkg/planner/core/rule_decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,24 @@ import (
// |_ outerSide
// |_ innerSide(cor_col_3)
func ExtractOuterApplyCorrelatedCols(p base.PhysicalPlan) []*expression.CorrelatedColumn {
return extractOuterApplyCorrelatedColsHelper(p, []*expression.Schema{})
corCols, _ := extractOuterApplyCorrelatedColsHelper(p)
return corCols
}

func extractOuterApplyCorrelatedColsHelper(p base.PhysicalPlan, outerSchemas []*expression.Schema) []*expression.CorrelatedColumn {
func extractOuterApplyCorrelatedColsHelper(p base.PhysicalPlan) ([]*expression.CorrelatedColumn, []*expression.Schema) {
if p == nil {
return nil
return nil, nil
}
curCorCols := p.ExtractCorrelatedCols()
newCorCols := make([]*expression.CorrelatedColumn, 0, len(curCorCols))

// If a corresponding Apply is found inside this PhysicalPlan, ignore it.
for _, corCol := range curCorCols {
var found bool
for _, outerSchema := range outerSchemas {
if outerSchema.ColumnIndex(&corCol.Column) != -1 {
found = true
break
}
}
if !found {
newCorCols = append(newCorCols, corCol)
}
// allCorCols store all sub plan's correlated columns.
// allOuterSchemas store all child Apply's outer side schemas.
allCorCols := p.ExtractCorrelatedCols()
allOuterSchemas := []*expression.Schema{}

handler := func(child base.PhysicalPlan) {
childCorCols, childOuterSchemas := extractOuterApplyCorrelatedColsHelper(child)
allCorCols = append(allCorCols, childCorCols...)
allOuterSchemas = append(allOuterSchemas, childOuterSchemas...)
}

switch v := p.(type) {
Expand All @@ -90,19 +86,35 @@ func extractOuterApplyCorrelatedColsHelper(p base.PhysicalPlan, outerSchemas []*
} else {
outerPlan = v.Children()[0]
}
outerSchemas = append(outerSchemas, outerPlan.Schema())
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(v.Children()[0], outerSchemas)...)
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(v.Children()[1], outerSchemas)...)
allOuterSchemas = append(allOuterSchemas, outerPlan.Schema())
handler(v.Children()[0])
handler(v.Children()[1])
case *PhysicalCTE:
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(v.SeedPlan, outerSchemas)...)
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(v.RecurPlan, outerSchemas)...)
handler(v.SeedPlan)
handler(v.RecurPlan)
default:
for _, child := range p.Children() {
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(child, outerSchemas)...)
handler(child)
}
}

return newCorCols
resCorCols := make([]*expression.CorrelatedColumn, 0, len(allCorCols))

// If one correlated column is found in allOuterSchemas, it means this correlated column is corresponding to an Apply inside `p`.
// However, we only need the correlated columns that correspond to the Apply of the parent node of `p`.
for _, corCol := range allCorCols {
var found bool
for _, outerSchema := range allOuterSchemas {
if outerSchema.ColumnIndex(&corCol.Column) != -1 {
found = true
break
}
}
if !found {
resCorCols = append(resCorCols, corCol)
}
}
return resCorCols, allOuterSchemas
}

// DecorrelateSolver tries to convert apply plan to join plan.
Expand Down
Loading

0 comments on commit d04ec9d

Please sign in to comment.