Skip to content

Commit

Permalink
sql: add plumbing for phys plan creation directly in execbuilder
Browse files Browse the repository at this point in the history
This commit introduces `planMaybePhysical` utility struct that
represents a plan and uses either planNode ("logical) or DistSQL spec
("physical") representations. It will be removed once we are able to
create all processor spec in execbuilder directly. This struct has been
plumbed in all places that need to look at the plan, but in most of them
only the logical representation is supported. However, the main codepath
for executing a statement supports both, and if physical representation
is used, then we bypass distsql physical planner.

This commit also renames `checkSupportForNode` to
`checkSupportForPlanNode` and `createPlanForNode` to
`createPhysPlanForPlanNode` to make their purpose more clear.

Release note: None
  • Loading branch information
yuzefovich committed May 29, 2020
1 parent f613950 commit 2c1f991
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 112 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(

var cols sqlbase.ResultColumns
if stmt.AST.StatementType() == tree.Rows {
cols = planColumns(planner.curPlan.main)
cols = planner.curPlan.main.planColumns()
}
if err := ex.initStatementResult(ctx, res, stmt, cols); err != nil {
res.SetError(err)
Expand Down
91 changes: 50 additions & 41 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func newQueryNotSupportedErrorf(format string, args ...interface{}) error {
}

// planNodeNotSupportedErr is the catch-all error value returned from
// checkSupportForNode when a planNode type does not support distributed
// checkSupportForPlanNode when a planNode type does not support distributed
// execution.
var planNodeNotSupportedErr = newQueryNotSupportedError("unsupported node")

Expand All @@ -292,7 +292,7 @@ var cannotDistributeRowLevelLockingErr = newQueryNotSupportedError(
)

// mustWrapNode returns true if a node has no DistSQL-processor equivalent.
// This must be kept in sync with createPlanForNode.
// This must be kept in sync with createPhysPlanForPlanNode.
// TODO(jordan): refactor these to use the observer pattern to avoid duplication.
func (dsp *DistSQLPlanner) mustWrapNode(planCtx *PlanningCtx, node planNode) bool {
switch n := node.(type) {
Expand All @@ -313,7 +313,7 @@ func (dsp *DistSQLPlanner) mustWrapNode(planCtx *PlanningCtx, node planNode) boo
case *unaryNode:
case *unionNode:
case *valuesNode:
// This is unfortunately duplicated by createPlanForNode, and must be kept
// This is unfortunately duplicated by createPhysPlanForPlanNode, and must be kept
// in sync with its implementation.
if !n.specifiedInQuery || planCtx.isLocal || planCtx.noEvalSubqueries {
return true
Expand All @@ -328,28 +328,28 @@ func (dsp *DistSQLPlanner) mustWrapNode(planCtx *PlanningCtx, node planNode) boo
return false
}

// checkSupportForNode returns a distRecommendation (as described above) or
// checkSupportForPlanNode returns a distRecommendation (as described above) or
// cannotDistribute and an error if the plan subtree is not distributable.
// The error doesn't indicate complete failure - it's instead the reason that
// this plan couldn't be distributed.
// TODO(radu): add tests for this.
func checkSupportForNode(node planNode) (distRecommendation, error) {
func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
switch n := node.(type) {
// Keep these cases alphabetized, please!
case *distinctNode:
return checkSupportForNode(n.plan)
return checkSupportForPlanNode(n.plan)

case *exportNode:
return checkSupportForNode(n.source)
return checkSupportForPlanNode(n.source)

case *filterNode:
if err := checkExpr(n.filter); err != nil {
return cannotDistribute, err
}
return checkSupportForNode(n.source.plan)
return checkSupportForPlanNode(n.source.plan)

case *groupNode:
rec, err := checkSupportForNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -359,20 +359,20 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
case *indexJoinNode:
// n.table doesn't have meaningful spans, but we need to check support (e.g.
// for any filtering expression).
if _, err := checkSupportForNode(n.table); err != nil {
if _, err := checkSupportForPlanNode(n.table); err != nil {
return cannotDistribute, err
}
return checkSupportForNode(n.input)
return checkSupportForPlanNode(n.input)

case *joinNode:
if err := checkExpr(n.pred.onCond); err != nil {
return cannotDistribute, err
}
recLeft, err := checkSupportForNode(n.left.plan)
recLeft, err := checkSupportForPlanNode(n.left.plan)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForNode(n.right.plan)
recRight, err := checkSupportForPlanNode(n.right.plan)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -392,27 +392,27 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
if err := checkExpr(n.offsetExpr); err != nil {
return cannotDistribute, err
}
return checkSupportForNode(n.plan)
return checkSupportForPlanNode(n.plan)

case *lookupJoinNode:
if err := checkExpr(n.onCond); err != nil {
return cannotDistribute, err
}
if _, err := checkSupportForNode(n.input); err != nil {
if _, err := checkSupportForPlanNode(n.input); err != nil {
return cannotDistribute, err
}
return shouldDistribute, nil

case *projectSetNode:
return checkSupportForNode(n.source)
return checkSupportForPlanNode(n.source)

case *renderNode:
for _, e := range n.render {
if err := checkExpr(e); err != nil {
return cannotDistribute, err
}
}
return checkSupportForNode(n.source.plan)
return checkSupportForPlanNode(n.source.plan)

case *scanNode:
if n.lockingStrength != sqlbase.ScanLockingStrength_FOR_NONE {
Expand Down Expand Up @@ -445,7 +445,7 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
return rec, nil

case *sortNode:
rec, err := checkSupportForNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -457,11 +457,11 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *unionNode:
recLeft, err := checkSupportForNode(n.left)
recLeft, err := checkSupportForPlanNode(n.left)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForNode(n.right)
recRight, err := checkSupportForPlanNode(n.right)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -485,7 +485,7 @@ func checkSupportForNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *windowNode:
return checkSupportForNode(n.plan)
return checkSupportForPlanNode(n.plan)

case *zeroNode:
return canDistribute, nil
Expand Down Expand Up @@ -1812,7 +1812,7 @@ func (dsp *DistSQLPlanner) addAggregators(
func (dsp *DistSQLPlanner) createPlanForIndexJoin(
planCtx *PlanningCtx, n *indexJoinNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.input)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.input)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -1892,7 +1892,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin(
func (dsp *DistSQLPlanner) createPlanForLookupJoin(
planCtx *PlanningCtx, n *lookupJoinNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.input)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.input)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2174,11 +2174,11 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
//
// - The routers of the joiner processors are the result routers of the plan.

leftPlan, err := dsp.createPlanForNode(planCtx, n.left.plan)
leftPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.left.plan)
if err != nil {
return PhysicalPlan{}, err
}
rightPlan, err := dsp.createPlanForNode(planCtx, n.right.plan)
rightPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.right.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2288,7 +2288,16 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
return p, nil
}

func (dsp *DistSQLPlanner) createPlanForNode(
func (dsp *DistSQLPlanner) createPhysPlan(
planCtx *PlanningCtx, plan planMaybePhysical,
) (physPlan PhysicalPlan, err error) {
if plan.physPlan != nil {
return *plan.physPlan, nil
}
return dsp.createPhysPlanForPlanNode(planCtx, plan.planNode)
}

func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
planCtx *PlanningCtx, node planNode,
) (plan PhysicalPlan, err error) {
planCtx.planDepth++
Expand All @@ -2302,7 +2311,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createPlanForExport(planCtx, n)

case *filterNode:
plan, err = dsp.createPlanForNode(planCtx, n.source.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.source.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2312,7 +2321,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
}

case *groupNode:
plan, err = dsp.createPlanForNode(planCtx, n.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2328,7 +2337,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createPlanForJoin(planCtx, n)

case *limitNode:
plan, err = dsp.createPlanForNode(planCtx, n.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2349,7 +2358,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createPlanForProjectSet(planCtx, n)

case *renderNode:
plan, err = dsp.createPlanForNode(planCtx, n.source.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.source.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2362,7 +2371,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createTableReaders(planCtx, n, nil)

case *sortNode:
plan, err = dsp.createPlanForNode(planCtx, n.plan)
plan, err = dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -2376,7 +2385,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
plan, err = dsp.createPlanForSetOp(planCtx, n)

case *valuesNode:
// Just like in checkSupportForNode, if a valuesNode wasn't specified in
// Just like in checkSupportForPlanNode, if a valuesNode wasn't specified in
// the query, it means that it was autogenerated for things that we don't
// want to be distributing, like populating values from a virtual table. So,
// we wrap the plan instead.
Expand All @@ -2392,7 +2401,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
// evaluatable.
//
// NB: If you change this conditional, you must also change it in
// checkSupportForNode!
// checkSupportForPlanNode!
if !n.specifiedInQuery || planCtx.isLocal || planCtx.noEvalSubqueries {
plan, err = dsp.wrapPlan(planCtx, n)
} else {
Expand Down Expand Up @@ -2475,7 +2484,7 @@ func (dsp *DistSQLPlanner) wrapPlan(planCtx *PlanningCtx, n planNode) (PhysicalP
// control of planning back to the DistSQL physical planner.
if !dsp.mustWrapNode(planCtx, plan) {
firstNotWrapped = plan
p, err = dsp.createPlanForNode(planCtx, plan)
p, err = dsp.createPhysPlanForPlanNode(planCtx, plan)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -2698,7 +2707,7 @@ func createDistinctSpec(n *distinctNode, cols []int) *execinfrapb.DistinctSpec {
func (dsp *DistSQLPlanner) createPlanForDistinct(
planCtx *PlanningCtx, n *distinctNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2727,7 +2736,7 @@ func (dsp *DistSQLPlanner) createPlanForDistinct(
func (dsp *DistSQLPlanner) createPlanForOrdinality(
planCtx *PlanningCtx, n *ordinalityNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.source)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.source)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2773,7 +2782,7 @@ func createProjectSetSpec(
func (dsp *DistSQLPlanner) createPlanForProjectSet(
planCtx *PlanningCtx, n *projectSetNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.source)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.source)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2862,12 +2871,12 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
planCtx *PlanningCtx, n *unionNode,
) (PhysicalPlan, error) {
leftLogicalPlan := n.left
leftPlan, err := dsp.createPlanForNode(planCtx, n.left)
leftPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.left)
if err != nil {
return PhysicalPlan{}, err
}
rightLogicalPlan := n.right
rightPlan, err := dsp.createPlanForNode(planCtx, n.right)
rightPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.right)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -3074,7 +3083,7 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
func (dsp *DistSQLPlanner) createPlanForWindow(
planCtx *PlanningCtx, n *windowNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.plan)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -3220,7 +3229,7 @@ func (dsp *DistSQLPlanner) createPlanForWindow(
func (dsp *DistSQLPlanner) createPlanForExport(
planCtx *PlanningCtx, n *exportNode,
) (PhysicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.source)
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.source)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/sql/distsql_plan_ctas.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func PlanAndRunCTAS(
planner *planner,
txn *kv.Txn,
isLocal bool,
in planNode,
in planMaybePhysical,
out execinfrapb.ProcessorCoreUnion,
recv *DistSQLReceiver,
) {
Expand All @@ -36,22 +36,21 @@ func PlanAndRunCTAS(
planCtx.planner = planner
planCtx.stmtType = tree.Rows

p, err := dsp.createPlanForNode(planCtx, in)
physPlan, err := dsp.createPhysPlan(planCtx, in)
if err != nil {
recv.SetError(errors.Wrapf(err, "constructing distSQL plan"))
return
}

p.AddNoGroupingStage(
physPlan.AddNoGroupingStage(
out, execinfrapb.PostProcessSpec{}, rowexec.CTASPlanResultTypes, execinfrapb.Ordering{},
)

// The bulk row writers will emit a binary encoded BulkOpSummary.
p.PlanToStreamColMap = []int{0}
p.ResultTypes = rowexec.CTASPlanResultTypes
physPlan.PlanToStreamColMap = []int{0}
physPlan.ResultTypes = rowexec.CTASPlanResultTypes

// Make copy of evalCtx as Run might modify it.
evalCtxCopy := planner.ExtendedEvalContextCopy()
dsp.FinalizePlan(planCtx, &p)
dsp.Run(planCtx, txn, &p, recv, evalCtxCopy, nil /* finishedSetupFn */)()
dsp.FinalizePlan(planCtx, &physPlan)
dsp.Run(planCtx, txn, &physPlan, recv, evalCtxCopy, nil /* finishedSetupFn */)()
}
Loading

0 comments on commit 2c1f991

Please sign in to comment.