Skip to content

Commit

Permalink
opt,sql: use paired joins for left outer spatial joins
Browse files Browse the repository at this point in the history
The GenerateInvertedJoins rule now fires for left outer joins
GenerateInvertedJoins in custom_funcs.go builds the two
RelExprs and the continuation col ID. execbuilder.Builder
makes the adjustments for outputting the continuation column
for the inverted join (the first join in the pair). There
are similar changes in execFactory.ConstructInvertedJoin and in
DistSQLPlanner.createPlanForInvertedJoin.

I could not figure out a simpler way that did not require
changes in all these places since the continuation column
is synthesized, and not part of the input or the scan of
the right side.

To prevent a sort from being interposed between the first
and second join, there is a change to
lookupOrIndexJoinCanProvideOrdering. This is currently
the only known case where the optimizer can interpose
an operation that would break the behavior of the
continuation column. DistSQLPlanner always uses
PhysicalPlan.AddNoGroupingStage when planning the second
join, which ensures that the second join processors are
on the same node as the first join processors, so there is
no danger of breaking the one-to-one relationship needed
between the two processors.

In addition to being more efficient than the current
transformation for left outer joins, I noticed in the
output in inverted_join_geospatial_dist that the current
transformation was not distributed while the paired joins
are distributed.

This PR does not include left semi and left anti joins which
will be in later PRs.

Release note (performance improvement): more efficient plan
for execution of left outer spatial joins.
  • Loading branch information
sumeerbhola committed Oct 10, 2020
1 parent 357c20c commit 051263f
Show file tree
Hide file tree
Showing 21 changed files with 926 additions and 917 deletions.
62 changes: 45 additions & 17 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ func initTableReaderSpec(
return s, post, nil
}

