Skip to content

Commit

Permalink
feat: add recurse operator and start of CTE planning
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Jul 19, 2024
1 parent 270b719 commit 6864482
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 11 deletions.
43 changes: 40 additions & 3 deletions go/vt/vtgate/planbuilder/operators/ast_to_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr

switch tableInfo := tableInfo.(type) {
case *semantics.VindexTable:
solves := tableID
return &Vindex{
Table: VindexTable{
TableID: tableID,
Expand All @@ -257,10 +256,14 @@ func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr
VTable: tableInfo.Table.GetVindexTable(),
},
Vindex: tableInfo.Vindex,
Solved: solves,
Solved: tableID,
}
case *semantics.CTETable:
panic(vterrors.VT12001("recursive common table expression"))
current := ctx.ActiveCTE()
if current != nil && current.CTEDef == tableInfo.CTEDef {
return createDualTable(ctx, tableID, tableInfo)
}
return createRecursiveCTE(ctx, tableInfo)
case *semantics.RealTable:
qg := newQueryGraph()
isInfSchema := tableInfo.IsInfSchema()
Expand Down Expand Up @@ -290,6 +293,40 @@ func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr
}
}

func createDualTable(ctx *plancontext.PlanningContext, tableID semantics.TableSet, tableInfo *semantics.CTETable) Operator {
vschemaTable, _, _, _, _, err := ctx.VSchema.FindTableOrVindex(sqlparser.NewTableName("dual"))
if err != nil {
panic(err)
}
qtbl := &QueryTable{
ID: tableID,
Alias: tableInfo.ASTNode,
Table: sqlparser.NewTableName("dual"),
}
return createRouteFromVSchemaTable(ctx, qtbl, vschemaTable, false, nil)
}

func createRecursiveCTE(ctx *plancontext.PlanningContext, def *semantics.CTETable) Operator {
union, ok := def.CTEDef.Query.(*sqlparser.Union)
if !ok {
panic(vterrors.VT13001("expected UNION in recursive CTE"))
}

init := translateQueryToOp(ctx, union.Left)

// Push the CTE definition to the stack so that it can be used in the recursive part of the query
ctx.PushCTE(def)
tail := translateQueryToOp(ctx, union.Right)
if err := ctx.PopCTE(); err != nil {
panic(err)
}

return &Recurse{
Init: init,
Tail: tail,
}
}

func crossJoin(ctx *plancontext.PlanningContext, exprs sqlparser.TableExprs) Operator {
var output Operator
for _, tableExpr := range exprs {
Expand Down
77 changes: 77 additions & 0 deletions go/vt/vtgate/planbuilder/operators/recurse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operators

import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

// Recurse is used to represent a recursive CTE
type Recurse struct {
Init, Tail Operator
}

var _ Operator = (*Recurse)(nil)

func (r *Recurse) Clone(inputs []Operator) Operator {
return &Recurse{
Init: inputs[0],
Tail: inputs[1],
}
}

func (r *Recurse) Inputs() []Operator {
return []Operator{r.Init, r.Tail}
}

func (r *Recurse) SetInputs(operators []Operator) {
r.Init = operators[0]
r.Tail = operators[1]
}

func (r *Recurse) AddPredicate(_ *plancontext.PlanningContext, e sqlparser.Expr) Operator {
r.Tail = newFilter(r, e)
return r
}

func (r *Recurse) AddColumn(ctx *plancontext.PlanningContext, reuseExisting bool, addToGroupBy bool, expr *sqlparser.AliasedExpr) int {
return r.Init.AddColumn(ctx, reuseExisting, addToGroupBy, expr)
}

func (r *Recurse) AddWSColumn(*plancontext.PlanningContext, int, bool) int {
panic("implement me")
}

func (r *Recurse) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return r.Init.FindCol(ctx, expr, underRoute)
}

func (r *Recurse) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return r.Init.GetColumns(ctx)
}

func (r *Recurse) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return r.Init.GetSelectExprs(ctx)
}

func (r *Recurse) ShortDescription() string { return "" }

