Skip to content

Commit

Permalink
rowexec: paired joiners to accomplish left joins
Browse files Browse the repository at this point in the history
The paired joiners are used to accomplish left {outer,semi,anti}
joins when the first joiner will produce false positives (as
known to the optimizer).
Currently, only the invertedJoiner can function as this first
joiner but there is wording in the spec describing how this
could also be useful for join expressions that can't be fully
evaluated on one non-inverted index.

The first joiner outputs an additional bool column representing
a continuation value. This is used to demarcate groups of
consecutive rows output by the first joiner that represent
the same original left row. The first joiner needs to preserve
input row order (the invertedJoiner always does this). The
second joiner is a lookup join and handles these groups in
a special manner. The second join does not need to be order
preserving.

Informs cockroachdb#53576

Prior to this, the way to do:
- a left outer join with an inverted join is to
  do an inner join with the same ON condition, which is a pair
  of inverted join and lookup join, and then wrap the expression
  in another left join with the original left side.
- a left anti join with an inverted join is to map it to a left
  outer join (previous bullet).
- a left semi join is to map it to an inner join with the same
  ON condition, which is a pair of inverted join and lookup
  join, and then project, sort (or make the lookup join order
  preserving) and dedup.

We expect that the alternative outlined in this PR (it excludes
the optimizer changes) will be more efficient since it is
simply a pairing of inverted joiner and lookup join (and the
latter does not need to be order preserving).

Release note: None
  • Loading branch information
sumeerbhola committed Sep 28, 2020
1 parent 00a95b8 commit cbdadf2
Show file tree
Hide file tree
Showing 8 changed files with 1,055 additions and 389 deletions.
8 changes: 6 additions & 2 deletions pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
//
// ATTENTION: When updating these fields, add a brief description of what
// changed to the version history below.
const Version execinfrapb.DistSQLVersion = 37
const Version execinfrapb.DistSQLVersion = 38

// MinAcceptedVersion is the oldest version that the server is compatible with.
// A server will not accept flows with older versions.
const MinAcceptedVersion execinfrapb.DistSQLVersion = 37
const MinAcceptedVersion execinfrapb.DistSQLVersion = 38

