Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowcontainer,rowexec: switch joinReader to use DiskBackedNumberedRowC… #49669

Merged
merged 1 commit into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 83 additions & 27 deletions pkg/sql/rowcontainer/numbered_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"math"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -38,7 +39,10 @@ type DiskBackedNumberedRowContainer struct {
storedTypes []*types.T
idx int // the index of the next row to be added into the container

rowIter *numberedDiskRowIterator
rowIter *numberedDiskRowIterator
// cacheMap is a map used in the implementation of rowIter that is kept
// in the container to avoid repeated allocation.
cacheMap map[int]*cacheElement
rowIterMemAcc mon.BoundAccount
DisableCache bool
}
Expand Down Expand Up @@ -95,6 +99,12 @@ func (d *DiskBackedNumberedRowContainer) UsingDisk() bool {
return d.rc.UsingDisk()
}

// Spilled returns whether or not the primary container spilled to disk in its
// lifetime.
func (d *DiskBackedNumberedRowContainer) Spilled() bool {
return d.rc.Spilled()
}

// testingSpillToDisk is for tests to spill the container(s)
// to disk.
func (d *DiskBackedNumberedRowContainer) testingSpillToDisk(ctx context.Context) error {
Expand Down Expand Up @@ -161,8 +171,11 @@ func (d *DiskBackedNumberedRowContainer) SetupForRead(ctx context.Context, acces
// This is not an efficient way to disable the cache, but ok for tests.
cacheSize = 0
}
if d.cacheMap == nil {
d.cacheMap = make(map[int]*cacheElement)
}
d.rowIter = newNumberedDiskRowIterator(
ctx, rowIter, accesses, meanRowsPerSSBlock, cacheSize, &d.rowIterMemAcc)
ctx, rowIter, accesses, meanRowsPerSSBlock, cacheSize, d.cacheMap, &d.rowIterMemAcc)
}