func (r *Recurse) GetOrdering(*plancontext.PlanningContext) []OrderBy {
// Recurse is a special case. It never guarantees any ordering.
return nil
}
24 changes: 24 additions & 0 deletions go/vt/vtgate/planbuilder/plancontext/planning_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine/opcode"
Expand Down Expand Up @@ -66,6 +67,10 @@ type PlanningContext struct {
// OuterTables contains the tables that are outer to the current query
// Used to set the nullable flag on the columns
OuterTables semantics.TableSet

// This is a stack of CTEs being built. It's used when we have CTEs inside CTEs,
// to remember which is the CTE currently being assembled
CurrentCTE []*semantics.CTETable
}

// CreatePlanningContext initializes a new PlanningContext with the given parameters.
Expand Down Expand Up @@ -376,3 +381,22 @@ func (ctx *PlanningContext) ContainsAggr(e sqlparser.SQLNode) (hasAggr bool) {
}, e)
return
}

func (ctx *PlanningContext) PushCTE(def *semantics.CTETable) {
ctx.CurrentCTE = append(ctx.CurrentCTE, def)
}

func (ctx *PlanningContext) PopCTE() error {
if len(ctx.CurrentCTE) == 0 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no CTE to pop")
}
ctx.CurrentCTE = ctx.CurrentCTE[:len(ctx.CurrentCTE)-1]
return nil
}

func (ctx *PlanningContext) ActiveCTE() *semantics.CTETable {
if len(ctx.CurrentCTE) == 0 {
return nil
}
return ctx.CurrentCTE[len(ctx.CurrentCTE)-1]
}
10 changes: 5 additions & 5 deletions go/vt/vtgate/semantics/cte_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

// CTETable contains the information about the CTE table.
type CTETable struct {
tableName string
TableName string
ASTNode *sqlparser.AliasedTableExpr
CTEDef
}
Expand All @@ -43,14 +43,14 @@ func newCTETable(node *sqlparser.AliasedTableExpr, t sqlparser.TableName, cteDef
name = node.As.String()
}
return &CTETable{
tableName: name,
TableName: name,
ASTNode: node,
CTEDef: cteDef,
}
}

func (cte *CTETable) Name() (sqlparser.TableName, error) {
return sqlparser.NewTableName(cte.tableName), nil
return sqlparser.NewTableName(cte.TableName), nil
}

func (cte *CTETable) GetVindexTable() *vindexes.Table {
Expand All @@ -62,7 +62,7 @@ func (cte *CTETable) IsInfSchema() bool {
}

func (cte *CTETable) matches(name sqlparser.TableName) bool {
return cte.tableName == name.Name.String() && name.Qualifier.IsEmpty()
return cte.TableName == name.Name.String() && name.Qualifier.IsEmpty()
}

func (cte *CTETable) authoritative() bool {
Expand All @@ -78,7 +78,7 @@ func (cte *CTETable) canShortCut() shortCut {
}

func (cte *CTETable) getColumns(bool) []ColumnInfo {
selExprs := cte.definition.GetColumns()
selExprs := cte.Query.GetColumns()
cols := make([]ColumnInfo, 0, len(selExprs))
for _, selExpr := range selExprs {
ae, isAe := selExpr.(*sqlparser.AliasedExpr)
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/semantics/table_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type (
}

CTEDef struct {
definition sqlparser.SelectStatement
Query sqlparser.SelectStatement
isAuthoritative bool
recursiveDeps *TableSet
}
Expand All @@ -67,7 +67,7 @@ func (cte *CTEDef) recursive(org originable) (id TableSet) {
}
id = id.Merge(org.tableSetFor(ate))
return true, nil
}, cte.definition)
}, cte.Query)
return
}

Expand All @@ -86,7 +86,7 @@ func (etc *earlyTableCollector) down(cursor *sqlparser.Cursor) bool {
return true
}
for _, cte := range with.CTEs {
etc.cte[cte.ID.String()] = CTEDef{definition: cte.Subquery.Select}
etc.cte[cte.ID.String()] = CTEDef{Query: cte.Subquery.Select}
}
return true
}
Expand Down

0 comments on commit 6864482

Please sign in to comment.