-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
joinreader_span_generator.go
681 lines (599 loc) · 23.1 KB
/
joinreader_span_generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package rowexec
import (
"fmt"
"sort"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/span"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)
// joinReaderSpanGenerator is used by the joinReader to generate spans for
// looking up into the index.
type joinReaderSpanGenerator interface {
// generateSpans generates spans for the given batch of input rows.
generateSpans(rows []rowenc.EncDatumRow) (roachpb.Spans, error)
// getMatchingRowIndices returns the indices of the input rows that desire
// the given key (i.e., the indices of the rows passed to generateSpans that
// generated spans containing the given key).
getMatchingRowIndices(key roachpb.Key) []int
// maxLookupCols returns the maximum number of index columns used as the key
// for each lookup.
maxLookupCols() int
}
var _ joinReaderSpanGenerator = &defaultSpanGenerator{}
var _ joinReaderSpanGenerator = &multiSpanGenerator{}
var _ joinReaderSpanGenerator = &localityOptimizedSpanGenerator{}
type defaultSpanGenerator struct {
spanBuilder *span.Builder
numKeyCols int
lookupCols []uint32
indexKeyRow rowenc.EncDatumRow
// keyToInputRowIndices maps a lookup span key to the input row indices that
// desire that span. This is used for joins other than index joins, for
// de-duping spans, and to map the fetched rows to the input rows that need
// to join with them. Index joins already have unique rows in the input that
// generate unique spans for fetch, and simply output the fetched rows, do
// do not use this map.
keyToInputRowIndices map[string][]int
scratchSpans roachpb.Spans
}
// Generate spans for a given row.
// If lookup columns are specified will use those to collect the relevant
// columns. Otherwise the first rows are assumed to correspond with the index.
// It additionally returns whether the row contains null, which is needed to
// decide whether or not to split the generated span into separate family
// specific spans.
func (g *defaultSpanGenerator) generateSpan(
row rowenc.EncDatumRow,
) (_ roachpb.Span, containsNull bool, _ error) {
numLookupCols := len(g.lookupCols)
if numLookupCols > g.numKeyCols {
return roachpb.Span{}, false, errors.Errorf(
"%d lookup columns specified, expecting at most %d", numLookupCols, g.numKeyCols)
}
g.indexKeyRow = g.indexKeyRow[:0]
for _, id := range g.lookupCols {
g.indexKeyRow = append(g.indexKeyRow, row[id])
}
return g.spanBuilder.SpanFromEncDatums(g.indexKeyRow, numLookupCols)
}
func (g *defaultSpanGenerator) hasNullLookupColumn(row rowenc.EncDatumRow) bool {
for _, colIdx := range g.lookupCols {
if row[colIdx].IsNull() {
return true
}
}
return false
}
// generateSpans is part of the joinReaderSpanGenerator interface.
func (g *defaultSpanGenerator) generateSpans(rows []rowenc.EncDatumRow) (roachpb.Spans, error) {
// This loop gets optimized to a runtime.mapclear call.
for k := range g.keyToInputRowIndices {
delete(g.keyToInputRowIndices, k)
}
// We maintain a map from index key to the corresponding input rows so we can
// join the index results to the inputs.
g.scratchSpans = g.scratchSpans[:0]
for i, inputRow := range rows {
if g.hasNullLookupColumn(inputRow) {
continue
}
generatedSpan, containsNull, err := g.generateSpan(inputRow)
if err != nil {
return nil, err
}
if g.keyToInputRowIndices == nil {
// Index join.
g.scratchSpans = g.spanBuilder.MaybeSplitSpanIntoSeparateFamilies(
g.scratchSpans, generatedSpan, len(g.lookupCols), containsNull)
} else {
inputRowIndices := g.keyToInputRowIndices[string(generatedSpan.Key)]
if inputRowIndices == nil {
g.scratchSpans = g.spanBuilder.MaybeSplitSpanIntoSeparateFamilies(
g.scratchSpans, generatedSpan, len(g.lookupCols), containsNull)
}
g.keyToInputRowIndices[string(generatedSpan.Key)] = append(inputRowIndices, i)
}
}
return g.scratchSpans, nil
}
// getMatchingRowIndices is part of the joinReaderSpanGenerator interface.
func (g *defaultSpanGenerator) getMatchingRowIndices(key roachpb.Key) []int {
return g.keyToInputRowIndices[string(key)]
}
// maxLookupCols is part of the joinReaderSpanGenerator interface.
func (g *defaultSpanGenerator) maxLookupCols() int {
return len(g.lookupCols)
}
type spanRowIndex struct {
span roachpb.Span
rowIndices []int
}
type spanRowIndices []spanRowIndex
func (a spanRowIndices) Len() int { return len(a) }
func (a spanRowIndices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a spanRowIndices) Less(i, j int) bool { return a[i].span.Key.Compare(a[j].span.Key) < 0 }
var _ sort.Interface = &spanRowIndices{}
// multiSpanGenerator is the joinReaderSpanGenerator used when each lookup will
// scan multiple spans in the index. This is the case when some of the index
// columns can take on multiple constant values. For example, the
// multiSpanGenerator would be used for a left lookup join in the following
// case:
// - The index has key columns (region, id)
// - The input columns are (a, b, c)
// - The join condition is region IN ('east', 'west') AND id = a
// In this case, the multiSpanGenerator would generate two spans for each input
// row: [/'east'/<val_a> - /'east'/<val_a>] [/'west'/<val_a> - /'west'/<val_a>].
type multiSpanGenerator struct {
spanBuilder *span.Builder
// indexColInfos stores info about the values that each index column can
// take on in the spans produced by the multiSpanGenerator. See the comment
// above multiSpanGeneratorIndexColInfo for more details.
indexColInfos []interface{ multiSpanGeneratorColInfo }
// indexKeyRows and indexKeySpans are used to generate the spans for a single
// input row. They are allocated once in init(), and then reused for every row.
indexKeyRows []rowenc.EncDatumRow
indexKeySpans roachpb.Spans
// spanToInputRowIndices maps a lookup span to the input row indices that
// desire that span. This is used for de-duping spans, and to map the fetched
// rows to the input rows that need to join with them.
spanToInputRowIndices spanRowIndices
// spansCount is the number of spans generated for each input row.
spansCount int
// indexOrds contains the ordinals (i.e., the positions in the index) of the
// index columns that are constrained by the spans produced by this
// multiSpanGenerator. indexOrds must be a prefix of the index columns.
indexOrds util.FastIntSet
// tableOrdToIndexOrd maps the ordinals of columns in the table to their
// ordinals in the index.
tableOrdToIndexOrd util.FastIntMap
// numInputCols is the number of columns in the input to the joinReader.
numInputCols int
// index of inequality colinfo (there can be only one), -1 otherwise
inequalityColIdx int
scratchSpans roachpb.Spans
}
// multiSpanGeneratorColInfo contains info about the values that a specific
// index column can take on in the spans produced by the multiSpanGenerator. The
// column ordinal is not contained in this struct, but depends on the location
// of this struct in the indexColInfos slice; the position in the slice
// corresponds to the position in the index.
type multiSpanGeneratorColInfo interface {
String() string
}
// multiSpanGeneratorValuesColInfo is used to represent a column constrained
// by a set of constants (ie '=' or 'in' expressions).
type multiSpanGeneratorValuesColInfo struct {
constVals tree.Datums
}
func (i multiSpanGeneratorValuesColInfo) String() string {
return fmt.Sprintf("[constVals: %s]", i.constVals.String())
}
// multiSpanGeneratorIndexVarColInfo represents a column that matches a column
// in the input row. inputRowIdx corresponds to an index into the input row.
// This is the case for join filters such as c = a, where c is a column in the
// index and a is a column in the input.
type multiSpanGeneratorIndexVarColInfo struct {
inputRowIdx int
}
func (i multiSpanGeneratorIndexVarColInfo) String() string {
return fmt.Sprintf("[inputRowIdx: %d]", i.inputRowIdx)
}
// multiSpanGeneratorInequalityColInfo represents a column that is bound by a
// range expression. If there are <,>, >= or <= inequalities we distill them
// into a start and end datum.
type multiSpanGeneratorInequalityColInfo struct {
start tree.Datum
startInclusive bool
end tree.Datum
endInclusive bool
}
func (i multiSpanGeneratorInequalityColInfo) String() string {
var startBoundary rune
if i.startInclusive {
startBoundary = '['
} else {
startBoundary = '('
}
var endBoundary rune
if i.endInclusive {
endBoundary = ']'
} else {
endBoundary = ')'
}
return fmt.Sprintf("%c%v - %v%c", startBoundary, i.start, i.end, endBoundary)
}
var _ multiSpanGeneratorColInfo = &multiSpanGeneratorValuesColInfo{}
var _ multiSpanGeneratorColInfo = &multiSpanGeneratorIndexVarColInfo{}
var _ multiSpanGeneratorColInfo = &multiSpanGeneratorInequalityColInfo{}
// maxLookupCols is part of the joinReaderSpanGenerator interface.
func (g *multiSpanGenerator) maxLookupCols() int {
return len(g.indexColInfos)
}
// init must be called before the multiSpanGenerator can be used to generate
// spans.
func (g *multiSpanGenerator) init(
spanBuilder *span.Builder,
numKeyCols int,
numInputCols int,
exprHelper *execinfrapb.ExprHelper,
tableOrdToIndexOrd util.FastIntMap,
) error {
g.spanBuilder = spanBuilder
g.numInputCols = numInputCols
g.tableOrdToIndexOrd = tableOrdToIndexOrd
g.inequalityColIdx = -1
// Initialize the spansCount to 1, since we'll always have at least one span.
// This number may increase when we call fillInIndexColInfos() below.
g.spansCount = 1
// Process the given expression to fill in g.indexColInfos with info from the
// join conditions. This info will be used later to generate the spans.
g.indexColInfos = make([]interface{ multiSpanGeneratorColInfo }, 0, numKeyCols)
if err := g.fillInIndexColInfos(exprHelper.Expr); err != nil {
return err
}
// Check that the results of fillInIndexColInfos can be used to generate valid
// spans.
lookupColsCount := len(g.indexColInfos)
if lookupColsCount != g.indexOrds.Len() {
return errors.AssertionFailedf(
"columns in the join condition do not form a prefix on the index",
)
}
if lookupColsCount > numKeyCols {
return errors.AssertionFailedf(
"%d lookup columns specified, expecting at most %d", lookupColsCount, numKeyCols,
)
}
// Fill in g.indexKeyRows with the cartesian product of the constant values
// collected above. This reduces the amount of work that is needed for each
// input row, since only columns depending on the input values will need
// to be filled in later.
//
// For example, suppose that we have index columns on
// (region, tenant, category, id), and the join conditions are:
//
// region IN ('east', 'west')
// AND tenant = input.tenant
// AND category IN (1, 2)
// AND id = input.id
//
// The following code would create the following rows, leaving spaces for
// tenant and id to be filled in later:
//
// [ 'east' - 1 - ]
// [ 'west' - 1 - ]
// [ 'east' - 2 - ]
// [ 'west' - 2 - ]
//
// Make first pass flushing out the structure with const values
g.indexKeyRows = make([]rowenc.EncDatumRow, 1, g.spansCount)
g.indexKeyRows[0] = make(rowenc.EncDatumRow, 0, lookupColsCount)
for _, info := range g.indexColInfos {
if valuesInfo, ok := info.(multiSpanGeneratorValuesColInfo); ok {
for i, n := 0, len(g.indexKeyRows); i < n; i++ {
indexKeyRow := g.indexKeyRows[i]
for j := 1; j < len(valuesInfo.constVals); j++ {
newIndexKeyRow := make(rowenc.EncDatumRow, len(indexKeyRow), lookupColsCount)
copy(newIndexKeyRow, indexKeyRow)
newIndexKeyRow = append(newIndexKeyRow, rowenc.EncDatum{Datum: valuesInfo.constVals[j]})
g.indexKeyRows = append(g.indexKeyRows, newIndexKeyRow)
}
g.indexKeyRows[i] = append(indexKeyRow, rowenc.EncDatum{Datum: valuesInfo.constVals[0]})
}
} else {
for i := 0; i < len(g.indexKeyRows); i++ {
// Just fill in an empty EncDatum for now -- this will be replaced
// inside generateNonNullSpans when we process each row.
g.indexKeyRows[i] = append(g.indexKeyRows[i], rowenc.EncDatum{})
}
}
}
g.indexKeySpans = make(roachpb.Spans, 0, g.spansCount)
return nil
}
// fillInIndexColInfos recursively walks the expression tree to collect join
// conditions that are AND-ed together. It fills in g.indexColInfos with info
// from the join conditions.
//
// The only acceptable join conditions are:
// 1. Equalities between input columns and index columns, such as c1 = c2.
// 2. Equalities or IN conditions between index columns and constants, such
// as c = 5 or c IN ('a', 'b', 'c').
// 3. Inequalities from (possibly AND'd) <,>,<=,>= exprs.
//
// The optimizer should have ensured that all conditions fall into one of
// these categories. Any other expression types will return an error.
// TODO: We should probably be doing this at compile time, see #65773
func (g *multiSpanGenerator) fillInIndexColInfos(expr tree.TypedExpr) error {
switch t := expr.(type) {
case *tree.AndExpr:
if err := g.fillInIndexColInfos(t.Left.(tree.TypedExpr)); err != nil {
return err
}
return g.fillInIndexColInfos(t.Right.(tree.TypedExpr))
case *tree.ComparisonExpr:
setOfVals := false
inequality := false
switch t.Operator {
case tree.EQ, tree.In:
setOfVals = true
case tree.GE, tree.LE, tree.GT, tree.LT:
inequality = true
default:
// This should never happen because of enforcement at opt time.
return errors.AssertionFailedf("comparison operator not supported. Found %s", t.Operator)
}
tabOrd := -1
var info multiSpanGeneratorColInfo
// For EQ and In, we just need to check the types of the arguments in order
// to extract the info. For inequalities we return the const datums that
// will form the span boundaries.
getInfo := func(typedExpr tree.TypedExpr) (tree.Datum, error) {
switch t := typedExpr.(type) {
case *tree.IndexedVar:
// IndexedVars can either be from the input or the index. If the
// IndexedVar is from the index, shift it over by numInputCols to
// find the corresponding ordinal in the base table.
if t.Idx >= g.numInputCols {
tabOrd = t.Idx - g.numInputCols
} else {
info = multiSpanGeneratorIndexVarColInfo{inputRowIdx: t.Idx}
}
case tree.Datum:
if setOfVals {
var values tree.Datums
switch t.ResolvedType().Family() {
case types.TupleFamily:
values = t.(*tree.DTuple).D
default:
values = tree.Datums{t}
}
// Every time there are multiple possible values, we multiply the
// spansCount by the number of possibilities. We will need to create
// spans representing the cartesian product of possible values for
// each column.
info = multiSpanGeneratorValuesColInfo{constVals: values}
g.spansCount *= len(values)
} else {
return t, nil
}
default:
return nil, errors.AssertionFailedf("unhandled comparison argument type %T", t)
}
return nil, nil
}
// NB: we make no attempt to deal with column direction here, that is sorted
// out later in the span builder.
var inequalityInfo multiSpanGeneratorInequalityColInfo
if lval, err := getInfo(t.Left.(tree.TypedExpr)); lval != nil {
if t.Operator == tree.LT || t.Operator == tree.LE {
inequalityInfo.start = lval
inequalityInfo.startInclusive = t.Operator == tree.LE
} else {
inequalityInfo.end = lval
inequalityInfo.endInclusive = t.Operator == tree.GE
}
} else if err != nil {
return err
}
if rval, err := getInfo(t.Right.(tree.TypedExpr)); rval != nil {
if t.Operator == tree.LT || t.Operator == tree.LE {
inequalityInfo.end = rval
inequalityInfo.endInclusive = t.Operator == tree.LE
} else {
inequalityInfo.start = rval
inequalityInfo.startInclusive = t.Operator == tree.GE
}
} else if err != nil {
return err
}
idxOrd, ok := g.tableOrdToIndexOrd.Get(tabOrd)
if !ok {
return errors.AssertionFailedf("table column %d not found in index", tabOrd)
}
if inequality {
info = inequalityInfo
g.inequalityColIdx = idxOrd
}
if len(g.indexColInfos) <= idxOrd {
g.indexColInfos = g.indexColInfos[:idxOrd+1]
}
g.indexColInfos[idxOrd] = info
g.indexOrds.Add(idxOrd)
default:
return errors.AssertionFailedf("unhandled expression type %T", t)
}
return nil
}
// generateNonNullSpans generates spans for a given row. It does not include
// null values, since those values would not match the lookup condition anyway.
func (g *multiSpanGenerator) generateNonNullSpans(row rowenc.EncDatumRow) (roachpb.Spans, error) {
// Fill in the holes in g.indexKeyRows that correspond to input row values.
for i := 0; i < len(g.indexKeyRows); i++ {
for j, info := range g.indexColInfos {
if inf, ok := info.(multiSpanGeneratorIndexVarColInfo); ok {
g.indexKeyRows[i][j] = row[inf.inputRowIdx]
}
}
}
// Convert the index key rows to spans.
g.indexKeySpans = g.indexKeySpans[:0]
var inequalityInfo multiSpanGeneratorInequalityColInfo
if g.inequalityColIdx != -1 {
inequalityInfo = g.indexColInfos[g.inequalityColIdx].(multiSpanGeneratorInequalityColInfo)
}
//build spans for each row
for _, indexKeyRow := range g.indexKeyRows {
var s roachpb.Span
var err error
var containsNull bool
if g.inequalityColIdx == -1 {
s, containsNull, err = g.spanBuilder.SpanFromEncDatums(indexKeyRow, len(g.indexColInfos))
} else {
s, containsNull, err = g.spanBuilder.SpanFromEncDatumsWithRange(indexKeyRow, len(g.indexColInfos),
inequalityInfo.start, inequalityInfo.startInclusive, inequalityInfo.end, inequalityInfo.endInclusive)
}
if err != nil {
return roachpb.Spans{}, err
}
if !containsNull {
g.indexKeySpans = append(g.indexKeySpans, s)
}
}
return g.indexKeySpans, nil
}
// findInputRowIndicesByKey does a binary search to find the span that contains
// the given key.
func (g *multiSpanGenerator) findInputRowIndicesByKey(key roachpb.Key) []int {
i, j := 0, len(g.spanToInputRowIndices)
for i < j {
h := (i + j) >> 1
sp := g.spanToInputRowIndices[h]
if sp.span.ContainsKey(key) {
return sp.rowIndices
}
if key.Compare(sp.span.Key) < 0 {
j = h
} else {
i = h + 1
}
}
return nil
}
// findInputRowIndices does a linear search for a given span, this is during
// rowexec.generateSpans before spanToInputRowIndices is sorted.
func (g *multiSpanGenerator) findInputRowIndices(span roachpb.Span) []int {
for _, s := range g.spanToInputRowIndices {
if span.Equal(s.span) {
return s.rowIndices
}
}
return nil
}
// setInputRowIndices installs a slice of row indices for a given span, this is during
// rowexec.generateSpans before spanToInputRowIndices is sorted.
func (g *multiSpanGenerator) setInputRowIndices(span roachpb.Span, indices []int) {
for i, s := range g.spanToInputRowIndices {
if span.Equal(s.span) {
g.spanToInputRowIndices[i].rowIndices = indices
return
}
}
// not found, add it
g.spanToInputRowIndices = append(g.spanToInputRowIndices, spanRowIndex{span: span, rowIndices: indices})
}
// generateSpans is part of the joinReaderSpanGenerator interface.
func (g *multiSpanGenerator) generateSpans(rows []rowenc.EncDatumRow) (roachpb.Spans, error) {
g.spanToInputRowIndices = g.spanToInputRowIndices[:0]
// We maintain a map from index key to the corresponding input rows so we can
// join the index results to the inputs.
g.scratchSpans = g.scratchSpans[:0]
for i, inputRow := range rows {
generatedSpans, err := g.generateNonNullSpans(inputRow)
if err != nil {
return nil, err
}
for j := range generatedSpans {
generatedSpan := &generatedSpans[j]
inputRowIndices := g.findInputRowIndices(*generatedSpan)
if inputRowIndices == nil {
// RFC: should doing range spans affects how/whether we should call this?
// it seems to break when calling it with inequality spans
if g.inequalityColIdx != -1 {
g.scratchSpans = append(g.scratchSpans, *generatedSpan)
} else {
g.scratchSpans = g.spanBuilder.MaybeSplitSpanIntoSeparateFamilies(
g.scratchSpans, *generatedSpan, len(g.indexColInfos), false /* containsNull */)
}
}
inputRowIndices = append(inputRowIndices, i)
g.setInputRowIndices(*generatedSpan, inputRowIndices)
}
}
if len(g.spanToInputRowIndices) > 1 {
sort.Sort(g.spanToInputRowIndices)
}
return g.scratchSpans, nil
}
// getMatchingRowIndices is part of the joinReaderSpanGenerator interface.
func (g *multiSpanGenerator) getMatchingRowIndices(key roachpb.Key) []int {
return g.findInputRowIndicesByKey(key)
}
// localityOptimizedSpanGenerator is the span generator for locality optimized
// lookup joins. The localSpanGen is used to generate spans targeting local
// nodes, and the remoteSpanGen is used to generate spans targeting remote
// nodes.
type localityOptimizedSpanGenerator struct {
localSpanGen multiSpanGenerator
remoteSpanGen multiSpanGenerator
}
// init must be called before the localityOptimizedSpanGenerator can be used to
// generate spans.
func (g *localityOptimizedSpanGenerator) init(
spanBuilder *span.Builder,
numKeyCols int,
numInputCols int,
localExprHelper *execinfrapb.ExprHelper,
remoteExprHelper *execinfrapb.ExprHelper,
tableOrdToIndexOrd util.FastIntMap,
) error {
if err := g.localSpanGen.init(
spanBuilder, numKeyCols, numInputCols, localExprHelper, tableOrdToIndexOrd,
); err != nil {
return err
}
if err := g.remoteSpanGen.init(
spanBuilder, numKeyCols, numInputCols, remoteExprHelper, tableOrdToIndexOrd,
); err != nil {
return err
}
// Check that the resulting span generators have the same lookup columns.
localLookupCols := g.localSpanGen.maxLookupCols()
remoteLookupCols := g.remoteSpanGen.maxLookupCols()
if localLookupCols != remoteLookupCols {
return errors.AssertionFailedf(
"local lookup cols (%d) != remote lookup cols (%d)", localLookupCols, remoteLookupCols,
)
}
return nil
}
// maxLookupCols is part of the joinReaderSpanGenerator interface.
func (g *localityOptimizedSpanGenerator) maxLookupCols() int {
// We already asserted in init that maxLookupCols is the same for both the
// local and remote span generators.
return g.localSpanGen.maxLookupCols()
}
// generateSpans is part of the joinReaderSpanGenerator interface.
func (g *localityOptimizedSpanGenerator) generateSpans(
rows []rowenc.EncDatumRow,
) (roachpb.Spans, error) {
return g.localSpanGen.generateSpans(rows)
}
// generateRemoteSpans generates spans targeting remote nodes for the given
// batch of input rows.
func (g *localityOptimizedSpanGenerator) generateRemoteSpans(
rows []rowenc.EncDatumRow,
) (roachpb.Spans, error) {
return g.remoteSpanGen.generateSpans(rows)
}
// getMatchingRowIndices is part of the joinReaderSpanGenerator interface.
func (g *localityOptimizedSpanGenerator) getMatchingRowIndices(key roachpb.Key) []int {
if res := g.localSpanGen.getMatchingRowIndices(key); len(res) > 0 {
return res
}
return g.remoteSpanGen.getMatchingRowIndices(key)
}