Skip to content

Commit

Permalink
opt: support recursive CTEs
Browse files Browse the repository at this point in the history
A recursive CTE is of the form:
```
WITH RECURSIVE cte AS (
    <initial query>
  UNION ALL
    <recursive query>
)
```

Recursive CTE evaluation (paraphrased from postgres docs):
 1. Evaluate the initial query; emit the results and also save them in
    a "working" table.
 2. So long as the working table is not empty:
    - evaluate the recursive query, substituting the current contents of
      the working table for the recursive self-reference. Emit all
      resulting rows, and save them into the next iteration's working
      table.

This change adds optimizer and execution support for recursive CTEs.

Some notes for the various components:

 - optbuilder:  We build the recursive query using a `cteSource` that
   corresponds to the "working table". This code turned out to be
   tricky, in part because we want to allow non-recursive CTEs even if
   RECURSIVE is used (which is necessary when defining multiple CTEs,
   some of which are not recursive).

 - execution: the execution operator is somewhat similar to
   `applyJoinNode` (but simpler). The execbuilder provides a
   callback that can be used to regenerate the right-hand-side plan
   each time; this callback takes a reference to the working table (as
   a `bufferNode`).

 - execbuilder: to implement the callback mentioned above, we create
   an "inner" builder inside the callback which uses the same factory
   but is otherwise independent from the "outer" builder.

Fixes #21085.

Release note (sql change): WITH RECURSIVE is now supported.
  • Loading branch information
RaduBerinde committed Oct 8, 2019
1 parent 84d9fa5 commit 1c414a8
Show file tree
Hide file tree
Showing 35 changed files with 1,307 additions and 126 deletions.
11 changes: 9 additions & 2 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,15 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) {
func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planTop) error {
a.run.curRightRow = 0
a.run.rightRows.Clear(params.ctx)
rowResultWriter := NewRowResultWriter(a.run.rightRows)
return runPlanInsidePlan(params, plan, a.run.rightRows)
}

// runPlanInsidePlan is used to run a plan and gather the results in a row
// container, as part of the execution of an "outer" plan.
func runPlanInsidePlan(
params runParams, plan *planTop, rowContainer *rowcontainer.RowContainer,
) error {
rowResultWriter := NewRowResultWriter(rowContainer)
recv := MakeDistSQLReceiver(
params.ctx, rowResultWriter, tree.Rows,
params.extendedEvalCtx.ExecCfg.RangeDescriptorCache,
Expand Down Expand Up @@ -339,7 +347,6 @@ func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planTop) error
return recv.commErr
}
return rowResultWriter.err

}

