Skip to content

Commit

Permalink
admission,server: scope store overload admission control to write ope…
Browse files Browse the repository at this point in the history
…rations on the overloaded store

Previously KVWork was all admitted through a single WorkQueue, and this
WorkQueue was paired with a GrantCoordinator that took into account
both CPU overload and storage overload. This meant a single overloaded
store (in a multi-store setting) would slow down all KV operations
including reads, and those on stores that were fine.

We now have multiple WorkQueue's, one per store, that KV writes go
through, and then get to the shared WorkQueue that all operations go
through. The per-store WorkQueues are associated with their own
GrantCoordinators that listen to health state for their respective
stores. The per-store queues do not care about CPU and therefore do not
do grant chaining. Since admission through these per-store queues
happens before the shared queue, a store bottleneck will not cause KV
slots in the shared queue to be taken by requests that are still waiting
for admission elsewhere. The reverse can happen, and is considered
acceptable -- per-store tokens, when a store is overloaded, can be used
by requests that are now waiting for admission in the shared WorkQueue
because of a cpu bottleneck.

The code is significantly refactored for the above:
NewGrantCoordinators returns a container called StoreGrantCoordinators
which lazily initializes the relevant per-store GrantCoordinators when
it first fetches Pebble metrics, in addition to the shared
GrantCoordinator. The server code now integrates with both types and
the code in Node.Batch will sometimes subject a request to two
WorkQueues. The PebbleMetricsProvider now includes StoreIDs, and the
periodic ticking that fetches these metrics at 1min intervals, and does
1s ticks, is moved to StoreGrantCoordinators. This simplifies the
ioLoadListener which no longer does the periodic ticking and eliminates
a some testing-only abstractions. The per-store WorkQueues share the
same metrics, which represent an aggregate across these queues.

Informs #65957

Release note: None
  • Loading branch information
sumeerbhola committed Jul 16, 2021
1 parent 31cdc0e commit 0aee0ba
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 334 deletions.
44 changes: 32 additions & 12 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/redact"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
Expand Down Expand Up @@ -173,7 +172,9 @@ type Node struct {

perReplicaServer kvserver.Server

admissionQ *admission.WorkQueue
// Admission control queues and coordinators. Both should be nil or non-nil.
kvAdmissionQ *admission.WorkQueue
storeGrantCoords *admission.StoreGrantCoordinators
}

var _ roachpb.InternalServer = &Node{}
Expand Down Expand Up @@ -685,11 +686,12 @@ func (n *Node) computePeriodicMetrics(ctx context.Context, tick int) error {
}

// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []*pebble.Metrics {
var metrics []*pebble.Metrics
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
metrics = append(metrics, m.Metrics)
metrics = append(
metrics, admission.StoreMetrics{StoreID: int32(store.StoreID()), Metrics: m.Metrics})
return nil
})
return metrics
Expand Down Expand Up @@ -913,9 +915,10 @@ func (n *Node) Batch(
// log tags more expensive and makes local calls differ from remote calls.
ctx = n.storeCfg.AmbientCtx.ResetAndAnnotateCtx(ctx)

var callAdmittedWorkDone bool
var callAdmittedWorkDoneOnKVAdmissionQ bool
var tenantID roachpb.TenantID
if n.admissionQ != nil {
var storeAdmissionQ *admission.WorkQueue
if n.kvAdmissionQ != nil {
var ok bool
tenantID, ok = roachpb.TenantFromContext(ctx)
if !ok {
Expand Down Expand Up @@ -944,14 +947,31 @@ func (n *Node) Batch(
BypassAdmission: bypassAdmission,
}
var err error
callAdmittedWorkDone, err = n.admissionQ.Admit(ctx, admissionInfo)
if err != nil {
return nil, err
if args.IsWrite() {
storeAdmissionQ = n.storeGrantCoords.TryGetQueueForStore(int32(args.Replica.StoreID))
}
continueToSecondQ := true
if storeAdmissionQ != nil {
if continueToSecondQ, err = storeAdmissionQ.Admit(ctx, admissionInfo); err != nil {
return nil, err
}
if !continueToSecondQ {
storeAdmissionQ = nil
}
}
if continueToSecondQ {
callAdmittedWorkDoneOnKVAdmissionQ, err = n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
return nil, err
}
}
}
br, err := n.batchInternal(ctx, args)
if callAdmittedWorkDone {
n.admissionQ.AdmittedWorkDone(tenantID)
if callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(tenantID)
}
if storeAdmissionQ != nil {
storeAdmissionQ.AdmittedWorkDone(tenantID)
}

// We always return errors via BatchResponse.Error so structure is
Expand Down
17 changes: 9 additions & 8 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ type Server struct {
// Created in NewServer but initialized (made usable) in `(*Server).Start`.
externalStorageBuilder *externalStorageBuilder

gcoord *admission.GrantCoordinator
storeGrantCoords *admission.StoreGrantCoordinators

// The following fields are populated at start time, i.e. in `(*Server).Start`.
startTime time.Time
Expand Down Expand Up @@ -430,7 +430,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
tcsFactory := kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, distSender)

gcoord, metrics := admission.NewGrantCoordinator(admission.Options{
gcoords, metrics := admission.NewGrantCoordinators(admission.Options{
MinCPUSlots: 1,
MaxCPUSlots: 100000, /* TODO(sumeer): add cluster setting */
SQLKVResponseBurstTokens: 100000, /* TODO(sumeer): add cluster setting */
Expand All @@ -442,17 +442,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
for i := range metrics {
registry.AddMetricStruct(metrics[i])
}
cbID := goschedstats.RegisterRunnableCountCallback(gcoord.CPULoad)
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad)
stopper.AddCloser(stop.CloserFn(func() {
goschedstats.UnregisterRunnableCountCallback(cbID)
}))
stopper.AddCloser(gcoord)
stopper.AddCloser(gcoords)

dbCtx := kv.DefaultDBContext(stopper)
dbCtx.NodeID = idContainer
dbCtx.Stopper = stopper
db := kv.NewDBWithContext(cfg.AmbientCtx, tcsFactory, clock, dbCtx)
db.SQLKVResponseAdmissionQ = gcoord.GetWorkQueue(admission.SQLKVResponseWork)
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)

nlActive, nlRenewal := cfg.NodeLivenessDurations()
if knobs := cfg.TestingKnobs.NodeLiveness; knobs != nil {
Expand Down Expand Up @@ -632,7 +632,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
node := NewNode(
storeCfg, recorder, registry, stopper,
txnMetrics, stores, nil /* execCfg */, &rpcContext.ClusterID)
node.admissionQ = gcoord.GetWorkQueue(admission.KVWork)
node.kvAdmissionQ = gcoords.Regular.GetWorkQueue(admission.KVWork)
node.storeGrantCoords = gcoords.Stores
lateBoundNode = node
roachpb.RegisterInternalServer(grpcServer.Server, node)
kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)
Expand Down Expand Up @@ -783,7 +784,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
sqlServer: sqlServer,
drainSleepFn: drainSleepFn,
externalStorageBuilder: externalStorageBuilder,
gcoord: gcoord,
storeGrantCoords: gcoords.Stores,
}
return lateBoundServer, err
}
Expand Down Expand Up @@ -1652,7 +1653,7 @@ func (s *Server) PreStart(ctx context.Context) error {
return err
}
// Stores have been initialized, so Node can now provide Pebble metrics.
s.gcoord.SetPebbleMetricsProvider(s.node)
s.storeGrantCoords.SetPebbleMetricsProvider(s.node)

log.Event(ctx, "started node")
if err := s.startPersistingHLCUpperBound(
Expand Down
1 change: 0 additions & 1 deletion pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_stretchr_testify//require",
Expand Down
Loading

0 comments on commit 0aee0ba

Please sign in to comment.