diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index 297551431773..0a5ed95a6ec7 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -147,6 +147,22 @@ func (ro *routerOutput) addRowLocked(ctx context.Context, row rowenc.EncDatumRow func (ro *routerOutput) popRowsLocked(ctx context.Context) ([]rowenc.EncDatumRow, error) { n := 0 + // addToRowBufToPushFrom adds row to nth position in rowBufToPushFrom. row + // *must* be safe from further modifications. + addToRowBufToPushFrom := func(row rowenc.EncDatumRow) error { + // We're reusing the same rowBufToPushFrom slice, so we can only + // release the memory under the "old" row once we overwrite it in + // rowBufToPushFrom which we're about to do for rowBufToPushFrom[n]. + prevSize := uintptr(0) + if ro.rowBufToPushFrom[n] != nil { + prevSize = ro.rowBufToPushFrom[n].Size() + } + if err := ro.rowBufToPushFromAcc.Grow(ctx, int64(row.Size()-prevSize)); err != nil { + return err + } + ro.rowBufToPushFrom[n] = row + return nil + } // First try to get rows from the row container. if ro.mu.rowContainer.Len() > 0 { if err := func() error { @@ -162,18 +178,9 @@ func (ro *routerOutput) popRowsLocked(ctx context.Context) ([]rowenc.EncDatumRow if err != nil { return err } - // We're reusing the same rowBufToPushFrom slice, so we can - // only release the memory under the "old" row once we - // overwrite it in rowBufToPushFrom which we're about to do for - // rowBufToPushFrom[n]. - prevSize := uintptr(0) - if ro.rowBufToPushFrom[n] != nil { - prevSize = ro.rowBufToPushFrom[n].Size() - } - if err := ro.rowBufToPushFromAcc.Grow(ctx, int64(row.Size()-prevSize)); err != nil { + if err = addToRowBufToPushFrom(ro.rowAlloc.CopyRow(row)); err != nil { return err } - ro.rowBufToPushFrom[n] = ro.rowAlloc.CopyRow(row) n++ } return nil @@ -184,7 +191,9 @@ func (ro *routerOutput) popRowsLocked(ctx context.Context) ([]rowenc.EncDatumRow // If the row container is empty, get more rows from the row buffer. for ; n < len(ro.rowBufToPushFrom) && ro.mu.rowBufLen > 0; n++ { - ro.rowBufToPushFrom[n] = ro.mu.rowBuf[ro.mu.rowBufLeft] + if err := addToRowBufToPushFrom(ro.mu.rowBuf[ro.mu.rowBufLeft]); err != nil { + return nil, err + } ro.mu.rowBufLeft = (ro.mu.rowBufLeft + 1) % routerRowBufSize ro.mu.rowBufLen-- }