Skip to content

Commit

Permalink
Merge #104559
Browse files Browse the repository at this point in the history
104559: allocator2: change rate limiter etc. r=kvoli a=sumeerbhola

storeChangeRateLimiter and storeEnactedHistory are introduced to ensure
that N distributed allocators operating concurrently do not add or remove
more than 50% of the cluster mean load at a store.

storeState.computePendingChangesReflectedInLatestLoad is a heuristic
introduced to decide when a pendingReplicaChange that has been enacted,
can be assumed to be reflected in the StoreCapacity that is gossiped.

Informs #103320

Epic: CRDB-25222

Release note: None


Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
  • Loading branch information
craig[bot] and sumeerbhola committed Jun 15, 2023
2 parents 8d9cbc4 + cc6eda2 commit 5dc031e
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/allocator/allocator2/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ type Allocator interface {
// of integration, we could relax this and synthesize this list as a
// function of (a) timestamp of the gossiped load, (b) timestamp of the
// proposed change, (c) timestamp of the latest leaseholderStores slice
// that shows the change as being enacted.
// that shows the change as being enacted. See
// storeState.computePendingChangesReflectedInLatestLoad for such an
// attempt.
//
// - storeLoadMsg.storeRanges does not need to be the complete list of
// ranges -- it can be filtered down to only the ranges for which the
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/allocator/allocator2/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type allocatorState struct {

meansMemo *meansMemo
diversityScoringMemo *diversityScoringMemo

// TODO(kvoli,sumeer): initialize and use.
changeRangeLimiter *storeChangeRateLimiter
}

func newAllocatorState() *allocatorState {
Expand Down Expand Up @@ -345,6 +348,7 @@ func (erl *existingReplicaLocalities) getScoreSum(replica localityTiers) float64
var _ = newAllocatorState
var _ = (&allocatorState{}).computeChanges
var _ = (&allocatorState{}).computeCandidatesForRange
var _ = allocatorState{}.changeRangeLimiter
var _ = (&existingReplicaLocalities{}).clear
var _ = replicasLocalityTiers{}.hash
var _ = replicasLocalityTiers{}.isEqual
Expand Down
204 changes: 204 additions & 0 deletions pkg/kv/kvserver/allocator/allocator2/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package allocator2

import (
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -128,6 +129,168 @@ const (
removed
)

// storeChangeRateLimiter and helpers.
//
// Usage:
// - construct a single storeChangeRateLimiter for the allocator using
// newStoreChangeRateLimiter. If all allocators are operating at 10s
// intervals, a gcThreshold of 15s should be enough.
// - storeEnactedHistory is a member of storeState, so one per store.
// - storeEnactedHistory.addEnactedChange must be called every time a change
// is known to be enacted.
// - Before every rebalancing pass, call
// storeChangeRateLimiter.initForRebalancePass. Then iterate over each store
// and call storeChangeRateLimiter.updateForRebalancePass. The latter step
// updates the rate limiting state for the store -- this "rate limiting" is
// simply a bool, which represents whether changes are allowed to the store
// for load reasons (changes for failure reasons, to shed load, are always
// allowed), in this rebalancing pass. Note, changes made during the
// rebalancing pass, and rate limiting those, is not in scope of this rate
// limiter -- that happens via storeState.adjusted.loadPendingChanges.
// - During the rebalancing pass, use
// storeEnactedHistory.allowLoadBasedChanges, to decide whether a store can
// be the source or target for load based rebalancing.
//
// TODO(sumeer): unit test.
//
// TODO(sumeer,kvoli): integrate this into the rest of allocator2.

// enactedReplicaChange is information about a change at a store that was
// enacted. It is an internal implementation detail of storeEnactedHistory and
// storeChangeRateLimiter.
type enactedReplicaChange struct {
// When the change is known to be enacted, based on the authoritative
// information received from the leaseholder.
enactedAtTime time.Time
// The load this change adds to a store. The values are always positive,
// even for load subtraction.
loadDelta loadVector
secondaryDelta secondaryLoadVector
}

// storeEnactedHistory is a member of storeState. Users should only call
// addEnactedChange and allowLoadBasedChanges.
type storeEnactedHistory struct {
changes []enactedReplicaChange
totalDelta loadVector
totalSecondary secondaryLoadVector
allowChanges bool
}

func (h *storeEnactedHistory) addEnactedChange(change *pendingReplicaChange) {
c := enactedReplicaChange{
enactedAtTime: change.enactedAtTime,
loadDelta: change.loadDelta,
}
for i, l := range c.loadDelta {
if l < 0 {
c.loadDelta[i] = -l
}
}
if change.prev.isLeaseholder != change.next.isLeaseholder {
c.secondaryDelta[leaseCount] = 1
}
h.changes = append(h.changes, c)
sort.Slice(h.changes, func(i, j int) bool {
return h.changes[i].enactedAtTime.Before(h.changes[j].enactedAtTime)
})
h.totalDelta.add(c.loadDelta)
h.totalSecondary.add(c.secondaryDelta)
}

func (h *storeEnactedHistory) allowLoadBasedChanges() bool {
return h.allowChanges
}

func (h *storeEnactedHistory) gcHistory(now time.Time, gcThreshold time.Duration) {
i := 0
for ; i < len(h.changes); i++ {
c := h.changes[i]
if now.Sub(c.enactedAtTime) > gcThreshold {
h.totalDelta.subtract(c.loadDelta)
h.totalSecondary.subtract(c.secondaryDelta)
} else {
break
}
}
h.changes = h.changes[i:]
}

// storeChangeRateLimiter is a rate limiter for rebalancing to/from this
// store, for this allocator, under the assumption that the cluster uses many
// (distributed) allocators. The rate limiting heuristic assumes that the
// other allocators are also running with such a rate limiter, and is intended
// to prevent a thundering herd problem where all allocators add/remove too
// much load from a store, resulting in perpetual rebalancing without
// improving the cluster.
type storeChangeRateLimiter struct {
gcThreshold time.Duration
numAllocators int
clusterMean meanStoreLoad
}

func newStoreChangeRateLimiter(gcThreshold time.Duration) *storeChangeRateLimiter {
return &storeChangeRateLimiter{
gcThreshold: gcThreshold,
}
}

func (crl *storeChangeRateLimiter) initForRebalancePass(
numAllocators int, clusterMean meanStoreLoad,
) {
crl.numAllocators = numAllocators
crl.clusterMean = clusterMean
}

// TODO(sumeer): also consider utilization in the cluster and capacity of this
// store, since that is necessary to make this reasonable for heterogeneous
// clusters.

func (crl *storeChangeRateLimiter) updateForRebalancePass(h *storeEnactedHistory, now time.Time) {
h.gcHistory(now, crl.gcThreshold)
if len(h.changes) == 0 {
// If there is nothing in the history, changes are allowed. This is the
// path we will fall into for underloaded clusters where the mean is very
// low and the projectedDelta will become too high even with a single
// change.
//
// Additionally, if the actual number of allocators that can add or remove
// load from a store is much lower than numAllocators (because of
// constraints) we may often need to use this fallback. We do assume that
// typical clusters will be configured in a manner that each store will be
// able to satisfy some constraint for a substantial fraction of the
// ranges in the cluster (say > 50%), so the pessimistic behavior encoded
// in the code below will not significantly slow down allocator changes.
//
// Another situation that can present a problem is cluster configurations
// where a significant number of nodes can only have follower replicas. Some options:
// - Add a cluster setting effective_fraction_allocators, that defaults to
// 1.0 that can be adjusted by the operator to the fraction of nodes
// that can do allocation.
// - Estimate the number of allocators based on the gossiped lease count
// by each store.
//
// We currently prefer the second option.
h.allowChanges = true
return
}
h.allowChanges = true
for i := range h.totalDelta {
projectedDelta := h.totalDelta[i] * loadValue(crl.numAllocators)
if float64(projectedDelta) > 0.5*float64(crl.clusterMean.load[i]) {
h.allowChanges = false
return
}
}
for i := range h.totalSecondary {
projectedDelta := h.totalSecondary[i] * loadValue(crl.numAllocators)
if float64(projectedDelta) > 0.5*float64(crl.clusterMean.secondaryLoad[i]) {
h.allowChanges = false
return
}
}
}

// storeState maintains the complete state about a store as known to the
// allocator.
type storeState struct {
Expand Down Expand Up @@ -167,6 +330,8 @@ type storeState struct {
// to always remove it.
loadPendingChanges map[changeID]*pendingReplicaChange

enactedHistory storeEnactedHistory

secondaryLoad secondaryLoadVector

// replicas is computed from the authoritative information provided by
Expand All @@ -192,6 +357,37 @@ type storeState struct {
localityTiers
}

// The time duration between a change happening at a store, and when the
// effect of that change is seen in the load information computed by that
// store.
//
// TODO(sumeer): set this based on an understanding of ReplicaStats etc.
const lagForChangeReflectedInLoad = 5 * time.Second

// NB: this is a heuristic.
//
// TODO(sumeer): this interface is just a placeholder. We should integrate
// this into the storeState update function, in which case we can directly
// remove from the loadPendingChanges map.
func (ss *storeState) computePendingChangesReflectedInLatestLoad(
latestLoadTime time.Time,
) []*pendingReplicaChange {
var changes []*pendingReplicaChange
for _, change := range ss.adjusted.loadPendingChanges {
if change.enactedAtTime.IsZero() {
// Not yet enacted, based on the information provided by the
// leaseholder, which is always considered most up-to-date (because of
// AdjustPendingChangesDisposition)
continue
}
// Is enacted.
if latestLoadTime.Sub(change.enactedAtTime) > lagForChangeReflectedInLoad {
changes = append(changes, change)
}
}
return changes
}

// failureDetectionSummary is provided by an external entity and never
// computed inside the allocator.
type failureDetectionSummary uint8
Expand Down Expand Up @@ -450,3 +646,11 @@ var _ = clusterState{}.ranges
var _ = clusterState{}.pendingChanges
var _ = clusterState{}.constraintMatcher
var _ = clusterState{}.localityTierInterner
var _ = (&storeChangeRateLimiter{}).initForRebalancePass
var _ = (&storeChangeRateLimiter{}).updateForRebalancePass
var _ = newStoreChangeRateLimiter
var _ = (&storeEnactedHistory{}).addEnactedChange
var _ = (&storeEnactedHistory{}).allowLoadBasedChanges
var _ = (&storeEnactedHistory{}).gcHistory
var _ = enactedReplicaChange{}
var _ = (&storeState{}).computePendingChangesReflectedInLatestLoad
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/allocator/allocator2/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ type loadValue int64
// dimension.
type loadVector [numLoadDimensions]loadValue

func (lv *loadVector) add(other loadVector) {
for i := range other {
(*lv)[i] += other[i]
}
}

func (lv *loadVector) subtract(other loadVector) {
for i := range other {
(*lv)[i] -= other[i]
}
}

// A resource can have a capacity, which is also expressed using loadValue.
// There are some special case capacity values, enumerated here.
const (
Expand Down Expand Up @@ -76,6 +88,18 @@ const (

type secondaryLoadVector [numSecondaryLoadDimensions]loadValue

func (lv *secondaryLoadVector) add(other secondaryLoadVector) {
for i := range other {
(*lv)[i] += other[i]
}
}

func (lv *secondaryLoadVector) subtract(other secondaryLoadVector) {
for i := range other {
(*lv)[i] -= other[i]
}
}

type rangeLoad struct {
load loadVector
// Nanos per second. raftCPU <= load[cpu]. Handling this as a special case,
Expand Down

0 comments on commit 5dc031e

Please sign in to comment.