From d61f474635f814bc420a61a11e6660b8882e05f0 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Mon, 23 Aug 2021 22:10:41 -0400 Subject: [PATCH] kvserver: promote QPS convergence during load-based lease rebalancing This commit augments `TransferLeaseTarget()` by adding a mode that picks the best lease transfer target that would lead to QPS convergence across the stores that have a replica for a given range. This commit implements a strategy that predicates lease transfer decisions on whether they would serve to reduce the QPS delta between existing replicas' stores. Resolves https://github.com/cockroachdb/cockroach/issues/31135 Release justification: Fixes high priority bug Release note (bug fix): Previously, the store rebalancer was unable to rebalance leases for hot ranges that received a disproportionate amount of traffic relative to the rest of the cluster. This often led to prolonged single node hotspots in certain workloads that led to hot ranges. This bug is now fixed. --- pkg/kv/kvserver/allocator.go | 269 +++++++++++++++++------ pkg/kv/kvserver/allocator_test.go | 156 ++++++++----- pkg/kv/kvserver/replica_stats.go | 12 + pkg/kv/kvserver/replicate_queue.go | 18 +- pkg/kv/kvserver/store.go | 15 +- pkg/kv/kvserver/store_rebalancer.go | 207 ++++++----------- pkg/kv/kvserver/store_rebalancer_test.go | 195 ++++++++++++---- pkg/kv/kvserver/testing_knobs.go | 9 + 8 files changed, 560 insertions(+), 321 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 25c1657ec1c1..694347335798 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -354,11 +354,15 @@ type Allocator struct { storePool *StorePool nodeLatencyFn func(addr string) (time.Duration, bool) randGen allocatorRand + + knobs *AllocatorTestingKnobs } // MakeAllocator creates a new allocator using the specified StorePool. func MakeAllocator( - storePool *StorePool, nodeLatencyFn func(addr string) (time.Duration, bool), + storePool *StorePool, + nodeLatencyFn func(addr string) (time.Duration, bool), + knobs *AllocatorTestingKnobs, ) Allocator { var randSource rand.Source // There are number of test cases that make a test store but don't add @@ -373,6 +377,7 @@ func MakeAllocator( storePool: storePool, nodeLatencyFn: nodeLatencyFn, randGen: makeAllocatorRand(randSource), + knobs: knobs, } } @@ -1271,15 +1276,16 @@ func (a *Allocator) TransferLeaseTarget( GetRangeID() roachpb.RangeID }, stats *replicaStats, - checkTransferLeaseSource bool, - checkCandidateFullness bool, - alwaysAllowDecisionWithoutStats bool, + forceDecisionWithoutStats bool, + opts transferLeaseOptions, ) roachpb.ReplicaDescriptor { + allStoresList, _, _ := a.storePool.getStoreList(storeFilterNone) + storeDescMap := storeListToMap(allStoresList) + sl, _, _ := a.storePool.getStoreList(storeFilterSuspect) sl = sl.excludeInvalid(conf.Constraints) sl = sl.excludeInvalid(conf.VoterConstraints) - // The only thing we use the storeList for is for the lease mean across the - // eligible stores, make that explicit here. + candidateLeasesMean := sl.candidateLeases.mean source, ok := a.storePool.getStoreDescriptor(leaseRepl.StoreID()) @@ -1292,6 +1298,7 @@ func (a *Allocator) TransferLeaseTarget( // store matches, it's where the lease should be (unless the preferred store // is the current one and checkTransferLeaseSource is false). var preferred []roachpb.ReplicaDescriptor + checkTransferLeaseSource := opts.checkTransferLeaseSource if checkTransferLeaseSource { preferred = a.preferredLeaseholders(conf, existing) } else { @@ -1328,24 +1335,26 @@ func (a *Allocator) TransferLeaseTarget( // Only consider live, non-draining, non-suspect replicas. existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */) - // Only proceed with the lease transfer if we are also the raft leader (we - // already know we are the leaseholder at this point), and only consider - // replicas that are in `StateReplicate` as potential candidates. - // - // NB: The RaftStatus() only returns a non-empty and non-nil result on the - // Raft leader (since Raft followers do not track the progress of other - // replicas, only the leader does). - // - // NB: On every Raft tick, we try to ensure that leadership is collocated with - // leaseholdership (see - // Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that - // on a range that is not already borked (i.e. can accept writes), periods of - // leader/leaseholder misalignment should be ephemeral and rare. We choose to - // be pessimistic here and choose to bail on the lease transfer, as opposed to - // potentially transferring the lease to a replica that may be waiting for a - // snapshot (which will wedge the range until the replica applies that - // snapshot). - existing = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), existing) + if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots { + // Only proceed with the lease transfer if we are also the raft leader (we + // already know we are the leaseholder at this point), and only consider + // replicas that are in `StateReplicate` as potential candidates. + // + // NB: The RaftStatus() only returns a non-empty and non-nil result on the + // Raft leader (since Raft followers do not track the progress of other + // replicas, only the leader does). + // + // NB: On every Raft tick, we try to ensure that leadership is collocated with + // leaseholdership (see + // Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that + // on a range that is not already borked (i.e. can accept writes), periods of + // leader/leaseholder misalignment should be ephemeral and rare. We choose to + // be pessimistic here and choose to bail on the lease transfer, as opposed to + // potentially transferring the lease to a replica that may be waiting for a + // snapshot (which will wedge the range until the replica applies that + // snapshot). + existing = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), existing) + } // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) { @@ -1353,65 +1362,172 @@ func (a *Allocator) TransferLeaseTarget( return roachpb.ReplicaDescriptor{} } - // Try to pick a replica to transfer the lease to while also determining - // whether we actually should be transferring the lease. The transfer - // decision is only needed if we've been asked to check the source. - transferDec, repl := a.shouldTransferLeaseUsingStats( - ctx, source, existing, stats, nil, candidateLeasesMean, - ) - if checkTransferLeaseSource { - switch transferDec { - case shouldNotTransfer: - if !alwaysAllowDecisionWithoutStats { - return roachpb.ReplicaDescriptor{} + switch g := opts.goal; g { + case followTheWorkload: + // Try to pick a replica to transfer the lease to while also determining + // whether we actually should be transferring the lease. The transfer + // decision is only needed if we've been asked to check the source. + transferDec, repl := a.shouldTransferLeaseForAccessLocality( + ctx, source, existing, stats, nil, candidateLeasesMean, + ) + if checkTransferLeaseSource { + switch transferDec { + case shouldNotTransfer: + if !forceDecisionWithoutStats { + return roachpb.ReplicaDescriptor{} + } + fallthrough + case decideWithoutStats: + if !a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) { + return roachpb.ReplicaDescriptor{} + } + case shouldTransfer: + default: + log.Fatalf(ctx, "unexpected transfer decision %d with replica %+v", transferDec, repl) + } + } + if repl != (roachpb.ReplicaDescriptor{}) { + return repl + } + fallthrough + + case leaseCountConvergence: + // Fall back to logic that doesn't take request counts and latency into + // account if the counts/latency-based logic couldn't pick a best replica. + candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)) + var bestOption roachpb.ReplicaDescriptor + bestOptionLeaseCount := int32(math.MaxInt32) + for _, repl := range existing { + if leaseRepl.StoreID() == repl.StoreID { + continue + } + storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) + if !ok { + continue } - fallthrough - case decideWithoutStats: - if !a.shouldTransferLeaseWithoutStats(ctx, sl, source, existing) { - return roachpb.ReplicaDescriptor{} + if !opts.checkCandidateFullness || float64(storeDesc.Capacity.LeaseCount) < candidateLeasesMean-0.5 { + candidates = append(candidates, repl) + } else if storeDesc.Capacity.LeaseCount < bestOptionLeaseCount { + bestOption = repl + bestOptionLeaseCount = storeDesc.Capacity.LeaseCount } - case shouldTransfer: - default: - log.Fatalf(ctx, "unexpected transfer decision %d with replica %+v", transferDec, repl) } - } + if len(candidates) == 0 { + // If we aren't supposed to be considering the current leaseholder (e.g. + // because we need to remove this replica for some reason), return + // our best option if we otherwise wouldn't want to do anything. + if !checkTransferLeaseSource { + return bestOption + } + return roachpb.ReplicaDescriptor{} + } + a.randGen.Lock() + defer a.randGen.Unlock() + return candidates[a.randGen.Intn(len(candidates))] + + case qpsConvergence: + // When the goal is to further QPS convergence across stores, we ensure that + // any lease transfer decision we make *reduces the delta between the store + // serving the highest QPS and the store serving the lowest QPS* among our + // list of candidates. + + // Create a separate map of store_id -> qps that we can manipulate in order + // to simulate the resulting QPS distribution of various potential lease + // transfer decisions. + storeQPSMap := make(map[roachpb.StoreID]float64) + for _, storeDesc := range storeDescMap { + storeQPSMap[storeDesc.StoreID] = storeDesc.Capacity.QueriesPerSecond + } - if repl != (roachpb.ReplicaDescriptor{}) { - return repl + leaseholderStoreQPS, ok := storeQPSMap[leaseRepl.StoreID()] + if !ok { + log.VEventf( + ctx, 3, "cannot find store descriptor for leaseholder s%d;"+ + " skipping this range", leaseRepl.StoreID(), + ) + return roachpb.ReplicaDescriptor{} + } + + leaseholderReplQPS, _ := stats.avgQPS() + currentDelta := getQPSDelta(storeQPSMap, existing) + bestOption := getCandidateWithMinQPS(storeQPSMap, existing) + if bestOption != (roachpb.ReplicaDescriptor{}) && + // It is always beneficial to transfer the lease to the coldest candidate + // if the range's own qps is smaller than the difference between the + // leaseholder store and the candidate store. This will always drive down + // the difference between those two stores, which should always drive down + // the difference between the store serving the highest QPS and the store + // serving the lowest QPS. + // + // TODO(aayush): We should think about whether we need any padding here. + // Not adding any sort of padding could make this a little sensitive, but + // there are some downsides to doing so. If the padding here is too high, + // we're going to keep ignoring opportunities for lease transfers for + // ranges with low QPS. This can add up and prevent us from achieving + // convergence in cases where we're dealing with a ton of very low-QPS + // ranges. + (leaseholderStoreQPS-leaseholderReplQPS) > storeQPSMap[bestOption.StoreID] { + storeQPSMap[leaseRepl.StoreID()] -= leaseholderReplQPS + storeQPSMap[bestOption.StoreID] += leaseholderReplQPS + minDelta := getQPSDelta(storeQPSMap, existing) + log.VEventf( + ctx, + 3, + "lease transfer to s%d would reduce the QPS delta between this ranges' stores from %.2f to %.2f", + bestOption.StoreID, + currentDelta, + minDelta, + ) + return bestOption + } + return roachpb.ReplicaDescriptor{} + default: + log.Fatalf(ctx, "unexpected lease transfer goal %d", g) } + panic("unreachable") +} - // Fall back to logic that doesn't take request counts and latency into - // account if the counts/latency-based logic couldn't pick a best replica. - candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)) - var bestOption roachpb.ReplicaDescriptor - bestOptionLeaseCount := int32(math.MaxInt32) +// getCandidateWithMinQPS returns the `ReplicaDescriptor` that belongs to the +// store serving the lowest QPS among all the `existing` replicas. +func getCandidateWithMinQPS( + storeQPSMap map[roachpb.StoreID]float64, existing []roachpb.ReplicaDescriptor, +) roachpb.ReplicaDescriptor { + minCandidateQPS := math.MaxFloat64 + var candidateWithMin roachpb.ReplicaDescriptor for _, repl := range existing { - if leaseRepl.StoreID() == repl.StoreID { + candidateQPS, ok := storeQPSMap[repl.StoreID] + if !ok { continue } - storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) + if minCandidateQPS > candidateQPS { + minCandidateQPS = candidateQPS + candidateWithMin = repl + } + } + return candidateWithMin +} + +// getQPSDelta returns the difference between the store serving the highest QPS +// and the store serving the lowest QPS, among the set of stores that have an +// `existing` replica. +func getQPSDelta( + storeQPSMap map[roachpb.StoreID]float64, existing []roachpb.ReplicaDescriptor, +) float64 { + maxCandidateQPS := float64(0) + minCandidateQPS := math.MaxFloat64 + for _, repl := range existing { + candidateQPS, ok := storeQPSMap[repl.StoreID] if !ok { continue } - if !checkCandidateFullness || float64(storeDesc.Capacity.LeaseCount) < candidateLeasesMean-0.5 { - candidates = append(candidates, repl) - } else if storeDesc.Capacity.LeaseCount < bestOptionLeaseCount { - bestOption = repl - bestOptionLeaseCount = storeDesc.Capacity.LeaseCount + if maxCandidateQPS < candidateQPS { + maxCandidateQPS = candidateQPS } - } - if len(candidates) == 0 { - // If we aren't supposed to be considering the current leaseholder (e.g. - // because we need to remove this replica for some reason), return - // our best option if we otherwise wouldn't want to do anything. - if !checkTransferLeaseSource { - return bestOption + if minCandidateQPS > candidateQPS { + minCandidateQPS = candidateQPS } - return roachpb.ReplicaDescriptor{} } - a.randGen.Lock() - defer a.randGen.Unlock() - return candidates[a.randGen.Intn(len(candidates))] + return maxCandidateQPS - minCandidateQPS } // ShouldTransferLease returns true if the specified store is overfull in terms @@ -1457,7 +1573,14 @@ func (a *Allocator) ShouldTransferLease( return false } - transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, source, existing, stats, nil, sl.candidateLeases.mean) + transferDec, _ := a.shouldTransferLeaseForAccessLocality( + ctx, + source, + existing, + stats, + nil, + sl.candidateLeases.mean, + ) var result bool switch transferDec { case shouldNotTransfer: @@ -1465,7 +1588,7 @@ func (a *Allocator) ShouldTransferLease( case shouldTransfer: result = true case decideWithoutStats: - result = a.shouldTransferLeaseWithoutStats(ctx, sl, source, existing) + result = a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) default: log.Fatalf(ctx, "unexpected transfer decision %d", transferDec) } @@ -1483,7 +1606,7 @@ func (a Allocator) followTheWorkloadPrefersLocal( stats *replicaStats, ) bool { adjustments := make(map[roachpb.StoreID]float64) - decision, _ := a.shouldTransferLeaseUsingStats(ctx, source, existing, stats, adjustments, sl.candidateLeases.mean) + decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, stats, adjustments, sl.candidateLeases.mean) if decision == decideWithoutStats { return false } @@ -1497,7 +1620,7 @@ func (a Allocator) followTheWorkloadPrefersLocal( return false } -func (a Allocator) shouldTransferLeaseUsingStats( +func (a Allocator) shouldTransferLeaseForAccessLocality( ctx context.Context, source roachpb.StoreDescriptor, existing []roachpb.ReplicaDescriptor, @@ -1681,7 +1804,7 @@ func loadBasedLeaseRebalanceScore( return totalScore, rebalanceAdjustment } -func (a Allocator) shouldTransferLeaseWithoutStats( +func (a Allocator) shouldTransferLeaseForLeaseCountConvergence( ctx context.Context, sl StoreList, source roachpb.StoreDescriptor, diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 8d36cbb18a47..f3a991fe7fc5 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -373,6 +373,15 @@ func replicas(storeIDs ...roachpb.StoreID) []roachpb.ReplicaDescriptor { // use in tests. Stopper must be stopped by the caller. func createTestAllocator( numNodes int, deterministic bool, +) (*stop.Stopper, *gossip.Gossip, *StorePool, Allocator, *hlc.ManualClock) { + return createTestAllocatorWithKnobs(numNodes, deterministic, nil /* knobs */) +} + +// createTestAllocatorWithKnobs is like `createTestAllocator`, but allows the +// caller to pass in custom AllocatorTestingKnobs. Stopper must be stopped by +// the caller. +func createTestAllocatorWithKnobs( + numNodes int, deterministic bool, knobs *AllocatorTestingKnobs, ) (*stop.Stopper, *gossip.Gossip, *StorePool, Allocator, *hlc.ManualClock) { stopper, g, manual, storePool, _ := createTestStorePool( TestTimeUntilStoreDeadOff, deterministic, @@ -381,7 +390,9 @@ func createTestAllocator( a := MakeAllocator( storePool, func(string) (time.Duration, bool) { return 0, true - }) + }, + knobs, + ) return stopper, g, storePool, a, manual } @@ -1700,10 +1711,12 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { replicationFactor: 3, storeID: c.leaseholder, }, - nil, /* replicaStats */ - c.check, - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: c.check, + checkCandidateFullness: true, + }, ) if c.expected != target.StoreID { t.Fatalf("expected %d, but found %d", c.expected, target.StoreID) @@ -1811,9 +1824,11 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) { c.existing, repl, nil, - c.checkSource, - true, /* checkCandidateFullness */ false, /* alwaysAllowDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: c.checkSource, + checkCandidateFullness: true, + }, ) if c.transferTarget != target.StoreID { t.Fatalf("expected %d, but found %d", c.transferTarget, target.StoreID) @@ -1901,10 +1916,12 @@ func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) { replicationFactor: 3, storeID: c.leaseholder, }, - nil, /* replicaStats */ - true, - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: true, + checkCandidateFullness: true, + }, ) if c.expected != target.StoreID { t.Fatalf("expected %d, but found %d", c.expected, target.StoreID) @@ -1922,9 +1939,11 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { - return 0, true - }) + a := MakeAllocator( + storePool, func(string) (time.Duration, bool) { + return 0, true + }, nil, /* knobs */ + ) defer stopper.Stop(context.Background()) // 3 stores where the lease count for each store is equal to 100x the store @@ -2003,10 +2022,12 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { replicationFactor: 3, storeID: c.leaseholder, }, - nil, /* replicaStats */ - c.check, - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: c.check, + checkCandidateFullness: true, + }, ) if c.expected != target.StoreID { t.Fatalf("expected %d, but found %d", c.expected, target.StoreID) @@ -2293,9 +2314,11 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { - return 0, true - }) + a := MakeAllocator( + storePool, func(string) (time.Duration, bool) { + return 0, true + }, nil, /* knobs */ + ) defer stopper.Stop(context.Background()) // 4 stores where the lease count for each store is equal to 10x the store @@ -2355,9 +2378,11 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { - return 0, true - }) + a := MakeAllocator( + storePool, func(string) (time.Duration, bool) { + return 0, true + }, nil, /* knobs */ + ) defer stopper.Stop(context.Background()) var stores []*roachpb.StoreDescriptor @@ -2533,10 +2558,12 @@ func TestAllocatorLeasePreferences(t *testing.T) { replicationFactor: 5, storeID: c.leaseholder, }, - nil, /* replicaStats */ - true, /* checkTransferLeaseSource */ - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: true, + checkCandidateFullness: true, + }, ) if c.expectedCheckTrue != target.StoreID { t.Errorf("expected s%d for check=true, but found %v", c.expectedCheckTrue, target) @@ -2549,10 +2576,12 @@ func TestAllocatorLeasePreferences(t *testing.T) { replicationFactor: 5, storeID: c.leaseholder, }, - nil, /* replicaStats */ - false, /* checkTransferLeaseSource */ - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: false, + checkCandidateFullness: true, + }, ) if c.expectedCheckFalse != target.StoreID { t.Errorf("expected s%d for check=false, but found %v", c.expectedCheckFalse, target) @@ -2635,10 +2664,12 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { replicationFactor: 6, storeID: c.leaseholder, }, - nil, /* replicaStats */ - true, /* checkTransferLeaseSource */ - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: true, + checkCandidateFullness: true, + }, ) if c.expectedCheckTrue != target.StoreID { t.Errorf("expected s%d for check=true, but found %v", c.expectedCheckTrue, target) @@ -2651,10 +2682,12 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { replicationFactor: 6, storeID: c.leaseholder, }, - nil, /* replicaStats */ - false, /* checkTransferLeaseSource */ - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: false, + checkCandidateFullness: true, + }, ) if c.expectedCheckFalse != target.StoreID { t.Errorf("expected s%d for check=false, but found %v", c.expectedCheckFalse, target) @@ -5057,9 +5090,11 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { - a := MakeAllocator(storePool, func(addr string) (time.Duration, bool) { - return c.latency[addr], true - }) + a := MakeAllocator( + storePool, func(addr string) (time.Duration, bool) { + return c.latency[addr], true + }, nil, /* knobs */ + ) target := a.TransferLeaseTarget( context.Background(), emptySpanConfig(), @@ -5069,9 +5104,12 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { storeID: c.leaseholder, }, c.stats, - c.check, - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + false, + transferLeaseOptions{ + checkTransferLeaseSource: c.check, + checkCandidateFullness: true, + dryRun: false, + }, ) if c.expected != target.StoreID { t.Errorf("expected %d, got %d", c.expected, target.StoreID) @@ -6631,9 +6669,11 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(sp, func(string) (time.Duration, bool) { - return 0, true - }) + a := MakeAllocator( + sp, func(string) (time.Duration, bool) { + return 0, true + }, nil, /* knobs */ + ) ctx := context.Background() defer stopper.Stop(ctx) @@ -6738,7 +6778,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - a := MakeAllocator(nil /* storePool */, nil /* rpcContext */) + a := MakeAllocator(nil /* storePool */, nil /* nodeLatencyFn */, nil /* knobs */) action, priority := a.ComputeAction(context.Background(), roachpb.SpanConfig{}, nil) if action != AllocatorNoop { t.Errorf("expected AllocatorNoop, but got %v", action) @@ -7258,9 +7298,11 @@ func TestAllocatorFullDisks(t *testing.T) { mockNodeLiveness.nodeLivenessFunc, false, /* deterministic */ ) - alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { - return 0, false - }) + alloc := MakeAllocator( + sp, func(string) (time.Duration, bool) { + return 0, false + }, nil, /* knobs */ + ) var wg sync.WaitGroup g.RegisterCallback(gossip.MakePrefixPattern(gossip.KeyStorePrefix), @@ -7404,9 +7446,11 @@ func Example_rebalancing() { newMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).nodeLivenessFunc, /* deterministic */ true, ) - alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { - return 0, false - }) + alloc := MakeAllocator( + sp, func(string) (time.Duration, bool) { + return 0, false + }, nil, /* knobs */ + ) var wg sync.WaitGroup g.RegisterCallback(gossip.MakePrefixPattern(gossip.KeyStorePrefix), diff --git a/pkg/kv/kvserver/replica_stats.go b/pkg/kv/kvserver/replica_stats.go index 77f889d9fbba..51d09555eed0 100644 --- a/pkg/kv/kvserver/replica_stats.go +++ b/pkg/kv/kvserver/replica_stats.go @@ -57,6 +57,9 @@ type replicaStats struct { requests [6]perLocalityCounts lastRotate time.Time lastReset time.Time + + // Testing only. + avgQPSForTesting float64 } } @@ -186,6 +189,9 @@ func (rs *replicaStats) avgQPS() (float64, time.Duration) { rs.mu.Lock() defer rs.mu.Unlock() + if rs.mu.avgQPSForTesting != 0 { + return rs.mu.avgQPSForTesting, 0 + } rs.maybeRotateLocked(now) @@ -224,3 +230,9 @@ func (rs *replicaStats) resetRequestCounts() { rs.mu.lastRotate = timeutil.Unix(0, rs.clock.PhysicalNow()) rs.mu.lastReset = rs.mu.lastRotate } + +func (rs *replicaStats) setAvgQPSForTesting(qps float64) { + rs.mu.Lock() + defer rs.mu.Unlock() + rs.mu.avgQPSForTesting = qps +} diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 7d4042f2c8fe..734f514b0c8d 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -1164,6 +1164,7 @@ func (rq *replicateQueue) considerRebalance( desc, conf, transferLeaseOptions{ + goal: followTheWorkload, checkTransferLeaseSource: true, checkCandidateFullness: true, dryRun: dryRun, @@ -1271,7 +1272,19 @@ func replicationChangesForRebalance( return chgs, performingSwap, nil } +// transferLeaseGoal dictates whether a call to TransferLeaseTarget should +// improve locality of access, convergence of lease counts or convergence of +// QPS. +type transferLeaseGoal int + +const ( + followTheWorkload transferLeaseGoal = iota + leaseCountConvergence + qpsConvergence +) + type transferLeaseOptions struct { + goal transferLeaseGoal checkTransferLeaseSource bool checkCandidateFullness bool dryRun bool @@ -1320,9 +1333,8 @@ func (rq *replicateQueue) shedLease( desc.Replicas().VoterDescriptors(), repl, repl.leaseholderStats, - opts.checkTransferLeaseSource, - opts.checkCandidateFullness, - false, /* alwaysAllowDecisionWithoutStats */ + false, /* forceDecisionWithoutStats */ + opts, ) if target == (roachpb.ReplicaDescriptor{}) { return noSuitableTarget, nil diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 1c40e0d5b58c..5b476a2e7941 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -835,11 +835,18 @@ func NewStore( ctSender: cfg.ClosedTimestampSender, } if cfg.RPCContext != nil { - s.allocator = MakeAllocator(cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency) + s.allocator = MakeAllocator( + cfg.StorePool, + cfg.RPCContext.RemoteClocks.Latency, + cfg.TestingKnobs.AllocatorKnobs, + ) } else { - s.allocator = MakeAllocator(cfg.StorePool, func(string) (time.Duration, bool) { - return 0, false - }) + s.allocator = MakeAllocator( + cfg.StorePool, func(string) (time.Duration, bool) { + return 0, false + }, + cfg.TestingKnobs.AllocatorKnobs, + ) } s.replRankings = newReplicaRankings() diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 55ab046c8d55..d28f86450a8a 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -14,14 +14,12 @@ import ( "context" "math" "math/rand" - "sort" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/contextutil" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -233,7 +231,8 @@ func (sr *StoreRebalancer) rebalanceStore( if !ok { log.Fatalf(ctx, "expected the `StoreRebalancer` to be using a `qpsScorerOptions`") } - qpsMinThreshold := underfullQPSThreshold(options, allStoresList.candidateQueriesPerSecond.mean) + // We only bother rebalancing stores that are fielding more than the + // cluster-level overfull threshold of QPS. qpsMaxThreshold := overfullQPSThreshold(options, allStoresList.candidateQueriesPerSecond.mean) var localDesc *roachpb.StoreDescriptor @@ -264,14 +263,17 @@ func (sr *StoreRebalancer) rebalanceStore( hottestRanges := sr.replRankings.topQPS() for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { replWithStats, target, considerForRebalance := sr.chooseLeaseToTransfer( - ctx, &hottestRanges, localDesc, allStoresList, storeMap, qpsMinThreshold, qpsMaxThreshold) + ctx, + &hottestRanges, + localDesc, + allStoresList, + storeMap, + ) replicasToMaybeRebalance = append(replicasToMaybeRebalance, considerForRebalance...) if replWithStats.repl == nil { break } - log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load", - replWithStats.repl.RangeID, replWithStats.qps, target.StoreID) timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl) if err := contextutil.RunWithTimeout(ctx, "transfer lease", timeout, func(ctx context.Context) error { return sr.rq.transferLease(ctx, replWithStats.repl, target, replWithStats.qps) @@ -381,16 +383,12 @@ func (sr *StoreRebalancer) rebalanceStore( localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, allStoresList.candidateQueriesPerSecond.mean, qpsMaxThreshold) } -// TODO(a-robinson): Should we take the number of leases on each store into -// account here or just continue to let that happen in allocator.go? func (sr *StoreRebalancer) chooseLeaseToTransfer( ctx context.Context, hottestRanges *[]replicaWithStats, localDesc *roachpb.StoreDescriptor, storeList StoreList, storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, - minQPS float64, - maxQPS float64, ) (replicaWithStats, roachpb.ReplicaDescriptor, []replicaWithStats) { var considerForRebalance []replicaWithStats now := sr.rq.store.Clock().NowAsClockTimestamp() @@ -410,11 +408,6 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) continue } - if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { - log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", - replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) - continue - } // Don't bother moving leases whose QPS is below some small fraction of the // store's QPS (unless the store has extra leases to spare anyway). It's @@ -423,7 +416,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( const minQPSFraction = .001 if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction && float64(localDesc.Capacity.LeaseCount) <= storeList.candidateLeases.mean { - log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", + log.VEventf(ctx, 3, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond) continue } @@ -436,74 +429,68 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( // Learners or non-voters aren't allowed to become leaseholders or raft // leaders, so only consider the `Voter` replicas. candidates := desc.Replicas().DeepCopy().VoterDescriptors() - sort.Slice(candidates, func(i, j int) bool { - var iQPS, jQPS float64 - if desc := storeMap[candidates[i].StoreID]; desc != nil { - iQPS = desc.Capacity.QueriesPerSecond - } - if desc := storeMap[candidates[j].StoreID]; desc != nil { - jQPS = desc.Capacity.QueriesPerSecond - } - return iQPS < jQPS - }) - - var raftStatus *raft.Status - - preferred := sr.rq.allocator.preferredLeaseholders(conf, candidates) - - // Filter both the list of preferred stores as well as the list of all - // candidate replicas to only consider live (non-suspect, non-draining) - // nodes. - const includeSuspectAndDrainingStores = false - preferred, _ = sr.rq.allocator.storePool.liveAndDeadReplicas(preferred, includeSuspectAndDrainingStores) - candidates, _ = sr.rq.allocator.storePool.liveAndDeadReplicas(candidates, includeSuspectAndDrainingStores) - - for _, candidate := range candidates { - if candidate.StoreID == localDesc.StoreID { - continue - } - - meanQPS := storeList.candidateQueriesPerSecond.mean - if sr.shouldNotMoveTo(ctx, storeMap, replWithStats, candidate.StoreID, meanQPS, minQPS, maxQPS) { - continue - } - if raftStatus == nil { - raftStatus = sr.getRaftStatusFn(replWithStats.repl) - } - if replicaIsBehind(raftStatus, candidate.ReplicaID) { - log.VEventf(ctx, 3, "%v is behind or this store isn't the raft leader for r%d; raftStatus: %v", - candidate, desc.RangeID, raftStatus) - continue - } + // Only consider replicas that are not lagging behind the leader in order to + // avoid hurting QPS in the short term. This is a stronger check than what + // `TransferLeaseTarget` performs (it only excludes replicas that are + // waiting for a snapshot). + candidates = filterBehindReplicas(ctx, sr.getRaftStatusFn(replWithStats.repl), candidates) - if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, roachpb.MakeReplicaSet(preferred).ReplicationTargets()) { - log.VEventf(ctx, 3, "s%d not a preferred leaseholder for r%d; preferred: %v", - candidate.StoreID, desc.RangeID, preferred) - continue - } + candidate := sr.rq.allocator.TransferLeaseTarget( + ctx, + conf, + candidates, + replWithStats.repl, + replWithStats.repl.leaseholderStats, + true, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + goal: qpsConvergence, + checkTransferLeaseSource: true, + }, + ) - filteredStoreList := storeList.excludeInvalid(conf.Constraints) - filteredStoreList = storeList.excludeInvalid(conf.VoterConstraints) - if sr.rq.allocator.followTheWorkloadPrefersLocal( + if candidate == (roachpb.ReplicaDescriptor{}) { + log.VEventf( ctx, - filteredStoreList, - *localDesc, - candidate.StoreID, - candidates, - replWithStats.repl.leaseholderStats, - ) { - log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping", - desc.RangeID, localDesc.StoreID) - continue - } - - return replWithStats, candidate, considerForRebalance + 3, + "could not find a better lease transfer target for r%d; considering replica rebalance instead", + desc.RangeID, + ) + considerForRebalance = append(considerForRebalance, replWithStats) + continue } - // If none of the other replicas are valid lease transfer targets, consider - // this range for replica rebalancing. - considerForRebalance = append(considerForRebalance, replWithStats) + filteredStoreList := storeList.excludeInvalid(conf.Constraints) + filteredStoreList = storeList.excludeInvalid(conf.VoterConstraints) + if sr.rq.allocator.followTheWorkloadPrefersLocal( + ctx, + filteredStoreList, + *localDesc, + candidate.StoreID, + candidates, + replWithStats.repl.leaseholderStats, + ) { + log.VEventf( + ctx, 3, "r%d is on s%d due to follow-the-workload; considering replica rebalance instead", + desc.RangeID, localDesc.StoreID, + ) + considerForRebalance = append(considerForRebalance, replWithStats) + continue + } + if targetStore, ok := storeMap[candidate.StoreID]; ok { + log.VEventf( + ctx, + 1, + "transferring lease for r%d (qps=%.2f) to store s%d (qps=%.2f) from local store s%d (qps=%.2f)", + desc.RangeID, + replWithStats.qps, + targetStore.StoreID, + targetStore.Capacity.QueriesPerSecond, + localDesc.StoreID, + localDesc.Capacity.QueriesPerSecond, + ) + } + return replWithStats, candidate, considerForRebalance } } @@ -514,8 +501,6 @@ type rangeRebalanceContext struct { replWithStats replicaWithStats rangeDesc *roachpb.RangeDescriptor conf roachpb.SpanConfig - - qpsThresholdFraction float64 } func (sr *StoreRebalancer) chooseRangeToRebalance( @@ -753,68 +738,6 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS( return finalVoterTargets, finalNonVoterTargets } -func shouldNotMoveAway( - ctx context.Context, - replWithStats replicaWithStats, - localDesc *roachpb.StoreDescriptor, - now hlc.ClockTimestamp, - minQPS float64, -) bool { - if !replWithStats.repl.OwnsValidLease(ctx, now) { - log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) - return true - } - if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { - log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", - replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) - return true - } - return false -} - -func (sr *StoreRebalancer) shouldNotMoveTo( - ctx context.Context, - storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, - replWithStats replicaWithStats, - candidateStoreID roachpb.StoreID, - meanQPS float64, - minQPS float64, - maxQPS float64, -) bool { - candidateStore, ok := storeMap[candidateStoreID] - if !ok { - log.VEventf(ctx, 3, "missing store descriptor for s%d", candidateStoreID) - return true - } - - newCandidateQPS := candidateStore.Capacity.QueriesPerSecond + replWithStats.qps - if candidateStore.Capacity.QueriesPerSecond < minQPS { - if newCandidateQPS > maxQPS { - log.VEventf(ctx, 3, - "r%d's %.2f qps would push s%d over the max threshold (%.2f) with %.2f qps afterwards", - replWithStats.repl.RangeID, replWithStats.qps, candidateStoreID, maxQPS, newCandidateQPS) - return true - } - } else if newCandidateQPS > meanQPS { - log.VEventf(ctx, 3, - "r%d's %.2f qps would push s%d over the mean (%.2f) with %.2f qps afterwards", - replWithStats.repl.RangeID, replWithStats.qps, candidateStoreID, meanQPS, newCandidateQPS) - return true - } - - // If the target store is on a separate node, we will also care - // about node liveness. - targetNodeID := candidateStore.Node.NodeID - if targetNodeID != sr.rq.store.Ident.NodeID { - if !sr.rq.allocator.storePool.isStoreReadyForRoutineReplicaTransfer(ctx, candidateStore.StoreID) { - log.VEventf(ctx, 3, - "refusing to transfer replica to n%d/s%d", targetNodeID, candidateStore.StoreID) - return true - } - } - return false -} - func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor { storeMap := make(map[roachpb.StoreID]*roachpb.StoreDescriptor) for i := range sl.stores { diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 8bb5c91f87df..d2dd5facfdec 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -30,8 +30,6 @@ import ( "go.etcd.io/etcd/raft/v3/tracker" ) -const defaultQPSRebalanceThreshold = 0.25 - var ( // multiRegionStores specifies a set of stores across 3 regions. These stores // are arranged in descending order of the QPS they are receiving. Store 1 is @@ -271,7 +269,10 @@ func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) { // TODO(a-robinson): The below three lines won't be needed once the old // rangeInfo code is ripped out of the allocator. repl.mu.state.Stats = &enginepb.MVCCStats{} + repl.leaseholderStats = newReplicaStats(s.Clock(), nil) + repl.leaseholderStats.setAvgQPSForTesting(r.qps) + repl.writeStats = newReplicaStats(s.Clock(), nil) acc.addReplica(replicaWithStats{ repl: repl, @@ -289,15 +290,18 @@ func TestChooseLeaseToTransfer(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + stopper, g, _, a, _ := createTestAllocatorWithKnobs( + 10, false /* deterministic */, &AllocatorTestingKnobs{ + // Let the allocator pick lease transfer targets that are replicas in need + // of snapshots, in order to avoid mocking out a fake raft group for the + // `replicaMayNeedSnapshot` checks inside `TransferLeaseTarget`. + AllowLeaseTransfersToReplicasNeedingSnapshots: true, + }, + ) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) storeMap := storeListToMap(storeList) - - const minQPS = 800 - const maxQPS = 1200 - localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g @@ -331,38 +335,143 @@ func TestChooseLeaseToTransfer(t *testing.T) { qps float64 expectTarget roachpb.StoreID }{ - {[]roachpb.StoreID{1}, 100, 0}, - {[]roachpb.StoreID{1, 2}, 100, 0}, - {[]roachpb.StoreID{1, 3}, 100, 0}, - {[]roachpb.StoreID{1, 4}, 100, 4}, - {[]roachpb.StoreID{1, 5}, 100, 5}, - {[]roachpb.StoreID{5, 1}, 100, 0}, - {[]roachpb.StoreID{1, 2}, 200, 0}, - {[]roachpb.StoreID{1, 3}, 200, 0}, - {[]roachpb.StoreID{1, 4}, 200, 0}, - {[]roachpb.StoreID{1, 5}, 200, 5}, - {[]roachpb.StoreID{1, 2}, 500, 0}, - {[]roachpb.StoreID{1, 3}, 500, 0}, - {[]roachpb.StoreID{1, 4}, 500, 0}, - {[]roachpb.StoreID{1, 5}, 500, 5}, - {[]roachpb.StoreID{1, 5}, 600, 5}, - {[]roachpb.StoreID{1, 5}, 700, 5}, - {[]roachpb.StoreID{1, 5}, 800, 0}, - {[]roachpb.StoreID{1, 4}, 1.5, 4}, - {[]roachpb.StoreID{1, 5}, 1.5, 5}, - {[]roachpb.StoreID{1, 4}, 1.49, 0}, - {[]roachpb.StoreID{1, 5}, 1.49, 0}, + { + storeIDs: []roachpb.StoreID{1}, + qps: 100, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 2}, + qps: 100, + expectTarget: 2, + }, + { + storeIDs: []roachpb.StoreID{1, 3}, + qps: 100, + expectTarget: 3, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 100, + expectTarget: 4, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 100, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{5, 1}, + qps: 100, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 2}, + qps: 200, + expectTarget: 2, + }, + { + storeIDs: []roachpb.StoreID{1, 3}, + qps: 200, + expectTarget: 3, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 200, + expectTarget: 4, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 200, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 2}, + qps: 500, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 3}, + qps: 500, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 500, + expectTarget: 4, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 500, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 600, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 700, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 800, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 4, 5}, + qps: 800, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 3, 4, 5}, + qps: 800, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 1.5, + expectTarget: 4, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 1.5, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 1.49, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 1.49, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 2, 3, 4}, + qps: 1500, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 2, 3, 4, 5}, + qps: 1500, + expectTarget: 0, + }, } for _, tc := range testCases { - loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}) - hottestRanges := rr.topQPS() - _, target, _ := sr.chooseLeaseToTransfer( - ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) - if target.StoreID != tc.expectTarget { - t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", - target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) - } + t.Run("", func(t *testing.T) { + loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}) + hottestRanges := rr.topQPS() + _, target, _ := sr.chooseLeaseToTransfer(ctx, &hottestRanges, &localDesc, storeList, storeMap) + if target.StoreID != tc.expectTarget { + t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", + target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) + } + }) } } @@ -777,15 +886,16 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + stopper, g, _, a, _ := createTestAllocatorWithKnobs( + 10, + false, /* deterministic */ + &AllocatorTestingKnobs{AllowLeaseTransfersToReplicasNeedingSnapshots: true}, + ) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) storeMap := storeListToMap(storeList) - const minQPS = 800 - const maxQPS = 1200 - localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g @@ -823,8 +933,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { return status } - _, target, _ := sr.chooseLeaseToTransfer( - ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) + _, target, _ := sr.chooseLeaseToTransfer(ctx, &hottestRanges, &localDesc, storeList, storeMap) expectTarget := roachpb.StoreID(4) if target.StoreID != expectTarget { t.Errorf("got target store s%d for range with RaftStatus %v; want s%d", @@ -843,7 +952,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { &hottestRanges, &localDesc, storeList, - qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: 0.25}, + qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: 0.05}, ) expectTargets := []roachpb.ReplicationTarget{ {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 73443190424e..7fa2b71e6de3 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -37,6 +37,7 @@ type StoreTestingKnobs struct { ConsistencyTestingKnobs ConsistencyTestingKnobs TenantRateKnobs tenantrate.TestingKnobs StorageKnobs storage.TestingKnobs + AllocatorKnobs *AllocatorTestingKnobs // TestingRequestFilter is called before evaluating each request on a // replica. The filter is run before the request acquires latches, so @@ -382,6 +383,14 @@ var _ base.ModuleTestingKnobs = NodeLivenessTestingKnobs{} // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. func (NodeLivenessTestingKnobs) ModuleTestingKnobs() {} +// AllocatorTestingKnobs allows tests to override the behavior of `Allocator`. +type AllocatorTestingKnobs struct { + // AllowLeaseTransfersToReplicasNeedingSnapshots permits lease transfer + // targets produced by the Allocator to include replicas that may be waiting + // for snapshots. + AllowLeaseTransfersToReplicasNeedingSnapshots bool +} + // PinnedLeasesKnob is a testing know for controlling what store can acquire a // lease for specific ranges. type PinnedLeasesKnob struct {