Skip to content

Commit

Permalink
kv,storage: use a BytesMonitor to track memory allocations for scans
Browse files Browse the repository at this point in the history
The goal is to track memory allocations for:
- non-local SQL=>KV requests: this can happen with joins,
  multi-tenant clusters, and if ranges move between DistSQL
  planning and execution.
- local SQL=>KV requests for the first request by a fetcher:
  in this case the fetcher reserves a modest 1KB which can
  be significantly exceeded by KV allocations.

Only allocations in pebbleMVCCScanner for kv pairs and intents are
tracked. The memory is released when returning from
executeReadOnlyBatchWithServersideRefreshes since the chain
of returns will end up in gRPC response processing and we can't
hook into where that memory is released. This should still help
for some cases of OOMs, and give some signal of memory overload
that we can use elsewhere (e.g. admission control).

The BytesMonitor is used to construct a BoundAccount that is
wrapped in a narrower ScannerMemoryMonitor that is passed via the
EvalContext interface. The other alternative would be for the
engine to have a BytesMonitor at initialization time that it
can use to construct a BoundAccount for each MVCC scan, and
pass it back via MVCCScanResult. This would mean multiple
BoundAccounts for a batch (since we don't want to release memory
until all the requests in the batch are processed), and would
be harder to extend to track additional request types compared
to embedding in EvalContext.

The rootSQLMonitor is reused for this memory allocation tracking.
This tracking is always done for non-local requests, and for the
first request by a fetcher for a local request. This is to
avoid double-counting, the first request issued by a SQL fetcher
only reserves 1KB, but subsequent ones have already reserved
what was returned in the first response. So there is room to
tighten this if we knew what had been reserved on the local
client (there are complications because the batch may have
been split to send to different nodes, only one of which was
local).

The AdmissionHeader.SourceLocation field is used to mark local
requests and is set in rpc.internalClientAdapter. The first
request is marked using the
AdmissionHeader.NoMemoryReservedAtSource bit.

Informs #19721

Release note (ops change): The memory pool used for SQL is now
also used to cover KV memory used for scans.
  • Loading branch information