// GetRow returns a row with the given index. If skip is true the row is not
Expand Down Expand Up @@ -261,7 +274,6 @@ func (d *DiskBackedNumberedRowContainer) Close(ctx context.Context) {
// cost of a cache miss is high.
//
// TODO(sumeer):
// - Before integrating with joinReader, try some realistic benchmarks.
// - Use some realistic inverted index workloads (including geospatial) to
// measure the effect of this cache.
type numberedDiskRowIterator struct {
Expand Down Expand Up @@ -305,6 +317,26 @@ type cacheElement struct {
row sqlbase.EncDatumRow
// When row is non-nil, this is the element in the heap.
heapElement cacheRowHeapElement
// Used only when initializing accesses, so that we can allocate a single
// shared slice for accesses across all cacheElements.
numAccesses int
}

var cacheElementSyncPool = sync.Pool{
New: func() interface{} {
return &cacheElement{}
},
}

func freeCacheElement(elem *cacheElement) {
elem.accesses = nil
elem.row = nil
elem.numAccesses = 0
cacheElementSyncPool.Put(elem)
}

func newCacheElement() *cacheElement {
return cacheElementSyncPool.Get().(*cacheElement)
}

type cacheRowHeapElement struct {
Expand Down Expand Up @@ -349,24 +381,39 @@ func newNumberedDiskRowIterator(
accesses [][]int,
meanRowsPerSSBlock int,
maxCacheSize int,
cache map[int]*cacheElement,
memAcc *mon.BoundAccount,
) *numberedDiskRowIterator {
n := &numberedDiskRowIterator{
rowIter: rowIter,
meanRowsPerSSBlock: meanRowsPerSSBlock,
maxCacheSize: maxCacheSize,
memAcc: memAcc,
cache: make(map[int]*cacheElement, maxCacheSize),
cache: cache,
}
var accessIdx int
var numAccesses int
for _, accSlice := range accesses {
for _, rowIdx := range accSlice {
elem := n.cache[rowIdx]
if elem == nil {
elem = &cacheElement{}
elem = newCacheElement()
elem.heapElement.rowIdx = rowIdx
n.cache[rowIdx] = elem
}
elem.numAccesses++
numAccesses++
}
}
allAccesses := make([]int, numAccesses)
accessIdx := 0
for _, accSlice := range accesses {
for _, rowIdx := range accSlice {
elem := n.cache[rowIdx]
if elem.accesses == nil {
// Sub-slice that can grow up to elem.numAccesses
elem.accesses = allAccesses[0:0:elem.numAccesses]
allAccesses = allAccesses[elem.numAccesses:]
}
elem.accesses = append(elem.accesses, accessIdx)
accessIdx++
}
Expand All @@ -376,6 +423,10 @@ func newNumberedDiskRowIterator(

func (n *numberedDiskRowIterator) close() {
n.rowIter.Close()
for k, v := range n.cache {
freeCacheElement(v)
delete(n.cache, k)
}
}

func (n *numberedDiskRowIterator) getRow(
Expand Down Expand Up @@ -471,15 +522,13 @@ func (n *numberedDiskRowIterator) getRow(
return n.tryAddCacheAndReturnRow(ctx, elem)
}

func (n *numberedDiskRowIterator) ensureDecodedAndCopy(
row sqlbase.EncDatumRow,
) (sqlbase.EncDatumRow, error) {
func (n *numberedDiskRowIterator) ensureDecoded(row sqlbase.EncDatumRow) error {
for i := range row {
if err := row[i].EnsureDecoded(n.rowIter.rowContainer.types[i], &n.datumAlloc); err != nil {
return nil, err
return err
}
}
return n.rowAlloc.CopyRow(row), nil
return nil
}

func (n *numberedDiskRowIterator) tryAddCacheAndReturnRow(
Expand All @@ -489,11 +538,13 @@ func (n *numberedDiskRowIterator) tryAddCacheAndReturnRow(
if err != nil {
return nil, err
}
row, err := n.ensureDecodedAndCopy(r)
if err != nil {
if err = n.ensureDecoded(r); err != nil {
return nil, err
}
return row, n.tryAddCacheHelper(ctx, elem, row, true)
if len(elem.accesses) == 0 {
return r, nil
}
return r, n.tryAddCacheHelper(ctx, elem, r, true)
}

func (n *numberedDiskRowIterator) tryAddCache(ctx context.Context, elem *cacheElement) error {
Expand All @@ -514,36 +565,36 @@ func (n *numberedDiskRowIterator) tryAddCache(ctx context.Context, elem *cacheEl
}

func (n *numberedDiskRowIterator) tryAddCacheHelper(
ctx context.Context, elem *cacheElement, row sqlbase.EncDatumRow, alreadyDecodedAndCopied bool,
ctx context.Context, elem *cacheElement, row sqlbase.EncDatumRow, alreadyDecoded bool,
) error {
if len(elem.accesses) == 0 {
return nil
}
if elem.row != nil {
log.Fatalf(ctx, "adding row to cache when it is already in cache")
}
nextAccess := elem.accesses[0]
evict := func() error {
evict := func() (sqlbase.EncDatumRow, error) {
heapElem := heap.Pop(&n.cacheHeap).(*cacheRowHeapElement)
evictElem, ok := n.cache[heapElem.rowIdx]
if !ok {
return errors.Errorf("bug: element not in cache map")
return nil, errors.Errorf("bug: element not in cache map")
}
bytes := evictElem.row.Size()
n.memAcc.Shrink(ctx, int64(bytes))
evictedRow := evictElem.row
evictElem.row = nil
return nil
return evictedRow, nil
}
rowBytesUsage := -1
var rowToReuse sqlbase.EncDatumRow
for {
if n.maxCacheSize == 0 {
return nil
}
if len(n.cacheHeap) == n.maxCacheSize && n.cacheHeap[0].nextAccess <= nextAccess {
return nil
}
var err error
if len(n.cacheHeap) >= n.maxCacheSize {
if err := evict(); err != nil {
if rowToReuse, err = evict(); err != nil {
return err
}
continue
Expand All @@ -553,13 +604,12 @@ func (n *numberedDiskRowIterator) tryAddCacheHelper(
// many rows memAcc will allow us to place in the cache. So it is likely
// that this row can be added. Decode the row to get the correct
// rowBytesUsage.
if !alreadyDecodedAndCopied {
var err error
row, err = n.ensureDecodedAndCopy(row)
if !alreadyDecoded {
err = n.ensureDecoded(row)
if err != nil {
return err
}
alreadyDecodedAndCopied = true
alreadyDecoded = true
}
if rowBytesUsage == -1 {
rowBytesUsage = int(row.Size())
Expand All @@ -583,7 +633,13 @@ func (n *numberedDiskRowIterator) tryAddCacheHelper(
}
// Add to cache.
elem.heapElement.nextAccess = nextAccess
elem.row = row
// Need to copy row, since its lifetime is less than the cached row.
if rowToReuse == nil {
elem.row = n.rowAlloc.CopyRow(row)
} else {
copy(rowToReuse, row)
elem.row = rowToReuse
}
heap.Push(&n.cacheHeap, &elem.heapElement)
return nil
}
Loading