Skip to content

Commit

Permalink
sql: implement ConstructZigzagJoin in distsql_spec_exec_factory
Browse files Browse the repository at this point in the history
Informs cockroachdb#47473

Release note: None
  • Loading branch information
cucaroach committed Nov 12, 2021
1 parent c308f5e commit 2b72a16
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 42 deletions.
120 changes: 80 additions & 40 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
Expand Down Expand Up @@ -2537,37 +2538,20 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
planCtx *PlanningCtx, n *zigzagJoinNode,
) (plan *PhysicalPlan, err error) {
plan = planCtx.NewPhysicalPlan()

tables := make([]descpb.TableDescriptor, len(n.sides))
indexOrdinals := make([]uint32, len(n.sides))
cols := make([]execinfrapb.Columns, len(n.sides))
numStreamCols := 0
pi := zigzagPlanningInfo{onCond: n.onCond, reqOrdering: exec.OutputOrdering(n.reqOrdering)}
pi.init(len(n.sides))

for i, side := range n.sides {
tables[i] = *side.scan.desc.TableDesc()
indexOrdinals[i], err = getIndexIdx(side.scan.index, side.scan.desc)
err = pi.initSide(i, side.scan.desc, side.scan.index, len(side.eqCols))
if err != nil {
return nil, err
}

cols[i].Columns = make([]uint32, len(side.eqCols))
for j, col := range side.eqCols {
cols[i].Columns[j] = uint32(col)
pi.eqCols[i].Columns[j] = uint32(col)
}

numStreamCols += len(side.scan.desc.PublicColumns())
}

// The zigzag join node only represents inner joins, so hardcode Type to
// InnerJoin.
zigzagJoinerSpec := execinfrapb.ZigzagJoinerSpec{
Tables: tables,
IndexOrdinals: indexOrdinals,
EqColumns: cols,
Type: descpb.InnerJoin,
}
zigzagJoinerSpec.FixedValues = make([]*execinfrapb.ValuesCoreSpec, len(n.sides))

// The fixed values are represented as a Values node with one tuple.
for i := range n.sides {
fixedVals := n.sides[i].fixedVals
Expand All @@ -2576,7 +2560,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
if err != nil {
return nil, err
}
zigzagJoinerSpec.FixedValues[i] = valuesSpec
pi.fixedValues[i] = valuesSpec
}

// The internal schema of the zigzag joiner is:
Expand All @@ -2587,12 +2571,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
// <side 1 index columns> ... <side 2 index columns> ...
// so the planToStreamColMap has to basically map index ordinals
// to table ordinals.
post := execinfrapb.PostProcessSpec{Projection: true}
numOutCols := len(n.columns)

post.OutputColumns = make([]uint32, numOutCols)
types := make([]*types.T, numOutCols)
planToStreamColMap := makePlanToStreamColMap(numOutCols)
pi.initOutput(len(n.columns))
colOffset := 0
i := 0

Expand All @@ -2605,30 +2584,91 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
// opt/exec/execbuilder/relational_builder.go, similar to lookup joins.
for _, col := range side.scan.cols {
ord := tableOrdinal(side.scan.desc, col.GetID(), side.scan.colCfg.visibility)
post.OutputColumns[i] = uint32(colOffset + ord)
types[i] = col.GetType()
planToStreamColMap[i] = i

pi.outputColumns[i] = uint32(colOffset + ord)
pi.outputTypes[i] = col.GetType()
pi.planToStreamColMap[i] = i
i++
}

colOffset += len(side.scan.desc.PublicColumns())
}

return dsp.constructZigzagJoin(planCtx, pi)
}

type zigzagPlanningInfo struct {
tableDescriptors []descpb.TableDescriptor
indexOrdinals []uint32
fixedValues []*execinfrapb.ValuesCoreSpec
eqCols []execinfrapb.Columns
outputColumns []uint32
outputTypes []*types.T
planToStreamColMap []int
onCond tree.TypedExpr
reqOrdering exec.OutputOrdering
}

