Skip to content

Commit

Permalink
kvserver: promote QPS convergence during load-based lease rebalancing
Browse files Browse the repository at this point in the history
This commit augments `TransferLeaseTarget()` by adding a mode that picks
the best lease transfer target that would lead to QPS convergence across
the stores that have a replica for a given range.

This commit implements a strategy that predicates lease transfer decisions on
whether they would serve to reduce the QPS delta between existing replicas'
stores.

Resolves cockroachdb#31135

Release justification: Fixes high priority bug

Release note (bug fix): Previously, the store rebalancer was unable to
rebalance leases for hot ranges that received a disproportionate amount
of traffic relative to the rest of the cluster. This often led to
prolonged single node hotspots in certain workloads that led to hot
ranges. This bug is now fixed.
  • Loading branch information
aayushshah15 committed Sep 8, 2021
1 parent 5af9094 commit 590c527
Show file tree
Hide file tree
Showing 8 changed files with 553 additions and 321 deletions.
262 changes: 189 additions & 73 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,15 @@ type Allocator struct {
storePool *StorePool
nodeLatencyFn func(addr string) (time.Duration, bool)
randGen allocatorRand

knobs *AllocatorTestingKnobs
}

// MakeAllocator creates a new allocator using the specified StorePool.
func MakeAllocator(
storePool *StorePool, nodeLatencyFn func(addr string) (time.Duration, bool),
storePool *StorePool,
nodeLatencyFn func(addr string) (time.Duration, bool),
knobs *AllocatorTestingKnobs,
) Allocator {
var randSource rand.Source
// There are number of test cases that make a test store but don't add
Expand All @@ -373,6 +377,7 @@ func MakeAllocator(
storePool: storePool,
nodeLatencyFn: nodeLatencyFn,
randGen: makeAllocatorRand(randSource),
knobs: knobs,
}
}

Expand Down Expand Up @@ -1271,15 +1276,16 @@ func (a *Allocator) TransferLeaseTarget(
GetRangeID() roachpb.RangeID
},
stats *replicaStats,
checkTransferLeaseSource bool,
checkCandidateFullness bool,
alwaysAllowDecisionWithoutStats bool,
forceDecisionWithoutStats bool,
opts transferLeaseOptions,
) roachpb.ReplicaDescriptor {
allStoresList, _, _ := a.storePool.getStoreList(storeFilterNone)
storeDescMap := storeListToMap(allStoresList)

sl, _, _ := a.storePool.getStoreList(storeFilterSuspect)
sl = sl.excludeInvalid(conf.Constraints)
sl = sl.excludeInvalid(conf.VoterConstraints)
// The only thing we use the storeList for is for the lease mean across the
// eligible stores, make that explicit here.

candidateLeasesMean := sl.candidateLeases.mean

source, ok := a.storePool.getStoreDescriptor(leaseRepl.StoreID())
Expand All @@ -1292,6 +1298,7 @@ func (a *Allocator) TransferLeaseTarget(
// store matches, it's where the lease should be (unless the preferred store
// is the current one and checkTransferLeaseSource is false).
var preferred []roachpb.ReplicaDescriptor
checkTransferLeaseSource := opts.checkTransferLeaseSource
if checkTransferLeaseSource {
preferred = a.preferredLeaseholders(conf, existing)
} else {
Expand Down Expand Up @@ -1328,90 +1335,192 @@ func (a *Allocator) TransferLeaseTarget(
// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */)

// Only proceed with the lease transfer if we are also the raft leader (we
// already know we are the leaseholder at this point), and only consider
// replicas that are in `StateReplicate` as potential candidates.
//
// NB: The RaftStatus() only returns a non-empty and non-nil result on the
// Raft leader (since Raft followers do not track the progress of other
// replicas, only the leader does).
//
// NB: On every Raft tick, we try to ensure that leadership is collocated with
// leaseholdership (see
// Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that
// on a range that is not already borked (i.e. can accept writes), periods of
// leader/leaseholder misalignment should be ephemeral and rare. We choose to
// be pessimistic here and choose to bail on the lease transfer, as opposed to
// potentially transferring the lease to a replica that may be waiting for a
// snapshot (which will wedge the range until the replica applies that
// snapshot).
existing = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), existing)
if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots {
// Only proceed with the lease transfer if we are also the raft leader (we
// already know we are the leaseholder at this point), and only consider
// replicas that are in `StateReplicate` as potential candidates.
//
// NB: The RaftStatus() only returns a non-empty and non-nil result on the
// Raft leader (since Raft followers do not track the progress of other
// replicas, only the leader does).
//
// NB: On every Raft tick, we try to ensure that leadership is collocated with
// leaseholdership (see
// Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that
// on a range that is not already borked (i.e. can accept writes), periods of
// leader/leaseholder misalignment should be ephemeral and rare. We choose to
// be pessimistic here and choose to bail on the lease transfer, as opposed to
// potentially transferring the lease to a replica that may be waiting for a
// snapshot (which will wedge the range until the replica applies that
// snapshot).
existing = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), existing)
}

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) {
log.VEventf(ctx, 2, "no lease transfer target found for r%d", leaseRepl.GetRangeID())
return roachpb.ReplicaDescriptor{}
}