func (a *applyJoinNode) Values() tree.Datums {
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ func (n *bufferNode) Values() tree.Datums {

func (n *bufferNode) Close(ctx context.Context) {
n.plan.Close(ctx)
// It's valid to be Closed without startExec having been called, in which
// case n.bufferedRows will be nil.
if n.bufferedRows != nil {
n.bufferedRows.Close(ctx)
}
n.bufferedRows.Close(ctx)
}

// scanBufferNode behaves like an iterator into the bufferNode it is
Expand Down
181 changes: 181 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/with-opt

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/sql/opt/bench/stub_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ func (f *stubFactory) ConstructScanBuffer(ref exec.Node, label string) (exec.Nod
return struct{}{}, nil
}

func (f *stubFactory) ConstructRecursiveCTE(
initial exec.Node, fn exec.RecursiveCTEIterationFn, label string,
) (exec.Node, error) {
return struct{}{}, nil
}

func (f *stubFactory) ConstructControlJobs(
command tree.JobCommand, input exec.Node,
) (exec.Node, error) {
Expand Down
27 changes: 12 additions & 15 deletions pkg/sql/opt/exec/execbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ func (b *Builder) DisableTelemetry() {
// Build constructs the execution node tree and returns its root node if no
// error occurred.
func (b *Builder) Build() (_ exec.Plan, err error) {
plan, err := b.build(b.e)
if err != nil {
return nil, err
}
return b.factory.ConstructPlan(plan.root, b.subqueries, b.postqueries)
}

func (b *Builder) build(e opt.Expr) (_ execPlan, err error) {
defer func() {
if r := recover(); r != nil {
// This code allows us to propagate errors without adding lots of checks
Expand All @@ -107,27 +115,16 @@ func (b *Builder) Build() (_ exec.Plan, err error) {
}
}()

root, err := b.build(b.e)
if err != nil {
return nil, err
}
return b.factory.ConstructPlan(root, b.subqueries, b.postqueries)
}

func (b *Builder) build(e opt.Expr) (exec.Node, error) {
rel, ok := e.(memo.RelExpr)
if !ok {
return nil, errors.AssertionFailedf("building execution for non-relational operator %s", log.Safe(e.Op()))
return execPlan{}, errors.AssertionFailedf(
"building execution for non-relational operator %s", log.Safe(e.Op()),
)
}

b.autoCommit = b.canAutoCommit(rel)

plan, err := b.buildRelational(rel)
if err != nil {
return nil, err
}

return plan.root, nil
return b.buildRelational(rel)
}

// BuildScalar converts a scalar expression to a TypedExpr. Variables are mapped
Expand Down
65 changes: 65 additions & 0 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) {
case *memo.WithScanExpr:
ep, err = b.buildWithScan(t)

case *memo.RecursiveCTEExpr:
ep, err = b.buildRecursiveCTE(t)

case *memo.ExplainExpr:
ep, err = b.buildExplain(t)

Expand Down Expand Up @@ -1424,6 +1427,68 @@ func (b *Builder) buildWith(with *memo.WithExpr) (execPlan, error) {
return b.buildRelational(with.Main)
}

func (b *Builder) buildRecursiveCTE(rec *memo.RecursiveCTEExpr) (execPlan, error) {
initial, err := b.buildRelational(rec.Initial)
if err != nil {
return execPlan{}, err
}

// Make sure we have the columns in the correct order.
initial, err = b.ensureColumns(initial, rec.InitialCols, nil /* colNames */, nil /* ordering */)
if err != nil {
return execPlan{}, err
}

// Renumber the columns so they match the columns expected by recursive query.
initial.outputCols = util.FastIntMap{}
for i, col := range rec.OutCols {
initial.outputCols.Set(int(col), i)
}

// To implement exec.RecursiveCTEIterationFn, we create a special Builder.

innerBldTemplate := &Builder{
factory: b.factory,
mem: b.mem,
catalog: b.catalog,
evalCtx: b.evalCtx,
// If the recursive query itself contains CTEs, building it in the function
// below will add to withExprs. Cap the slice to force reallocation on any
// appends, so that they don't overwrite overwrite later appends by our
// original builder.
withExprs: b.withExprs[:len(b.withExprs):len(b.withExprs)],
}

fn := func(bufferRef exec.Node) (exec.Plan, error) {
// Use a separate builder each time.
innerBld := *innerBldTemplate
innerBld.addBuiltWithExpr(rec.WithID, initial.outputCols, bufferRef)
plan, err := innerBld.build(rec.Recursive)
if err != nil {
return nil, err
}
// Ensure columns are output in the same order.
plan, err = innerBld.ensureColumns(
plan, rec.RecursiveCols, nil /* colNames */, nil, /* ordering */
)
if err != nil {
return nil, err
}
return innerBld.factory.ConstructPlan(plan.root, innerBld.subqueries, innerBld.postqueries)
}

label := fmt.Sprintf("working buffer (%s)", rec.Name)
var ep execPlan
ep.root, err = b.factory.ConstructRecursiveCTE(initial.root, fn, label)
if err != nil {
return execPlan{}, err
}
for i, col := range rec.OutCols {
ep.outputCols.Set(int(col), i)
}
return ep, nil
}

func (b *Builder) buildWithScan(withScan *memo.WithScanExpr) (execPlan, error) {
id := withScan.ID
var e *builtWithExpr
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/with
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,25 @@ hash-join · · (col) ·
└── scan · · (col) ·
· table table39010@primary · ·
· spans ALL · ·

query TTTTT
EXPLAIN (VERBOSE)
WITH RECURSIVE t(n) AS (
VALUES (1)
UNION ALL
SELECT n+1 FROM t WHERE n < 100
)
SELECT sum(n) FROM t
----
· distributed false · ·
· vectorized false · ·
group · · (sum) ·
│ aggregate 0 sum(n) · ·
│ scalar · · ·
└── render · · (n) ·
│ render 0 column1 · ·
└── recursive cte node · · (column1) ·
│ label working buffer (t) · ·
└── values · · (column1) ·
· size 1 column, 1 row · ·
· row 0, expr 0 1 · ·
21 changes: 19 additions & 2 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,12 +473,24 @@ type Factory interface {

// ConstructBuffer constructs a node whose input can be referenced from
// elsewhere in the query.
ConstructBuffer(value Node, label string) (Node, error)
ConstructBuffer(input Node, label string) (Node, error)

// ConstructScanBuffer constructs a node which refers to a node constructed by
// ConstructBuffer.
// ConstructBuffer or ConstructRecursiveCTEBuffer.
ConstructScanBuffer(ref Node, label string) (Node, error)

// ConstructRecursiveCTE constructs a node that executes a recursive CTE:
// * the initial plan is run first; the results are emitted and also saved
// in a buffer.
// * so long as the last buffer is not empty:
// - the RecursiveCTEIterationFn is used to create a plan for the
// recursive side; a reference to the last buffer is passed to this
// function. The returned plan uses this reference with a
// ConstructScanBuffer call.
// - the plan is executed; the results are emitted and also saved in a new
// buffer for the next iteration.
ConstructRecursiveCTE(initial Node, fn RecursiveCTEIterationFn, label string) (Node, error)

// ConstructControlJobs creates a node that implements PAUSE/CANCEL/RESUME
// JOBS.
ConstructControlJobs(command tree.JobCommand, input Node) (Node, error)
Expand Down Expand Up @@ -613,3 +625,8 @@ type KVOption struct {
// If there is no value, Value is DNull.
Value tree.TypedExpr
}

// RecursiveCTEIterationFn creates a plan for an iteration of WITH RECURSIVE,
// given the result of the last iteration (as a Buffer that can be used with
// ConstructScanBuffer).
type RecursiveCTEIterationFn func(bufferRef Node) (Plan, error)
21 changes: 13 additions & 8 deletions pkg/sql/opt/memo/expr_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,15 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) {
}

case *WithExpr:
w := e.(*WithExpr)
fmt.Fprintf(f.Buffer, "%v &%d", e.Op(), w.ID)
if w.Name != "" {
fmt.Fprintf(f.Buffer, " (%s)", w.Name)
fmt.Fprintf(f.Buffer, "%v &%d", e.Op(), t.ID)
if t.Name != "" {
fmt.Fprintf(f.Buffer, " (%s)", t.Name)
}

case *WithScanExpr:
ws := e.(*WithScanExpr)
fmt.Fprintf(f.Buffer, "%v &%d", e.Op(), ws.ID)
if ws.Name != "" {
fmt.Fprintf(f.Buffer, " (%s)", ws.Name)
fmt.Fprintf(f.Buffer, "%v &%d", e.Op(), t.ID)
if t.Name != "" {
fmt.Fprintf(f.Buffer, " (%s)", t.Name)
}

default:
Expand Down Expand Up @@ -469,6 +467,13 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) {
tp.Childf("mode: %s", m)
}

case *RecursiveCTEExpr:
if !f.HasFlags(ExprFmtHideColumns) {
tp.Childf("working table binding: &%d", t.WithID)
f.formatColList(e, tp, "initial columns:", t.InitialCols)
f.formatColList(e, tp, "recursive columns:", t.RecursiveCols)
}

default:
if opt.IsJoinOp(t) {
p := t.Private().(*JoinPrivate)
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/opt/memo/logical_props_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,37 @@ func (b *logicalPropsBuilder) buildWithScanProps(withScan *WithScanExpr, rel *pr
}
}

func (b *logicalPropsBuilder) buildRecursiveCTEProps(rec *RecursiveCTEExpr, rel *props.Relational) {
BuildSharedProps(b.mem, rec, &rel.Shared)

// Output Columns
// --------------
rel.OutputCols = rec.OutCols.ToSet()

// Not Null Columns
// ----------------
// All columns are assumed to be nullable.

// Outer Columns
// -------------
// No outer columns.

// Functional Dependencies
// -----------------------
// No known FDs.

// Cardinality
// -----------
// At least the cardinality of the initial buffer.
rel.Cardinality = props.AnyCardinality.AtLeast(rec.Initial.Relational().Cardinality)

// Statistics
// ----------
if !b.disableStats {
b.sb.buildUnknown(rel)
}
}

func (b *logicalPropsBuilder) buildExplainProps(explain *ExplainExpr, rel *props.Relational) {
b.buildBasicProps(explain, explain.ColList, rel)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/memo/statistics_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (sb *statisticsBuilder) colStat(colSet opt.ColSet, e RelExpr) *props.Column
return sb.colStatSequenceSelect(colSet, e.(*SequenceSelectExpr))

case opt.ExplainOp, opt.ShowTraceForSessionOp,
opt.OpaqueRelOp, opt.OpaqueMutationOp, opt.OpaqueDDLOp:
opt.OpaqueRelOp, opt.OpaqueMutationOp, opt.OpaqueDDLOp, opt.RecursiveCTEOp:
return sb.colStatUnknown(colSet, e.Relational())

case opt.WithOp:
Expand Down
37 changes: 37 additions & 0 deletions pkg/sql/opt/ops/relational.opt
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,43 @@ define WithScanPrivate {
OutCols ColList
}

# RecursiveCTE implements the logic of a recursive CTE:
# * the Initial query is evaluated; the results are emitted and also saved into
# a "working table".
# * so long as the working table is not empty:
# - the Recursive query (which refers to the working table using a specific
# WithID) is evaluated; the results are emitted and also saved into a new
# "working table" for the next iteration.
[Relational]
define RecursiveCTE {
Initial RelExpr
Recursive RelExpr

_ RecursiveCTEPrivate
}

[Private]
define RecursiveCTEPrivate {
# Name is used to identify the CTE being referenced for debugging purposes.
Name string

# WithID is the ID through which the Recursive expression refers to the
# current working table.
WithID WithID

# InitialCols are the columns produced by the initial expression.
InitialCols ColList


# RecursiveCols are the columns produced by the recursive expression, that
# map 1-1 to InitialCols.
RecursiveCols ColList

# OutCols are the columns produced by the RecursiveCTE operator; they map
# 1-1 to InitialCols and to RecursiveCols.
OutCols ColList
}

# FakeRel is a mock relational operator used for testing; its logical properties
# are pre-determined and stored in the private. It can be used as the child of
# an operator for which we are calculating properties or statistics.
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/opt/optbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ type Builder struct {
evalCtx *tree.EvalContext
catalog cat.Catalog
scopeAlloc []scope
ctes []cteSource

// If set, the planner will skip checking for the SELECT privilege when
// resolving data sources (tables, views, etc). This is used when compiling
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/optbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (b *Builder) buildDelete(del *tree.Delete, inScope *scope) (outScope *scope

var ctes []cteSource
if del.With != nil {
inScope, ctes = b.buildCTE(del.With.CTEList, inScope)
inScope, ctes = b.buildCTEs(del.With, inScope)
}

// DELETE FROM xx AS yy - we want to know about xx (tn) because
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/optbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func init() {
func (b *Builder) buildInsert(ins *tree.Insert, inScope *scope) (outScope *scope) {
var ctes []cteSource
if ins.With != nil {
inScope, ctes = b.buildCTE(ins.With.CTEList, inScope)
inScope, ctes = b.buildCTEs(ins.With, inScope)
}

// INSERT INTO xx AS yy - we want to know about xx (tn) because
Expand Down
Loading

0 comments on commit 1c414a8

Please sign in to comment.