Skip to content

Commit

Permalink
sql: implement ConstructLimit in the new factory
Browse files Browse the repository at this point in the history
This commit additionally removes the requirement that `limitExpr` and
`offsetExpr` must be distributable in order for the whole plan to be
distributable in the old path because those expressions are evaluated
during the physical planning, locally.

Release note: None
  • Loading branch information
yuzefovich committed Jun 25, 2020
1 parent df9b561 commit d20fb18
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 33 deletions.
14 changes: 6 additions & 8 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,9 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return rec, nil

case *limitNode:
if err := checkExpr(n.countExpr); err != nil {
return cannotDistribute, err
}
if err := checkExpr(n.offsetExpr); err != nil {
return cannotDistribute, err
}
// Note that we don't need to check whether we support distribution of
// n.countExpr or n.offsetExpr because those expressions are evaluated
// locally, during the physical planning.
return checkSupportForPlanNode(n.plan)

case *lookupJoinNode:
Expand Down Expand Up @@ -2353,10 +2350,11 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
if err != nil {
return nil, err
}
if err := n.evalLimit(planCtx.EvalContext()); err != nil {
var count, offset int64
if count, offset, err = evalLimit(planCtx.EvalContext(), n.countExpr, n.offsetExpr); err != nil {
return nil, err
}
if err := plan.AddLimit(n.count, n.offset, planCtx); err != nil {
if err := plan.AddLimit(count, offset, planCtx); err != nil {
return nil, err
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,23 @@ func (e *distSQLSpecExecFactory) ConstructZigzagJoin(
}

func (e *distSQLSpecExecFactory) ConstructLimit(
input exec.Node, limit, offset tree.TypedExpr,
input exec.Node, limitExpr, offsetExpr tree.TypedExpr,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: limit")
physPlan, plan := getPhysPlan(input)
// Note that we pass in nil slice for exprs because we will evaluate both
// expressions below, locally.
recommendation := e.checkExprsAndMaybeMergeLastStage(nil /* exprs */, physPlan)
count, offset, err := evalLimit(e.planner.EvalContext(), limitExpr, offsetExpr)
if err != nil {
return nil, err
}
if err = physPlan.AddLimit(count, offset, e.getPlanCtx(recommendation)); err != nil {
return nil, err
}
// Since addition of limit and/or offset doesn't change any properties of
// the physical plan, we don't need to update any of those (like
// PlanToStreamColMap, etc).
return plan, nil
}

func (e *distSQLSpecExecFactory) ConstructMax1Row(
Expand Down
22 changes: 10 additions & 12 deletions pkg/sql/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ type limitNode struct {
plan planNode
countExpr tree.TypedExpr
offsetExpr tree.TypedExpr
evaluated bool
count int64
offset int64
}

func (n *limitNode) startExec(params runParams) error {
Expand All @@ -47,24 +44,26 @@ func (n *limitNode) Close(ctx context.Context) {

// evalLimit evaluates the Count and Offset fields. If Count is missing, the
// value is MaxInt64. If Offset is missing, the value is 0
func (n *limitNode) evalLimit(evalCtx *tree.EvalContext) error {
n.count = math.MaxInt64
n.offset = 0
func evalLimit(
evalCtx *tree.EvalContext, countExpr, offsetExpr tree.TypedExpr,
) (count, offset int64, err error) {
count = math.MaxInt64
offset = 0

data := []struct {
name string
src tree.TypedExpr
dst *int64
}{
{"LIMIT", n.countExpr, &n.count},
{"OFFSET", n.offsetExpr, &n.offset},
{"LIMIT", countExpr, &count},
{"OFFSET", offsetExpr, &offset},
}

for _, datum := range data {
if datum.src != nil {
dstDatum, err := datum.src.Eval(evalCtx)
if err != nil {
return err
return count, offset, err
}

if dstDatum == tree.DNull {
Expand All @@ -75,11 +74,10 @@ func (n *limitNode) evalLimit(evalCtx *tree.EvalContext) error {
dstDInt := tree.MustBeDInt(dstDatum)
val := int64(dstDInt)
if val < 0 {
return fmt.Errorf("negative value for %s", datum.name)
return count, offset, fmt.Errorf("negative value for %s", datum.name)
}
*datum.dst = val
}
}
n.evaluated = true
return nil
return count, offset, nil
}
24 changes: 13 additions & 11 deletions pkg/sql/physicalplan/physical_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,28 +753,30 @@ func (p *PhysicalPlan) AddFilter(
return nil
}

// emptyPlan creates a plan with a single processor on the gateway that
// generates no rows; the output stream has the given types.
func emptyPlan(types []*types.T, gatewayNodeID roachpb.NodeID) PhysicalPlan {
// emptyPlan updates p in-place with a plan consisting of a single processor on
// the gateway that generates no rows; the output stream has the same types as
// p produces.
func (p *PhysicalPlan) emptyPlan() {
s := execinfrapb.ValuesCoreSpec{
Columns: make([]execinfrapb.DatumInfo, len(types)),
Columns: make([]execinfrapb.DatumInfo, len(p.ResultTypes)),
}
for i, t := range types {
for i, t := range p.ResultTypes {
s.Columns[i].Encoding = sqlbase.DatumEncoding_VALUE
s.Columns[i].Type = t
}

return PhysicalPlan{
*p = PhysicalPlan{
Processors: []Processor{{
Node: gatewayNodeID,
Node: p.GatewayNodeID,
Spec: execinfrapb.ProcessorSpec{
Core: execinfrapb.ProcessorCoreUnion{Values: &s},
Output: make([]execinfrapb.OutputRouterSpec, 1),
},
}},
ResultRouters: []ProcessorIdx{0},
ResultTypes: types,
GatewayNodeID: gatewayNodeID,
ResultTypes: p.ResultTypes,
ResultColumns: p.ResultColumns,
GatewayNodeID: p.GatewayNodeID,
Distribution: LocalPlan,
}
}
Expand All @@ -801,7 +803,7 @@ func (p *PhysicalPlan) AddLimit(count int64, offset int64, exprCtx ExprContext)
limitZero := false
if count == 0 {
if len(p.LocalProcessors) == 0 {
*p = emptyPlan(p.ResultTypes, p.GatewayNodeID)
p.emptyPlan()
return nil
}
count = 1
Expand All @@ -824,7 +826,7 @@ func (p *PhysicalPlan) AddLimit(count int64, offset int64, exprCtx ExprContext)
// Even though we know there will be no results, we don't elide the
// plan if there are local processors. See comment above limitZero
// for why.
*p = emptyPlan(p.ResultTypes, p.GatewayNodeID)
p.emptyPlan()
return nil
}
count = 1
Expand Down

0 comments on commit d20fb18

Please sign in to comment.