// Try to pick a replica to transfer the lease to while also determining
// whether we actually should be transferring the lease. The transfer
// decision is only needed if we've been asked to check the source.
transferDec, repl := a.shouldTransferLeaseUsingStats(
ctx, source, existing, stats, nil, candidateLeasesMean,
)
if checkTransferLeaseSource {
switch transferDec {
case shouldNotTransfer:
if !alwaysAllowDecisionWithoutStats {
return roachpb.ReplicaDescriptor{}
switch g := opts.goal; g {
case followTheWorkload:
// Try to pick a replica to transfer the lease to while also determining
// whether we actually should be transferring the lease. The transfer
// decision is only needed if we've been asked to check the source.
transferDec, repl := a.shouldTransferLeaseForAccessLocality(
ctx, source, existing, stats, nil, candidateLeasesMean,
)
if checkTransferLeaseSource {
switch transferDec {
case shouldNotTransfer:
if !forceDecisionWithoutStats {
return roachpb.ReplicaDescriptor{}
}
fallthrough
case decideWithoutStats:
if !a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) {
return roachpb.ReplicaDescriptor{}
}
case shouldTransfer:
default:
log.Fatalf(ctx, "unexpected transfer decision %d with replica %+v", transferDec, repl)
}
}
if repl != (roachpb.ReplicaDescriptor{}) {
return repl
}
fallthrough

case leaseCountConvergence:
// Fall back to logic that doesn't take request counts and latency into
// account if the counts/latency-based logic couldn't pick a best replica.
candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing))
var bestOption roachpb.ReplicaDescriptor
bestOptionLeaseCount := int32(math.MaxInt32)
for _, repl := range existing {
if leaseRepl.StoreID() == repl.StoreID {
continue
}
storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID)
if !ok {
continue
}
fallthrough
case decideWithoutStats:
if !a.shouldTransferLeaseWithoutStats(ctx, sl, source, existing) {
return roachpb.ReplicaDescriptor{}
if !opts.checkCandidateFullness || float64(storeDesc.Capacity.LeaseCount) < candidateLeasesMean-0.5 {
candidates = append(candidates, repl)
} else if storeDesc.Capacity.LeaseCount < bestOptionLeaseCount {
bestOption = repl
bestOptionLeaseCount = storeDesc.Capacity.LeaseCount
}
case shouldTransfer:
default:
log.Fatalf(ctx, "unexpected transfer decision %d with replica %+v", transferDec, repl)
}
}
if len(candidates) == 0 {
// If we aren't supposed to be considering the current leaseholder (e.g.
// because we need to remove this replica for some reason), return
// our best option if we otherwise wouldn't want to do anything.
if !checkTransferLeaseSource {
return bestOption
}
return roachpb.ReplicaDescriptor{}
}
a.randGen.Lock()
defer a.randGen.Unlock()
return candidates[a.randGen.Intn(len(candidates))]

case qpsConvergence:
// When the goal is to further QPS convergence across stores, we ensure that
// any lease transfer decision we make *reduces the delta between the store
// serving the highest QPS and the store serving the lowest QPS* among our
// list of candidates.

// Create a separate map of store_id -> qps that we can manipulate in order
// to simulate the resulting QPS distribution of various potential lease
// transfer decisions.
storeQPSMap := make(map[roachpb.StoreID]float64)
for _, storeDesc := range storeDescMap {
storeQPSMap[storeDesc.StoreID] = storeDesc.Capacity.QueriesPerSecond
}

if repl != (roachpb.ReplicaDescriptor{}) {
return repl
leaseholderStoreQPS, ok := storeQPSMap[leaseRepl.StoreID()]
if !ok {
log.VEventf(
ctx, 3, "cannot find store descriptor for leaseholder s%d;"+
" skipping this range", leaseRepl.StoreID(),
)
return roachpb.ReplicaDescriptor{}
}

leaseholderReplQPS, _ := stats.avgQPS()
currentDelta := getQPSDelta(storeQPSMap, existing)
bestOption := getCandidateWithMinQPS(storeQPSMap, existing)
if bestOption != (roachpb.ReplicaDescriptor{}) &&
(leaseholderStoreQPS-leaseholderReplQPS) > storeQPSMap[bestOption.StoreID] {
// TODO(aayush): We should think about whether we need any padding here.
// Not adding any sort of padding could make this a little sensitive, but
// there are some downsides to doing so. If the padding here is too high,
// we're going to keep ignoring opportunities for lease transfers for
// ranges with low QPS. This can add up and prevent us from achieving
// convergence in cases where we're dealing with a ton of very low-QPS
// ranges.
storeQPSMap[leaseRepl.StoreID()] -= leaseholderReplQPS
storeQPSMap[bestOption.StoreID] += leaseholderReplQPS
minDelta := getQPSDelta(storeQPSMap, existing)
log.VEventf(
ctx,
3,
"lease transfer to s%d would reduce the QPS delta between this ranges' stores from %.2f to %.2f",
bestOption.StoreID,
currentDelta,
minDelta,
)
return bestOption
}
return roachpb.ReplicaDescriptor{}
default:
log.Fatalf(ctx, "unexpected lease transfer goal %d", g)
}
panic("unreachable")
}

