Skip to content

Commit

Permalink
rowflow: account for more memory usage of the row buffer
Browse files Browse the repository at this point in the history
Previously, we accounted for the memory usage of the row buffer from
which the rows are pushed from in the routers only when copying the rows
from the row container, but we also have another in-memory row buffer
that we can move the rows from which was missing the memory accounting.
This is now fixed which also deflakes the router disk spill test.

Release note: None
  • Loading branch information
yuzefovich committed Oct 27, 2020
1 parent e1709ec commit 3557b32
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions pkg/sql/rowflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ func (ro *routerOutput) addRowLocked(ctx context.Context, row rowenc.EncDatumRow

func (ro *routerOutput) popRowsLocked(ctx context.Context) ([]rowenc.EncDatumRow, error) {
n := 0
// addToRowBufToPushFrom adds row to nth position in rowBufToPushFrom. row
// *must* be safe from further modifications.
addToRowBufToPushFrom := func(row rowenc.EncDatumRow) error {
// We're reusing the same rowBufToPushFrom slice, so we can only
// release the memory under the "old" row once we overwrite it in
// rowBufToPushFrom which we're about to do for rowBufToPushFrom[n].
prevSize := uintptr(0)
if ro.rowBufToPushFrom[n] != nil {
prevSize = ro.rowBufToPushFrom[n].Size()
}
if err := ro.rowBufToPushFromAcc.Grow(ctx, int64(row.Size()-prevSize)); err != nil {
return err
}
ro.rowBufToPushFrom[n] = row
return nil
}
// First try to get rows from the row container.
if ro.mu.rowContainer.Len() > 0 {
if err := func() error {
Expand All @@ -162,18 +178,9 @@ func (ro *routerOutput) popRowsLocked(ctx context.Context) ([]rowenc.EncDatumRow
if err != nil {
return err
}
// We're reusing the same rowBufToPushFrom slice, so we can
// only release the memory under the "old" row once we
// overwrite it in rowBufToPushFrom which we're about to do for
// rowBufToPushFrom[n].
prevSize := uintptr(0)
if ro.rowBufToPushFrom[n] != nil {
prevSize = ro.rowBufToPushFrom[n].Size()
}
if err := ro.rowBufToPushFromAcc.Grow(ctx, int64(row.Size()-prevSize)); err != nil {
if err = addToRowBufToPushFrom(ro.rowAlloc.CopyRow(row)); err != nil {
return err
}
ro.rowBufToPushFrom[n] = ro.rowAlloc.CopyRow(row)
n++
}
return nil
Expand All @@ -184,7 +191,9 @@ func (ro *routerOutput) popRowsLocked(ctx context.Context) ([]rowenc.EncDatumRow

// If the row container is empty, get more rows from the row buffer.
for ; n < len(ro.rowBufToPushFrom) && ro.mu.rowBufLen > 0; n++ {
ro.rowBufToPushFrom[n] = ro.mu.rowBuf[ro.mu.rowBufLeft]
if err := addToRowBufToPushFrom(ro.mu.rowBuf[ro.mu.rowBufLeft]); err != nil {
return nil, err
}
ro.mu.rowBufLeft = (ro.mu.rowBufLeft + 1) % routerRowBufSize
ro.mu.rowBufLen--
}
Expand Down

0 comments on commit 3557b32

Please sign in to comment.