Skip to content

Commit

Permalink
rowcontainer,rowexec: switch joinReader to use DiskBackedNumberedRowC…
Browse files Browse the repository at this point in the history
…ontainer

Additionally,
- added a randomized correctness test that compares the results of the indexed
  and numbered containers.
- added benchmark cases to the joinReader benchmark that limit memory. None of
  the workloads have repeated reads of the same right row and all access the
  right rows in monotonically increasing order so the difference between the
  two containers is due to the numbered container avoiding the overhead of
  populating the cache.
- reduced the number of allocations in newNumberedDiskRowIterator.
  - the accesses slices share the same underlying slice.
  - a row copy, when there is a miss and the row is not added to the cache, is
    eliminated. When a copy is needed and we have evicted a row from the cache,
    the copying reuses that evicted row.
  - allocations of the map, map elements are reused.

Fixes #48118

Release note: None
  • Loading branch information
sumeerbhola committed Jun 1, 2020
1 parent 948ce76 commit f39ae24
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 132 deletions.
110 changes: 83 additions & 27 deletions pkg/sql/rowcontainer/numbered_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"math"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -38,7 +39,10 @@ type DiskBackedNumberedRowContainer struct {
storedTypes []*types.T
idx int // the index of the next row to be added into the container

rowIter *numberedDiskRowIterator
rowIter *numberedDiskRowIterator
// cacheMap is a map used in the implementation of rowIter that is kept
// in the container to avoid repeated allocation.
cacheMap map[int]*cacheElement
rowIterMemAcc mon.BoundAccount
DisableCache bool
}
Expand Down Expand Up @@ -95,6 +99,12 @@ func (d *DiskBackedNumberedRowContainer) UsingDisk() bool {
return d.rc.UsingDisk()
}

// Spilled returns whether or not the primary container spilled to disk in its
// lifetime.
func (d *DiskBackedNumberedRowContainer) Spilled() bool {
return d.rc.Spilled()
}

// testingSpillToDisk is for tests to spill the container(s)
// to disk.
func (d *DiskBackedNumberedRowContainer) testingSpillToDisk(ctx context.Context) error {
Expand Down Expand Up @@ -161,8 +171,11 @@ func (d *DiskBackedNumberedRowContainer) SetupForRead(ctx context.Context, acces
// This is not an efficient way to disable the cache, but ok for tests.
cacheSize = 0
}
if d.cacheMap == nil {
d.cacheMap = make(map[int]*cacheElement)
}
d.rowIter = newNumberedDiskRowIterator(
ctx, rowIter, accesses, meanRowsPerSSBlock, cacheSize, &d.rowIterMemAcc)
ctx, rowIter, accesses, meanRowsPerSSBlock, cacheSize, d.cacheMap, &d.rowIterMemAcc)
}