sumeerbhola committed Jul 14, 2021
1 parent 81e8ce0 commit a690006
Show file tree
Hide file tree
Showing 22 changed files with 1,005 additions and 563 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func Get(
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
LocalUncertaintyLimit: cArgs.LocalUncertaintyLimit,
MemoryMonitor: cArgs.EvalCtx.GetResponseMemoryAccount(),
})
if err != nil {
return result.Result{}, err
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestGetResumeSpan(t *testing.T) {
defer db.Close()

_, err := Put(ctx, db, CommandArgs{
Header: roachpb.Header{TargetBytes: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -45,7 +46,8 @@ func TestGetResumeSpan(t *testing.T) {

// Case 1: Check that a negative TargetBytes causes a resume span.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{TargetBytes: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -62,7 +64,8 @@ func TestGetResumeSpan(t *testing.T) {
resp = &roachpb.GetResponse{}
// Case 2: Check that a negative MaxSpanRequestKeys causes a resume span.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{MaxSpanRequestKeys: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{MaxSpanRequestKeys: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -79,7 +82,8 @@ func TestGetResumeSpan(t *testing.T) {
resp = &roachpb.GetResponse{}
// Case 3: Check that a positive limit causes a normal return.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{MaxSpanRequestKeys: 10, TargetBytes: 100},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{MaxSpanRequestKeys: 10, TargetBytes: 100},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func ReverseScan(
TargetBytes: h.TargetBytes,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: true,
MemoryMonitor: cArgs.EvalCtx.GetResponseMemoryAccount(),
}

switch args.ScanFormat {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func Scan(
TargetBytes: h.TargetBytes,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: false,
MemoryMonitor: cArgs.EvalCtx.GetResponseMemoryAccount(),
}

switch args.ScanFormat {
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -127,6 +128,11 @@ type EvalContext interface {
// WatchForMerge arranges to block all requests until the in-progress merge
// completes. Returns an error if no in-progress merge is detected.
WatchForMerge(ctx context.Context) error

// GetResponseMemoryAccount returns a memory account to be used when
// generating BatchResponses. Currently only used for MVCC scans, and only
// initialized to be a real account on those paths.
GetResponseMemoryAccount() storage.ResponseMemoryAccount
}

// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
Expand Down Expand Up @@ -257,3 +263,6 @@ func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequen
func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetResponseMemoryAccount() storage.ResponseMemoryAccount {
return storage.ResponseMemoryAccount{}
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,13 @@ func (r *Replica) markSystemConfigGossipFailed() {
r.mu.failureToGossipSystemConfig = true
}

// GetResponseMemoryAccount implements the batcheval.EvalContext interface.
func (r *Replica) GetResponseMemoryAccount() storage.ResponseMemoryAccount {
// Return an empty account. Places where a real account is needed use a
// wrapper for Replica as the EvalContext.
return storage.ResponseMemoryAccount{}
}

func init() {
tracing.RegisterTagRemapping("r", "range")
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -249,3 +250,8 @@ func (rec *SpanSetReplicaEvalContext) RevokeLease(ctx context.Context, seq roach
func (rec *SpanSetReplicaEvalContext) WatchForMerge(ctx context.Context) error {
return rec.i.WatchForMerge(ctx)
}

// GetResponseMemoryAccount implements the batcheval.EvalContext interface.
func (rec *SpanSetReplicaEvalContext) GetResponseMemoryAccount() storage.ResponseMemoryAccount {
return rec.i.GetResponseMemoryAccount()
}
62 changes: 62 additions & 0 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/kr/pretty"
)

Expand Down Expand Up @@ -180,6 +181,25 @@ func (r *Replica) executeReadOnlyBatch(
return br, nil, pErr
}

// evalContextWithAccount wraps an EvalContext to provide a real
// ResponseMemoryAccount. This wrapping is conditional on various factors, and
// specific to a request (see executeReadOnlyBatchWithServersideRefreshes),
// which is why the implementation of EvalContext by Replica does not by
// default provide a real ResponseMemoryAccount.
//
// If we start using evalContextWithAccount on more code paths we should
// consider using it everywhere and lift it to an earlier point in the code.
// Then code that decides whether or not a real ResponseMemoryAccount can set
// a field instead of wrapping.
type evalContextWithAccount struct {
batcheval.EvalContext
memMonitor storage.ResponseMemoryAccount
}

func (e evalContextWithAccount) GetResponseMemoryAccount() storage.ResponseMemoryAccount {
return e.memMonitor
}

// executeReadOnlyBatchWithServersideRefreshes invokes evaluateBatch and retries
// at a higher timestamp in the event of some retriable errors if allowed by the
// batch/txn.
Expand All @@ -193,8 +213,50 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
log.Event(ctx, "executing read-only batch")

var rootMonitor *mon.BytesMonitor
// Only do memory allocation accounting if the request did not originate
// locally, or for a local request that has reserved no memory. Local
// requests (originating in DistSQL) do memory accounting before issuing the
// request. Even though the accounting for the first request in the caller
// is small (the NoMemoryReservedAtSource=true case), subsequent ones use
// the size of the response for subsequent requests (see row.txnKVFetcher).
// Note that we could additionally add an OR-clause with
// ba.AdmissionHeader.Source != FROM_SQL for the if-block that does memory
// accounting. We don't do that currently since there are some SQL requests
// that are not marked as FROM_SQL.
//
// This whole scheme could be tightened, both in terms of marking, and
// compensating for the amount of memory reserved at the source.
//
// TODO(sumeer): for multi-tenant KV we should be accounting on a per-tenant
// basis and not letting a single tenant consume all the memory (we could
// place a limit equal to total/2).
if ba.AdmissionHeader.SourceLocation != roachpb.AdmissionHeader_LOCAL ||
ba.AdmissionHeader.NoMemoryReservedAtSource {
// rootMonitor will never be nil in production settings, but it can be nil
// for tests that do not have a monitor.
rootMonitor = r.store.getRootMemoryMonitorForKV()
}
var boundAccount mon.BoundAccount
if rootMonitor != nil {
boundAccount = rootMonitor.MakeBoundAccount()
// Memory is not actually released when this function returns, but at
// least the batch is fully evaluated. Ideally we would like to release
// after grpc has sent the response, but there are no interceptors at that
// stage. The interceptors execute before the response is marshaled in
// Server.processUnaryRPC by calling sendResponse.
// We are intentionally not using finalizers because they delay GC and
// because they have had bugs in the past (and can prevent GC of objects
// with cyclic references).
defer boundAccount.Close(ctx)
rec = evalContextWithAccount{
EvalContext: rec, memMonitor: storage.ResponseMemoryAccount{B: &boundAccount}}
}

for retries := 0; ; retries++ {
if retries > 0 {
// It is safe to call Clear on an uninitialized BoundAccount.
boundAccount.Clear(ctx)
log.VEventf(ctx, 2, "server-side retry of batch")
}
br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, lul, true /* readOnly */)
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -723,6 +724,10 @@ type StoreConfig struct {
// subsystem. It is queried during the GC process and in the handling of
// AdminVerifyProtectedTimestampRequest.
ProtectedTimestampCache protectedts.Cache

// KV Memory Monitor. Must be non-nil for production, and can be nil in some
// tests.
KVMemoryMonitor *mon.BytesMonitor
}

// ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the
Expand Down Expand Up @@ -2897,6 +2902,12 @@ func (s *Store) unregisterLeaseholderByID(ctx context.Context, rangeID roachpb.R
}
}

// getRootMemoryMonitorForKV returns a BytesMonitor to use for KV memory
// tracking.
func (s *Store) getRootMemoryMonitorForKV() *mon.BytesMonitor {
return s.cfg.KVMemoryMonitor
}

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key. We only accept a raw engine to ensure we're persisting
// the write durably.
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,10 @@ func (txn *Txn) Send(
}

// Some callers have not initialized ba using a Batch constructed using
// Txn.NewBatch. So we fallback to initializing here.
// Txn.NewBatch. So we fallback to partially overwriting here.
noMem := ba.AdmissionHeader.NoMemoryReservedAtSource
ba.AdmissionHeader = txn.admissionHeader
ba.AdmissionHeader.NoMemoryReservedAtSource = noMem

txn.mu.Lock()
requestTxnID := txn.mu.ID
Expand Down
Loading

0 comments on commit a690006

Please sign in to comment.