// scanNodeOrdinal returns the index of a column with the given ID.
// tableOrdinal returns the index of a column with the given ID.
func tableOrdinal(
desc *tabledesc.Immutable, colID descpb.ColumnID, visibility execinfrapb.ScanVisibility,
) int {
Expand Down Expand Up @@ -983,6 +983,14 @@ func tableOrdinal(
panic(errors.AssertionFailedf("column %d not in desc.Columns", colID))
}

func highestTableOrdinal(desc *tabledesc.Immutable, visibility execinfrapb.ScanVisibility) int {
highest := len(desc.Columns) - 1
if visibility == execinfra.ScanVisibilityPublicAndNotPublic {
highest = len(desc.Columns) + len(desc.MutationColumns()) - 1
}
return highest
}

// toTableOrdinals returns a mapping from column ordinals in cols to table
// reader column ordinals.
func toTableOrdinals(
Expand Down Expand Up @@ -1984,13 +1992,14 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
}

joinReaderSpec := execinfrapb.JoinReaderSpec{
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
Visibility: n.table.colCfg.visibility,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
HasSystemColumns: n.table.containsSystemColumns,
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
Visibility: n.table.colCfg.visibility,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
HasSystemColumns: n.table.containsSystemColumns,
LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner,
}
joinReaderSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {
Expand All @@ -2006,7 +2015,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
joinReaderSpec.LookupColumnsAreKey = n.eqColsAreKey

numInputNodeCols, planToStreamColMap, post, types :=
mappingHelperForLookupJoins(plan, n.input, n.table)
mappingHelperForLookupJoins(plan, n.input, n.table, false /* addContinuationCol */)

// Set the ON condition.
if n.onCond != nil {
Expand All @@ -2026,7 +2035,10 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
numInputNodeCols, planToStreamColMap, post.OutputColumns, types)
}

// Instantiate one join reader for every stream.
// Instantiate one join reader for every stream. This is also necessary for
// correctness of paired-joins where this join is the second join -- it is
// necessary to have a one-to-one relationship between the first and second
// join processor.
plan.AddNoGroupingStage(
execinfrapb.ProcessorCoreUnion{JoinReader: &joinReaderSpec},
post,
Expand All @@ -2041,7 +2053,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
// lookup-style joins (that involve an input that is used to lookup from a
// table).
func mappingHelperForLookupJoins(
plan *PhysicalPlan, input planNode, table *scanNode,
plan *PhysicalPlan, input planNode, table *scanNode, addContinuationCol bool,
) (
numInputNodeCols int,
planToStreamColMap []int,
Expand All @@ -2051,9 +2063,12 @@ func mappingHelperForLookupJoins(
// The n.table node can be configured with an arbitrary set of columns. Apply
// the corresponding projection.
// The internal schema of the join reader is:
// <input columns>... <table columns>...
// <input columns>... <table columns>...[continuation col]
numLeftCols := len(plan.ResultTypes)
numOutCols := numLeftCols + len(table.cols)
if addContinuationCol {
numOutCols++
}
post = execinfrapb.PostProcessSpec{Projection: true}

post.OutputColumns = make([]uint32, numOutCols)
Expand All @@ -2068,15 +2083,27 @@ func mappingHelperForLookupJoins(
ord := tableOrdinal(table.desc, table.cols[i].ID, table.colCfg.visibility)
post.OutputColumns[numLeftCols+i] = uint32(numLeftCols + ord)
}
if addContinuationCol {
outTypes[numOutCols-1] = types.Bool
post.OutputColumns[numOutCols-1] =
uint32(numLeftCols + highestTableOrdinal(table.desc, table.colCfg.visibility) + 1)
}

// Map the columns of the lookupJoinNode to the result streams of the
// JoinReader.
numInputNodeCols = len(planColumns(input))
planToStreamColMap = makePlanToStreamColMap(numInputNodeCols + len(table.cols))
lenPlanToStreamColMap := numInputNodeCols + len(table.cols)
if addContinuationCol {
lenPlanToStreamColMap++
}
planToStreamColMap = makePlanToStreamColMap(lenPlanToStreamColMap)
copy(planToStreamColMap, plan.PlanToStreamColMap)
for i := range table.cols {
planToStreamColMap[numInputNodeCols+i] = numLeftCols + i
}
if addContinuationCol {
planToStreamColMap[lenPlanToStreamColMap-1] = numLeftCols + len(table.cols)
}
return numInputNodeCols, planToStreamColMap, post, outTypes
}

Expand Down Expand Up @@ -2114,17 +2141,18 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
}

invertedJoinerSpec := execinfrapb.InvertedJoinerSpec{
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
MaintainOrdering: len(n.reqOrdering) > 0,
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
MaintainOrdering: len(n.reqOrdering) > 0,
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
}
invertedJoinerSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {
return nil, err
}

numInputNodeCols, planToStreamColMap, post, types :=
mappingHelperForLookupJoins(plan, n.input, n.table)
mappingHelperForLookupJoins(plan, n.input, n.table, n.isFirstJoinInPairedJoiner)

indexVarMap := makeIndexVarMapForLookupJoins(numInputNodeCols, n.table, plan, &post)
if invertedJoinerSpec.InvertedExpr, err = physicalplan.MakeExpression(
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin(
eqColsAreKey bool,
lookupCols exec.TableColumnOrdinalSet,
onCond tree.TypedExpr,
isSecondJoinInPairedJoiner bool,
reqOrdering exec.OutputOrdering,
locking *tree.LockingItem,
) (exec.Node, error) {
Expand All @@ -640,6 +641,7 @@ func (e *distSQLSpecExecFactory) ConstructInvertedJoin(
index cat.Index,
lookupCols exec.TableColumnOrdinalSet,
onCond tree.TypedExpr,
isFirstJoinInPairedJoiner bool,
reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: geo lookup join")
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (tr *TableReaderSpec) summary() (string, []string) {

// summary implements the diagramCellType interface.
func (jr *JoinReaderSpec) summary() (string, []string) {
details := make([]string, 0, 4)
details := make([]string, 0, 5)
if jr.Type != descpb.InnerJoin {
details = append(details, joinTypeDetail(jr.Type))
}
Expand All @@ -167,6 +167,9 @@ func (jr *JoinReaderSpec) summary() (string, []string) {
if !jr.OnExpr.Empty() {
details = append(details, fmt.Sprintf("ON %s", jr.OnExpr))
}
if jr.LeftJoinWithPairedJoiner {
details = append(details, "second join in paired-join")
}
return "JoinReader", details
}

Expand Down Expand Up @@ -269,7 +272,7 @@ func (zj *ZigzagJoinerSpec) summary() (string, []string) {

// summary implements the diagramCellType interface.
func (ij *InvertedJoinerSpec) summary() (string, []string) {
details := make([]string, 0, 4)
details := make([]string, 0, 5)
if ij.Type != descpb.InnerJoin {
details = append(details, joinTypeDetail(ij.Type))
}
Expand All @@ -278,6 +281,9 @@ func (ij *InvertedJoinerSpec) summary() (string, []string) {
if !ij.OnExpr.Empty() {
details = append(details, fmt.Sprintf("ON %s", ij.OnExpr))
}
if ij.OutputGroupContinuationForLeftRow {
details = append(details, "first join in paired-join")
}
return "InvertedJoiner", details
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/inverted_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ type invertedJoinNode struct {
invertedExpr tree.TypedExpr

// columns are the produced columns, namely the input columns and (unless the
// join type is semi or anti join) the columns in the table scanNode.
// join type is semi or anti join) the columns in the table scanNode. It can
// include an additional continuation column for paired joins.
columns colinfo.ResultColumns

// onExpr is any ON condition to be used in conjunction with the inverted
// expression.
onExpr tree.TypedExpr

isFirstJoinInPairedJoiner bool

reqOrdering ReqOrdering
}

Expand Down
83 changes: 73 additions & 10 deletions pkg/sql/logictest/testdata/logic_test/inverted_join_geospatial
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ ORDER BY rk
13
16

# Left join is supported by having the optimizer convert it to an inner join.
# Left join is supported by having the optimizer convert it to a pair of joins.
query II
SELECT lk, rk FROM ltable LEFT JOIN rtable ON ST_Intersects(ltable.geom1, rtable.geom) ORDER BY (lk, rk)
SELECT lk, rk FROM ltable LEFT JOIN rtable@geom_index ON ST_Intersects(ltable.geom1, rtable.geom) ORDER BY (lk, rk)
----
1 13
1 16
Expand All @@ -199,7 +199,7 @@ SELECT lk, rk FROM ltable LEFT JOIN rtable ON ST_Intersects(ltable.geom1, rtable
6 NULL

query II
SELECT lk, rk FROM ltable LEFT JOIN rtable ON ST_DWithin(ltable.geom1, rtable.geom, 2) ORDER BY (lk, rk)
SELECT lk, rk FROM ltable LEFT JOIN rtable@geom_index ON ST_DWithin(ltable.geom1, rtable.geom, 2) ORDER BY (lk, rk)
----
1 12
1 13
Expand All @@ -217,7 +217,7 @@ SELECT lk, rk FROM ltable LEFT JOIN rtable ON ST_DWithin(ltable.geom1, rtable.ge
6 NULL

query II
SELECT lk, rk FROM ltable LEFT JOIN rtable
SELECT lk, rk FROM ltable LEFT JOIN rtable@geom_index
ON ST_Intersects(rtable.geom, ltable.geom1) OR ST_DWithin(ltable.geom1, rtable.geom, 2) ORDER BY (lk, rk)
----
1 12
Expand All @@ -236,7 +236,7 @@ ON ST_Intersects(rtable.geom, ltable.geom1) OR ST_DWithin(ltable.geom1, rtable.g
6 NULL

query II
SELECT lk, rk FROM ltable LEFT JOIN rtable
SELECT lk, rk FROM ltable LEFT JOIN rtable@geom_index
ON ST_Intersects(ltable.geom1, rtable.geom) AND ST_DWithin(rtable.geom, ltable.geom1, 2) ORDER BY (lk, rk)
----
1 13
Expand All @@ -251,7 +251,7 @@ ON ST_Intersects(ltable.geom1, rtable.geom) AND ST_DWithin(rtable.geom, ltable.g
6 NULL

query II
SELECT lk, rk FROM ltable LEFT JOIN rtable
SELECT lk, rk FROM ltable LEFT JOIN rtable@geom_index
ON ST_Intersects(ltable.geom1, rtable.geom) AND ST_DWithin(rtable.geom, ltable.geom2, 2) ORDER BY (lk, rk)
----
1 13
Expand All @@ -265,7 +265,7 @@ ON ST_Intersects(ltable.geom1, rtable.geom) AND ST_DWithin(rtable.geom, ltable.g
6 NULL

query II
SELECT lk, rk FROM ltable LEFT JOIN rtable
SELECT lk, rk FROM ltable LEFT JOIN rtable@geom_index
ON ST_Intersects(ltable.geom1, rtable.geom) OR ST_DWithin(rtable.geom, ltable.geom2, 2) ORDER BY (lk, rk)
----
1 12
Expand Down Expand Up @@ -295,7 +295,7 @@ WITH q AS (
SELECT lk, count(*), (SELECT count(*) FROM q) FROM (
SELECT lk, rk
FROM q
LEFT JOIN rtable ON ST_Intersects(q.geom1, rtable.geom)
LEFT JOIN rtable@geom_index ON ST_Intersects(q.geom1, rtable.geom)
) GROUP BY lk ORDER BY lk
----
3 2 4
Expand All @@ -306,7 +306,7 @@ SELECT lk, count(*), (SELECT count(*) FROM q) FROM (
# Anti-join is supported by having the optimizer convert it to a left join,
# which is then converted to an inner join.
query I
SELECT lk FROM ltable WHERE NOT EXISTS (SELECT * FROM rtable WHERE ST_Intersects(ltable.geom2, rtable.geom))
SELECT lk FROM ltable WHERE NOT EXISTS (SELECT * FROM rtable@geom_index WHERE ST_Intersects(ltable.geom2, rtable.geom))
ORDER BY lk
----
5
Expand All @@ -324,9 +324,72 @@ ORDER BY rk
query I
SELECT lk FROM ltable
WHERE NOT EXISTS (
SELECT * FROM rtable WHERE ST_Covers(ltable.geom2, rtable.geom) AND lk > 1 AND rk > 12
SELECT * FROM rtable@geom_index WHERE ST_Covers(ltable.geom2, rtable.geom) AND lk > 1 AND rk > 12
) ORDER BY lk
----
1
5
6

# Tests where the table with the inverted index has multiple columns in the primary
# key.
statement ok
CREATE TABLE rtable2(
rk1 int,
geom geometry,
rk2 int,
primary key (rk1, rk2),
INVERTED INDEX geom_index(geom)
)

statement ok
INSERT INTO rtable2 VALUES
(11, 'POINT(1.0 1.0)', 22),
(12, 'LINESTRING(1.0 1.0, 2.0 2.0)', 24),
(13, 'POINT(3.0 3.0)', 26),
(14, 'LINESTRING(4.0 4.0, 5.0 5.0)', 28),
(15, 'LINESTRING(40.0 40.0, 41.0 41.0)', 30),
(16, 'POLYGON((1.0 1.0, 5.0 1.0, 5.0 5.0, 1.0 5.0, 1.0 1.0))', 32)

query III
SELECT lk, rk1, rk2 FROM ltable JOIN rtable2@geom_index ON ST_Intersects(ltable.geom1, rtable2.geom) ORDER BY (lk, rk1, rk2)
----
1 13 26
1 16 32
2 14 28
2 16 32
3 12 24
3 16 32
5 12 24
5 16 32

query III
SELECT lk, rk1, rk2 FROM ltable LEFT JOIN rtable2@geom_index
ON ST_Intersects(ltable.geom1, rtable2.geom) ORDER BY (lk, rk1, rk2)
----
1 13 26
1 16 32
2 14 28
2 16 32
3 12 24
3 16 32
4 NULL NULL
5 12 24
5 16 32
6 NULL NULL

query I
SELECT lk FROM ltable WHERE EXISTS (SELECT * FROM rtable2@geom_index
WHERE ST_Intersects(ltable.geom1, rtable2.geom)) ORDER BY lk
----
1
2
3
5

query I
SELECT lk FROM ltable WHERE NOT EXISTS (SELECT * FROM rtable2@geom_index
WHERE ST_Intersects(ltable.geom1, rtable2.geom)) ORDER BY lk
----
4
6
Loading

0 comments on commit 051263f

Please sign in to comment.