Skip to content

Commit

Permalink
row: release memory more eagerly in txnKVFetcher
Browse files Browse the repository at this point in the history
Previously, we could leave some dangling references to batch responses
around in the txnKVFetcher when we were fetching more than one batch at
a time. This would cause a delay in reclamation of memory for the
lifetime of a given query.

Release note (bug fix): use less memory in some queries, primarily
lookup joins.
  • Loading branch information
jordanlewis committed Jun 3, 2021
1 parent 5c00812 commit d6d394b
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,35 +516,42 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
return nil
}

// popBatch returns the 0th byte slice in a slice of byte slices, as well as
// the rest of the slice of the byte slices. It nils the pointer to the 0th
// element before reslicing the outer slice.
func popBatch(batches [][]byte) (batch []byte, remainingBatches [][]byte) {
batch, remainingBatches = batches[0], batches[1:]
batches[0] = nil
return batch, remainingBatches
}

// nextBatch returns the next batch of key/value pairs. If there are none
// available, a fetch is initiated. When there are no more keys, ok is false.
// origSpan returns the span that batch was fetched from, and bounds all of the
// keys returned.
func (f *txnKVFetcher) nextBatch(
ctx context.Context,
) (ok bool, kvs []roachpb.KeyValue, batchResponse []byte, origSpan roachpb.Span, err error) {
) (ok bool, kvs []roachpb.KeyValue, batchResp []byte, origSpan roachpb.Span, err error) {
if len(f.remainingBatches) > 0 {
batch := f.remainingBatches[0]
f.remainingBatches = f.remainingBatches[1:]
return true, nil, batch, f.origSpan, nil
batchResp, f.remainingBatches = popBatch(f.remainingBatches)
return true, nil, batchResp, f.origSpan, nil
}
for len(f.responses) > 0 {
reply := f.responses[0].GetInner()
f.responses[0] = roachpb.ResponseUnion{}
f.responses = f.responses[1:]
origSpan := f.requestSpans[0]
f.requestSpans[0] = roachpb.Span{}
f.requestSpans = f.requestSpans[1:]
var batchResp []byte
switch t := reply.(type) {
case *roachpb.ScanResponse:
if len(t.BatchResponses) > 0 {
batchResp = t.BatchResponses[0]
f.remainingBatches = t.BatchResponses[1:]
batchResp, f.remainingBatches = popBatch(t.BatchResponses)
}
return true, t.Rows, batchResp, origSpan, nil
case *roachpb.ReverseScanResponse:
if len(t.BatchResponses) > 0 {
batchResp = t.BatchResponses[0]
f.remainingBatches = t.BatchResponses[1:]
batchResp, f.remainingBatches = popBatch(t.BatchResponses)
}
return true, t.Rows, batchResp, origSpan, nil
case *roachpb.GetResponse:
Expand All @@ -571,5 +578,7 @@ func (f *txnKVFetcher) nextBatch(

// close releases the resources of this txnKVFetcher.
func (f *txnKVFetcher) close(ctx context.Context) {
f.responses = nil
f.remainingBatches = nil
f.acc.Close(ctx)
}

0 comments on commit d6d394b

Please sign in to comment.