// Fall back to logic that doesn't take request counts and latency into
// account if the counts/latency-based logic couldn't pick a best replica.
candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing))
var bestOption roachpb.ReplicaDescriptor
bestOptionLeaseCount := int32(math.MaxInt32)
// getCandidateWithMinQPS returns the `ReplicaDescriptor` that belongs to the
// store serving the lowest QPS among all the `existing` replicas.
func getCandidateWithMinQPS(
storeQPSMap map[roachpb.StoreID]float64, existing []roachpb.ReplicaDescriptor,
) roachpb.ReplicaDescriptor {
minCandidateQPS := math.MaxFloat64
var candidateWithMin roachpb.ReplicaDescriptor
for _, repl := range existing {
if leaseRepl.StoreID() == repl.StoreID {
candidateQPS, ok := storeQPSMap[repl.StoreID]
if !ok {
continue
}
storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID)
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
candidateWithMin = repl
}
}
return candidateWithMin
}

// getQPSDelta returns the difference between the store serving the highest QPS
// and the store serving the lowest QPS, among the set of stores that have an
// `existing` replica.
func getQPSDelta(
storeQPSMap map[roachpb.StoreID]float64, existing []roachpb.ReplicaDescriptor,
) float64 {
maxCandidateQPS := float64(0)
minCandidateQPS := math.MaxFloat64
for _, repl := range existing {
candidateQPS, ok := storeQPSMap[repl.StoreID]
if !ok {
continue
}
if !checkCandidateFullness || float64(storeDesc.Capacity.LeaseCount) < candidateLeasesMean-0.5 {
candidates = append(candidates, repl)
} else if storeDesc.Capacity.LeaseCount < bestOptionLeaseCount {
bestOption = repl
bestOptionLeaseCount = storeDesc.Capacity.LeaseCount
if maxCandidateQPS < candidateQPS {
maxCandidateQPS = candidateQPS
}
}
if len(candidates) == 0 {
// If we aren't supposed to be considering the current leaseholder (e.g.
// because we need to remove this replica for some reason), return
// our best option if we otherwise wouldn't want to do anything.
if !checkTransferLeaseSource {
return bestOption
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
}
return roachpb.ReplicaDescriptor{}
}
a.randGen.Lock()
defer a.randGen.Unlock()
return candidates[a.randGen.Intn(len(candidates))]
return maxCandidateQPS - minCandidateQPS
}

// ShouldTransferLease returns true if the specified store is overfull in terms
Expand Down Expand Up @@ -1457,15 +1566,22 @@ func (a *Allocator) ShouldTransferLease(
return false
}

transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, source, existing, stats, nil, sl.candidateLeases.mean)
transferDec, _ := a.shouldTransferLeaseForAccessLocality(
ctx,
source,
existing,
stats,
nil,
sl.candidateLeases.mean,
)
var result bool
switch transferDec {
case shouldNotTransfer:
result = false
case shouldTransfer:
result = true
case decideWithoutStats:
result = a.shouldTransferLeaseWithoutStats(ctx, sl, source, existing)
result = a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing)
default:
log.Fatalf(ctx, "unexpected transfer decision %d", transferDec)
}
Expand All @@ -1483,7 +1599,7 @@ func (a Allocator) followTheWorkloadPrefersLocal(
stats *replicaStats,
) bool {
adjustments := make(map[roachpb.StoreID]float64)
decision, _ := a.shouldTransferLeaseUsingStats(ctx, source, existing, stats, adjustments, sl.candidateLeases.mean)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, stats, adjustments, sl.candidateLeases.mean)
if decision == decideWithoutStats {
return false
}
Expand All @@ -1497,7 +1613,7 @@ func (a Allocator) followTheWorkloadPrefersLocal(
return false
}

func (a Allocator) shouldTransferLeaseUsingStats(
func (a Allocator) shouldTransferLeaseForAccessLocality(
ctx context.Context,
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
Expand Down Expand Up @@ -1681,7 +1797,7 @@ func loadBasedLeaseRebalanceScore(
return totalScore, rebalanceAdjustment
}

func (a Allocator) shouldTransferLeaseWithoutStats(
func (a Allocator) shouldTransferLeaseForLeaseCountConvergence(
ctx context.Context,
sl StoreList,
source roachpb.StoreDescriptor,
Expand Down
Loading

0 comments on commit 590c527

Please sign in to comment.