func (pi *zigzagPlanningInfo) init(nsides int) {
pi.tableDescriptors = make([]descpb.TableDescriptor, nsides)
pi.indexOrdinals = make([]uint32, nsides)
pi.fixedValues = make([]*execinfrapb.ValuesCoreSpec, nsides)
pi.eqCols = make([]execinfrapb.Columns, nsides)
}

func (pi *zigzagPlanningInfo) initSide(
i int, desc catalog.TableDescriptor, index catalog.Index, numEqCols int,
) error {
pi.tableDescriptors[i] = *desc.TableDesc()
idxOrd, err := getIndexIdx(index, desc)
if err != nil {
return err
}
pi.indexOrdinals[i] = idxOrd
pi.eqCols[i].Columns = make([]uint32, numEqCols)

return nil
}

func (pi *zigzagPlanningInfo) initOutput(numCols int) {
pi.outputColumns = make([]uint32, numCols)
pi.outputTypes = make([]*types.T, numCols)
pi.planToStreamColMap = makePlanToStreamColMap(numCols)
}

func (dsp *DistSQLPlanner) constructZigzagJoin(
planCtx *PlanningCtx, pi zigzagPlanningInfo,
) (*PhysicalPlan, error) {

plan := planCtx.NewPhysicalPlan()

post := execinfrapb.PostProcessSpec{Projection: true}
post.OutputColumns = pi.outputColumns

// The zigzag join node only represents inner joins, so hardcode Type to
// InnerJoin.
zigzagJoinerSpec := execinfrapb.ZigzagJoinerSpec{
Tables: pi.tableDescriptors,
IndexOrdinals: pi.indexOrdinals,
EqColumns: pi.eqCols,
Type: descpb.InnerJoin,
FixedValues: pi.fixedValues,
}

// Set the ON condition.
if n.onCond != nil {
if pi.onCond != nil {
// Note that the ON condition refers to the *internal* columns of the
// processor (before the OutputColumns projection).
indexVarMap := makePlanToStreamColMap(len(n.columns))
for i := range n.columns {
indexVarMap := makePlanToStreamColMap(len(pi.outputColumns))
for i := 0; i < len(pi.outputColumns); i++ {
indexVarMap[i] = int(post.OutputColumns[i])
}
zigzagJoinerSpec.OnExpr, err = physicalplan.MakeExpression(
n.onCond, planCtx, indexVarMap,
onExpr, err := physicalplan.MakeExpression(
pi.onCond, planCtx, indexVarMap,
)
if err != nil {
return nil, err
}
zigzagJoinerSpec.OnExpr = onExpr
}

// Figure out the node where this zigzag joiner goes.
Expand All @@ -2642,8 +2682,8 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
Core: execinfrapb.ProcessorCoreUnion{ZigzagJoiner: &zigzagJoinerSpec},
}}

plan.AddNoInputStage(corePlacement, post, types, execinfrapb.Ordering{})
plan.PlanToStreamColMap = planToStreamColMap
plan.AddNoInputStage(corePlacement, post, pi.outputTypes, execinfrapb.Ordering{})
plan.PlanToStreamColMap = pi.planToStreamColMap

return plan, nil
}
Expand Down
95 changes: 94 additions & 1 deletion pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/inverted"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
Expand Down Expand Up @@ -695,7 +696,99 @@ func (e *distSQLSpecExecFactory) ConstructZigzagJoin(
onCond tree.TypedExpr,
reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: zigzag join")
planCtx := e.getPlanCtx(cannotDistribute)

// Maybe someday N will be greater than 2, code it that way.
nsides := 2
pi := zigzagPlanningInfo{onCond: onCond, reqOrdering: reqOrdering}
pi.init(nsides)

tables := []cat.Table{leftTable, rightTable}
indexes := []cat.Index{leftIndex, rightIndex}
cols := []exec.TableColumnOrdinalSet{leftCols, rightCols}
eqCols := [][]exec.TableColumnOrdinal{leftEqCols, rightEqCols}
fixedVals := [][]tree.TypedExpr{leftFixedVals, rightFixedVals}
allCols := exec.TableColumnOrdinalSet{}

numOutputCols := 0

for s, table := range tables {
// RFC: does casting a cat.Table to an optTable ruin the ability to test this code from opttester? opt_exec_factory
// does the same thing, just curious.
tdesc := table.(*optTable).desc
err := pi.initSide(s, tdesc, indexes[s].(*optIndex).idx, len(eqCols[s]))
if err != nil {
return nil, err
}
for j, col := range eqCols[s] {
pi.eqCols[s].Columns[j] = uint32(col)
}
numOutputCols += cols[s].Len()
allCols.UnionWith(cols[s])
}

// The fixed values are represented as a Values node with one tuple.
for s, vals := range fixedVals {
typs := make([]*types.T, len(vals))
for t := range typs {
col := indexes[s].Column(t)
typs[t] = col.DatumType()
}
valuesSpec, err := e.dsp.createValuesSpecFromTuples(planCtx, [][]tree.TypedExpr{vals}, typs)
if err != nil {
return nil, err
}
pi.fixedValues[s] = valuesSpec
}

// The internal schema of the zigzag joiner is:
// <side 1 table columns> ... <side 2 table columns> ...
// with only the columns in the specified index populated.
//
// The schema of the zigzagJoinNode is:
// <side 1 index columns> ... <side 2 index columns> ...
// so the planToStreamColMap has to basically map index ordinals
// to table ordinals.
pi.initOutput(numOutputCols)
colOffset := 0
i := 0

// Populate post.OutputColumns (the implicit projection), result types,
// and the planToStreamColMap for index columns from all sides.
for s, col := range cols {
tdesc := tables[s].(*optTable).desc
for c, ok := col.Next(0); ok; c, ok = col.Next(c + 1) {
column := tdesc.PublicColumns()[c]
// RFC: what's going on with visibility? can't find where its set I think its just
// defaulting to the Go default so use that.
ord := tableOrdinal(tdesc, column.GetID(), execinfra.ScanVisibilityPublic)
pi.outputColumns[i] = uint32(colOffset + ord)
pi.outputTypes[i] = column.GetType()
pi.planToStreamColMap[i] = i
i++
}

colOffset += len(tdesc.PublicColumns())
}

p, err := e.dsp.constructZigzagJoin(planCtx, pi)
if err != nil {
return nil, err
}

// We don't support different tables currently.
table := tables[0]
colCfg := makeScanColumnsConfig(table, allCols)
// Note that initColsForScan and setting ResultColumns below are equivalent
// to what scan.initTable call does in execFactory.ConstructScan.
tabDesc := table.(*optTable).desc
catCols, err := initColsForScan(tabDesc, colCfg)
if err != nil {
return nil, err
}
p.ResultColumns = colinfo.ResultColumnsFromColumns(tabDesc.GetID(), catCols)

return makePlanMaybePhysical(p, nil /* planNodesToClose */), nil
}

func (e *distSQLSpecExecFactory) ConstructLimit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,31 @@ SELECT DISTINCT v FROM kv
2
NULL
3

# Check that zigzag join works in always mode.
statement ok
RESET experimental_distsql_planning;
CREATE TABLE a (n INT PRIMARY KEY, a INT, b INT, c STRING, INDEX a_idx(a), INDEX b_idx(b), INDEX bc_idx(b,c));
INSERT INTO a SELECT a,a,a%3,'foo' FROM generate_series(1,10) AS g(a) ;
SET enable_zigzag_join = true;
SET experimental_distsql_planning = always

query III rowsort
SELECT n,a,b FROM a WHERE a = 5 AND b = 2
----
5 5 2

query T
EXPLAIN SELECT n,a,b FROM a WHERE a = 5 AND b = 2
----
distribution: local
vectorized: true
·
• zigzag join
pred: (a = 5) AND (b = 2)
left table: a@a_idx
left columns: (n, a)
left fixed values: 1 column
right table: a@b_idx
right columns: (b)
right fixed values: 1 column
2 changes: 1 addition & 1 deletion pkg/sql/opt/exec/execbuilder/testdata/zigzag_join
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# LogicTest: local
# LogicTest: local local-spec-planning

# Make sure that the zigzag join is used in the regression tests for #71093.
statement ok
Expand Down

0 comments on commit 2b72a16

Please sign in to comment.