Skip to content

Commit

Permalink
kv,storage: use a BytesMonitor to track memory allocations for scans
Browse files Browse the repository at this point in the history
The goal is to track memory allocations for:
- non-local SQL=>KV requests: this can happen with joins,
  multi-tenant clusters, and if ranges move between DistSQL
  planning and execution.
- local SQL=>KV requests for the first request by a fetcher:
  in this case the fetcher reserves a modest 1KB which can
  be significantly exceeded by KV allocations.

Only allocations in pebbleMVCCScanner for kv pairs and intents are
tracked. The memory is released when returning from
executeReadOnlyBatchWithServersideRefreshes since the chain
of returns will end up in gRPC response processing and we can't
hook into where that memory is released. This should still help
for some cases of OOMs, and give some signal of memory overload
that we can use elsewhere (e.g. admission control).

The BytesMonitor is used to construct a BoundAccount that is
wrapped in a narrower ScannerMemoryMonitor that is passed via the
EvalContext interface. The other alternative would be for the
engine to have a BytesMonitor at initialization time that it
can use to construct a BoundAccount for each MVCC scan, and
pass it back via MVCCScanResult. This would mean multiple
BoundAccounts for a batch (since we don't want to release memory
until all the requests in the batch are processed), and would
be harder to extend to track additional request types compared
to embedding in EvalContext.

The rootSQLMonitor is reused for this memory allocation tracking.
This tracking is always done for non-local requests, and for the
first request by a fetcher for a local request. This is to
avoid double-counting, the first request issued by a SQL fetcher
only reserves 1KB, but subsequent ones have already reserved
what was returned in the first response. So there is room to
tighten this if we knew what had been reserved on the local
client (there are complications because the batch may have
been split to send to different nodes, only one of which was
local).

The AdmissionHeader.SourceLocation field is used to mark local
requests and is set in rpc.internalClientAdapter. The first
request is marked using the
AdmissionHeader.NoMemoryReservedAtSource bit.

Informs cockroachdb#19721

Release note (ops change): The memory pool used for SQL is now
also used to cover KV memory used for scans.
  • Loading branch information