/*
** VERSION HISTORY **
Please add new entries at the top.
- Version: 38 (MinAcceptedVersion: 38)
- A paired joiner approach for inverted joins was added, for left
outer/semi/anti joins involving the invertedJoiner and joinReader.
- Version: 37 (MinAcceptedVersion: 37)
- An InterleavedReaderJoiner processor was removed, and the old processor
spec would be unrecognized by a server running older versions, hence the
Expand Down
540 changes: 337 additions & 203 deletions pkg/sql/execinfrapb/processors_sql.pb.go

Large diffs are not rendered by default.

73 changes: 73 additions & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,55 @@ message IndexSkipTableReaderSpec {
//
// If performing an index join (where a = c and b = d) (lookup columns is []):
// Internal columns: | c | d | e |
//
// There is a special case when a "join reader" is used as the second join in
// a pair of joins to accomplish a LEFT_OUTER, LEFT_SEMI or LEFT_ANTI join.
// The first join in this pair of joins is unable to precisely evaluate the
// join condition and produces false positives. This is typical when the first
// join is an inverted join (see InvertedJoinerSpec), but can also be the case
// when the first join is being evaluated over an index that does not have all
// the columns needed to evaluate the join condition. The first join outputs
// rows in sorted order of the original left columns. The input stream columns
// for the second join are a combination of the original left columns and the
// lookup columns. The first join additionally adds a continuation column that
// demarcates a group of successive rows that correspond to an original left
// row. The first row in a group contains false (since it is not a
// continuation of the group) and successive rows contain true.
//
// The mapping from the original join to the pair of joins is:
// LEFT_OUTER => LEFT_OUTER, LEFT_OUTER
// LEFT_SEMI => INNER, LEFT_SEMI (better than doing INNER, INNER, SORT, DISTINCT)
// LEFT_ANTI => LEFT_OUTER, LEFT_ANTI.
// where the first join always preserves order.
//
// More specifically, consider a lookup join example where the input stream
// columns are: | a | b | c | d | cont |.
// The lookup column is | d |. And the table columns are | e | f | with
// d = e.
// This join reader can see input of the form
// a1, b1, c1, d1, false
// a1, b1, c1, d2, true
// a1, b2, c1, null, false // when the first join is LEFT_OUTER
// a2, b1, c1, d3, false
// a2, b1, c1, d4, true
//
// Say both the results for (a1, b1, c1) are false positives, and the first
// of the (a2, b1, c1) result is a false positive.
// The output for LEFT_OUTER:
// a1, b1, c1, d1, false, null, null
// a1, b2, c1, null, false, null, null
// a2, b1, c1, d4, true, d4, f1
// The d, cont columns are not part of the original left row, so will be
// projected away after the join.
//
// The output for LEFT_ANTI:
// a1, b1, c1, d1, false
// a1, b2, c1, null, false
// Again, the d, cont columns will be projected away after the join.
//
// The output for LEFT_SEMI:
// a2, b1, c1, d4, true
// Again, the d, cont columns will be projected away after the join.
message JoinReaderSpec {
optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false];

Expand Down Expand Up @@ -263,6 +312,10 @@ message JoinReaderSpec {
// result of the secondary index joined against the primary index is
// expected to contain the materialized system columns.
optional bool has_system_columns = 13 [(gogoproto.nullable) = false];

// LeftJoinWithPairedJoiner is used when a left {outer,anti,semi} join is
// being achieved by pairing two joins. See the comment above.
optional bool left_join_with_paired_joiner = 14 [(gogoproto.nullable) = false];
}

// SorterSpec is the specification for a "sorting aggregator". A sorting
Expand Down Expand Up @@ -484,6 +537,18 @@ message HashJoinerSpec {
// that was indexed). For LEFT_SEMI and LEFT_ANTI, the "internal columns" are
// the columns of the left input.
//
// In many cases, the inverted join will contain false positives wrt the
// original join condition. This is handled by pairing it with a lookup join.
// This pairing works naturally when the user query specified INNER, by
// running an INNER inverted join followed by INNER lookup join. For a user
// query with LEFT_OUTER/LEFT_ANTI, the inverted join is run as a LEFT_OUTER
// with a special mode that outputs an additional bool column that represents
// whether this row is a continuation of a group, where a group is defined as
// rows corresponding to the same original left row. This is paired with a
// lookup join that also knows about the semantics of this bool column. For a
// user query with LEFT_SEMI, the inverted join is run as an INNER join with
// the same special mode. See the JoinReaderSpec for an example.
//
// Example:
// Input stream columns: | a | b |
// Table columns: | c | d | e |
Expand All @@ -498,6 +563,9 @@ message HashJoinerSpec {
// Internal columns for INNER and LEFT_OUTER: | a | b | c | d | e |
// where d, e are not populated.
// Internal columns for LEFT_SEMI and LEFT_ANTI: | a | b |
//
// For INNER/LEFT_OUTER with OutputGroupContinuationForLeftRow = true, the
// internal columns include an additional bool column as the last column.
message InvertedJoinerSpec {
optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false];

Expand Down Expand Up @@ -545,6 +613,11 @@ message InvertedJoinerSpec {
// Indicates that the inverted joiner should maintain the ordering of the
// input stream.
optional bool maintain_ordering = 7 [(gogoproto.nullable) = false];

// Indicates that the join should output a continuation column that
// indicates whether a row is a continuation of a group corresponding to a
// left row.
optional bool output_group_continuation_for_left_row = 8 [(gogoproto.nullable) = false];
}

// InvertedFiltererSpec is the specification of a processor that does filtering
Expand Down
58 changes: 51 additions & 7 deletions pkg/sql/rowexec/inverted_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type invertedJoiner struct {
// A row with one element, corresponding to an encoded inverted column
// value. Used to construct the span of the index for that value.
invertedColRow rowenc.EncDatumRow

outputContinuationCol bool
}

var _ execinfra.Processor = &invertedJoiner{}
Expand Down Expand Up @@ -201,12 +203,18 @@ func newInvertedJoiner(
if ij.joinType == descpb.InnerJoin || ij.joinType == descpb.LeftOuterJoin {
outputColCount += len(rightColTypes)
includeRightCols = true
if spec.OutputGroupContinuationForLeftRow {
outputColCount++
}
}
outputColTypes := make([]*types.T, 0, outputColCount)
outputColTypes = append(outputColTypes, ij.inputTypes...)
if includeRightCols {
outputColTypes = append(outputColTypes, rightColTypes...)
}
if spec.OutputGroupContinuationForLeftRow {
outputColTypes = append(outputColTypes, types.Bool)
}
if err := ij.ProcessorBase.Init(
ij, post, outputColTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand All @@ -227,7 +235,11 @@ func newInvertedJoiner(
if err := ij.onExprHelper.Init(spec.OnExpr, onExprColTypes, semaCtx, ij.EvalCtx); err != nil {
return nil, err
}
ij.combinedRow = make(rowenc.EncDatumRow, 0, len(onExprColTypes))
combinedRowLen := len(onExprColTypes)
if spec.OutputGroupContinuationForLeftRow {
combinedRowLen++
}
ij.combinedRow = make(rowenc.EncDatumRow, 0, combinedRowLen)

if ij.datumsToInvertedExpr == nil {
var invertedExprHelper execinfrapb.ExprHelper
Expand Down Expand Up @@ -298,6 +310,8 @@ func newInvertedJoiner(
ij.diskMonitor,
)

ij.outputContinuationCol = spec.OutputGroupContinuationForLeftRow

return ij, nil
}

Expand Down Expand Up @@ -496,6 +510,9 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ
return ijEmittingRows, nil
}

var trueEncDatum = rowenc.DatumToEncDatum(types.Bool, tree.DBoolTrue)
var falseEncDatum = rowenc.DatumToEncDatum(types.Bool, tree.DBoolFalse)

// emitRow returns the next row from ij.emitCursor, if present. Otherwise it
// prepares for another input batch.
func (ij *invertedJoiner) emitRow() (
Expand Down Expand Up @@ -532,7 +549,11 @@ func (ij *invertedJoiner) emitRow() (
if !seenMatch {
switch ij.joinType {
case descpb.LeftOuterJoin:
return ijEmittingRows, ij.renderUnmatchedRow(ij.inputRows[inputRowIdx]), nil
ij.renderUnmatchedRow(ij.inputRows[inputRowIdx])
if ij.outputContinuationCol {
ij.combinedRow = append(ij.combinedRow, falseEncDatum)
}
return ijEmittingRows, ij.combinedRow, nil
case descpb.LeftAntiJoin:
return ijEmittingRows, ij.inputRows[inputRowIdx], nil
}
Expand Down Expand Up @@ -564,9 +585,22 @@ func (ij *invertedJoiner) emitRow() (
return nil
}
if renderedRow != nil {
seenMatch := ij.emitCursor.seenMatch
ij.emitCursor.seenMatch = true
switch ij.joinType {
case descpb.InnerJoin, descpb.LeftOuterJoin:
if ij.outputContinuationCol {
if seenMatch {
// This is not the first row output for this left row, so set the
// group continuation to true.
ij.combinedRow = append(ij.combinedRow, trueEncDatum)
} else {
// This is the first row output for this left row, so set the group
// continuation to false.
ij.combinedRow = append(ij.combinedRow, falseEncDatum)
}
renderedRow = ij.combinedRow
}
return ijEmittingRows, renderedRow, nil
case descpb.LeftSemiJoin:
// Skip the rest of the joined rows.
Expand All @@ -588,7 +622,8 @@ func (ij *invertedJoiner) emitRow() (
}

// render constructs a row with columns from both sides. The ON condition is
// evaluated; if it fails, returns nil.
// evaluated; if it fails, returns nil. When it returns a non-nil row, it is
// identical to ij.combinedRow.
func (ij *invertedJoiner) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatumRow, error) {
ij.combinedRow = append(ij.combinedRow[:0], lrow...)
ij.combinedRow = append(ij.combinedRow, rrow...)
Expand All @@ -601,14 +636,23 @@ func (ij *invertedJoiner) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatum
return ij.combinedRow, nil
}

// renderUnmatchedRow creates a result row given an unmatched row.
func (ij *invertedJoiner) renderUnmatchedRow(row rowenc.EncDatumRow) rowenc.EncDatumRow {
// renderUnmatchedRow creates a result row given an unmatched row and
// stores it in ij.combinedRow.
func (ij *invertedJoiner) renderUnmatchedRow(row rowenc.EncDatumRow) {
// Append the left row.
ij.combinedRow = append(ij.combinedRow[:0], row...)
ij.combinedRow = ij.combinedRow[:cap(ij.combinedRow)]
rowLen := cap(ij.combinedRow)
if ij.outputContinuationCol {
// The cap of combinedRow includes the continuation col, which is not
// appended by this function, so compensate for that.
rowLen--
}
// Slice to make the combined left + right row and set the right columns to
// NULL.
ij.combinedRow = ij.combinedRow[:rowLen]
for i := len(row); i < len(ij.combinedRow); i++ {
ij.combinedRow[i].Datum = tree.DNull
}
return ij.combinedRow
}

func (ij *invertedJoiner) transformToKeyRow(row rowenc.EncDatumRow) {
Expand Down
Loading

0 comments on commit cbdadf2

Please sign in to comment.