Skip to content

Commit

Permalink
Merge #71109
Browse files Browse the repository at this point in the history
71109: kvserver,backupccl: make export, gc subject to admission control r=sumeerbhola a=sumeerbhola

They are now marked with AdmissionHeader_ROOT_KV, which stops
them from bypassing admission control. The priority is set to
admission.NormalPri since there are cases where they do need
to complete in a very timely manner. There are TODOs in the
integration code to make more sophisticated priority
assignments and use LowPri when possible.

Informs #65957

Release note: None

Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
  • Loading branch information
craig[bot] and sumeerbhola committed Oct 13, 2021
2 parents 9968d34 + a814577 commit 319b708
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 98 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ go_library(
"//pkg/storage",
"//pkg/testutils",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
19 changes: 16 additions & 3 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -335,7 +336,19 @@ func runBackupProcessor(
// value. The sentinel value of 1 forces the ExportRequest to paginate
// after creating a single SST.
header.TargetBytes = 1

admissionHeader := roachpb.AdmissionHeader{
// Export requests are currently assigned NormalPri.
//
// TODO(bulkio): the priority should vary based on the urgency of
// these background requests. These exports should get LowPri,
// unless they are being retried and need to be completed in a
// timely manner for compliance with RPO and data retention
// policies. Consider deriving this from the UserPriority field.
Priority: int32(admission.NormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_ROOT_KV,
NoMemoryReservedAtSource: true,
}
log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
var rawRes roachpb.Response
Expand All @@ -353,8 +366,8 @@ func runBackupProcessor(
ReqSentTime: reqSentTime.String(),
})

rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(),
header, req)
rawRes, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
return pErr.GoError()
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ go_library(
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
Expand Down
47 changes: 42 additions & 5 deletions pkg/kv/kvserver/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -404,8 +406,10 @@ func makeGCQueueScoreImpl(
}

type replicaGCer struct {
repl *Replica
count int32 // update atomically
repl *Replica
count int32 // update atomically
admissionController KVAdmissionController
storeID roachpb.StoreID
}

var _ gc.GCer = &replicaGCer{}
Expand All @@ -429,8 +433,37 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error {
ba.RangeID = r.repl.Desc().RangeID
ba.Timestamp = r.repl.Clock().Now()
ba.Add(&req)

if _, pErr := r.repl.Send(ctx, ba); pErr != nil {
// Since we are talking directly to the replica, we need to explicitly do
// admission control here, as we are bypassing server.Node.
var admissionHandle interface{}
if r.admissionController != nil {
ba.AdmissionHeader = roachpb.AdmissionHeader{
// GC is currently assigned NormalPri.
//
// TODO(kv): GC could be expected to be LowPri, so that it does not
// impact user-facing traffic when resources (e.g. CPU, write capacity
// of the store) are scarce. However long delays in GC can slow down
// user-facing traffic due to more versions in the store, and can
// increase write amplification of the store since there is more live
// data. Ideally, we should adjust this priority based on how far behind
// we are wrt GCing in this range.
Priority: int32(admission.NormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_ROOT_KV,
NoMemoryReservedAtSource: true,
}
ba.Replica.StoreID = r.storeID
var err error
admissionHandle, err = r.admissionController.AdmitKVWork(ctx, roachpb.SystemTenantID, &ba)
if err != nil {
return err
}
}
_, pErr := r.repl.Send(ctx, ba)
if r.admissionController != nil {
r.admissionController.AdmittedKVWorkDone(admissionHandle)
}
if pErr != nil {
log.VErrEventf(ctx, 2, "%v", pErr.String())
return pErr.GoError()
}
Expand Down Expand Up @@ -531,7 +564,11 @@ func (gcq *gcQueue) process(
IntentCleanupBatchTimeout: gcQueueIntentBatchTimeout,
},
conf.TTL(),
&replicaGCer{repl: repl},
&replicaGCer{
repl: repl,
admissionController: gcq.store.cfg.KVAdmissionController,
storeID: gcq.store.StoreID(),
},
func(ctx context.Context, intents []roachpb.Intent) error {
intentCount, err := repl.store.intentResolver.
CleanupIntents(ctx, intents, gcTimestamp, roachpb.PUSH_TOUCH)
Expand Down
125 changes: 125 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
Expand Down Expand Up @@ -758,6 +759,9 @@ type StoreConfig struct {
// SpanConfigsEnabled determines whether we're able to use the span configs
// infrastructure.
SpanConfigsEnabled bool

// KVAdmissionController is an optional field used for admission control.
KVAdmissionController KVAdmissionController
}

// ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the
Expand Down Expand Up @@ -2957,3 +2961,124 @@ func min(a, b int) int {
}
return b
}

// KVAdmissionController provides admission control for the KV layer.
type KVAdmissionController interface {
// AdmitKVWork must be called before performing KV work.
// BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be
// populated for admission to work correctly. If err is non-nil, the
// returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be
// called after the KV work is done executing.
AdmitKVWork(
ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
) (handle interface{}, err error)
// AdmittedKVWorkDone is called after the admitted KV work is done
// executing.
AdmittedKVWorkDone(handle interface{})
}

// KVAdmissionControllerImpl implements KVAdmissionController interface.
type KVAdmissionControllerImpl struct {
// Admission control queues and coordinators. Both should be nil or non-nil.
kvAdmissionQ *admission.WorkQueue
storeGrantCoords *admission.StoreGrantCoordinators
}

var _ KVAdmissionController = KVAdmissionControllerImpl{}

type admissionHandle struct {
tenantID roachpb.TenantID
callAdmittedWorkDoneOnKVAdmissionQ bool
storeAdmissionQ *admission.WorkQueue
}

func isSingleHeartbeatTxnRequest(b *roachpb.BatchRequest) bool {
if len(b.Requests) != 1 {
return false
}
_, ok := b.Requests[0].GetInner().(*roachpb.HeartbeatTxnRequest)
return ok
}

// MakeKVAdmissionController returns a KVAdmissionController. Both parameters
// must together either be nil or non-nil.
func MakeKVAdmissionController(
kvAdmissionQ *admission.WorkQueue, storeGrantCoords *admission.StoreGrantCoordinators,
) KVAdmissionController {
return KVAdmissionControllerImpl{
kvAdmissionQ: kvAdmissionQ,
storeGrantCoords: storeGrantCoords,
}
}

// AdmitKVWork implements the KVAdmissionController interface.
func (n KVAdmissionControllerImpl) AdmitKVWork(
ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
) (handle interface{}, err error) {
ah := admissionHandle{tenantID: tenantID}
if n.kvAdmissionQ != nil {
bypassAdmission := ba.IsAdmin()
source := ba.AdmissionHeader.Source
if !roachpb.IsSystemTenantID(tenantID.ToUint64()) {
// Request is from a SQL node.
bypassAdmission = false
source = roachpb.AdmissionHeader_FROM_SQL
}
if source == roachpb.AdmissionHeader_OTHER {
bypassAdmission = true
}
createTime := ba.AdmissionHeader.CreateTime
if !bypassAdmission && createTime == 0 {
// TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use
// of zero CreateTime needs to be revisited. It should use high priority.
createTime = timeutil.Now().UnixNano()
}
admissionInfo := admission.WorkInfo{
TenantID: tenantID,
Priority: admission.WorkPriority(ba.AdmissionHeader.Priority),
CreateTime: createTime,
BypassAdmission: bypassAdmission,
}
var err error
// Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though
// it would bypass admission, it would consume a slot. When writes are
// throttled, we start generating more txn heartbeats, which then consume
// all the slots, causing no useful work to happen. We do want useful work
// to continue even when throttling since there are often significant
// number of tokens available.
if ba.IsWrite() && !isSingleHeartbeatTxnRequest(ba) {
ah.storeAdmissionQ = n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID))
}
admissionEnabled := true
if ah.storeAdmissionQ != nil {
if admissionEnabled, err = ah.storeAdmissionQ.Admit(ctx, admissionInfo); err != nil {
return admissionHandle{}, err
}
if !admissionEnabled {
// Set storeAdmissionQ to nil so that we don't call AdmittedWorkDone
// on it. Additionally, the code below will not call
// kvAdmissionQ.Admit, and so callAdmittedWorkDoneOnKVAdmissionQ will
// stay false.
ah.storeAdmissionQ = nil
}
}
if admissionEnabled {
ah.callAdmittedWorkDoneOnKVAdmissionQ, err = n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
return admissionHandle{}, err
}
}
}
return ah, nil
}

// AdmittedKVWorkDone implement the KVAdmissionController interface.
func (n KVAdmissionControllerImpl) AdmittedKVWorkDone(handle interface{}) {
ah := handle.(admissionHandle)
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
}
if ah.storeAdmissionQ != nil {
ah.storeAdmissionQ.AdmittedWorkDone(ah.tenantID)
}
}
15 changes: 15 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,24 @@ func (f NonTransactionalFactoryFunc) NonTransactionalSender() Sender {
// `nil` context; an empty one is used in that case.
func SendWrappedWith(
ctx context.Context, sender Sender, h roachpb.Header, args roachpb.Request,
) (roachpb.Response, *roachpb.Error) {
return SendWrappedWithAdmission(ctx, sender, h, roachpb.AdmissionHeader{}, args)
}

// SendWrappedWithAdmission is a convenience function which wraps the request
// in a batch and sends it via the provided Sender and headers. It returns the
// unwrapped response or an error. It's valid to pass a `nil` context; an
// empty one is used in that case.
func SendWrappedWithAdmission(
ctx context.Context,
sender Sender,
h roachpb.Header,
ah roachpb.AdmissionHeader,
args roachpb.Request,
) (roachpb.Response, *roachpb.Error) {
ba := roachpb.BatchRequest{}
ba.Header = h
ba.AdmissionHeader = ah
ba.Add(args)

br, pErr := sender.Send(ctx, ba)
Expand Down
Loading

0 comments on commit 319b708

Please sign in to comment.