From f39ae242e802f13969c495592aecb1dbe40110e8 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Thu, 28 May 2020 14:34:02 -0400 Subject: [PATCH] rowcontainer,rowexec: switch joinReader to use DiskBackedNumberedRowContainer Additionally, - added a randomized correctness test that compares the results of the indexed and numbered containers. - added benchmark cases to the joinReader benchmark that limit memory. None of the workloads have repeated reads of the same right row and all access the right rows in monotonically increasing order so the difference between the two containers is due to the numbered container avoiding the overhead of populating the cache. - reduced the number of allocations in newNumberedDiskRowIterator. - the accesses slices share the same underlying slice. - a row copy, when there is a miss and the row is not added to the cache, is eliminated. When a copy is needed and we have evicted a row from the cache, the copying reuses that evicted row. - allocations of the map, map elements are reused. Fixes #48118 Release note: None --- .../rowcontainer/numbered_row_container.go | 110 ++++++++--- .../numbered_row_container_test.go | 153 +++++++++++++-- pkg/sql/rowexec/joinreader.go | 6 +- pkg/sql/rowexec/joinreader_strategies.go | 21 ++- pkg/sql/rowexec/joinreader_test.go | 178 ++++++++++-------- 5 files changed, 336 insertions(+), 132 deletions(-) diff --git a/pkg/sql/rowcontainer/numbered_row_container.go b/pkg/sql/rowcontainer/numbered_row_container.go index 71898292acb4..5fc117376ff3 100644 --- a/pkg/sql/rowcontainer/numbered_row_container.go +++ b/pkg/sql/rowcontainer/numbered_row_container.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "math" + "sync" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -38,7 +39,10 @@ type DiskBackedNumberedRowContainer struct { storedTypes []*types.T idx int // the index of the next row to be added into the container - rowIter *numberedDiskRowIterator + rowIter *numberedDiskRowIterator + // cacheMap is a map used in the implementation of rowIter that is kept + // in the container to avoid repeated allocation. + cacheMap map[int]*cacheElement rowIterMemAcc mon.BoundAccount DisableCache bool } @@ -95,6 +99,12 @@ func (d *DiskBackedNumberedRowContainer) UsingDisk() bool { return d.rc.UsingDisk() } +// Spilled returns whether or not the primary container spilled to disk in its +// lifetime. +func (d *DiskBackedNumberedRowContainer) Spilled() bool { + return d.rc.Spilled() +} + // testingSpillToDisk is for tests to spill the container(s) // to disk. func (d *DiskBackedNumberedRowContainer) testingSpillToDisk(ctx context.Context) error { @@ -161,8 +171,11 @@ func (d *DiskBackedNumberedRowContainer) SetupForRead(ctx context.Context, acces // This is not an efficient way to disable the cache, but ok for tests. cacheSize = 0 } + if d.cacheMap == nil { + d.cacheMap = make(map[int]*cacheElement) + } d.rowIter = newNumberedDiskRowIterator( - ctx, rowIter, accesses, meanRowsPerSSBlock, cacheSize, &d.rowIterMemAcc) + ctx, rowIter, accesses, meanRowsPerSSBlock, cacheSize, d.cacheMap, &d.rowIterMemAcc) } // GetRow returns a row with the given index. If skip is true the row is not @@ -261,7 +274,6 @@ func (d *DiskBackedNumberedRowContainer) Close(ctx context.Context) { // cost of a cache miss is high. // // TODO(sumeer): -// - Before integrating with joinReader, try some realistic benchmarks. // - Use some realistic inverted index workloads (including geospatial) to // measure the effect of this cache. type numberedDiskRowIterator struct { @@ -305,6 +317,26 @@ type cacheElement struct { row sqlbase.EncDatumRow // When row is non-nil, this is the element in the heap. heapElement cacheRowHeapElement + // Used only when initializing accesses, so that we can allocate a single + // shared slice for accesses across all cacheElements. + numAccesses int +} + +var cacheElementSyncPool = sync.Pool{ + New: func() interface{} { + return &cacheElement{} + }, +} + +func freeCacheElement(elem *cacheElement) { + elem.accesses = nil + elem.row = nil + elem.numAccesses = 0 + cacheElementSyncPool.Put(elem) +} + +func newCacheElement() *cacheElement { + return cacheElementSyncPool.Get().(*cacheElement) } type cacheRowHeapElement struct { @@ -349,6 +381,7 @@ func newNumberedDiskRowIterator( accesses [][]int, meanRowsPerSSBlock int, maxCacheSize int, + cache map[int]*cacheElement, memAcc *mon.BoundAccount, ) *numberedDiskRowIterator { n := &numberedDiskRowIterator{ @@ -356,17 +389,31 @@ func newNumberedDiskRowIterator( meanRowsPerSSBlock: meanRowsPerSSBlock, maxCacheSize: maxCacheSize, memAcc: memAcc, - cache: make(map[int]*cacheElement, maxCacheSize), + cache: cache, } - var accessIdx int + var numAccesses int for _, accSlice := range accesses { for _, rowIdx := range accSlice { elem := n.cache[rowIdx] if elem == nil { - elem = &cacheElement{} + elem = newCacheElement() elem.heapElement.rowIdx = rowIdx n.cache[rowIdx] = elem } + elem.numAccesses++ + numAccesses++ + } + } + allAccesses := make([]int, numAccesses) + accessIdx := 0 + for _, accSlice := range accesses { + for _, rowIdx := range accSlice { + elem := n.cache[rowIdx] + if elem.accesses == nil { + // Sub-slice that can grow up to elem.numAccesses + elem.accesses = allAccesses[0:0:elem.numAccesses] + allAccesses = allAccesses[elem.numAccesses:] + } elem.accesses = append(elem.accesses, accessIdx) accessIdx++ } @@ -376,6 +423,10 @@ func newNumberedDiskRowIterator( func (n *numberedDiskRowIterator) close() { n.rowIter.Close() + for k, v := range n.cache { + freeCacheElement(v) + delete(n.cache, k) + } } func (n *numberedDiskRowIterator) getRow( @@ -471,15 +522,13 @@ func (n *numberedDiskRowIterator) getRow( return n.tryAddCacheAndReturnRow(ctx, elem) } -func (n *numberedDiskRowIterator) ensureDecodedAndCopy( - row sqlbase.EncDatumRow, -) (sqlbase.EncDatumRow, error) { +func (n *numberedDiskRowIterator) ensureDecoded(row sqlbase.EncDatumRow) error { for i := range row { if err := row[i].EnsureDecoded(n.rowIter.rowContainer.types[i], &n.datumAlloc); err != nil { - return nil, err + return err } } - return n.rowAlloc.CopyRow(row), nil + return nil } func (n *numberedDiskRowIterator) tryAddCacheAndReturnRow( @@ -489,11 +538,13 @@ func (n *numberedDiskRowIterator) tryAddCacheAndReturnRow( if err != nil { return nil, err } - row, err := n.ensureDecodedAndCopy(r) - if err != nil { + if err = n.ensureDecoded(r); err != nil { return nil, err } - return row, n.tryAddCacheHelper(ctx, elem, row, true) + if len(elem.accesses) == 0 { + return r, nil + } + return r, n.tryAddCacheHelper(ctx, elem, r, true) } func (n *numberedDiskRowIterator) tryAddCache(ctx context.Context, elem *cacheElement) error { @@ -514,27 +565,26 @@ func (n *numberedDiskRowIterator) tryAddCache(ctx context.Context, elem *cacheEl } func (n *numberedDiskRowIterator) tryAddCacheHelper( - ctx context.Context, elem *cacheElement, row sqlbase.EncDatumRow, alreadyDecodedAndCopied bool, + ctx context.Context, elem *cacheElement, row sqlbase.EncDatumRow, alreadyDecoded bool, ) error { - if len(elem.accesses) == 0 { - return nil - } if elem.row != nil { log.Fatalf(ctx, "adding row to cache when it is already in cache") } nextAccess := elem.accesses[0] - evict := func() error { + evict := func() (sqlbase.EncDatumRow, error) { heapElem := heap.Pop(&n.cacheHeap).(*cacheRowHeapElement) evictElem, ok := n.cache[heapElem.rowIdx] if !ok { - return errors.Errorf("bug: element not in cache map") + return nil, errors.Errorf("bug: element not in cache map") } bytes := evictElem.row.Size() n.memAcc.Shrink(ctx, int64(bytes)) + evictedRow := evictElem.row evictElem.row = nil - return nil + return evictedRow, nil } rowBytesUsage := -1 + var rowToReuse sqlbase.EncDatumRow for { if n.maxCacheSize == 0 { return nil @@ -542,8 +592,9 @@ func (n *numberedDiskRowIterator) tryAddCacheHelper( if len(n.cacheHeap) == n.maxCacheSize && n.cacheHeap[0].nextAccess <= nextAccess { return nil } + var err error if len(n.cacheHeap) >= n.maxCacheSize { - if err := evict(); err != nil { + if rowToReuse, err = evict(); err != nil { return err } continue @@ -553,13 +604,12 @@ func (n *numberedDiskRowIterator) tryAddCacheHelper( // many rows memAcc will allow us to place in the cache. So it is likely // that this row can be added. Decode the row to get the correct // rowBytesUsage. - if !alreadyDecodedAndCopied { - var err error - row, err = n.ensureDecodedAndCopy(row) + if !alreadyDecoded { + err = n.ensureDecoded(row) if err != nil { return err } - alreadyDecodedAndCopied = true + alreadyDecoded = true } if rowBytesUsage == -1 { rowBytesUsage = int(row.Size()) @@ -583,7 +633,13 @@ func (n *numberedDiskRowIterator) tryAddCacheHelper( } // Add to cache. elem.heapElement.nextAccess = nextAccess - elem.row = row + // Need to copy row, since its lifetime is less than the cached row. + if rowToReuse == nil { + elem.row = n.rowAlloc.CopyRow(row) + } else { + copy(rowToReuse, row) + elem.row = rowToReuse + } heap.Push(&n.cacheHeap, &elem.heapElement) return nil } diff --git a/pkg/sql/rowcontainer/numbered_row_container_test.go b/pkg/sql/rowcontainer/numbered_row_container_test.go index 479c959cdc76..5876f5ff4582 100644 --- a/pkg/sql/rowcontainer/numbered_row_container_test.go +++ b/pkg/sql/rowcontainer/numbered_row_container_test.go @@ -61,17 +61,15 @@ func TestNumberedRowContainerDeDuping(t *testing.T) { st, ) diskMonitor := execinfra.NewTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) memoryBudget := math.MaxInt64 if rng.Intn(2) == 0 { fmt.Printf("using smallMemoryBudget to spill to disk\n") memoryBudget = smallMemoryBudget } - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(memoryBudget))) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) - defer diskMonitor.Stop(ctx) // Use random types and random rows. types := sqlbase.RandSortingTypes(rng, numCols) @@ -149,17 +147,15 @@ func TestNumberedRowContainerIteratorCaching(t *testing.T) { st, ) diskMonitor := execinfra.NewTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) numRows := 200 const numCols = 2 // This memory budget allows for some caching, but typically cannot // cache all the rows. const memoryBudget = 12000 - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(memoryBudget)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) - defer diskMonitor.Stop(ctx) // Use random types and random rows. rng, _ := randutil.NewPseudoRand() @@ -223,12 +219,120 @@ func TestNumberedRowContainerIteratorCaching(t *testing.T) { } } +// Tests that the DiskBackedNumberedRowContainer and +// DiskBackedIndexedRowContainer return the same results. +func TestCompareNumberedAndIndexedRowContainers(t *testing.T) { + defer leaktest.AfterTest(t)() + + rng, _ := randutil.NewPseudoRand() + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.MakeTestingEvalContext(st) + tempEngine, _, err := storage.NewTempEngine(ctx, storage.DefaultStorageEngine, base.DefaultTestTempStorageConfig(st), base.DefaultTestStoreSpec) + if err != nil { + t.Fatal(err) + } + defer tempEngine.Close() + + diskMonitor := execinfra.NewTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) + + numRows := 200 + const numCols = 2 + // This memory budget allows for some caching, but typically cannot + // cache all the rows. + var memoryBudget int64 = 12000 + if rng.Intn(2) == 0 { + memoryBudget = math.MaxInt64 + } + + // Use random types and random rows. + types := sqlbase.RandSortingTypes(rng, numCols) + ordering := sqlbase.ColumnOrdering{ + sqlbase.ColumnOrderInfo{ + ColIdx: 0, + Direction: encoding.Ascending, + }, + sqlbase.ColumnOrderInfo{ + ColIdx: 1, + Direction: encoding.Descending, + }, + } + numRows, rows := makeUniqueRows(t, &evalCtx, rng, numRows, types, ordering) + + var containers [2]numberedContainer + containers[0] = makeNumberedContainerUsingIRC( + ctx, t, types, &evalCtx, tempEngine, st, memoryBudget, diskMonitor) + containers[1] = makeNumberedContainerUsingNRC( + ctx, t, types, &evalCtx, tempEngine, st, memoryBudget, diskMonitor) + defer func() { + for _, rc := range containers { + rc.close(ctx) + } + }() + + // Each pass does an UnsafeReset at the end. + for passWithReset := 0; passWithReset < 2; passWithReset++ { + // Insert rows. + for i := 0; i < numRows; i++ { + for _, rc := range containers { + err := rc.addRow(ctx, rows[i]) + require.NoError(t, err) + } + } + // We want all the memory to be usable by the cache, so spill to disk. + if memoryBudget != math.MaxInt64 { + for _, rc := range containers { + require.NoError(t, rc.spillToDisk(ctx)) + } + } + + // Random access of the inserted rows. + var accesses [][]int + for i := 0; i < 2*numRows; i++ { + var access []int + for j := 0; j < 4; j++ { + access = append(access, rng.Intn(numRows)) + } + accesses = append(accesses, access) + } + for _, rc := range containers { + rc.setupForRead(ctx, accesses) + } + for _, access := range accesses { + for _, index := range access { + skip := rng.Intn(10) == 0 + var rows [2]sqlbase.EncDatumRow + for i, rc := range containers { + row, err := rc.getRow(ctx, index, skip) + require.NoError(t, err) + rows[i] = row + } + if skip { + continue + } + require.Equal(t, rows[0].String(types), rows[1].String(types)) + } + } + // Reset and reorder the rows for the next pass. + rand.Shuffle(numRows, func(i, j int) { + rows[i], rows[j] = rows[j], rows[i] + }) + for _, rc := range containers { + require.NoError(t, rc.unsafeReset(ctx)) + } + } +} + // Adapter interface that can be implemented using both DiskBackedNumberedRowContainer // and DiskBackedIndexedRowContainer. type numberedContainer interface { addRow(context.Context, sqlbase.EncDatumRow) error setupForRead(ctx context.Context, accesses [][]int) - getRow(ctx context.Context, idx int) (sqlbase.EncDatumRow, error) + getRow(ctx context.Context, idx int, skip bool) (sqlbase.EncDatumRow, error) + spillToDisk(context.Context) error + unsafeReset(context.Context) error close(context.Context) } @@ -245,17 +349,23 @@ func (d numberedContainerUsingNRC) setupForRead(ctx context.Context, accesses [] d.rc.SetupForRead(ctx, accesses) } func (d numberedContainerUsingNRC) getRow( - ctx context.Context, idx int, + ctx context.Context, idx int, skip bool, ) (sqlbase.EncDatumRow, error) { return d.rc.GetRow(ctx, idx, false) } +func (d numberedContainerUsingNRC) spillToDisk(ctx context.Context) error { + return d.rc.testingSpillToDisk(ctx) +} +func (d numberedContainerUsingNRC) unsafeReset(ctx context.Context) error { + return d.rc.UnsafeReset(ctx) +} func (d numberedContainerUsingNRC) close(ctx context.Context) { d.rc.Close(ctx) d.memoryMonitor.Stop(ctx) } func makeNumberedContainerUsingNRC( ctx context.Context, - b *testing.B, + t testing.TB, types []*types.T, evalCtx *tree.EvalContext, engine diskmap.Factory, @@ -266,7 +376,7 @@ func makeNumberedContainerUsingNRC( memoryMonitor := makeMemMonitorAndStart(ctx, st, memoryBudget) rc := NewDiskBackedNumberedRowContainer( false /* deDup */, types, evalCtx, engine, memoryMonitor, diskMonitor, 0 /* rowCapacity */) - require.NoError(b, rc.testingSpillToDisk(ctx)) + require.NoError(t, rc.testingSpillToDisk(ctx)) return numberedContainerUsingNRC{rc: rc, memoryMonitor: memoryMonitor} } @@ -280,21 +390,33 @@ func (d numberedContainerUsingIRC) addRow(ctx context.Context, row sqlbase.EncDa } func (d numberedContainerUsingIRC) setupForRead(context.Context, [][]int) {} func (d numberedContainerUsingIRC) getRow( - ctx context.Context, idx int, + ctx context.Context, idx int, skip bool, ) (sqlbase.EncDatumRow, error) { + if skip { + return nil, nil + } row, err := d.rc.GetRow(ctx, idx) if err != nil { return nil, err } return row.(IndexedRow).Row, nil } +func (d numberedContainerUsingIRC) spillToDisk(ctx context.Context) error { + if d.rc.UsingDisk() { + return nil + } + return d.rc.SpillToDisk(ctx) +} +func (d numberedContainerUsingIRC) unsafeReset(ctx context.Context) error { + return d.rc.UnsafeReset(ctx) +} func (d numberedContainerUsingIRC) close(ctx context.Context) { d.rc.Close(ctx) d.memoryMonitor.Stop(ctx) } func makeNumberedContainerUsingIRC( ctx context.Context, - b *testing.B, + t require.TestingT, types []*types.T, evalCtx *tree.EvalContext, engine diskmap.Factory, @@ -305,7 +427,7 @@ func makeNumberedContainerUsingIRC( memoryMonitor := makeMemMonitorAndStart(ctx, st, memoryBudget) rc := NewDiskBackedIndexedRowContainer( nil /* ordering */, types, evalCtx, engine, memoryMonitor, diskMonitor, 0 /* rowCapacity */) - require.NoError(b, rc.SpillToDisk(ctx)) + require.NoError(t, rc.SpillToDisk(ctx)) return numberedContainerUsingIRC{rc: rc, memoryMonitor: memoryMonitor} } @@ -518,7 +640,7 @@ func BenchmarkNumberedContainerIteratorCaching(b *testing.B) { nc.setupForRead(ctx, accesses) for i := 0; i < len(accesses); i++ { for j := 0; j < len(accesses[i]); j++ { - if _, err := nc.getRow(ctx, accesses[i][j]); err != nil { + if _, err := nc.getRow(ctx, accesses[i][j], false /* skip */); err != nil { b.Fatal(err) } } @@ -547,8 +669,5 @@ func BenchmarkNumberedContainerIteratorCaching(b *testing.B) { } // TODO(sumeer): -// - Randomized correctness test comparing the rows returned by -// DiskBacked{Numbered,Indexed}RowContainer. // - Benchmarks: // - de-duping with and without spilling. -// - different batch sizes for the left side. diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index f534ae7040c0..3452a5e6457d 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -215,9 +215,8 @@ func (jr *joinReader) initJoinReaderStrategy( // Initialize memory monitors and row container for looked up rows. jr.MemMonitor = execinfra.NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, "joiner-limited") jr.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk") - drc := rowcontainer.NewDiskBackedIndexedRowContainer( - // TODO(asubiotto): Does a nil ordering make sense in all cases? - nil, /* ordering */ + drc := rowcontainer.NewDiskBackedNumberedRowContainer( + false, /* deDup */ typs, jr.EvalCtx, jr.FlowCtx.Cfg.TempStorage, @@ -411,6 +410,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet } } log.VEvent(jr.Ctx, 1, "done joining rows") + jr.strategy.prepareToEmit(jr.Ctx) return jrEmittingRows, nil } diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index 318152e3fdcb..1af73c06a509 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -104,6 +104,9 @@ type joinReaderStrategy interface { // unsupported, but if an error is returned, the joinReader will transition // to draining. processLookedUpRow(ctx context.Context, row sqlbase.EncDatumRow, key roachpb.Key) (joinReaderState, error) + // prepareToEmit informs the strategy implementation that all looked up rows + // have been read, and that it should prepare for calls to nextRowToEmit. + prepareToEmit(ctx context.Context) // nextRowToEmit gets the next row to emit from the strategy. An accompanying // joinReaderState is also returned, indicating a state to transition to after // emitting this row. A transition to jrStateUnknown is unsupported, but if an @@ -193,6 +196,8 @@ func (s *joinReaderNoOrderingStrategy) processLookedUpRow( return jrEmittingRows, nil } +func (s *joinReaderNoOrderingStrategy) prepareToEmit(ctx context.Context) {} + func (s *joinReaderNoOrderingStrategy) nextRowToEmit( _ context.Context, ) (sqlbase.EncDatumRow, joinReaderState, error) { @@ -291,7 +296,7 @@ type joinReaderOrderingStrategy struct { inputRowIdxToLookedUpRowIndices [][]int lookedUpRowIdx int - lookedUpRows rowcontainer.IndexedRowContainer + lookedUpRows *rowcontainer.DiskBackedNumberedRowContainer // emitCursor contains information about where the next row to emit is within // inputRowIdxToLookedUpRowIndices. @@ -345,7 +350,7 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( row[i].Datum = tree.DNull } } - if err := s.lookedUpRows.AddRow(ctx, row); err != nil { + if _, err := s.lookedUpRows.AddRow(ctx, row); err != nil { return jrStateUnknown, err } } @@ -379,6 +384,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( return jrPerformingLookup, nil } +func (s *joinReaderOrderingStrategy) prepareToEmit(ctx context.Context) { + if !s.isPartialJoin { + s.lookedUpRows.SetupForRead(ctx, s.inputRowIdxToLookedUpRowIndices) + } +} + func (s *joinReaderOrderingStrategy) nextRowToEmit( ctx context.Context, ) (sqlbase.EncDatumRow, joinReaderState, error) { @@ -433,11 +444,11 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( return nil, jrEmittingRows, nil } - lookedUpRow, err := s.lookedUpRows.GetRow(s.Ctx, lookedUpRowIdx) + lookedUpRow, err := s.lookedUpRows.GetRow(s.Ctx, lookedUpRowIdx, false /* skip */) if err != nil { return nil, jrStateUnknown, err } - outputRow, err := s.render(inputRow, lookedUpRow.(rowcontainer.IndexedRow).Row) + outputRow, err := s.render(inputRow, lookedUpRow) if err != nil { return nil, jrStateUnknown, err } @@ -448,7 +459,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( } func (s *joinReaderOrderingStrategy) spilled() bool { - return s.lookedUpRows.(*rowcontainer.DiskBackedIndexedRowContainer).Spilled() + return s.lookedUpRows.Spilled() } func (s *joinReaderOrderingStrategy) close(ctx context.Context) { diff --git a/pkg/sql/rowexec/joinreader_test.go b/pkg/sql/rowexec/joinreader_test.go index d728e6bf5333..f6d1864f540c 100644 --- a/pkg/sql/rowexec/joinreader_test.go +++ b/pkg/sql/rowexec/joinreader_test.go @@ -839,94 +839,112 @@ func BenchmarkJoinReader(b *testing.B) { for _, reqOrdering := range []bool{true, false} { for columnIdx, columnDef := range rightSideColumnDefs { for _, numLookupRows := range []int{1, 1 << 4 /* 16 */, 1 << 8 /* 256 */, 1 << 10 /* 1024 */, 1 << 12 /* 4096 */, 1 << 13 /* 8192 */, 1 << 14 /* 16384 */, 1 << 15 /* 32768 */, 1 << 16 /* 65,536 */, 1 << 19 /* 524,288 */} { - if rightSz/columnDef.matchesPerLookupRow < numLookupRows { - // This case does not make sense since we won't have distinct lookup - // rows. We don't currently merge spans which could make this an - // interesting case to benchmark, but we probably should. - continue - } + for _, memoryLimit := range []int64{100 << 10, math.MaxInt64} { + memoryLimitStr := "mem=unlimited" + if memoryLimit != math.MaxInt64 { + if !reqOrdering { + // Smaller memory limit is not relevant when there is no ordering. + continue + } + memoryLimitStr = fmt.Sprintf("mem=%dKB", memoryLimit/(1<<10)) + // The benchmark workloads are such that each right row never joins + // with more than one left row. And the access pattern of right rows + // accessed across all the left rows is monotonically increasing. So + // once spilled to disk, the reads will always need to get from disk + // (caching cannot improve performance). + // + // TODO(sumeer): add workload that can benefit from caching. + } + if rightSz/columnDef.matchesPerLookupRow < numLookupRows { + // This case does not make sense since we won't have distinct lookup + // rows. We don't currently merge spans which could make this an + // interesting case to benchmark, but we probably should. + continue + } - eqColsAreKey := []bool{false} - if numLookupRows == 1 { - // For this case, execute the parallel lookup case as well. - eqColsAreKey = []bool{true, false} - } - for _, parallel := range eqColsAreKey { - benchmarkName := fmt.Sprintf("reqOrdering=%t/matchratio=oneto%s/lookuprows=%d", reqOrdering, columnDef.name, numLookupRows) - if parallel { - benchmarkName += "/parallel=true" + eqColsAreKey := []bool{false} + if numLookupRows == 1 { + // For this case, execute the parallel lookup case as well. + eqColsAreKey = []bool{true, false} } - b.Run(benchmarkName, func(b *testing.B) { - tableName := tableSizeToName(rightSz) - - // Get the table descriptor and find the index that will provide us with - // the expected match ratio. - tableDesc := sqlbase.GetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", tableName) - indexIdx := uint32(0) - for i := range tableDesc.Indexes { - require.Equal(b, 1, len(tableDesc.Indexes[i].ColumnNames), "all indexes created in this benchmark should only contain one column") - if tableDesc.Indexes[i].ColumnNames[0] == columnDef.name { - // Found indexIdx. - indexIdx = uint32(i + 1) - break - } - } - if indexIdx == 0 { - b.Fatalf("failed to find secondary index for column %s", columnDef.name) - } - input := newRowGeneratingSource(sqlbase.OneIntCol, sqlutils.ToRowFn(func(rowIdx int) tree.Datum { - // Convert to 0-based. - return tree.NewDInt(tree.DInt(rowIdx - 1)) - }), numLookupRows) - output := rowDisposer{} - - spec := execinfrapb.JoinReaderSpec{ - Table: *tableDesc, - LookupColumns: []uint32{0}, - LookupColumnsAreKey: parallel, - IndexIdx: indexIdx, - MaintainOrdering: reqOrdering, + for _, parallel := range eqColsAreKey { + benchmarkName := fmt.Sprintf("reqOrdering=%t/matchratio=oneto%s/lookuprows=%d/%s", + reqOrdering, columnDef.name, numLookupRows, memoryLimitStr) + if parallel { + benchmarkName += "/parallel=true" } - // Post specifies that only the columns contained in the secondary index - // need to be output. - post := execinfrapb.PostProcessSpec{ - Projection: true, - OutputColumns: []uint32{uint32(columnIdx + 1)}, - } - - expectedNumOutputRows := numLookupRows * columnDef.matchesPerLookupRow - b.ResetTimer() - // The number of bytes processed in this benchmark is the number of - // lookup bytes processed + the number of result bytes. We only look - // up using a single int column and the request only a single int column - // contained in the index. - b.SetBytes(int64((numLookupRows * 8) + (expectedNumOutputRows * 8))) - - spilled := false - for i := 0; i < b.N; i++ { - jr, err := newJoinReader(&flowCtx, 0 /* processorID */, &spec, input, &post, &output) - if err != nil { - b.Fatal(err) + b.Run(benchmarkName, func(b *testing.B) { + tableName := tableSizeToName(rightSz) + + // Get the table descriptor and find the index that will provide us with + // the expected match ratio. + tableDesc := sqlbase.GetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", tableName) + indexIdx := uint32(0) + for i := range tableDesc.Indexes { + require.Equal(b, 1, len(tableDesc.Indexes[i].ColumnNames), "all indexes created in this benchmark should only contain one column") + if tableDesc.Indexes[i].ColumnNames[0] == columnDef.name { + // Found indexIdx. + indexIdx = uint32(i + 1) + break + } } - jr.Run(ctx) - if !spilled && jr.(*joinReader).Spilled() { - spilled = true + if indexIdx == 0 { + b.Fatalf("failed to find secondary index for column %s", columnDef.name) } - meta := output.DrainMeta(ctx) - if meta != nil { - b.Fatalf("unexpected metadata: %v", meta) + input := newRowGeneratingSource(sqlbase.OneIntCol, sqlutils.ToRowFn(func(rowIdx int) tree.Datum { + // Convert to 0-based. + return tree.NewDInt(tree.DInt(rowIdx - 1)) + }), numLookupRows) + output := rowDisposer{} + + spec := execinfrapb.JoinReaderSpec{ + Table: *tableDesc, + LookupColumns: []uint32{0}, + LookupColumnsAreKey: parallel, + IndexIdx: indexIdx, + MaintainOrdering: reqOrdering, } - if output.NumRowsDisposed() != expectedNumOutputRows { - b.Fatalf("got %d output rows, expected %d", output.NumRowsDisposed(), expectedNumOutputRows) + // Post specifies that only the columns contained in the secondary index + // need to be output. + post := execinfrapb.PostProcessSpec{ + Projection: true, + OutputColumns: []uint32{uint32(columnIdx + 1)}, } - output.ResetNumRowsDisposed() - input.Reset() - } - if spilled { - b.Log("joinReader spilled to disk in at least one of the benchmark iterations") - } - }) + expectedNumOutputRows := numLookupRows * columnDef.matchesPerLookupRow + b.ResetTimer() + // The number of bytes processed in this benchmark is the number of + // lookup bytes processed + the number of result bytes. We only look + // up using a single int column and the request only a single int column + // contained in the index. + b.SetBytes(int64((numLookupRows * 8) + (expectedNumOutputRows * 8))) + + spilled := false + for i := 0; i < b.N; i++ { + flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit + jr, err := newJoinReader(&flowCtx, 0 /* processorID */, &spec, input, &post, &output) + if err != nil { + b.Fatal(err) + } + jr.Run(ctx) + if !spilled && jr.(*joinReader).Spilled() { + spilled = true + } + meta := output.DrainMeta(ctx) + if meta != nil { + b.Fatalf("unexpected metadata: %v", meta) + } + if output.NumRowsDisposed() != expectedNumOutputRows { + b.Fatalf("got %d output rows, expected %d", output.NumRowsDisposed(), expectedNumOutputRows) + } + output.ResetNumRowsDisposed() + input.Reset() + } + if spilled { + b.Log("joinReader spilled to disk in at least one of the benchmark iterations") + } + }) + } } } }