sumeerbhola committed Jun 26, 2021
1 parent b6598d0 commit 3cdf06b
Show file tree
Hide file tree
Showing 20 changed files with 876 additions and 535 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func Get(
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
LocalUncertaintyLimit: cArgs.LocalUncertaintyLimit,
MemoryMonitor: cArgs.EvalCtx.GetScannerMemoryMonitor(),
})
if err != nil {
return result.Result{}, err
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestGetResumeSpan(t *testing.T) {
defer db.Close()

_, err := Put(ctx, db, CommandArgs{
Header: roachpb.Header{TargetBytes: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -45,7 +46,8 @@ func TestGetResumeSpan(t *testing.T) {

// Case 1: Check that a negative TargetBytes causes a resume span.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{TargetBytes: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -62,7 +64,8 @@ func TestGetResumeSpan(t *testing.T) {
resp = &roachpb.GetResponse{}
// Case 2: Check that a negative MaxSpanRequestKeys causes a resume span.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{MaxSpanRequestKeys: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{MaxSpanRequestKeys: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -79,7 +82,8 @@ func TestGetResumeSpan(t *testing.T) {
resp = &roachpb.GetResponse{}
// Case 3: Check that a positive limit causes a normal return.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{MaxSpanRequestKeys: 10, TargetBytes: 100},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{MaxSpanRequestKeys: 10, TargetBytes: 100},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func ReverseScan(
TargetBytes: h.TargetBytes,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: true,
MemoryMonitor: cArgs.EvalCtx.GetScannerMemoryMonitor(),
}

switch args.ScanFormat {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func Scan(
TargetBytes: h.TargetBytes,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: false,
MemoryMonitor: cArgs.EvalCtx.GetScannerMemoryMonitor(),
}

switch args.ScanFormat {
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type EvalContext interface {
// WatchForMerge arranges to block all requests until the in-progress merge
// completes. Returns an error if no in-progress merge is detected.
WatchForMerge(ctx context.Context) error

// GetScannerMemoryMonitor returns a memory monitor to be used by MVCC
// scans.
GetScannerMemoryMonitor() storage.ScannerMemoryMonitor
}

// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
Expand Down Expand Up @@ -267,3 +271,6 @@ func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequen
func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetScannerMemoryMonitor() storage.ScannerMemoryMonitor {
return storage.ScannerMemoryMonitor{}
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,13 @@ func (r *Replica) markSystemConfigGossipFailed() {
r.mu.failureToGossipSystemConfig = true
}

// GetScannerMemoryMonitor implements the batcheval.EvalContext interface.
func (r *Replica) GetScannerMemoryMonitor() storage.ScannerMemoryMonitor {
// Return an empty monitor. Places where a real monitor is needed use a
// wrapper for Replica as the EvalContext.
return storage.ScannerMemoryMonitor{}
}

func init() {
tracing.RegisterTagRemapping("r", "range")
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,8 @@ func (rec *SpanSetReplicaEvalContext) RevokeLease(ctx context.Context, seq roach
func (rec *SpanSetReplicaEvalContext) WatchForMerge(ctx context.Context) error {
return rec.i.WatchForMerge(ctx)
}

// GetScannerMemoryMonitor implements the batcheval.EvalContext interface.
func (rec *SpanSetReplicaEvalContext) GetScannerMemoryMonitor() storage.ScannerMemoryMonitor {
return rec.i.GetScannerMemoryMonitor()
}
42 changes: 42 additions & 0 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/kr/pretty"
)

Expand Down Expand Up @@ -180,6 +181,15 @@ func (r *Replica) executeReadOnlyBatch(
return br, nil, pErr
}

type evalContextWithMonitor struct {
batcheval.EvalContext
memMonitor storage.ScannerMemoryMonitor
}

func (e evalContextWithMonitor) GetScannerMemoryMonitor() storage.ScannerMemoryMonitor {
return e.memMonitor
}

// executeReadOnlyBatchWithServersideRefreshes invokes evaluateBatch and retries
// at a higher timestamp in the event of some retriable errors if allowed by the
// batch/txn.
Expand All @@ -193,8 +203,40 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
log.Event(ctx, "executing read-only batch")

var rootMonitor *mon.BytesMonitor
// Only do memory allocation accounting if the request did not originate
// locally, or for local request has reserved no memory. Local requests
// (typically DistSQL, though we may not have instrumented the source as SQL
// in all cases, so some may be flowing in as OTHER), do memory accounting
// before issuing the request. Even though the accounting for the first
// request in fetcher is small (the NoMemoryReservedAtSource=true case),
// subsequent ones use the size of the response for subsequent requests (see
// https://github.com/cockroachdb/cockroach/pull/52496). This scheme could
// be tightened.
if ba.AdmissionHeader.SourceLocation != roachpb.AdmissionHeader_LOCAL ||
ba.AdmissionHeader.NoMemoryReservedAtSource {
rootMonitor = r.store.getRootMemoryMonitorForKV()
}
var boundAccount mon.BoundAccount
if rootMonitor != nil {
boundAccount = rootMonitor.MakeBoundAccount()
// Memory is not actually released when this function returns, but at
// least the batch is fully evaluated. Ideally we would like to release
// after grpc has sent the response, but there are no interceptors at that
// stage. The interceptors execute before the response is marshaled in
// Server.processUnaryRPC by calling sendResponse.
// We are intentionally not using finalizers because they delay GC and
// because they have had bugs in the past (and can prevent GC of objects
// with cyclic references).
defer boundAccount.Close(ctx)
rec = evalContextWithMonitor{
EvalContext: rec, memMonitor: storage.ScannerMemoryMonitor{B: &boundAccount}}
}

for retries := 0; ; retries++ {
if retries > 0 {
// It is safe to call Clear on an uninitialized BoundAccount.
boundAccount.Clear(ctx)
log.VEventf(ctx, 2, "server-side retry of batch")
}
br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, lul, true /* readOnly */)
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -723,6 +724,15 @@ type StoreConfig struct {
// subsystem. It is queried during the GC process and in the handling of
// AdminVerifyProtectedTimestampRequest.
ProtectedTimestampCache protectedts.Cache

// MemoryMonitorForKVProvider can be nil.
MemoryMonitorForKVProvider MemoryMonitorForKVProvider
}

// MemoryMonitorForKVProvider provides a long-lived memory monitor that can be
// used for tracking memory usage in KV.
type MemoryMonitorForKVProvider interface {
GetRootMemoryMonitor() *mon.BytesMonitor
}

// ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the
Expand Down Expand Up @@ -2891,6 +2901,13 @@ func (s *Store) unregisterLeaseholderByID(ctx context.Context, rangeID roachpb.R
}
}

func (s *Store) getRootMemoryMonitorForKV() *mon.BytesMonitor {
if s.cfg.MemoryMonitorForKVProvider != nil {
return s.cfg.MemoryMonitorForKVProvider.GetRootMemoryMonitor()
}
return nil
}

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key. We only accept a raw engine to ensure we're persisting
// the write durably.
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,10 @@ func (txn *Txn) Send(
}

// Some callers have not initialized ba using a Batch constructed using
// Txn.NewBatch. So we fallback to initializing here.
// Txn.NewBatch. So we fallback to partially overwriting here.
noMem := ba.AdmissionHeader.NoMemoryReservedAtSource
ba.AdmissionHeader = txn.admissionHeader
ba.AdmissionHeader.NoMemoryReservedAtSource = noMem

txn.mu.Lock()
requestTxnID := txn.mu.ID
Expand Down
Loading

0 comments on commit 3cdf06b

Please sign in to comment.