Skip to content

Commit

Permalink
kv,storage: use a BytesMonitor to track memory allocations for scans
Browse files Browse the repository at this point in the history
The goal is to track memory allocations for:
- non-local SQL=>KV requests: this can happen with joins,
  multi-tenant clusters, and if ranges move between DistSQL
  planning and execution.
- local SQL=>KV requests for the first request by a fetcher:
  in this case the fetcher reserves a modest 1KB which can
  be significantly exceeded by KV allocations.

Only allocations in pebbleMVCCScanner for kv pairs and intents are
tracked. The memory is released when returning from
executeReadOnlyBatchWithServersideRefreshes since the chain
of returns will end up in gRPC response processing and we can't
hook into where that memory is released. This should still help
for some cases of OOMs, and give some signal of memory overload
that we can use elsewhere (e.g. admission control).

The BytesMonitor is used to construct a BoundAccount that is
that is passed via the EvalContext interface. The other
alternative would be for the engine to have a BytesMonitor at
initialization time that it can use to construct a BoundAccount
for each MVCC scan, and pass it back via MVCCScanResult. This
would mean multiple BoundAccounts for a batch (since we don't
want to release memory until all the requests in the batch are
processed), and would be harder to extend to track additional
request types compared to embedding in EvalContext.

The rootSQLMonitor is reused for this memory allocation tracking.
This tracking is always done for non-local requests, and for the
first request by a fetcher for a local request. This is to
avoid double-counting, the first request issued by a SQL fetcher
only reserves 1KB, but subsequent ones have already reserved
what was returned in the first response. So there is room to
tighten this if we knew what had been reserved on the local
client (there are complications because the batch may have
been split to send to different nodes, only one of which was
local).

The AdmissionHeader.SourceLocation field is used to mark local
requests and is set in rpc.internalClientAdapter. The first
request is marked using the
AdmissionHeader.NoMemoryReservedAtSource bit.

Informs cockroachdb#19721

Release note (ops change): The memory pool used for SQL is now
also used to cover KV memory used for scans.
  • Loading branch information
