Skip to content

Commit

Permalink
sql: implement ConstructScan in distsql spec factory
Browse files Browse the repository at this point in the history
This commit adds an implementation of
`distSQLSpecExecFactory.ConstructScan` which combines the logic that
performs `scanNode` creation of `execFactory.ConstructScan` and physical
planning of table readers of `DistSQLPlanner.createTableReaders`.
I tried to refactor the code so that there is not much duplication going
on.

Notably, simple projections, renders, and filters are not yet
implemented.

Release note: None
  • Loading branch information
yuzefovich committed Jun 2, 2020
1 parent 1071544 commit 5112560
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 18 deletions.
14 changes: 9 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2096,18 +2096,22 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
return plan, nil
}

func getTypesFromResultColumns(cols sqlbase.ResultColumns) []*types.T {
typs := make([]*types.T, len(cols))
for i, col := range cols {
typs[i] = col.Typ
}
return typs
}

// getTypesForPlanResult returns the types of the elements in the result streams
// of a plan that corresponds to a given planNode. If planToStreamColMap is nil,
// a 1-1 mapping is assumed.
func getTypesForPlanResult(node planNode, planToStreamColMap []int) ([]*types.T, error) {
nodeColumns := planColumns(node)
if planToStreamColMap == nil {
// No remapping.
types := make([]*types.T, len(nodeColumns))
for i := range nodeColumns {
types[i] = nodeColumns[i].Typ
}
return types, nil
return getTypesFromResultColumns(nodeColumns), nil
}
numCols := 0
for _, streamCol := range planToStreamColMap {
Expand Down
140 changes: 136 additions & 4 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@ package sql

import (
"github.com/cockroachdb/cockroach/pkg/geo/geoindex"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/opt/constraint"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/span"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
)

type distSQLSpecExecFactory struct {
planner *planner
dsp *DistSQLPlanner
}

var _ exec.Factory = &distSQLSpecExecFactory{}

func newDistSQLSpecExecFactory() exec.Factory {
return &distSQLSpecExecFactory{}
func newDistSQLSpecExecFactory(p *planner, dsp *DistSQLPlanner) exec.Factory {
return &distSQLSpecExecFactory{planner: p, dsp: dsp}
}

func (e *distSQLSpecExecFactory) ConstructValues(
Expand All @@ -36,6 +42,9 @@ func (e *distSQLSpecExecFactory) ConstructValues(
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
}

// ConstructScan implements exec.Factory interface by combining the logic that
// performs scanNode creation of execFactory.ConstructScan and physical
// planning of table readers of DistSQLPlanner.createTableReaders.
func (e *distSQLSpecExecFactory) ConstructScan(
table cat.Table,
index cat.Index,
Expand All @@ -49,12 +58,131 @@ func (e *distSQLSpecExecFactory) ConstructScan(
rowCount float64,
locking *tree.LockingItem,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
if table.IsVirtualTable() {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
}

var p PhysicalPlan
// Although we don't yet recommend distributing plans where soft limits
// propagate to scan nodes because we don't have infrastructure to only
// plan for a few ranges at a time, the propagation of the soft limits
// to scan nodes has been added in 20.1 release, so to keep the
// previous behavior we continue to ignore the soft limits for now.
// TODO(yuzefovich): pay attention to the soft limits.
recommendation := canDistribute

// ------- begin execFactory.ConstructScan -------

tabDesc := table.(*optTable).desc
indexDesc := index.(*optIndex).desc
colCfg := makeScanColumnsConfig(table, needed)
sb := span.MakeBuilder(e.planner.ExecCfg().Codec, tabDesc.TableDesc(), indexDesc)

// Note that initColsForScan and setting ResultColumns below are equivalent
// to what scan.initTable call does in execFactory.ConstructScan.
cols, err := initColsForScan(tabDesc, colCfg)
if err != nil {
return nil, err
}
p.ResultColumns = sqlbase.ResultColumnsFromColDescs(tabDesc.GetID(), cols)

if indexConstraint != nil && indexConstraint.IsContradiction() {
// TODO(yuzefovich): once ConstructValues is implemented, consider
// calling it here.
physPlan, err := e.dsp.createValuesPlan(
getTypesFromResultColumns(p.ResultColumns), 0 /* numRows */, nil, /* rawBytes */
)
return planMaybePhysical{physPlan: physPlan, recommendation: canDistribute}, err
}

// TODO(yuzefovich): scanNode adds "parallel" attribute in walk.go when
// scanNode.canParallelize() returns true. We should plumb that info from
// here somehow as well.
var spans roachpb.Spans
spans, err = sb.SpansFromConstraint(indexConstraint, needed, false /* forDelete */)
if err != nil {
return nil, err
}
isFullTableScan := len(spans) == 1 && spans[0].EqualValue(
tabDesc.IndexSpan(e.planner.ExecCfg().Codec, indexDesc.ID),
)
if err = colCfg.assertValidReqOrdering(reqOrdering); err != nil {
return nil, err
}

// ------- end execFactory.ConstructScan -------

// Check if we are doing a full scan.
if isFullTableScan {
recommendation = recommendation.compose(shouldDistribute)
}

// ------- begin DistSQLPlanner.createTableReaders -------

colsForScanToTableOrdinalMap := getColsForScanToTableOrdinalMap(cols, tabDesc, colCfg.visibility)

// ------- begin DistSQLPlanner.initTableReaderSpec -------

trSpec := physicalplan.NewTableReaderSpec()
*trSpec = execinfrapb.TableReaderSpec{
Table: *tabDesc.TableDesc(),
Reverse: reverse,
IsCheck: false,
Visibility: colCfg.visibility,
// Retain the capacity of the spans slice.
Spans: trSpec.Spans[:0],
}
trSpec.IndexIdx, err = getIndexIdx(indexDesc, tabDesc)
if err != nil {
return nil, err
}
if locking != nil {
trSpec.LockingStrength = sqlbase.ToScanLockingStrength(locking.Strength)
trSpec.LockingWaitPolicy = sqlbase.ToScanLockingWaitPolicy(locking.WaitPolicy)
if trSpec.LockingStrength != sqlbase.ScanLockingStrength_FOR_NONE {
// Scans that are performing row-level locking cannot currently be
// distributed because their locks would not be propagated back to
// the root transaction coordinator.
// TODO(nvanbenschoten): lift this restriction.
recommendation = cannotDistribute
}
}

// Note that we don't do anything about the possible filter here since we
// don't know yet whether we will have it. ConstructFilter is responsible
// for pushing the filter down into the post-processing stage of this scan.
post := execinfrapb.PostProcessSpec{}
if hardLimit != 0 {
post.Limit = uint64(hardLimit)
} else if softLimit != 0 {
trSpec.LimitHint = softLimit
}

// ------- end DistSQLPlanner.initTableReaderSpec -------

distribute := shouldDistributeGivenRecAndMode(recommendation, e.planner.extendedEvalCtx.SessionData.DistSQLMode)
if _, singleTenant := e.planner.execCfg.NodeID.OptionalNodeID(); !singleTenant {
distribute = false
}

evalCtx := e.planner.ExtendedEvalContext()
planCtx := e.dsp.NewPlanningCtx(evalCtx.Context, evalCtx, e.planner.txn, distribute)
err = e.dsp.planTableReaders(
planCtx, &p, trSpec, post, tabDesc, spans, reverse, colCfg, maxResults,
uint64(rowCount), ReqOrdering(reqOrdering), cols, colsForScanToTableOrdinalMap,
)

// ------- end DistSQLPlanner.createTableReaders -------

return planMaybePhysical{physPlan: &p, recommendation: recommendation}, err
}

func (e *distSQLSpecExecFactory) ConstructFilter(
n exec.Node, filter tree.TypedExpr, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
// TODO(yuzefovich): figure out how to push the filter into the table
// reader when it already doesn't have a filter and it doesn't have a hard
// limit.
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
}

Expand Down Expand Up @@ -229,7 +357,11 @@ func (e *distSQLSpecExecFactory) ConstructWindow(
func (e *distSQLSpecExecFactory) RenameColumns(
input exec.Node, colNames []string,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
inputCols := input.(planMaybePhysical).physPlan.ResultColumns
for i := range inputCols {
inputCols[i].Name = colNames[i]
}
return input, nil
}

func (e *distSQLSpecExecFactory) ConstructPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,41 @@ SET CLUSTER SETTING sql.defaults.experimental_distsql_planning = on
statement ok
SET experimental_distsql_planning = always

# Test that a SELECT query fails but others don't.
statement ok
CREATE TABLE kv (k INT PRIMARY KEY, v INT); INSERT INTO kv VALUES (1, 1), (2, 1)
CREATE TABLE kv (k INT PRIMARY KEY, v INT); INSERT INTO kv VALUES (1, 1), (2, 1), (3, 2)

statement error pq: unimplemented: experimental opt-driven distsql planning
query II colnames,rowsort
SELECT * FROM kv
----
k v
1 1
2 1
3 2

query I colnames,rowsort
SELECT k FROM kv
----
k
1
2
3

query I colnames,rowsort
SELECT v FROM kv
----
v
1
1
2

# Projections are not yet supported.
statement error pq: unimplemented: experimental opt-driven distsql planning
SELECT v, k FROM kv

# Renders are not yet supported.
statement error pq: unimplemented: experimental opt-driven distsql planning
SELECT k + v FROM kv

# Filters are not yet supported.
statement error pq: unimplemented: experimental opt-driven distsql planning
SELECT * FROM kv WHERE k > v
6 changes: 5 additions & 1 deletion pkg/sql/physicalplan/physical_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type PhysicalPlan struct {
LocalProcessors []execinfra.LocalProcessor

// LocalProcessorIndexes contains pointers to all of the RowSourceIdx fields
// of the LocalPlanNodeSpecs that were created. This list is in the same
// of the LocalPlanNodeSpecs that were created. This list is in the same
// order as LocalProcessors, and is kept up-to-date so that LocalPlanNodeSpecs
// always have the correct index into the LocalProcessors slice.
LocalProcessorIndexes []*uint32
Expand Down Expand Up @@ -105,6 +105,10 @@ type PhysicalPlan struct {
// in-place during planning.
ResultTypes []*types.T

// ResultColumns is the schema (result columns) of the rows produced by the
// ResultRouters.
ResultColumns sqlbase.ResultColumns

// MergeOrdering is the ordering guarantee for the result streams that must be
// maintained when the streams eventually merge. The column indexes refer to
// columns for the rows produced by ResultRouters.
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,7 @@ func (p planMaybePhysical) isPhysicalPlan() bool {

func (p planMaybePhysical) planColumns() sqlbase.ResultColumns {
if p.isPhysicalPlan() {
// TODO(yuzefovich): update this once we support creating table reader
// specs directly in the optimizer (see #47474).
return nil
return p.physPlan.ResultColumns
}
return planColumns(p.planNode)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/plan_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error {
plan exec.Plan
bld *execbuilder.Builder
)
if mode := p.SessionData().ExperimentalDistSQLPlanningMode; mode != sessiondata.ExperimentalDistSQLPlanningOff {
bld = execbuilder.New(newDistSQLSpecExecFactory(), execMemo, &opc.catalog, root, p.EvalContext())
dsp := p.DistSQLPlanner()
if mode := p.SessionData().ExperimentalDistSQLPlanningMode; dsp != nil && mode != sessiondata.ExperimentalDistSQLPlanningOff {
bld = execbuilder.New(newDistSQLSpecExecFactory(p, dsp), execMemo, &opc.catalog, root, p.EvalContext())
plan, err = bld.Build()
if err != nil {
if mode == sessiondata.ExperimentalDistSQLPlanningAlways &&
Expand Down

0 comments on commit 5112560

Please sign in to comment.