// GetRow returns a row with the given index. If skip is true the row is not
Expand Down Expand Up @@ -261,7 +274,6 @@ func (d *DiskBackedNumberedRowContainer) Close(ctx context.Context) {
// cost of a cache miss is high.
//
// TODO(sumeer):
// - Before integrating with joinReader, try some realistic benchmarks.
// - Use some realistic inverted index workloads (including geospatial) to
// measure the effect of this cache.
type numberedDiskRowIterator struct {
Expand Down Expand Up @@ -305,6 +317,26 @@ type cacheElement struct {
row sqlbase.EncDatumRow
// When row is non-nil, this is the element in the heap.
heapElement cacheRowHeapElement
// Used only when initializing accesses, so that we can allocate a single
// shared slice for accesses across all cacheElements.
numAccesses int
}

var cacheElementSyncPool = sync.Pool{
New: func() interface{} {
return &cacheElement{}
},
}

func freeCacheElement(elem *cacheElement) {
elem.accesses = nil
elem.row = nil
elem.numAccesses = 0
cacheElementSyncPool.Put(elem)
}

func newCacheElement() *cacheElement {
return cacheElementSyncPool.Get().(*cacheElement)
}

type cacheRowHeapElement struct {
Expand Down Expand Up @@ -349,24 +381,39 @@ func newNumberedDiskRowIterator(
accesses [][]int,
meanRowsPerSSBlock int,
maxCacheSize int,
cache map[int]*cacheElement,
memAcc *mon.BoundAccount,
) *numberedDiskRowIterator {
n := &numberedDiskRowIterator{
rowIter: rowIter,
meanRowsPerSSBlock: meanRowsPerSSBlock,
maxCacheSize: maxCacheSize,
memAcc: memAcc,
cache: make(map[int]*cacheElement, maxCacheSize),
cache: cache,
}
var accessIdx int
var numAccesses int
for _, accSlice := range accesses {
for _, rowIdx := range accSlice {
elem := n.cache[rowIdx]
if elem == nil {
elem = &cacheElement{}
elem = newCacheElement()
elem.heapElement.rowIdx = rowIdx
n.cache[rowIdx] = elem
}
elem.numAccesses++
numAccesses++
}
}
allAccesses := make([]int, numAccesses)
accessIdx := 0
for _, accSlice := range accesses {
for _, rowIdx := range accSlice {
elem := n.cache[rowIdx]
if elem.accesses == nil {
// Sub-slice that can grow up to elem.numAccesses
elem.accesses = allAccesses[0:0:elem.numAccesses]
allAccesses = allAccesses[elem.numAccesses:]
}
elem.accesses = append(elem.accesses, accessIdx)
accessIdx++
}
Expand All @@ -376,6 +423,10 @@ func newNumberedDiskRowIterator(

func (n *numberedDiskRowIterator) close() {
n.rowIter.Close()
for k, v := range n.cache {
freeCacheElement(v)
delete(n.cache, k)
}
}

func (n *numberedDiskRowIterator) getRow(
Expand Down Expand Up @@ -471,15 +522,13 @@ func (n *numberedDiskRowIterator) getRow(
return n.tryAddCacheAndReturnRow(ctx, elem)
}

func (n *numberedDiskRowIterator) ensureDecodedAndCopy(
row sqlbase.EncDatumRow,
) (sqlbase.EncDatumRow, error) {
func (n *numberedDiskRowIterator) ensureDecoded(row sqlbase.EncDatumRow) error {
for i := range row {
if err := row[i].EnsureDecoded(n.rowIter.rowContainer.types[i], &n.datumAlloc); err != nil {
return nil, err
return err
}
}
return n.rowAlloc.CopyRow(row), nil
return nil
}

func (n *numberedDiskRowIterator) tryAddCacheAndReturnRow(
Expand All @@ -489,11 +538,13 @@ func (n *numberedDiskRowIterator) tryAddCacheAndReturnRow(
if err != nil {
return nil, err
}
row, err := n.ensureDecodedAndCopy(r)
if err != nil {
if err = n.ensureDecoded(r); err != nil {
return nil, err
}
return row, n.tryAddCacheHelper(ctx, elem, row, true)
if len(elem.accesses) == 0 {
return r, nil
}
return r, n.tryAddCacheHelper(ctx, elem, r, true)
}

func (n *numberedDiskRowIterator) tryAddCache(ctx context.Context, elem *cacheElement) error {
Expand All @@ -514,36 +565,36 @@ func (n *numberedDiskRowIterator) tryAddCache(ctx context.Context, elem *cacheEl
}

func (n *numberedDiskRowIterator) tryAddCacheHelper(
ctx context.Context, elem *cacheElement, row sqlbase.EncDatumRow, alreadyDecodedAndCopied bool,
ctx context.Context, elem *cacheElement, row sqlbase.EncDatumRow, alreadyDecoded bool,
) error {
if len(elem.accesses) == 0 {
return nil
}
if elem.row != nil {
log.Fatalf(ctx, "adding row to cache when it is already in cache")
}
nextAccess := elem.accesses[0]
evict := func() error {
evict := func() (sqlbase.EncDatumRow, error) {
heapElem := heap.Pop(&n.cacheHeap).(*cacheRowHeapElement)
evictElem, ok := n.cache[heapElem.rowIdx]
if !ok {
return errors.Errorf("bug: element not in cache map")
return nil, errors.Errorf("bug: element not in cache map")
}
bytes := evictElem.row.Size()
n.memAcc.Shrink(ctx, int64(bytes))
evictedRow := evictElem.row
evictElem.row = nil
return nil
return evictedRow, nil
}
rowBytesUsage := -1
var rowToReuse sqlbase.EncDatumRow
for {
if n.maxCacheSize == 0 {
return nil
}
if len(n.cacheHeap) == n.maxCacheSize && n.cacheHeap[0].nextAccess <= nextAccess {
return nil
}
var err error
if len(n.cacheHeap) >= n.maxCacheSize {
if err := evict(); err != nil {
if rowToReuse, err = evict(); err != nil {
return err
}
continue
Expand All @@ -553,13 +604,12 @@ func (n *numberedDiskRowIterator) tryAddCacheHelper(
// many rows memAcc will allow us to place in the cache. So it is likely
// that this row can be added. Decode the row to get the correct
// rowBytesUsage.
if !alreadyDecodedAndCopied {
var err error
row, err = n.ensureDecodedAndCopy(row)
if !alreadyDecoded {
err = n.ensureDecoded(row)
if err != nil {
return err
}
alreadyDecodedAndCopied = true
alreadyDecoded = true
}
if rowBytesUsage == -1 {
rowBytesUsage = int(row.Size())
Expand All @@ -583,7 +633,13 @@ func (n *numberedDiskRowIterator) tryAddCacheHelper(
}
// Add to cache.
elem.heapElement.nextAccess = nextAccess
elem.row = row
// Need to copy row, since its lifetime is less than the cached row.
if rowToReuse == nil {
elem.row = n.rowAlloc.CopyRow(row)
} else {
copy(rowToReuse, row)
elem.row = rowToReuse
}
heap.Push(&n.cacheHeap, &elem.heapElement)
return nil
}
Loading

0 comments on commit f39ae24

Please sign in to comment.