diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 4f20b9f3d0ff..6a82134fafc8 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2096,6 +2096,14 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin( return plan, nil } +func getTypesFromResultColumns(cols sqlbase.ResultColumns) []*types.T { + typs := make([]*types.T, len(cols)) + for i, col := range cols { + typs[i] = col.Typ + } + return typs +} + // getTypesForPlanResult returns the types of the elements in the result streams // of a plan that corresponds to a given planNode. If planToStreamColMap is nil, // a 1-1 mapping is assumed. @@ -2103,11 +2111,7 @@ func getTypesForPlanResult(node planNode, planToStreamColMap []int) ([]*types.T, nodeColumns := planColumns(node) if planToStreamColMap == nil { // No remapping. - types := make([]*types.T, len(nodeColumns)) - for i := range nodeColumns { - types[i] = nodeColumns[i].Typ - } - return types, nil + return getTypesFromResultColumns(nodeColumns), nil } numCols := 0 for _, streamCol := range planToStreamColMap { diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 4acd789d252a..e2bf7da725d8 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -12,22 +12,28 @@ package sql import ( "github.com/cockroachdb/cockroach/pkg/geo/geoindex" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/span" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" ) type distSQLSpecExecFactory struct { + planner *planner + dsp *DistSQLPlanner } var _ exec.Factory = &distSQLSpecExecFactory{} -func newDistSQLSpecExecFactory() exec.Factory { - return &distSQLSpecExecFactory{} +func newDistSQLSpecExecFactory(p *planner, dsp *DistSQLPlanner) exec.Factory { + return &distSQLSpecExecFactory{planner: p, dsp: dsp} } func (e *distSQLSpecExecFactory) ConstructValues( @@ -36,6 +42,9 @@ func (e *distSQLSpecExecFactory) ConstructValues( return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") } +// ConstructScan implements exec.Factory interface by combining the logic that +// performs scanNode creation of execFactory.ConstructScan and physical +// planning of table readers of DistSQLPlanner.createTableReaders. func (e *distSQLSpecExecFactory) ConstructScan( table cat.Table, index cat.Index, @@ -49,12 +58,131 @@ func (e *distSQLSpecExecFactory) ConstructScan( rowCount float64, locking *tree.LockingItem, ) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") + if table.IsVirtualTable() { + return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") + } + + var p PhysicalPlan + // Although we don't yet recommend distributing plans where soft limits + // propagate to scan nodes because we don't have infrastructure to only + // plan for a few ranges at a time, the propagation of the soft limits + // to scan nodes has been added in 20.1 release, so to keep the + // previous behavior we continue to ignore the soft limits for now. + // TODO(yuzefovich): pay attention to the soft limits. + recommendation := canDistribute + + // ------- begin execFactory.ConstructScan ------- + + tabDesc := table.(*optTable).desc + indexDesc := index.(*optIndex).desc + colCfg := makeScanColumnsConfig(table, needed) + sb := span.MakeBuilder(e.planner.ExecCfg().Codec, tabDesc.TableDesc(), indexDesc) + + // Note that initColsForScan and setting ResultColumns below are equivalent + // to what scan.initTable call does in execFactory.ConstructScan. + cols, err := initColsForScan(tabDesc, colCfg) + if err != nil { + return nil, err + } + p.ResultColumns = sqlbase.ResultColumnsFromColDescs(tabDesc.GetID(), cols) + + if indexConstraint != nil && indexConstraint.IsContradiction() { + // TODO(yuzefovich): once ConstructValues is implemented, consider + // calling it here. + physPlan, err := e.dsp.createValuesPlan( + getTypesFromResultColumns(p.ResultColumns), 0 /* numRows */, nil, /* rawBytes */ + ) + return planMaybePhysical{physPlan: physPlan, recommendation: canDistribute}, err + } + + // TODO(yuzefovich): scanNode adds "parallel" attribute in walk.go when + // scanNode.canParallelize() returns true. We should plumb that info from + // here somehow as well. + var spans roachpb.Spans + spans, err = sb.SpansFromConstraint(indexConstraint, needed, false /* forDelete */) + if err != nil { + return nil, err + } + isFullTableScan := len(spans) == 1 && spans[0].EqualValue( + tabDesc.IndexSpan(e.planner.ExecCfg().Codec, indexDesc.ID), + ) + if err = colCfg.assertValidReqOrdering(reqOrdering); err != nil { + return nil, err + } + + // ------- end execFactory.ConstructScan ------- + + // Check if we are doing a full scan. + if isFullTableScan { + recommendation = recommendation.compose(shouldDistribute) + } + + // ------- begin DistSQLPlanner.createTableReaders ------- + + colsForScanToTableOrdinalMap := getColsForScanToTableOrdinalMap(cols, tabDesc, colCfg.visibility) + + // ------- begin DistSQLPlanner.initTableReaderSpec ------- + + trSpec := physicalplan.NewTableReaderSpec() + *trSpec = execinfrapb.TableReaderSpec{ + Table: *tabDesc.TableDesc(), + Reverse: reverse, + IsCheck: false, + Visibility: colCfg.visibility, + // Retain the capacity of the spans slice. + Spans: trSpec.Spans[:0], + } + trSpec.IndexIdx, err = getIndexIdx(indexDesc, tabDesc) + if err != nil { + return nil, err + } + if locking != nil { + trSpec.LockingStrength = sqlbase.ToScanLockingStrength(locking.Strength) + trSpec.LockingWaitPolicy = sqlbase.ToScanLockingWaitPolicy(locking.WaitPolicy) + if trSpec.LockingStrength != sqlbase.ScanLockingStrength_FOR_NONE { + // Scans that are performing row-level locking cannot currently be + // distributed because their locks would not be propagated back to + // the root transaction coordinator. + // TODO(nvanbenschoten): lift this restriction. + recommendation = cannotDistribute + } + } + + // Note that we don't do anything about the possible filter here since we + // don't know yet whether we will have it. ConstructFilter is responsible + // for pushing the filter down into the post-processing stage of this scan. + post := execinfrapb.PostProcessSpec{} + if hardLimit != 0 { + post.Limit = uint64(hardLimit) + } else if softLimit != 0 { + trSpec.LimitHint = softLimit + } + + // ------- end DistSQLPlanner.initTableReaderSpec ------- + + distribute := shouldDistributeGivenRecAndMode(recommendation, e.planner.extendedEvalCtx.SessionData.DistSQLMode) + if _, singleTenant := e.planner.execCfg.NodeID.OptionalNodeID(); !singleTenant { + distribute = false + } + + evalCtx := e.planner.ExtendedEvalContext() + planCtx := e.dsp.NewPlanningCtx(evalCtx.Context, evalCtx, e.planner.txn, distribute) + err = e.dsp.planTableReaders( + planCtx, &p, trSpec, post, tabDesc, spans, reverse, colCfg, maxResults, + uint64(rowCount), ReqOrdering(reqOrdering), cols, colsForScanToTableOrdinalMap, + ) + + // ------- end DistSQLPlanner.createTableReaders ------- + + return planMaybePhysical{physPlan: &p, recommendation: recommendation}, err } func (e *distSQLSpecExecFactory) ConstructFilter( n exec.Node, filter tree.TypedExpr, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { + // TODO(yuzefovich): figure out how to push the filter into the table + // reader when it already doesn't have a filter and it doesn't have a hard + // limit. return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") } @@ -229,7 +357,11 @@ func (e *distSQLSpecExecFactory) ConstructWindow( func (e *distSQLSpecExecFactory) RenameColumns( input exec.Node, colNames []string, ) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") + inputCols := input.(planMaybePhysical).physPlan.ResultColumns + for i := range inputCols { + inputCols[i].Name = colNames[i] + } + return input, nil } func (e *distSQLSpecExecFactory) ConstructPlan( diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning index c91d1626643b..10fcc5ba215e 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning @@ -14,9 +14,41 @@ SET CLUSTER SETTING sql.defaults.experimental_distsql_planning = on statement ok SET experimental_distsql_planning = always -# Test that a SELECT query fails but others don't. statement ok -CREATE TABLE kv (k INT PRIMARY KEY, v INT); INSERT INTO kv VALUES (1, 1), (2, 1) +CREATE TABLE kv (k INT PRIMARY KEY, v INT); INSERT INTO kv VALUES (1, 1), (2, 1), (3, 2) -statement error pq: unimplemented: experimental opt-driven distsql planning +query II colnames,rowsort SELECT * FROM kv +---- +k v +1 1 +2 1 +3 2 + +query I colnames,rowsort +SELECT k FROM kv +---- +k +1 +2 +3 + +query I colnames,rowsort +SELECT v FROM kv +---- +v +1 +1 +2 + +# Projections are not yet supported. +statement error pq: unimplemented: experimental opt-driven distsql planning +SELECT v, k FROM kv + +# Renders are not yet supported. +statement error pq: unimplemented: experimental opt-driven distsql planning +SELECT k + v FROM kv + +# Filters are not yet supported. +statement error pq: unimplemented: experimental opt-driven distsql planning +SELECT * FROM kv WHERE k > v diff --git a/pkg/sql/physicalplan/physical_plan.go b/pkg/sql/physicalplan/physical_plan.go index 553b45d2cd03..fae6264e3d67 100644 --- a/pkg/sql/physicalplan/physical_plan.go +++ b/pkg/sql/physicalplan/physical_plan.go @@ -77,7 +77,7 @@ type PhysicalPlan struct { LocalProcessors []execinfra.LocalProcessor // LocalProcessorIndexes contains pointers to all of the RowSourceIdx fields - // of the LocalPlanNodeSpecs that were created. This list is in the same + // of the LocalPlanNodeSpecs that were created. This list is in the same // order as LocalProcessors, and is kept up-to-date so that LocalPlanNodeSpecs // always have the correct index into the LocalProcessors slice. LocalProcessorIndexes []*uint32 @@ -105,6 +105,10 @@ type PhysicalPlan struct { // in-place during planning. ResultTypes []*types.T + // ResultColumns is the schema (result columns) of the rows produced by the + // ResultRouters. + ResultColumns sqlbase.ResultColumns + // MergeOrdering is the ordering guarantee for the result streams that must be // maintained when the streams eventually merge. The column indexes refer to // columns for the rows produced by ResultRouters. diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index c4d63faa3e66..95dbc7092000 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -329,9 +329,7 @@ func (p planMaybePhysical) isPhysicalPlan() bool { func (p planMaybePhysical) planColumns() sqlbase.ResultColumns { if p.isPhysicalPlan() { - // TODO(yuzefovich): update this once we support creating table reader - // specs directly in the optimizer (see #47474). - return nil + return p.physPlan.ResultColumns } return planColumns(p.planNode) } diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index 977b7d766915..aadcb904ff1c 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -179,8 +179,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { plan exec.Plan bld *execbuilder.Builder ) - if mode := p.SessionData().ExperimentalDistSQLPlanningMode; mode != sessiondata.ExperimentalDistSQLPlanningOff { - bld = execbuilder.New(newDistSQLSpecExecFactory(), execMemo, &opc.catalog, root, p.EvalContext()) + dsp := p.DistSQLPlanner() + if mode := p.SessionData().ExperimentalDistSQLPlanningMode; dsp != nil && mode != sessiondata.ExperimentalDistSQLPlanningOff { + bld = execbuilder.New(newDistSQLSpecExecFactory(p, dsp), execMemo, &opc.catalog, root, p.EvalContext()) plan, err = bld.Build() if err != nil { if mode == sessiondata.ExperimentalDistSQLPlanningAlways &&