From cc6eda28406f1a658ce02ec612d6f1b88e301591 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 7 Jun 2023 16:13:59 -0600 Subject: [PATCH] allocator2: change rate limiter etc. storeChangeRateLimiter and storeEnactedHistory are introduced to ensure that N distributed allocators operating concurrently do not add or remove more than 50% of the cluster mean load at a store. storeState.computePendingChangesReflectedInLatestLoad is a heuristic introduced to decide when a pendingReplicaChange that has been enacted, can be assumed to be reflected in the StoreCapacity that is gossiped. Informs #103320 Epic: CRDB-25222 Release note: None --- .../allocator/allocator2/allocator.go | 4 +- .../allocator/allocator2/allocator_state.go | 4 + .../allocator/allocator2/cluster_state.go | 204 ++++++++++++++++++ pkg/kv/kvserver/allocator/allocator2/load.go | 24 +++ 4 files changed, 235 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/allocator/allocator2/allocator.go b/pkg/kv/kvserver/allocator/allocator2/allocator.go index 105acc7a055a..f1f30d784bf6 100644 --- a/pkg/kv/kvserver/allocator/allocator2/allocator.go +++ b/pkg/kv/kvserver/allocator/allocator2/allocator.go @@ -60,7 +60,9 @@ type Allocator interface { // of integration, we could relax this and synthesize this list as a // function of (a) timestamp of the gossiped load, (b) timestamp of the // proposed change, (c) timestamp of the latest leaseholderStores slice - // that shows the change as being enacted. + // that shows the change as being enacted. See + // storeState.computePendingChangesReflectedInLatestLoad for such an + // attempt. // // - storeLoadMsg.storeRanges does not need to be the complete list of // ranges -- it can be filtered down to only the ranges for which the diff --git a/pkg/kv/kvserver/allocator/allocator2/allocator_state.go b/pkg/kv/kvserver/allocator/allocator2/allocator_state.go index e70d0e33bc57..dda8429ad0c4 100644 --- a/pkg/kv/kvserver/allocator/allocator2/allocator_state.go +++ b/pkg/kv/kvserver/allocator/allocator2/allocator_state.go @@ -27,6 +27,9 @@ type allocatorState struct { meansMemo *meansMemo diversityScoringMemo *diversityScoringMemo + + // TODO(kvoli,sumeer): initialize and use. + changeRangeLimiter *storeChangeRateLimiter } func newAllocatorState() *allocatorState { @@ -345,6 +348,7 @@ func (erl *existingReplicaLocalities) getScoreSum(replica localityTiers) float64 var _ = newAllocatorState var _ = (&allocatorState{}).computeChanges var _ = (&allocatorState{}).computeCandidatesForRange +var _ = allocatorState{}.changeRangeLimiter var _ = (&existingReplicaLocalities{}).clear var _ = replicasLocalityTiers{}.hash var _ = replicasLocalityTiers{}.isEqual diff --git a/pkg/kv/kvserver/allocator/allocator2/cluster_state.go b/pkg/kv/kvserver/allocator/allocator2/cluster_state.go index 27e6f7114117..d48aebcb6e8c 100644 --- a/pkg/kv/kvserver/allocator/allocator2/cluster_state.go +++ b/pkg/kv/kvserver/allocator/allocator2/cluster_state.go @@ -11,6 +11,7 @@ package allocator2 import ( + "sort" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -128,6 +129,168 @@ const ( removed ) +// storeChangeRateLimiter and helpers. +// +// Usage: +// - construct a single storeChangeRateLimiter for the allocator using +// newStoreChangeRateLimiter. If all allocators are operating at 10s +// intervals, a gcThreshold of 15s should be enough. +// - storeEnactedHistory is a member of storeState, so one per store. +// - storeEnactedHistory.addEnactedChange must be called every time a change +// is known to be enacted. +// - Before every rebalancing pass, call +// storeChangeRateLimiter.initForRebalancePass. Then iterate over each store +// and call storeChangeRateLimiter.updateForRebalancePass. The latter step +// updates the rate limiting state for the store -- this "rate limiting" is +// simply a bool, which represents whether changes are allowed to the store +// for load reasons (changes for failure reasons, to shed load, are always +// allowed), in this rebalancing pass. Note, changes made during the +// rebalancing pass, and rate limiting those, is not in scope of this rate +// limiter -- that happens via storeState.adjusted.loadPendingChanges. +// - During the rebalancing pass, use +// storeEnactedHistory.allowLoadBasedChanges, to decide whether a store can +// be the source or target for load based rebalancing. +// +// TODO(sumeer): unit test. +// +// TODO(sumeer,kvoli): integrate this into the rest of allocator2. + +// enactedReplicaChange is information about a change at a store that was +// enacted. It is an internal implementation detail of storeEnactedHistory and +// storeChangeRateLimiter. +type enactedReplicaChange struct { + // When the change is known to be enacted, based on the authoritative + // information received from the leaseholder. + enactedAtTime time.Time + // The load this change adds to a store. The values are always positive, + // even for load subtraction. + loadDelta loadVector + secondaryDelta secondaryLoadVector +} + +// storeEnactedHistory is a member of storeState. Users should only call +// addEnactedChange and allowLoadBasedChanges. +type storeEnactedHistory struct { + changes []enactedReplicaChange + totalDelta loadVector + totalSecondary secondaryLoadVector + allowChanges bool +} + +func (h *storeEnactedHistory) addEnactedChange(change *pendingReplicaChange) { + c := enactedReplicaChange{ + enactedAtTime: change.enactedAtTime, + loadDelta: change.loadDelta, + } + for i, l := range c.loadDelta { + if l < 0 { + c.loadDelta[i] = -l + } + } + if change.prev.isLeaseholder != change.next.isLeaseholder { + c.secondaryDelta[leaseCount] = 1 + } + h.changes = append(h.changes, c) + sort.Slice(h.changes, func(i, j int) bool { + return h.changes[i].enactedAtTime.Before(h.changes[j].enactedAtTime) + }) + h.totalDelta.add(c.loadDelta) + h.totalSecondary.add(c.secondaryDelta) +} + +func (h *storeEnactedHistory) allowLoadBasedChanges() bool { + return h.allowChanges +} + +func (h *storeEnactedHistory) gcHistory(now time.Time, gcThreshold time.Duration) { + i := 0 + for ; i < len(h.changes); i++ { + c := h.changes[i] + if now.Sub(c.enactedAtTime) > gcThreshold { + h.totalDelta.subtract(c.loadDelta) + h.totalSecondary.subtract(c.secondaryDelta) + } else { + break + } + } + h.changes = h.changes[i:] +} + +// storeChangeRateLimiter is a rate limiter for rebalancing to/from this +// store, for this allocator, under the assumption that the cluster uses many +// (distributed) allocators. The rate limiting heuristic assumes that the +// other allocators are also running with such a rate limiter, and is intended +// to prevent a thundering herd problem where all allocators add/remove too +// much load from a store, resulting in perpetual rebalancing without +// improving the cluster. +type storeChangeRateLimiter struct { + gcThreshold time.Duration + numAllocators int + clusterMean meanStoreLoad +} + +func newStoreChangeRateLimiter(gcThreshold time.Duration) *storeChangeRateLimiter { + return &storeChangeRateLimiter{ + gcThreshold: gcThreshold, + } +} + +func (crl *storeChangeRateLimiter) initForRebalancePass( + numAllocators int, clusterMean meanStoreLoad, +) { + crl.numAllocators = numAllocators + crl.clusterMean = clusterMean +} + +// TODO(sumeer): also consider utilization in the cluster and capacity of this +// store, since that is necessary to make this reasonable for heterogeneous +// clusters. + +func (crl *storeChangeRateLimiter) updateForRebalancePass(h *storeEnactedHistory, now time.Time) { + h.gcHistory(now, crl.gcThreshold) + if len(h.changes) == 0 { + // If there is nothing in the history, changes are allowed. This is the + // path we will fall into for underloaded clusters where the mean is very + // low and the projectedDelta will become too high even with a single + // change. + // + // Additionally, if the actual number of allocators that can add or remove + // load from a store is much lower than numAllocators (because of + // constraints) we may often need to use this fallback. We do assume that + // typical clusters will be configured in a manner that each store will be + // able to satisfy some constraint for a substantial fraction of the + // ranges in the cluster (say > 50%), so the pessimistic behavior encoded + // in the code below will not significantly slow down allocator changes. + // + // Another situation that can present a problem is cluster configurations + // where a significant number of nodes can only have follower replicas. Some options: + // - Add a cluster setting effective_fraction_allocators, that defaults to + // 1.0 that can be adjusted by the operator to the fraction of nodes + // that can do allocation. + // - Estimate the number of allocators based on the gossiped lease count + // by each store. + // + // We currently prefer the second option. + h.allowChanges = true + return + } + h.allowChanges = true + for i := range h.totalDelta { + projectedDelta := h.totalDelta[i] * loadValue(crl.numAllocators) + if float64(projectedDelta) > 0.5*float64(crl.clusterMean.load[i]) { + h.allowChanges = false + return + } + } + for i := range h.totalSecondary { + projectedDelta := h.totalSecondary[i] * loadValue(crl.numAllocators) + if float64(projectedDelta) > 0.5*float64(crl.clusterMean.secondaryLoad[i]) { + h.allowChanges = false + return + } + } +} + // storeState maintains the complete state about a store as known to the // allocator. type storeState struct { @@ -167,6 +330,8 @@ type storeState struct { // to always remove it. loadPendingChanges map[changeID]*pendingReplicaChange + enactedHistory storeEnactedHistory + secondaryLoad secondaryLoadVector // replicas is computed from the authoritative information provided by @@ -192,6 +357,37 @@ type storeState struct { localityTiers } +// The time duration between a change happening at a store, and when the +// effect of that change is seen in the load information computed by that +// store. +// +// TODO(sumeer): set this based on an understanding of ReplicaStats etc. +const lagForChangeReflectedInLoad = 5 * time.Second + +// NB: this is a heuristic. +// +// TODO(sumeer): this interface is just a placeholder. We should integrate +// this into the storeState update function, in which case we can directly +// remove from the loadPendingChanges map. +func (ss *storeState) computePendingChangesReflectedInLatestLoad( + latestLoadTime time.Time, +) []*pendingReplicaChange { + var changes []*pendingReplicaChange + for _, change := range ss.adjusted.loadPendingChanges { + if change.enactedAtTime.IsZero() { + // Not yet enacted, based on the information provided by the + // leaseholder, which is always considered most up-to-date (because of + // AdjustPendingChangesDisposition) + continue + } + // Is enacted. + if latestLoadTime.Sub(change.enactedAtTime) > lagForChangeReflectedInLoad { + changes = append(changes, change) + } + } + return changes +} + // failureDetectionSummary is provided by an external entity and never // computed inside the allocator. type failureDetectionSummary uint8 @@ -450,3 +646,11 @@ var _ = clusterState{}.ranges var _ = clusterState{}.pendingChanges var _ = clusterState{}.constraintMatcher var _ = clusterState{}.localityTierInterner +var _ = (&storeChangeRateLimiter{}).initForRebalancePass +var _ = (&storeChangeRateLimiter{}).updateForRebalancePass +var _ = newStoreChangeRateLimiter +var _ = (&storeEnactedHistory{}).addEnactedChange +var _ = (&storeEnactedHistory{}).allowLoadBasedChanges +var _ = (&storeEnactedHistory{}).gcHistory +var _ = enactedReplicaChange{} +var _ = (&storeState{}).computePendingChangesReflectedInLatestLoad diff --git a/pkg/kv/kvserver/allocator/allocator2/load.go b/pkg/kv/kvserver/allocator/allocator2/load.go index 77ca375f6684..4544c1bd0ba1 100644 --- a/pkg/kv/kvserver/allocator/allocator2/load.go +++ b/pkg/kv/kvserver/allocator/allocator2/load.go @@ -42,6 +42,18 @@ type loadValue int64 // dimension. type loadVector [numLoadDimensions]loadValue +func (lv *loadVector) add(other loadVector) { + for i := range other { + (*lv)[i] += other[i] + } +} + +func (lv *loadVector) subtract(other loadVector) { + for i := range other { + (*lv)[i] -= other[i] + } +} + // A resource can have a capacity, which is also expressed using loadValue. // There are some special case capacity values, enumerated here. const ( @@ -76,6 +88,18 @@ const ( type secondaryLoadVector [numSecondaryLoadDimensions]loadValue +func (lv *secondaryLoadVector) add(other secondaryLoadVector) { + for i := range other { + (*lv)[i] += other[i] + } +} + +func (lv *secondaryLoadVector) subtract(other secondaryLoadVector) { + for i := range other { + (*lv)[i] -= other[i] + } +} + type rangeLoad struct { load loadVector // Nanos per second. raftCPU <= load[cpu]. Handling this as a special case,