sumeerbhola committed Aug 4, 2021
1 parent 875b969 commit a5708ab
Show file tree
Hide file tree
Showing 24 changed files with 1,138 additions and 588 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func Get(
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
LocalUncertaintyLimit: cArgs.LocalUncertaintyLimit,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
})
if err != nil {
return result.Result{}, err
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestGetResumeSpan(t *testing.T) {
defer db.Close()

_, err := Put(ctx, db, CommandArgs{
Header: roachpb.Header{TargetBytes: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -45,7 +46,8 @@ func TestGetResumeSpan(t *testing.T) {

// Case 1: Check that a negative TargetBytes causes a resume span.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{TargetBytes: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -62,7 +64,8 @@ func TestGetResumeSpan(t *testing.T) {
resp = &roachpb.GetResponse{}
// Case 2: Check that a negative MaxSpanRequestKeys causes a resume span.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{MaxSpanRequestKeys: -1},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{MaxSpanRequestKeys: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand All @@ -79,7 +82,8 @@ func TestGetResumeSpan(t *testing.T) {
resp = &roachpb.GetResponse{}
// Case 3: Check that a positive limit causes a normal return.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{MaxSpanRequestKeys: 10, TargetBytes: 100},
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{MaxSpanRequestKeys: 10, TargetBytes: 100},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func ReverseScan(
TargetBytes: h.TargetBytes,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
}

switch args.ScanFormat {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func Scan(
TargetBytes: h.TargetBytes,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: false,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
}

switch args.ScanFormat {
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -126,6 +127,12 @@ type EvalContext interface {
// WatchForMerge arranges to block all requests until the in-progress merge
// completes. Returns an error if no in-progress merge is detected.
WatchForMerge(ctx context.Context) error

// GetResponseMemoryAccount returns a memory account to be used when
// generating BatchResponses. Currently only used for MVCC scans, and only
// non-nil on those paths (a nil account is safe to use since it functions
// as an unlimited account).
GetResponseMemoryAccount() *mon.BoundAccount
}

// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
Expand All @@ -150,7 +157,7 @@ type MockEvalCtx struct {
// EvalContext returns the MockEvalCtx as an EvalContext. It will reflect future
// modifications to the underlying MockEvalContext.
func (m *MockEvalCtx) EvalContext() EvalContext {
return &mockEvalCtxImpl{m}
return &mockEvalCtxImpl{MockEvalCtx: m}
}

type mockEvalCtxImpl struct {
Expand Down Expand Up @@ -258,3 +265,7 @@ func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequen
func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetResponseMemoryAccount() *mon.BoundAccount {
// No limits.
return nil
}
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -1869,6 +1870,13 @@ func (r *Replica) markSystemConfigGossipFailed() {
r.mu.failureToGossipSystemConfig = true
}

// GetResponseMemoryAccount implements the batcheval.EvalContext interface.
func (r *Replica) GetResponseMemoryAccount() *mon.BoundAccount {
// Return an empty account, which places no limits. Places where a real
// account is needed use a wrapper for Replica as the EvalContext.
return nil
}

func init() {
tracing.RegisterTagRemapping("r", "range")
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -252,3 +253,8 @@ func (rec *SpanSetReplicaEvalContext) RevokeLease(ctx context.Context, seq roach
func (rec *SpanSetReplicaEvalContext) WatchForMerge(ctx context.Context) error {
return rec.i.WatchForMerge(ctx)
}

// GetResponseMemoryAccount implements the batcheval.EvalContext interface.
func (rec *SpanSetReplicaEvalContext) GetResponseMemoryAccount() *mon.BoundAccount {
return rec.i.GetResponseMemoryAccount()
}
92 changes: 92 additions & 0 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/kr/pretty"
)

Expand Down Expand Up @@ -179,6 +181,54 @@ func (r *Replica) executeReadOnlyBatch(
return br, nil, pErr
}

// evalContextWithAccount wraps an EvalContext to provide a non-nil
// mon.BoundAccount. This wrapping is conditional on various factors, and
// specific to a request (see executeReadOnlyBatchWithServersideRefreshes),
// which is why the implementation of EvalContext by Replica does not by
// default provide a non-nil mon.BoundAccount.
//
// If we start using evalContextWithAccount on more code paths we should
// consider using it everywhere and lift it to an earlier point in the code.
// Then code that decides that we need a non-nil BoundAccount can set a field
// instead of wrapping.
type evalContextWithAccount struct {
batcheval.EvalContext
memAccount *mon.BoundAccount
}

var evalContextWithAccountPool = sync.Pool{
New: func() interface{} {
return &evalContextWithAccount{}
},
}

// newEvalContextWithAccount creates an evalContextWithAccount with an account
// connected to the given monitor. It uses a sync.Pool.
func newEvalContextWithAccount(
ctx context.Context, evalCtx batcheval.EvalContext, mon *mon.BytesMonitor,
) *evalContextWithAccount {
ec := evalContextWithAccountPool.Get().(*evalContextWithAccount)
ec.EvalContext = evalCtx
if ec.memAccount != nil {
ec.memAccount.Init(ctx, mon)
} else {
acc := mon.MakeBoundAccount()
ec.memAccount = &acc
}
return ec
}

// close returns the accounted memory and returns objects to the sync.Pool.
func (e *evalContextWithAccount) close(ctx context.Context) {
e.memAccount.Close(ctx)
// Clear the BoundAccount struct, so it can be later reused.
*e.memAccount = mon.BoundAccount{}
evalContextWithAccountPool.Put(e)
}
func (e evalContextWithAccount) GetResponseMemoryAccount() *mon.BoundAccount {
return e.memAccount
}

// executeReadOnlyBatchWithServersideRefreshes invokes evaluateBatch and retries
// at a higher timestamp in the event of some retriable errors if allowed by the
// batch/txn.
Expand All @@ -192,8 +242,50 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
log.Event(ctx, "executing read-only batch")

var rootMonitor *mon.BytesMonitor
// Only do memory allocation accounting if the request did not originate
// locally, or for a local request that has reserved no memory. Local
// requests (originating in DistSQL) do memory accounting before issuing the
// request. Even though the accounting for the first request in the caller
// is small (the NoMemoryReservedAtSource=true case), subsequent ones use
// the size of the response for subsequent requests (see row.txnKVFetcher).
// Note that we could additionally add an OR-clause with
// ba.AdmissionHeader.Source != FROM_SQL for the if-block that does memory
// accounting. We don't do that currently since there are some SQL requests
// that are not marked as FROM_SQL.
//
// This whole scheme could be tightened, both in terms of marking, and
// compensating for the amount of memory reserved at the source.
//
// TODO(sumeer): for multi-tenant KV we should be accounting on a per-tenant
// basis and not letting a single tenant consume all the memory (we could
// place a limit equal to total/2).
if ba.AdmissionHeader.SourceLocation != roachpb.AdmissionHeader_LOCAL ||
ba.AdmissionHeader.NoMemoryReservedAtSource {
// rootMonitor will never be nil in production settings, but it can be nil
// for tests that do not have a monitor.
rootMonitor = r.store.getRootMemoryMonitorForKV()
}
var boundAccount *mon.BoundAccount
if rootMonitor != nil {
evalCtx := newEvalContextWithAccount(ctx, rec, rootMonitor)
boundAccount = evalCtx.memAccount
// Closing evalCtx also closes boundAccount. Memory is not actually
// released when this function returns, but at least the batch is fully
// evaluated. Ideally we would like to release after grpc has sent the
// response, but there are no interceptors at that stage. The interceptors
// execute before the response is marshaled in Server.processUnaryRPC by
// calling sendResponse. We are intentionally not using finalizers because
// they delay GC and because they have had bugs in the past (and can
// prevent GC of objects with cyclic references).
defer evalCtx.close(ctx)
rec = evalCtx
}

for retries := 0; ; retries++ {
if retries > 0 {
// It is safe to call Clear on an uninitialized BoundAccount.
boundAccount.Clear(ctx)
log.VEventf(ctx, 2, "server-side retry of batch")
}
br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, lul, true /* readOnly */)
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -723,6 +724,10 @@ type StoreConfig struct {
// subsystem. It is queried during the GC process and in the handling of
// AdminVerifyProtectedTimestampRequest.
ProtectedTimestampCache protectedts.Cache

// KV Memory Monitor. Must be non-nil for production, and can be nil in some
// tests.
KVMemoryMonitor *mon.BytesMonitor
}

// ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the
Expand Down Expand Up @@ -2919,6 +2924,12 @@ func (s *Store) unregisterLeaseholderByID(ctx context.Context, rangeID roachpb.R
}
}

// getRootMemoryMonitorForKV returns a BytesMonitor to use for KV memory
// tracking.
func (s *Store) getRootMemoryMonitorForKV() *mon.BytesMonitor {
return s.cfg.KVMemoryMonitor
}

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key. We only accept a raw engine to ensure we're persisting
// the write durably.
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,10 @@ func (txn *Txn) Send(
}

// Some callers have not initialized ba using a Batch constructed using
// Txn.NewBatch. So we fallback to initializing here.
// Txn.NewBatch. So we fallback to partially overwriting here.
noMem := ba.AdmissionHeader.NoMemoryReservedAtSource
ba.AdmissionHeader = txn.admissionHeader
ba.AdmissionHeader.NoMemoryReservedAtSource = noMem

txn.mu.Lock()
requestTxnID := txn.mu.ID
Expand Down
Loading

0 comments on commit a5708ab

Please sign in to comment.