Skip to content

Commit

Permalink
kvserver: rebalance ranges to minimize QPS delta among stores
Browse files Browse the repository at this point in the history
This commit fixes the regression(s) introduced by
cockroachdb#65379 where we observed replica
thrashing in various workloads (cockroachdb#70396 and cockroachdb#71244).

The following is a description of the differences between the QPS based
rebalancing scheme used in the previous implementation of the store rebalancer
(release-21.2 and before).

** lease rebalancing **
*** release 21.2 and before ***
QPS based lease rebalancing in CRDB 21.2 considers the overall cluster level
average QPS and computes underfull and overfull thresholds based off of this
average. For each range that the local store has a lease for, the store
rebalancer goroutine checks whether transferring said range's lease away will
bring the local store's QPS below the underfull threshold. If so, it ignores
the range and moves on to the next one. Otherwise, it iterates through the
stores of all the non-leaseholder voting replicas (in ascending order of their
QPS) and checks whether it would be reasonable to transfer the lease away to
such a store. It ensures that the receiving store would not become overfull
after the lease transfer. It checks that the receiving store doesn't have a
replica that's lagging behind the current leaseholder. It checks that the
receiving store is not in violation of lease preferences. Finally, it ensures
that the lease is not on the local store because of access locality
considerations (i.e. because of follow-the-workload).

All of this was bespoke logic that lived in the store rebalancer (using none of
the Allocator's machinery).

*** master and this commit ***
In cockroachdb#65379, we moved this decision making into the Allocator by adding a new
mode in `Allocator.TransferLeaseTarget` that tries to determine whether
transferring the lease to another voting replica would reduce the qps delta
between the hottest and the coldest stores in the replica set. This commit adds
some padding to this logic by ensuring that the qps difference between the
store relinquishing the lease and the store receiving the lease is at least
200qps. Furthermore, it ensures that the store receiving the lease won't become
significantly hotter than the current leaseholder.

** replica rebalancing **
*** release 21.2 and before ***
QPS replica rebalancing in CRDB <=21.2 works similarly to the lease rebalancing
logic. We first compute a cluster level QPS average, overfull and underfull
thresholds. Based on these thresholds we try to move replicas away from
overfull stores and onto stores that are underfull, all while ensuring that the
receiving stores would not become overfull after the rebalance. A critical
assumption that the store rebalancer made (and still does, in the approach
implemented by this commit) is that follower replicas serve the same traffic as
the leaseholder.

*** master and this commit ***
The approach implemented by cockroachdb#65379 and refined by this commit tries to leverage
machinery in the Allocator that makes rebalancing decisions that converge load
based statistics per equivalence class. Previously, this machinery was only
used for range count based replica rebalancing (performed by the
`replicateQueue`) but not for qps-based rebalancing. This commit implements a
similar approach to what we do now for lease rebalancing, which is to determine
whether a rebalance action would reduce the qps delta between the hottest and
the coldest store in the equivalence class. This commit adds some safeguards
around this logic by ensuring that the store relinquishing the replica and the
store receiving it differ by at least 200 qps. Furthermore, it ensures that the
replica rebalance would not significantly switch the relative dispositions of
the two stores.

An important thing to note with the 21.2 implementation of the store rebalancer
is that it was making all of its decisions based on cluster-level QPS averages.
This behaves poorly in heterogenously sized / loaded clusters where some
localities are designed to receive more traffic than others. In such clusters,
heavily loaded localities can always be considered "overfull". This usually
means that all stores in such localities would be above the "overfull"
threshold in the cluster. The logic described above would effectively not do
anything since there are no underfull stores to move replicas to.

Release note (performance improvement): A set of bugs that rendered QPS-based
lease and replica rebalancing in CRDB 21.2 and prior ineffective under
heterogenously loaded cluster localities has been fixed. Additionally a
limitation which prevent CRDB from effectively alleviating extreme QPS hotspots
from nodes has also been fixed.
  • Loading branch information
aayushshah15 committed Jan 16, 2022
1 parent 3937557 commit 1f38a11
Show file tree
Hide file tree
Showing 7 changed files with 1,154 additions and 358 deletions.
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/follower_reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func registerFollowerReads(r registry.Registry) {
name = name + "/insufficient-quorum"
}
r.Add(registry.TestSpec{
Skip: "https://github.com/cockroachdb/cockroach/issues/69817",
Name: name,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(
Expand Down
122 changes: 62 additions & 60 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,97 +1425,99 @@ func (a *Allocator) TransferLeaseTarget(
return candidates[a.randGen.Intn(len(candidates))]

case qpsConvergence:
leaseReplQPS, _ := stats.avgQPS()
candidates := make([]roachpb.StoreID, 0, len(existing)-1)
for _, repl := range existing {
if repl.StoreID != leaseRepl.StoreID() {
candidates = append(candidates, repl.StoreID)
}
}

// When the goal is to further QPS convergence across stores, we ensure that
// any lease transfer decision we make *reduces the delta between the store
// serving the highest QPS and the store serving the lowest QPS* among our
// list of candidates.
bestStore, noRebalanceReason := bestStoreToMinimizeQPSDelta(
leaseReplQPS,
qpsRebalanceThreshold.Get(&a.storePool.st.SV),
minQPSDifferenceForTransfers.Get(&a.storePool.st.SV),
leaseRepl.StoreID(),
candidates,
storeDescMap,
)

// Create a separate map of store_id -> qps that we can manipulate in order
// to simulate the resulting QPS distribution of various potential lease
// transfer decisions.
storeQPSMap := make(map[roachpb.StoreID]float64)
for _, storeDesc := range storeDescMap {
storeQPSMap[storeDesc.StoreID] = storeDesc.Capacity.QueriesPerSecond
}

leaseholderStoreQPS, ok := storeQPSMap[leaseRepl.StoreID()]
if !ok {
switch noRebalanceReason {
case noBetterCandidate:
log.VEventf(ctx, 5, "r%d: could not find a better target for lease", leaseRepl.GetRangeID())
return roachpb.ReplicaDescriptor{}
case existingNotOverfull:
log.VEventf(
ctx, 3, "cannot find store descriptor for leaseholder s%d;"+
" skipping this range", leaseRepl.StoreID(),
ctx, 5, "r%d: existing leaseholder s%d is not overfull",
leaseRepl.GetRangeID(), leaseRepl.StoreID(),
)
return roachpb.ReplicaDescriptor{}
}

leaseholderReplQPS, _ := stats.avgQPS()
currentDelta := getQPSDelta(storeQPSMap, existing)
bestOption := getCandidateWithMinQPS(storeQPSMap, existing)
if bestOption != (roachpb.ReplicaDescriptor{}) && bestOption.StoreID != leaseRepl.StoreID() &&
// It is always beneficial to transfer the lease to the coldest candidate
// if the range's own qps is smaller than the difference between the
// leaseholder store and the candidate store. This will always drive down
// the difference between those two stores, which should always drive down
// the difference between the store serving the highest QPS and the store
// serving the lowest QPS.
//
// TODO(aayush): We should think about whether we need any padding here.
// Not adding any sort of padding could make this a little sensitive, but
// there are some downsides to doing so. If the padding here is too high,
// we're going to keep ignoring opportunities for lease transfers for
// ranges with low QPS. This can add up and prevent us from achieving
// convergence in cases where we're dealing with a ton of very low-QPS
// ranges.
(leaseholderStoreQPS-leaseholderReplQPS) > storeQPSMap[bestOption.StoreID] {
storeQPSMap[leaseRepl.StoreID()] -= leaseholderReplQPS
storeQPSMap[bestOption.StoreID] += leaseholderReplQPS
minDelta := getQPSDelta(storeQPSMap, existing)
case deltaNotSignificant:
log.VEventf(
ctx,
3,
"lease transfer to s%d would reduce the QPS delta between this ranges' stores from %.2f to %.2f",
bestOption.StoreID,
currentDelta,
minDelta,
ctx, 5,
"r%d: delta between s%d and the coldest follower (ignoring r%d's lease) is not large enough",
leaseRepl.GetRangeID(), leaseRepl.StoreID(), leaseRepl.GetRangeID(),
)
return bestOption
return roachpb.ReplicaDescriptor{}
case significantlySwitchesRelativeDisposition:
log.VEventf(ctx, 5,
"r%d: lease transfer away from s%d would make it hotter than the coldest follower",
leaseRepl.GetRangeID(), leaseRepl.StoreID())
return roachpb.ReplicaDescriptor{}
case missingStatsForExistingStore:
log.VEventf(
ctx, 5, "r%d: missing stats for leaseholder s%d",
leaseRepl.GetRangeID(), leaseRepl.StoreID(),
)
return roachpb.ReplicaDescriptor{}
case none:
default:
log.Fatalf(ctx, "unknown declineReason: %v", noRebalanceReason)
}
return roachpb.ReplicaDescriptor{}

for _, repl := range existing {
if repl.StoreID == bestStore {
return repl
}
}
panic("unreachable")
default:
log.Fatalf(ctx, "unexpected lease transfer goal %d", g)
}
panic("unreachable")
}

// getCandidateWithMinQPS returns the `ReplicaDescriptor` that belongs to the
// store serving the lowest QPS among all the `existing` replicas.
// getCandidateWithMinQPS returns the StoreID that belongs to the store serving
// the lowest QPS among all the `candidates` stores.
func getCandidateWithMinQPS(
storeQPSMap map[roachpb.StoreID]float64, existing []roachpb.ReplicaDescriptor,
) roachpb.ReplicaDescriptor {
storeQPSMap map[roachpb.StoreID]float64, candidates []roachpb.StoreID,
) (bestCandidate roachpb.StoreID) {
minCandidateQPS := math.MaxFloat64
var candidateWithMin roachpb.ReplicaDescriptor
for _, repl := range existing {
candidateQPS, ok := storeQPSMap[repl.StoreID]
for _, store := range candidates {
candidateQPS, ok := storeQPSMap[store]
if !ok {
continue
}
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
candidateWithMin = repl
bestCandidate = store
}
}
return candidateWithMin
return bestCandidate
}

// getQPSDelta returns the difference between the store serving the highest QPS
// and the store serving the lowest QPS, among the set of stores that have an
// `existing` replica.
func getQPSDelta(
storeQPSMap map[roachpb.StoreID]float64, existing []roachpb.ReplicaDescriptor,
) float64 {
// and the store serving the lowest QPS, among the set of stores in the
// `domain`.
func getQPSDelta(storeQPSMap map[roachpb.StoreID]float64, domain []roachpb.StoreID) float64 {
maxCandidateQPS := float64(0)
minCandidateQPS := math.MaxFloat64
for _, repl := range existing {
candidateQPS, ok := storeQPSMap[repl.StoreID]
for _, cand := range domain {
candidateQPS, ok := storeQPSMap[cand]
if !ok {
continue
}
Expand Down
Loading

0 comments on commit 1f38a11

Please sign in to comment.