Skip to content

Commit

Permalink
sql: add plumbing for phys plan creation directly in execbuilder
Browse files Browse the repository at this point in the history
This commit introduces `planMaybePhysical` utility struct that
represents a plan and uses either planNode ("logical) or DistSQL spec
("physical") representations. It will be removed once we are able to
create all processor spec in execbuilder directly. This struct has been
plumbed in all places that need to look at the plan, but in most of them
only the logical representation is supported. However, the main codepath
for executing a statement supports both, and if physical representation
is used, then we bypass distsql physical planner.

This commit also renames `checkSupportForNode` to
`checkSupportForPlanNode` and `createPlanForNode` to
`createPhysPlanForPlanNode` to make their purpose more clear.

At the moment, if experimental distsql planning is enabled, we will use
the execbuilder-driven DistSQL spec creation only for SELECT statements
(which is checked using a simple string-matching heuristic). We do it
this way because table reader is the first spec to be implemented in
execbuilder. We could introduce a fallback to "old" code path of
planNode-based factory, but I'm worried that will be hiding errors in
the "new" path.

Release note: None
  • Loading branch information
yuzefovich committed May 28, 2020
1 parent d103a98 commit 63ba4b3
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 121 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(

var cols sqlbase.ResultColumns
if stmt.AST.StatementType() == tree.Rows {
cols = planColumns(planner.curPlan.main)
cols = planner.curPlan.main.planColumns()
}
if err := ex.initStatementResult(ctx, res, stmt, cols); err != nil {
res.SetError(err)
Expand Down
82 changes: 41 additions & 41 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func newQueryNotSupportedErrorf(format string, args ...interface{}) error {
}

// planNodeNotSupportedErr is the catch-all error value returned from
// checkSupportForNode when a planNode type does not support distributed
// checkSupportForPlanNode when a planNode type does not support distributed
// execution.
var planNodeNotSupportedErr = newQueryNotSupportedError("unsupported node")

Expand All @@ -284,7 +284,7 @@ var cannotDistributeRowLevelLockingErr = newQueryNotSupportedError(
)

// mustWrapNode returns true if a node has no DistSQL-processor equivalent.
// This must be kept in sync with createPlanForNode.
// This must be kept in sync with createPhysPlanForPlanNode.
// TODO(jordan): refactor these to use the observer pattern to avoid duplication.
func (dsp *DistSQLPlanner) mustWrapNode(planCtx *PlanningCtx, node planNode) bool {
switch n := node.(type) {
Expand All @@ -305,7 +305,7 @@ func (dsp *DistSQLPlanner) mustWrapNode(planCtx *PlanningCtx, node planNode) boo
case *unaryNode:
case *unionNode:
case *valuesNode:
// This is unfortunately duplicated by createPlanForNode, and must be kept
// This is unfortunately duplicated by createPhysPlanForPlanNode, and must be kept
// in sync with its implementation.
if !n.specifiedInQuery || planCtx.isLocal || planCtx.noEvalSubqueries {
return true
Expand All @@ -320,28 +320,28 @@ func (dsp *DistSQLPlanner) mustWrapNode(planCtx *PlanningCtx, node planNode) boo
return false
}

// checkSupportForNode returns a distRecommendation (as described above) or
// checkSupportForPlanNode returns a distRecommendation (as described above) or
// cannotDistribute and an error if the plan subtree is not distributable.
// The error doesn't indicate complete failure - it's instead the reason that
// this plan couldn't be distributed.
// TODO(radu): add tests for this.
func checkSupportForNode(node planNode) (distRecommendation, error) {
func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
switch n := node.(type) {
// Keep these cases alphabetized, please!
case *distinctNode:
return checkSupportForNode(n.plan)
return checkSupportForPlanNode(n.plan)

case *exportNode:
return checkSupportForNode(n.source)
return checkSupportForPlanNode(n.source)

case *filterNode:
if err := checkExpr(n.filter); err != nil {
return cannotDistribute, err
}
return checkSupportForNode(n.source.plan)
return checkSupportForPlanNode(n.source.plan)

case *groupNode:
rec, err := checkSupportForNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -351,20 +351,20 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
case *indexJoinNode:
// n.table doesn't have meaningful spans, but we need to check support (e.g.
// for any filtering expression).
if _, err := checkSupportForNode(n.table); err != nil {
if _, err := checkSupportForPlanNode(n.table); err != nil {
return cannotDistribute, err
}
return checkSupportForNode(n.input)
return checkSupportForPlanNode(n.input)

case *joinNode:
if err := checkExpr(n.pred.onCond); err != nil {
return cannotDistribute, err
}
recLeft, err := checkSupportForNode(n.left.plan)
recLeft, err := checkSupportForPlanNode(n.left.plan)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForNode(n.right.plan)
recRight, err := checkSupportForPlanNode(n.right.plan)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -384,27 +384,27 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
if err := checkExpr(n.offsetExpr); err != nil {
return cannotDistribute, err
}
return checkSupportForNode(n.plan)
return checkSupportForPlanNode(n.plan)

case *lookupJoinNode:
if err := checkExpr(n.onCond); err != nil {
return cannotDistribute, err
}
if _, err := checkSupportForNode(n.input); err != nil {
if _, err := checkSupportForPlanNode(n.input); err != nil {
return cannotDistribute, err
}
return shouldDistribute, nil

case *projectSetNode:
return checkSupportForNode(n.source)
return checkSupportForPlanNode(n.source)

case *renderNode:
for _, e := range n.render {
if err := checkExpr(e); err != nil {
return cannotDistribute, err
}
}
return checkSupportForNode(n.source.plan)
return checkSupportForPlanNode(n.source.plan)

case *scanNode:
if n.lockingStrength != sqlbase.ScanLockingStrength_FOR_NONE {
Expand Down Expand Up @@ -437,7 +437,7 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
return rec, nil

case *sortNode:
rec, err := checkSupportForNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -449,11 +449,11 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *unionNode:
recLeft, err := checkSupportForNode(n.left)
recLeft, err := checkSupportForPlanNode(n.left)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForNode(n.right)
recRight, err := checkSupportForPlanNode(n.right)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -477,7 +477,7 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *windowNode:
return checkSupportForNode(n.plan)
return checkSupportForPlanNode(n.plan)

case *zeroNode:
return canDistribute, nil
Expand Down Expand Up @@ -1804,7 +1804,7 @@ func (dsp *DistSQLPlanner) addAggregators(
func (dsp *DistSQLPlanner) createPlanForIndexJoin(
planCtx *PlanningCtx, n *indexJoinNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.input)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.input)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -1879,7 +1879,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin(
func (dsp *DistSQLPlanner) createPlanForLookupJoin(
planCtx *PlanningCtx, n *lookupJoinNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.input)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.input)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2161,11 +2161,11 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
//
// - The routers of the joiner processors are the result routers of the plan.

leftPlan, err := dsp.createPlanForNode(planCtx, n.left.plan)
leftPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.left.plan)
if err != nil {
return PhysicalPlan{}, err
}
rightPlan, err := dsp.createPlanForNode(planCtx, n.right.plan)
rightPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.right.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2275,7 +2275,7 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
return p, nil
}

func (dsp *DistSQLPlanner) createPlanForNode(
func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
planCtx *PlanningCtx, node planNode,
) (plan PhysicalPlan, err error) {
planCtx.planDepth++
Expand All @@ -2289,7 +2289,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createPlanForExport(planCtx, n)

case *filterNode:
plan, err = dsp.createPlanForNode(planCtx, n.source.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.source.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2299,7 +2299,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
}

case *groupNode:
plan, err = dsp.createPlanForNode(planCtx, n.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2315,7 +2315,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createPlanForJoin(planCtx, n)

case *limitNode:
plan, err = dsp.createPlanForNode(planCtx, n.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2336,7 +2336,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createPlanForProjectSet(planCtx, n)

case *renderNode:
plan, err = dsp.createPlanForNode(planCtx, n.source.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.source.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2349,7 +2349,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createTableReaders(planCtx, n, nil)

case *sortNode:
plan, err = dsp.createPlanForNode(planCtx, n.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2363,7 +2363,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createPlanForSetOp(planCtx, n)

case *valuesNode:
// Just like in checkSupportForNode, if a valuesNode wasn't specified in
// Just like in checkSupportForPlanNode, if a valuesNode wasn't specified in
// the query, it means that it was autogenerated for things that we don't
// want to be distributing, like populating values from a virtual table. So,
// we wrap the plan instead.
Expand All @@ -2379,7 +2379,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
// evaluatable.
//
// NB: If you change this conditional, you must also change it in
// checkSupportForNode!
// checkSupportForPlanNode!
if !n.specifiedInQuery || planCtx.isLocal || planCtx.noEvalSubqueries {
plan, err = dsp.wrapPlan(planCtx, n)
} else {
Expand Down Expand Up @@ -2462,7 +2462,7 @@ func (dsp *DistSQLPlanner) wrapPlan(planCtx *PlanningCtx, n planNode) (PhysicalP
// control of planning back to the DistSQL physical planner.
if !dsp.mustWrapNode(planCtx, plan) {
firstNotWrapped = plan
p, err = dsp.createPlanForNode(planCtx, plan)
p, err = dsp.createPhysPlanForPlanNode(planCtx, plan)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -2685,7 +2685,7 @@ func createDistinctSpec(n *distinctNode, cols []int) *execinfrapb.DistinctSpec {
func (dsp *DistSQLPlanner) createPlanForDistinct(
planCtx *PlanningCtx, n *distinctNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2714,7 +2714,7 @@ func (dsp *DistSQLPlanner) createPlanForDistinct(
func (dsp *DistSQLPlanner) createPlanForOrdinality(
planCtx *PlanningCtx, n *ordinalityNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.source)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.source)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2760,7 +2760,7 @@ func createProjectSetSpec(
func (dsp *DistSQLPlanner) createPlanForProjectSet(
planCtx *PlanningCtx, n *projectSetNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.source)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.source)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2849,12 +2849,12 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
planCtx *PlanningCtx, n *unionNode,
) (PhysicalPlan, error) {
leftLogicalPlan := n.left
leftPlan, err := dsp.createPlanForNode(planCtx, n.left)
leftPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.left)
if err != nil {
return PhysicalPlan{}, err
}
rightLogicalPlan := n.right
rightPlan, err := dsp.createPlanForNode(planCtx, n.right)
rightPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.right)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -3061,7 +3061,7 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
func (dsp *DistSQLPlanner) createPlanForWindow(
planCtx *PlanningCtx, n *windowNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -3207,7 +3207,7 @@ func (dsp *DistSQLPlanner) createPlanForWindow(
func (dsp *DistSQLPlanner) createPlanForExport(
planCtx *PlanningCtx, n *exportNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.source)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.source)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_plan_ctas.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func PlanAndRunCTAS(
planCtx.planner = planner
planCtx.stmtType = tree.Rows

p, err := dsp.createPlanForNode(planCtx, in)
p, err := dsp.createPhysPlanForPlanNode(planCtx, in)
if err != nil {
recv.SetError(errors.Wrapf(err, "constructing distSQL plan"))
return
Expand Down
Loading

0 comments on commit 63ba4b3

Please sign in to comment.