From 721c381d6077e1481ea7748ef582f590dce88550 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Wed, 20 Oct 2021 11:57:31 -0400 Subject: [PATCH] sql: implement ConstructZigzagJoin in distsql_spec_exec_factory Informs #47473 Release note: None --- pkg/sql/distsql_physical_planner.go | 120 ++++++++++++------ pkg/sql/distsql_spec_exec_factory.go | 95 +++++++++++++- .../logic_test/experimental_distsql_planning | 26 ++++ .../opt/exec/execbuilder/testdata/zigzag_join | 2 +- 4 files changed, 201 insertions(+), 42 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index aa504a8972e9..d30e52beb5b7 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" @@ -2537,37 +2538,20 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin( func (dsp *DistSQLPlanner) createPlanForZigzagJoin( planCtx *PlanningCtx, n *zigzagJoinNode, ) (plan *PhysicalPlan, err error) { - plan = planCtx.NewPhysicalPlan() - tables := make([]descpb.TableDescriptor, len(n.sides)) - indexOrdinals := make([]uint32, len(n.sides)) - cols := make([]execinfrapb.Columns, len(n.sides)) - numStreamCols := 0 + pi := zigzagPlanningInfo{onCond: n.onCond, reqOrdering: exec.OutputOrdering(n.reqOrdering)} + pi.init(len(n.sides)) + for i, side := range n.sides { - tables[i] = *side.scan.desc.TableDesc() - indexOrdinals[i], err = getIndexIdx(side.scan.index, side.scan.desc) + err = pi.initSide(i, side.scan.desc, side.scan.index, len(side.eqCols)) if err != nil { return nil, err } - - cols[i].Columns = make([]uint32, len(side.eqCols)) for j, col := range side.eqCols { - cols[i].Columns[j] = uint32(col) + pi.eqCols[i].Columns[j] = uint32(col) } - - numStreamCols += len(side.scan.desc.PublicColumns()) } - // The zigzag join node only represents inner joins, so hardcode Type to - // InnerJoin. - zigzagJoinerSpec := execinfrapb.ZigzagJoinerSpec{ - Tables: tables, - IndexOrdinals: indexOrdinals, - EqColumns: cols, - Type: descpb.InnerJoin, - } - zigzagJoinerSpec.FixedValues = make([]*execinfrapb.ValuesCoreSpec, len(n.sides)) - // The fixed values are represented as a Values node with one tuple. for i := range n.sides { fixedVals := n.sides[i].fixedVals @@ -2576,7 +2560,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin( if err != nil { return nil, err } - zigzagJoinerSpec.FixedValues[i] = valuesSpec + pi.fixedValues[i] = valuesSpec } // The internal schema of the zigzag joiner is: @@ -2587,12 +2571,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin( // ... ... // so the planToStreamColMap has to basically map index ordinals // to table ordinals. - post := execinfrapb.PostProcessSpec{Projection: true} - numOutCols := len(n.columns) - - post.OutputColumns = make([]uint32, numOutCols) - types := make([]*types.T, numOutCols) - planToStreamColMap := makePlanToStreamColMap(numOutCols) + pi.initOutput(len(n.columns)) colOffset := 0 i := 0 @@ -2605,30 +2584,91 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin( // opt/exec/execbuilder/relational_builder.go, similar to lookup joins. for _, col := range side.scan.cols { ord := tableOrdinal(side.scan.desc, col.GetID(), side.scan.colCfg.visibility) - post.OutputColumns[i] = uint32(colOffset + ord) - types[i] = col.GetType() - planToStreamColMap[i] = i - + pi.outputColumns[i] = uint32(colOffset + ord) + pi.outputTypes[i] = col.GetType() + pi.planToStreamColMap[i] = i i++ } colOffset += len(side.scan.desc.PublicColumns()) } + return dsp.constructZigzagJoin(planCtx, pi) +} + +type zigzagPlanningInfo struct { + tableDescriptors []descpb.TableDescriptor + indexOrdinals []uint32 + fixedValues []*execinfrapb.ValuesCoreSpec + eqCols []execinfrapb.Columns + outputColumns []uint32 + outputTypes []*types.T + planToStreamColMap []int + onCond tree.TypedExpr + reqOrdering exec.OutputOrdering +} + +func (pi *zigzagPlanningInfo) init(nsides int) { + pi.tableDescriptors = make([]descpb.TableDescriptor, nsides) + pi.indexOrdinals = make([]uint32, nsides) + pi.fixedValues = make([]*execinfrapb.ValuesCoreSpec, nsides) + pi.eqCols = make([]execinfrapb.Columns, nsides) +} + +func (pi *zigzagPlanningInfo) initSide( + i int, desc catalog.TableDescriptor, index catalog.Index, numEqCols int, +) error { + pi.tableDescriptors[i] = *desc.TableDesc() + idxOrd, err := getIndexIdx(index, desc) + if err != nil { + return err + } + pi.indexOrdinals[i] = idxOrd + pi.eqCols[i].Columns = make([]uint32, numEqCols) + + return nil +} + +func (pi *zigzagPlanningInfo) initOutput(numCols int) { + pi.outputColumns = make([]uint32, numCols) + pi.outputTypes = make([]*types.T, numCols) + pi.planToStreamColMap = makePlanToStreamColMap(numCols) +} + +func (dsp *DistSQLPlanner) constructZigzagJoin( + planCtx *PlanningCtx, pi zigzagPlanningInfo, +) (*PhysicalPlan, error) { + + plan := planCtx.NewPhysicalPlan() + + post := execinfrapb.PostProcessSpec{Projection: true} + post.OutputColumns = pi.outputColumns + + // The zigzag join node only represents inner joins, so hardcode Type to + // InnerJoin. + zigzagJoinerSpec := execinfrapb.ZigzagJoinerSpec{ + Tables: pi.tableDescriptors, + IndexOrdinals: pi.indexOrdinals, + EqColumns: pi.eqCols, + Type: descpb.InnerJoin, + FixedValues: pi.fixedValues, + } + // Set the ON condition. - if n.onCond != nil { + if pi.onCond != nil { // Note that the ON condition refers to the *internal* columns of the // processor (before the OutputColumns projection). - indexVarMap := makePlanToStreamColMap(len(n.columns)) - for i := range n.columns { + indexVarMap := makePlanToStreamColMap(len(pi.outputColumns)) + for i := 0; i < len(pi.outputColumns); i++ { indexVarMap[i] = int(post.OutputColumns[i]) } - zigzagJoinerSpec.OnExpr, err = physicalplan.MakeExpression( - n.onCond, planCtx, indexVarMap, + onExpr, err := physicalplan.MakeExpression( + pi.onCond, planCtx, indexVarMap, ) if err != nil { return nil, err } + zigzagJoinerSpec.OnExpr = onExpr } // Figure out the node where this zigzag joiner goes. @@ -2642,8 +2682,8 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin( Core: execinfrapb.ProcessorCoreUnion{ZigzagJoiner: &zigzagJoinerSpec}, }} - plan.AddNoInputStage(corePlacement, post, types, execinfrapb.Ordering{}) - plan.PlanToStreamColMap = planToStreamColMap + plan.AddNoInputStage(corePlacement, post, pi.outputTypes, execinfrapb.Ordering{}) + plan.PlanToStreamColMap = pi.planToStreamColMap return plan, nil } diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index f3511d95e88a..66a9e53e9d7b 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/inverted" "github.com/cockroachdb/cockroach/pkg/sql/opt" @@ -695,7 +696,99 @@ func (e *distSQLSpecExecFactory) ConstructZigzagJoin( onCond tree.TypedExpr, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: zigzag join") + planCtx := e.getPlanCtx(cannotDistribute) + + // Maybe someday N will be greater than 2, code it that way. + nsides := 2 + pi := zigzagPlanningInfo{onCond: onCond, reqOrdering: reqOrdering} + pi.init(nsides) + + tables := []cat.Table{leftTable, rightTable} + indexes := []cat.Index{leftIndex, rightIndex} + cols := []exec.TableColumnOrdinalSet{leftCols, rightCols} + eqCols := [][]exec.TableColumnOrdinal{leftEqCols, rightEqCols} + fixedVals := [][]tree.TypedExpr{leftFixedVals, rightFixedVals} + allCols := exec.TableColumnOrdinalSet{} + + numOutputCols := 0 + + for s, table := range tables { + // RFC: does casting a cat.Table to an optTable ruin the ability to test this code from opttester? opt_exec_factory + // does the same thing, just curious. + tdesc := table.(*optTable).desc + err := pi.initSide(s, tdesc, indexes[s].(*optIndex).idx, len(eqCols[s])) + if err != nil { + return nil, err + } + for j, col := range eqCols[s] { + pi.eqCols[s].Columns[j] = uint32(col) + } + numOutputCols += cols[s].Len() + allCols.UnionWith(cols[s]) + } + + // The fixed values are represented as a Values node with one tuple. + for s, vals := range fixedVals { + typs := make([]*types.T, len(vals)) + for t := range typs { + col := indexes[s].Column(t) + typs[t] = col.DatumType() + } + valuesSpec, err := e.dsp.createValuesSpecFromTuples(planCtx, [][]tree.TypedExpr{vals}, typs) + if err != nil { + return nil, err + } + pi.fixedValues[s] = valuesSpec + } + + // The internal schema of the zigzag joiner is: + // ... ... + // with only the columns in the specified index populated. + // + // The schema of the zigzagJoinNode is: + // ... ... + // so the planToStreamColMap has to basically map index ordinals + // to table ordinals. + pi.initOutput(numOutputCols) + colOffset := 0 + i := 0 + + // Populate post.OutputColumns (the implicit projection), result types, + // and the planToStreamColMap for index columns from all sides. + for s, col := range cols { + tdesc := tables[s].(*optTable).desc + for c, ok := col.Next(0); ok; c, ok = col.Next(c + 1) { + column := tdesc.PublicColumns()[c] + // RFC: what's going on with visibility? can't find where its set I think its just + // defaulting to the Go default so use that. + ord := tableOrdinal(tdesc, column.GetID(), execinfra.ScanVisibilityPublic) + pi.outputColumns[i] = uint32(colOffset + ord) + pi.outputTypes[i] = column.GetType() + pi.planToStreamColMap[i] = i + i++ + } + + colOffset += len(tdesc.PublicColumns()) + } + + p, err := e.dsp.constructZigzagJoin(planCtx, pi) + if err != nil { + return nil, err + } + + // We don't support different tables currently. + table := tables[0] + colCfg := makeScanColumnsConfig(table, allCols) + // Note that initColsForScan and setting ResultColumns below are equivalent + // to what scan.initTable call does in execFactory.ConstructScan. + tabDesc := table.(*optTable).desc + catCols, err := initColsForScan(tabDesc, colCfg) + if err != nil { + return nil, err + } + p.ResultColumns = colinfo.ResultColumnsFromColumns(tabDesc.GetID(), catCols) + + return makePlanMaybePhysical(p, nil /* planNodesToClose */), nil } func (e *distSQLSpecExecFactory) ConstructLimit( diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning index af827a445c00..2199be54edc0 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning @@ -118,3 +118,29 @@ SELECT DISTINCT v FROM kv 2 NULL 3 + +# Check that zigzag join works in always mode. +statement ok +RESET experimental_distsql_planning; +CREATE TABLE a (n INT PRIMARY KEY, a INT, b INT, c STRING, INDEX a_idx(a), INDEX b_idx(b), INDEX bc_idx(b,c)); +INSERT INTO a SELECT a,a,a%3,'foo' FROM generate_series(1,10) AS g(a) ; +SET enable_zigzag_join = true; +SET experimental_distsql_planning = always + +query III rowsort +SELECT n,a,b FROM a WHERE a = 5 AND b = 2 +---- +5 5 2 + +query T +SELECT * FROM [EXPLAIN SELECT n,a,b FROM a WHERE a = 5 AND b = 2] OFFSET 2 +---- +· +• zigzag join + pred: (a = 5) AND (b = 2) + left table: a@a_idx + left columns: (n, a) + left fixed values: 1 column + right table: a@b_idx + right columns: (b) + right fixed values: 1 column diff --git a/pkg/sql/opt/exec/execbuilder/testdata/zigzag_join b/pkg/sql/opt/exec/execbuilder/testdata/zigzag_join index f790625ce748..b263a2771964 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/zigzag_join +++ b/pkg/sql/opt/exec/execbuilder/testdata/zigzag_join @@ -1,4 +1,4 @@ -# LogicTest: local +# LogicTest: local local-spec-planning # Make sure that the zigzag join is used in the regression tests for #71093. statement ok