diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 25c1657ec1c1..694347335798 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -354,11 +354,15 @@ type Allocator struct { storePool *StorePool nodeLatencyFn func(addr string) (time.Duration, bool) randGen allocatorRand + + knobs *AllocatorTestingKnobs } // MakeAllocator creates a new allocator using the specified StorePool. func MakeAllocator( - storePool *StorePool, nodeLatencyFn func(addr string) (time.Duration, bool), + storePool *StorePool, + nodeLatencyFn func(addr string) (time.Duration, bool), + knobs *AllocatorTestingKnobs, ) Allocator { var randSource rand.Source // There are number of test cases that make a test store but don't add @@ -373,6 +377,7 @@ func MakeAllocator( storePool: storePool, nodeLatencyFn: nodeLatencyFn, randGen: makeAllocatorRand(randSource), + knobs: knobs, } } @@ -1271,15 +1276,16 @@ func (a *Allocator) TransferLeaseTarget( GetRangeID() roachpb.RangeID }, stats *replicaStats, - checkTransferLeaseSource bool, - checkCandidateFullness bool, - alwaysAllowDecisionWithoutStats bool, + forceDecisionWithoutStats bool, + opts transferLeaseOptions, ) roachpb.ReplicaDescriptor { + allStoresList, _, _ := a.storePool.getStoreList(storeFilterNone) + storeDescMap := storeListToMap(allStoresList) + sl, _, _ := a.storePool.getStoreList(storeFilterSuspect) sl = sl.excludeInvalid(conf.Constraints) sl = sl.excludeInvalid(conf.VoterConstraints) - // The only thing we use the storeList for is for the lease mean across the - // eligible stores, make that explicit here. + candidateLeasesMean := sl.candidateLeases.mean source, ok := a.storePool.getStoreDescriptor(leaseRepl.StoreID()) @@ -1292,6 +1298,7 @@ func (a *Allocator) TransferLeaseTarget( // store matches, it's where the lease should be (unless the preferred store // is the current one and checkTransferLeaseSource is false). var preferred []roachpb.ReplicaDescriptor + checkTransferLeaseSource := opts.checkTransferLeaseSource if checkTransferLeaseSource { preferred = a.preferredLeaseholders(conf, existing) } else { @@ -1328,24 +1335,26 @@ func (a *Allocator) TransferLeaseTarget( // Only consider live, non-draining, non-suspect replicas. existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */) - // Only proceed with the lease transfer if we are also the raft leader (we - // already know we are the leaseholder at this point), and only consider - // replicas that are in `StateReplicate` as potential candidates. - // - // NB: The RaftStatus() only returns a non-empty and non-nil result on the - // Raft leader (since Raft followers do not track the progress of other - // replicas, only the leader does). - // - // NB: On every Raft tick, we try to ensure that leadership is collocated with - // leaseholdership (see - // Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that - // on a range that is not already borked (i.e. can accept writes), periods of - // leader/leaseholder misalignment should be ephemeral and rare. We choose to - // be pessimistic here and choose to bail on the lease transfer, as opposed to - // potentially transferring the lease to a replica that may be waiting for a - // snapshot (which will wedge the range until the replica applies that - // snapshot). - existing = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), existing) + if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots { + // Only proceed with the lease transfer if we are also the raft leader (we + // already know we are the leaseholder at this point), and only consider + // replicas that are in `StateReplicate` as potential candidates. + // + // NB: The RaftStatus() only returns a non-empty and non-nil result on the + // Raft leader (since Raft followers do not track the progress of other + // replicas, only the leader does). + // + // NB: On every Raft tick, we try to ensure that leadership is collocated with + // leaseholdership (see + // Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that + // on a range that is not already borked (i.e. can accept writes), periods of + // leader/leaseholder misalignment should be ephemeral and rare. We choose to + // be pessimistic here and choose to bail on the lease transfer, as opposed to + // potentially transferring the lease to a replica that may be waiting for a + // snapshot (which will wedge the range until the replica applies that + // snapshot). + existing = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), existing) + } // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) { @@ -1353,65 +1362,172 @@ func (a *Allocator) TransferLeaseTarget( return roachpb.ReplicaDescriptor{} } - // Try to pick a replica to transfer the lease to while also determining - // whether we actually should be transferring the lease. The transfer - // decision is only needed if we've been asked to check the source. - transferDec, repl := a.shouldTransferLeaseUsingStats( - ctx, source, existing, stats, nil, candidateLeasesMean, - ) - if checkTransferLeaseSource { - switch transferDec { - case shouldNotTransfer: - if !alwaysAllowDecisionWithoutStats { - return roachpb.ReplicaDescriptor{} + switch g := opts.goal; g { + case followTheWorkload: + // Try to pick a replica to transfer the lease to while also determining + // whether we actually should be transferring the lease. The transfer + // decision is only needed if we've been asked to check the source. + transferDec, repl := a.shouldTransferLeaseForAccessLocality( + ctx, source, existing, stats, nil, candidateLeasesMean, + ) + if checkTransferLeaseSource { + switch transferDec { + case shouldNotTransfer: + if !forceDecisionWithoutStats { + return roachpb.ReplicaDescriptor{} + } + fallthrough + case decideWithoutStats: + if !a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) { + return roachpb.ReplicaDescriptor{} + } + case shouldTransfer: + default: + log.Fatalf(ctx, "unexpected transfer decision %d with replica %+v", transferDec, repl) + } + } + if repl != (roachpb.ReplicaDescriptor{}) { + return repl + } + fallthrough + + case leaseCountConvergence: + // Fall back to logic that doesn't take request counts and latency into + // account if the counts/latency-based logic couldn't pick a best replica. + candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)) + var bestOption roachpb.ReplicaDescriptor + bestOptionLeaseCount := int32(math.MaxInt32) + for _, repl := range existing { + if leaseRepl.StoreID() == repl.StoreID { + continue + } + storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) + if !ok { + continue } - fallthrough - case decideWithoutStats: - if !a.shouldTransferLeaseWithoutStats(ctx, sl, source, existing) { - return roachpb.ReplicaDescriptor{} + if !opts.checkCandidateFullness || float64(storeDesc.Capacity.LeaseCount) < candidateLeasesMean-0.5 { + candidates = append(candidates, repl) + } else if storeDesc.Capacity.LeaseCount < bestOptionLeaseCount { + bestOption = repl + bestOptionLeaseCount = storeDesc.Capacity.LeaseCount } - case shouldTransfer: - default: - log.Fatalf(ctx, "unexpected transfer decision %d with replica %+v", transferDec, repl) } - } + if len(candidates) == 0 { + // If we aren't supposed to be considering the current leaseholder (e.g. + // because we need to remove this replica for some reason), return + // our best option if we otherwise wouldn't want to do anything. + if !checkTransferLeaseSource { + return bestOption + } + return roachpb.ReplicaDescriptor{} + } + a.randGen.Lock() + defer a.randGen.Unlock() + return candidates[a.randGen.Intn(len(candidates))] + + case qpsConvergence: + // When the goal is to further QPS convergence across stores, we ensure that + // any lease transfer decision we make *reduces the delta between the store + // serving the highest QPS and the store serving the lowest QPS* among our + // list of candidates. + + // Create a separate map of store_id -> qps that we can manipulate in order + // to simulate the resulting QPS distribution of various potential lease + // transfer decisions. + storeQPSMap := make(map[roachpb.StoreID]float64) + for _, storeDesc := range storeDescMap { + storeQPSMap[storeDesc.StoreID] = storeDesc.Capacity.QueriesPerSecond + } - if repl != (roachpb.ReplicaDescriptor{}) { - return repl + leaseholderStoreQPS, ok := storeQPSMap[leaseRepl.StoreID()] + if !ok { + log.VEventf( + ctx, 3, "cannot find store descriptor for leaseholder s%d;"+ + " skipping this range", leaseRepl.StoreID(), + ) + return roachpb.ReplicaDescriptor{} + } + + leaseholderReplQPS, _ := stats.avgQPS() + currentDelta := getQPSDelta(storeQPSMap, existing) + bestOption := getCandidateWithMinQPS(storeQPSMap, existing) + if bestOption != (roachpb.ReplicaDescriptor{}) && + // It is always beneficial to transfer the lease to the coldest candidate + // if the range's own qps is smaller than the difference between the + // leaseholder store and the candidate store. This will always drive down + // the difference between those two stores, which should always drive down + // the difference between the store serving the highest QPS and the store + // serving the lowest QPS. + // + // TODO(aayush): We should think about whether we need any padding here. + // Not adding any sort of padding could make this a little sensitive, but + // there are some downsides to doing so. If the padding here is too high, + // we're going to keep ignoring opportunities for lease transfers for + // ranges with low QPS. This can add up and prevent us from achieving + // convergence in cases where we're dealing with a ton of very low-QPS + // ranges. + (leaseholderStoreQPS-leaseholderReplQPS) > storeQPSMap[bestOption.StoreID] { + storeQPSMap[leaseRepl.StoreID()] -= leaseholderReplQPS + storeQPSMap[bestOption.StoreID] += leaseholderReplQPS + minDelta := getQPSDelta(storeQPSMap, existing) + log.VEventf( + ctx, + 3, + "lease transfer to s%d would reduce the QPS delta between this ranges' stores from %.2f to %.2f", + bestOption.StoreID, + currentDelta, + minDelta, + ) + return bestOption + } + return roachpb.ReplicaDescriptor{} + default: + log.Fatalf(ctx, "unexpected lease transfer goal %d", g) } + panic("unreachable") +} - // Fall back to logic that doesn't take request counts and latency into - // account if the counts/latency-based logic couldn't pick a best replica. - candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)) - var bestOption roachpb.ReplicaDescriptor - bestOptionLeaseCount := int32(math.MaxInt32) +// getCandidateWithMinQPS returns the `ReplicaDescriptor` that belongs to the +// store serving the lowest QPS among all the `existing` replicas. +func getCandidateWithMinQPS( + storeQPSMap map[roachpb.StoreID]float64, existing []roachpb.ReplicaDescriptor, +) roachpb.ReplicaDescriptor { + minCandidateQPS := math.MaxFloat64 + var candidateWithMin roachpb.ReplicaDescriptor for _, repl := range existing { - if leaseRepl.StoreID() == repl.StoreID { + candidateQPS, ok := storeQPSMap[repl.StoreID] + if !ok { continue } - storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) + if minCandidateQPS > candidateQPS { + minCandidateQPS = candidateQPS + candidateWithMin = repl + } + } + return candidateWithMin +} + +// getQPSDelta returns the difference between the store serving the highest QPS +// and the store serving the lowest QPS, among the set of stores that have an +// `existing` replica. +func getQPSDelta( + storeQPSMap map[roachpb.StoreID]float64, existing []roachpb.ReplicaDescriptor, +) float64 { + maxCandidateQPS := float64(0) + minCandidateQPS := math.MaxFloat64 + for _, repl := range existing { + candidateQPS, ok := storeQPSMap[repl.StoreID] if !ok { continue } - if !checkCandidateFullness || float64(storeDesc.Capacity.LeaseCount) < candidateLeasesMean-0.5 { - candidates = append(candidates, repl) - } else if storeDesc.Capacity.LeaseCount < bestOptionLeaseCount { - bestOption = repl - bestOptionLeaseCount = storeDesc.Capacity.LeaseCount + if maxCandidateQPS < candidateQPS { + maxCandidateQPS = candidateQPS } - } - if len(candidates) == 0 { - // If we aren't supposed to be considering the current leaseholder (e.g. - // because we need to remove this replica for some reason), return - // our best option if we otherwise wouldn't want to do anything. - if !checkTransferLeaseSource { - return bestOption + if minCandidateQPS > candidateQPS { + minCandidateQPS = candidateQPS } - return roachpb.ReplicaDescriptor{} } - a.randGen.Lock() - defer a.randGen.Unlock() - return candidates[a.randGen.Intn(len(candidates))] + return maxCandidateQPS - minCandidateQPS } // ShouldTransferLease returns true if the specified store is overfull in terms @@ -1457,7 +1573,14 @@ func (a *Allocator) ShouldTransferLease( return false } - transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, source, existing, stats, nil, sl.candidateLeases.mean) + transferDec, _ := a.shouldTransferLeaseForAccessLocality( + ctx, + source, + existing, + stats, + nil, + sl.candidateLeases.mean, + ) var result bool switch transferDec { case shouldNotTransfer: @@ -1465,7 +1588,7 @@ func (a *Allocator) ShouldTransferLease( case shouldTransfer: result = true case decideWithoutStats: - result = a.shouldTransferLeaseWithoutStats(ctx, sl, source, existing) + result = a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) default: log.Fatalf(ctx, "unexpected transfer decision %d", transferDec) } @@ -1483,7 +1606,7 @@ func (a Allocator) followTheWorkloadPrefersLocal( stats *replicaStats, ) bool { adjustments := make(map[roachpb.StoreID]float64) - decision, _ := a.shouldTransferLeaseUsingStats(ctx, source, existing, stats, adjustments, sl.candidateLeases.mean) + decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, stats, adjustments, sl.candidateLeases.mean) if decision == decideWithoutStats { return false } @@ -1497,7 +1620,7 @@ func (a Allocator) followTheWorkloadPrefersLocal( return false } -func (a Allocator) shouldTransferLeaseUsingStats( +func (a Allocator) shouldTransferLeaseForAccessLocality( ctx context.Context, source roachpb.StoreDescriptor, existing []roachpb.ReplicaDescriptor, @@ -1681,7 +1804,7 @@ func loadBasedLeaseRebalanceScore( return totalScore, rebalanceAdjustment } -func (a Allocator) shouldTransferLeaseWithoutStats( +func (a Allocator) shouldTransferLeaseForLeaseCountConvergence( ctx context.Context, sl StoreList, source roachpb.StoreDescriptor, diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 8d36cbb18a47..f3a991fe7fc5 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -373,6 +373,15 @@ func replicas(storeIDs ...roachpb.StoreID) []roachpb.ReplicaDescriptor { // use in tests. Stopper must be stopped by the caller. func createTestAllocator( numNodes int, deterministic bool, +) (*stop.Stopper, *gossip.Gossip, *StorePool, Allocator, *hlc.ManualClock) { + return createTestAllocatorWithKnobs(numNodes, deterministic, nil /* knobs */) +} + +// createTestAllocatorWithKnobs is like `createTestAllocator`, but allows the +// caller to pass in custom AllocatorTestingKnobs. Stopper must be stopped by +// the caller. +func createTestAllocatorWithKnobs( + numNodes int, deterministic bool, knobs *AllocatorTestingKnobs, ) (*stop.Stopper, *gossip.Gossip, *StorePool, Allocator, *hlc.ManualClock) { stopper, g, manual, storePool, _ := createTestStorePool( TestTimeUntilStoreDeadOff, deterministic, @@ -381,7 +390,9 @@ func createTestAllocator( a := MakeAllocator( storePool, func(string) (time.Duration, bool) { return 0, true - }) + }, + knobs, + ) return stopper, g, storePool, a, manual } @@ -1700,10 +1711,12 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { replicationFactor: 3, storeID: c.leaseholder, }, - nil, /* replicaStats */ - c.check, - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: c.check, + checkCandidateFullness: true, + }, ) if c.expected != target.StoreID { t.Fatalf("expected %d, but found %d", c.expected, target.StoreID) @@ -1811,9 +1824,11 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) { c.existing, repl, nil, - c.checkSource, - true, /* checkCandidateFullness */ false, /* alwaysAllowDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: c.checkSource, + checkCandidateFullness: true, + }, ) if c.transferTarget != target.StoreID { t.Fatalf("expected %d, but found %d", c.transferTarget, target.StoreID) @@ -1901,10 +1916,12 @@ func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) { replicationFactor: 3, storeID: c.leaseholder, }, - nil, /* replicaStats */ - true, - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: true, + checkCandidateFullness: true, + }, ) if c.expected != target.StoreID { t.Fatalf("expected %d, but found %d", c.expected, target.StoreID) @@ -1922,9 +1939,11 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { - return 0, true - }) + a := MakeAllocator( + storePool, func(string) (time.Duration, bool) { + return 0, true + }, nil, /* knobs */ + ) defer stopper.Stop(context.Background()) // 3 stores where the lease count for each store is equal to 100x the store @@ -2003,10 +2022,12 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { replicationFactor: 3, storeID: c.leaseholder, }, - nil, /* replicaStats */ - c.check, - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: c.check, + checkCandidateFullness: true, + }, ) if c.expected != target.StoreID { t.Fatalf("expected %d, but found %d", c.expected, target.StoreID) @@ -2293,9 +2314,11 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { - return 0, true - }) + a := MakeAllocator( + storePool, func(string) (time.Duration, bool) { + return 0, true + }, nil, /* knobs */ + ) defer stopper.Stop(context.Background()) // 4 stores where the lease count for each store is equal to 10x the store @@ -2355,9 +2378,11 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { - return 0, true - }) + a := MakeAllocator( + storePool, func(string) (time.Duration, bool) { + return 0, true + }, nil, /* knobs */ + ) defer stopper.Stop(context.Background()) var stores []*roachpb.StoreDescriptor @@ -2533,10 +2558,12 @@ func TestAllocatorLeasePreferences(t *testing.T) { replicationFactor: 5, storeID: c.leaseholder, }, - nil, /* replicaStats */ - true, /* checkTransferLeaseSource */ - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: true, + checkCandidateFullness: true, + }, ) if c.expectedCheckTrue != target.StoreID { t.Errorf("expected s%d for check=true, but found %v", c.expectedCheckTrue, target) @@ -2549,10 +2576,12 @@ func TestAllocatorLeasePreferences(t *testing.T) { replicationFactor: 5, storeID: c.leaseholder, }, - nil, /* replicaStats */ - false, /* checkTransferLeaseSource */ - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: false, + checkCandidateFullness: true, + }, ) if c.expectedCheckFalse != target.StoreID { t.Errorf("expected s%d for check=false, but found %v", c.expectedCheckFalse, target) @@ -2635,10 +2664,12 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { replicationFactor: 6, storeID: c.leaseholder, }, - nil, /* replicaStats */ - true, /* checkTransferLeaseSource */ - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: true, + checkCandidateFullness: true, + }, ) if c.expectedCheckTrue != target.StoreID { t.Errorf("expected s%d for check=true, but found %v", c.expectedCheckTrue, target) @@ -2651,10 +2682,12 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { replicationFactor: 6, storeID: c.leaseholder, }, - nil, /* replicaStats */ - false, /* checkTransferLeaseSource */ - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + nil, /* stats */ + false, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + checkTransferLeaseSource: false, + checkCandidateFullness: true, + }, ) if c.expectedCheckFalse != target.StoreID { t.Errorf("expected s%d for check=false, but found %v", c.expectedCheckFalse, target) @@ -5057,9 +5090,11 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { - a := MakeAllocator(storePool, func(addr string) (time.Duration, bool) { - return c.latency[addr], true - }) + a := MakeAllocator( + storePool, func(addr string) (time.Duration, bool) { + return c.latency[addr], true + }, nil, /* knobs */ + ) target := a.TransferLeaseTarget( context.Background(), emptySpanConfig(), @@ -5069,9 +5104,12 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { storeID: c.leaseholder, }, c.stats, - c.check, - true, /* checkCandidateFullness */ - false, /* alwaysAllowDecisionWithoutStats */ + false, + transferLeaseOptions{ + checkTransferLeaseSource: c.check, + checkCandidateFullness: true, + dryRun: false, + }, ) if c.expected != target.StoreID { t.Errorf("expected %d, got %d", c.expected, target.StoreID) @@ -6631,9 +6669,11 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(sp, func(string) (time.Duration, bool) { - return 0, true - }) + a := MakeAllocator( + sp, func(string) (time.Duration, bool) { + return 0, true + }, nil, /* knobs */ + ) ctx := context.Background() defer stopper.Stop(ctx) @@ -6738,7 +6778,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - a := MakeAllocator(nil /* storePool */, nil /* rpcContext */) + a := MakeAllocator(nil /* storePool */, nil /* nodeLatencyFn */, nil /* knobs */) action, priority := a.ComputeAction(context.Background(), roachpb.SpanConfig{}, nil) if action != AllocatorNoop { t.Errorf("expected AllocatorNoop, but got %v", action) @@ -7258,9 +7298,11 @@ func TestAllocatorFullDisks(t *testing.T) { mockNodeLiveness.nodeLivenessFunc, false, /* deterministic */ ) - alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { - return 0, false - }) + alloc := MakeAllocator( + sp, func(string) (time.Duration, bool) { + return 0, false + }, nil, /* knobs */ + ) var wg sync.WaitGroup g.RegisterCallback(gossip.MakePrefixPattern(gossip.KeyStorePrefix), @@ -7404,9 +7446,11 @@ func Example_rebalancing() { newMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).nodeLivenessFunc, /* deterministic */ true, ) - alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { - return 0, false - }) + alloc := MakeAllocator( + sp, func(string) (time.Duration, bool) { + return 0, false + }, nil, /* knobs */ + ) var wg sync.WaitGroup g.RegisterCallback(gossip.MakePrefixPattern(gossip.KeyStorePrefix), diff --git a/pkg/kv/kvserver/replica_stats.go b/pkg/kv/kvserver/replica_stats.go index 77f889d9fbba..51d09555eed0 100644 --- a/pkg/kv/kvserver/replica_stats.go +++ b/pkg/kv/kvserver/replica_stats.go @@ -57,6 +57,9 @@ type replicaStats struct { requests [6]perLocalityCounts lastRotate time.Time lastReset time.Time + + // Testing only. + avgQPSForTesting float64 } } @@ -186,6 +189,9 @@ func (rs *replicaStats) avgQPS() (float64, time.Duration) { rs.mu.Lock() defer rs.mu.Unlock() + if rs.mu.avgQPSForTesting != 0 { + return rs.mu.avgQPSForTesting, 0 + } rs.maybeRotateLocked(now) @@ -224,3 +230,9 @@ func (rs *replicaStats) resetRequestCounts() { rs.mu.lastRotate = timeutil.Unix(0, rs.clock.PhysicalNow()) rs.mu.lastReset = rs.mu.lastRotate } + +func (rs *replicaStats) setAvgQPSForTesting(qps float64) { + rs.mu.Lock() + defer rs.mu.Unlock() + rs.mu.avgQPSForTesting = qps +} diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 7d4042f2c8fe..734f514b0c8d 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -1164,6 +1164,7 @@ func (rq *replicateQueue) considerRebalance( desc, conf, transferLeaseOptions{ + goal: followTheWorkload, checkTransferLeaseSource: true, checkCandidateFullness: true, dryRun: dryRun, @@ -1271,7 +1272,19 @@ func replicationChangesForRebalance( return chgs, performingSwap, nil } +// transferLeaseGoal dictates whether a call to TransferLeaseTarget should +// improve locality of access, convergence of lease counts or convergence of +// QPS. +type transferLeaseGoal int + +const ( + followTheWorkload transferLeaseGoal = iota + leaseCountConvergence + qpsConvergence +) + type transferLeaseOptions struct { + goal transferLeaseGoal checkTransferLeaseSource bool checkCandidateFullness bool dryRun bool @@ -1320,9 +1333,8 @@ func (rq *replicateQueue) shedLease( desc.Replicas().VoterDescriptors(), repl, repl.leaseholderStats, - opts.checkTransferLeaseSource, - opts.checkCandidateFullness, - false, /* alwaysAllowDecisionWithoutStats */ + false, /* forceDecisionWithoutStats */ + opts, ) if target == (roachpb.ReplicaDescriptor{}) { return noSuitableTarget, nil diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 1c40e0d5b58c..5b476a2e7941 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -835,11 +835,18 @@ func NewStore( ctSender: cfg.ClosedTimestampSender, } if cfg.RPCContext != nil { - s.allocator = MakeAllocator(cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency) + s.allocator = MakeAllocator( + cfg.StorePool, + cfg.RPCContext.RemoteClocks.Latency, + cfg.TestingKnobs.AllocatorKnobs, + ) } else { - s.allocator = MakeAllocator(cfg.StorePool, func(string) (time.Duration, bool) { - return 0, false - }) + s.allocator = MakeAllocator( + cfg.StorePool, func(string) (time.Duration, bool) { + return 0, false + }, + cfg.TestingKnobs.AllocatorKnobs, + ) } s.replRankings = newReplicaRankings() diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 55ab046c8d55..d28f86450a8a 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -14,14 +14,12 @@ import ( "context" "math" "math/rand" - "sort" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/contextutil" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -233,7 +231,8 @@ func (sr *StoreRebalancer) rebalanceStore( if !ok { log.Fatalf(ctx, "expected the `StoreRebalancer` to be using a `qpsScorerOptions`") } - qpsMinThreshold := underfullQPSThreshold(options, allStoresList.candidateQueriesPerSecond.mean) + // We only bother rebalancing stores that are fielding more than the + // cluster-level overfull threshold of QPS. qpsMaxThreshold := overfullQPSThreshold(options, allStoresList.candidateQueriesPerSecond.mean) var localDesc *roachpb.StoreDescriptor @@ -264,14 +263,17 @@ func (sr *StoreRebalancer) rebalanceStore( hottestRanges := sr.replRankings.topQPS() for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { replWithStats, target, considerForRebalance := sr.chooseLeaseToTransfer( - ctx, &hottestRanges, localDesc, allStoresList, storeMap, qpsMinThreshold, qpsMaxThreshold) + ctx, + &hottestRanges, + localDesc, + allStoresList, + storeMap, + ) replicasToMaybeRebalance = append(replicasToMaybeRebalance, considerForRebalance...) if replWithStats.repl == nil { break } - log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load", - replWithStats.repl.RangeID, replWithStats.qps, target.StoreID) timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl) if err := contextutil.RunWithTimeout(ctx, "transfer lease", timeout, func(ctx context.Context) error { return sr.rq.transferLease(ctx, replWithStats.repl, target, replWithStats.qps) @@ -381,16 +383,12 @@ func (sr *StoreRebalancer) rebalanceStore( localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, allStoresList.candidateQueriesPerSecond.mean, qpsMaxThreshold) } -// TODO(a-robinson): Should we take the number of leases on each store into -// account here or just continue to let that happen in allocator.go? func (sr *StoreRebalancer) chooseLeaseToTransfer( ctx context.Context, hottestRanges *[]replicaWithStats, localDesc *roachpb.StoreDescriptor, storeList StoreList, storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, - minQPS float64, - maxQPS float64, ) (replicaWithStats, roachpb.ReplicaDescriptor, []replicaWithStats) { var considerForRebalance []replicaWithStats now := sr.rq.store.Clock().NowAsClockTimestamp() @@ -410,11 +408,6 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) continue } - if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { - log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", - replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) - continue - } // Don't bother moving leases whose QPS is below some small fraction of the // store's QPS (unless the store has extra leases to spare anyway). It's @@ -423,7 +416,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( const minQPSFraction = .001 if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction && float64(localDesc.Capacity.LeaseCount) <= storeList.candidateLeases.mean { - log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", + log.VEventf(ctx, 3, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond) continue } @@ -436,74 +429,68 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( // Learners or non-voters aren't allowed to become leaseholders or raft // leaders, so only consider the `Voter` replicas. candidates := desc.Replicas().DeepCopy().VoterDescriptors() - sort.Slice(candidates, func(i, j int) bool { - var iQPS, jQPS float64 - if desc := storeMap[candidates[i].StoreID]; desc != nil { - iQPS = desc.Capacity.QueriesPerSecond - } - if desc := storeMap[candidates[j].StoreID]; desc != nil { - jQPS = desc.Capacity.QueriesPerSecond - } - return iQPS < jQPS - }) - - var raftStatus *raft.Status - - preferred := sr.rq.allocator.preferredLeaseholders(conf, candidates) - - // Filter both the list of preferred stores as well as the list of all - // candidate replicas to only consider live (non-suspect, non-draining) - // nodes. - const includeSuspectAndDrainingStores = false - preferred, _ = sr.rq.allocator.storePool.liveAndDeadReplicas(preferred, includeSuspectAndDrainingStores) - candidates, _ = sr.rq.allocator.storePool.liveAndDeadReplicas(candidates, includeSuspectAndDrainingStores) - - for _, candidate := range candidates { - if candidate.StoreID == localDesc.StoreID { - continue - } - - meanQPS := storeList.candidateQueriesPerSecond.mean - if sr.shouldNotMoveTo(ctx, storeMap, replWithStats, candidate.StoreID, meanQPS, minQPS, maxQPS) { - continue - } - if raftStatus == nil { - raftStatus = sr.getRaftStatusFn(replWithStats.repl) - } - if replicaIsBehind(raftStatus, candidate.ReplicaID) { - log.VEventf(ctx, 3, "%v is behind or this store isn't the raft leader for r%d; raftStatus: %v", - candidate, desc.RangeID, raftStatus) - continue - } + // Only consider replicas that are not lagging behind the leader in order to + // avoid hurting QPS in the short term. This is a stronger check than what + // `TransferLeaseTarget` performs (it only excludes replicas that are + // waiting for a snapshot). + candidates = filterBehindReplicas(ctx, sr.getRaftStatusFn(replWithStats.repl), candidates) - if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, roachpb.MakeReplicaSet(preferred).ReplicationTargets()) { - log.VEventf(ctx, 3, "s%d not a preferred leaseholder for r%d; preferred: %v", - candidate.StoreID, desc.RangeID, preferred) - continue - } + candidate := sr.rq.allocator.TransferLeaseTarget( + ctx, + conf, + candidates, + replWithStats.repl, + replWithStats.repl.leaseholderStats, + true, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + goal: qpsConvergence, + checkTransferLeaseSource: true, + }, + ) - filteredStoreList := storeList.excludeInvalid(conf.Constraints) - filteredStoreList = storeList.excludeInvalid(conf.VoterConstraints) - if sr.rq.allocator.followTheWorkloadPrefersLocal( + if candidate == (roachpb.ReplicaDescriptor{}) { + log.VEventf( ctx, - filteredStoreList, - *localDesc, - candidate.StoreID, - candidates, - replWithStats.repl.leaseholderStats, - ) { - log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping", - desc.RangeID, localDesc.StoreID) - continue - } - - return replWithStats, candidate, considerForRebalance + 3, + "could not find a better lease transfer target for r%d; considering replica rebalance instead", + desc.RangeID, + ) + considerForRebalance = append(considerForRebalance, replWithStats) + continue } - // If none of the other replicas are valid lease transfer targets, consider - // this range for replica rebalancing. - considerForRebalance = append(considerForRebalance, replWithStats) + filteredStoreList := storeList.excludeInvalid(conf.Constraints) + filteredStoreList = storeList.excludeInvalid(conf.VoterConstraints) + if sr.rq.allocator.followTheWorkloadPrefersLocal( + ctx, + filteredStoreList, + *localDesc, + candidate.StoreID, + candidates, + replWithStats.repl.leaseholderStats, + ) { + log.VEventf( + ctx, 3, "r%d is on s%d due to follow-the-workload; considering replica rebalance instead", + desc.RangeID, localDesc.StoreID, + ) + considerForRebalance = append(considerForRebalance, replWithStats) + continue + } + if targetStore, ok := storeMap[candidate.StoreID]; ok { + log.VEventf( + ctx, + 1, + "transferring lease for r%d (qps=%.2f) to store s%d (qps=%.2f) from local store s%d (qps=%.2f)", + desc.RangeID, + replWithStats.qps, + targetStore.StoreID, + targetStore.Capacity.QueriesPerSecond, + localDesc.StoreID, + localDesc.Capacity.QueriesPerSecond, + ) + } + return replWithStats, candidate, considerForRebalance } } @@ -514,8 +501,6 @@ type rangeRebalanceContext struct { replWithStats replicaWithStats rangeDesc *roachpb.RangeDescriptor conf roachpb.SpanConfig - - qpsThresholdFraction float64 } func (sr *StoreRebalancer) chooseRangeToRebalance( @@ -753,68 +738,6 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS( return finalVoterTargets, finalNonVoterTargets } -func shouldNotMoveAway( - ctx context.Context, - replWithStats replicaWithStats, - localDesc *roachpb.StoreDescriptor, - now hlc.ClockTimestamp, - minQPS float64, -) bool { - if !replWithStats.repl.OwnsValidLease(ctx, now) { - log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) - return true - } - if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { - log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", - replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) - return true - } - return false -} - -func (sr *StoreRebalancer) shouldNotMoveTo( - ctx context.Context, - storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, - replWithStats replicaWithStats, - candidateStoreID roachpb.StoreID, - meanQPS float64, - minQPS float64, - maxQPS float64, -) bool { - candidateStore, ok := storeMap[candidateStoreID] - if !ok { - log.VEventf(ctx, 3, "missing store descriptor for s%d", candidateStoreID) - return true - } - - newCandidateQPS := candidateStore.Capacity.QueriesPerSecond + replWithStats.qps - if candidateStore.Capacity.QueriesPerSecond < minQPS { - if newCandidateQPS > maxQPS { - log.VEventf(ctx, 3, - "r%d's %.2f qps would push s%d over the max threshold (%.2f) with %.2f qps afterwards", - replWithStats.repl.RangeID, replWithStats.qps, candidateStoreID, maxQPS, newCandidateQPS) - return true - } - } else if newCandidateQPS > meanQPS { - log.VEventf(ctx, 3, - "r%d's %.2f qps would push s%d over the mean (%.2f) with %.2f qps afterwards", - replWithStats.repl.RangeID, replWithStats.qps, candidateStoreID, meanQPS, newCandidateQPS) - return true - } - - // If the target store is on a separate node, we will also care - // about node liveness. - targetNodeID := candidateStore.Node.NodeID - if targetNodeID != sr.rq.store.Ident.NodeID { - if !sr.rq.allocator.storePool.isStoreReadyForRoutineReplicaTransfer(ctx, candidateStore.StoreID) { - log.VEventf(ctx, 3, - "refusing to transfer replica to n%d/s%d", targetNodeID, candidateStore.StoreID) - return true - } - } - return false -} - func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor { storeMap := make(map[roachpb.StoreID]*roachpb.StoreDescriptor) for i := range sl.stores { diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 8bb5c91f87df..d2dd5facfdec 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -30,8 +30,6 @@ import ( "go.etcd.io/etcd/raft/v3/tracker" ) -const defaultQPSRebalanceThreshold = 0.25 - var ( // multiRegionStores specifies a set of stores across 3 regions. These stores // are arranged in descending order of the QPS they are receiving. Store 1 is @@ -271,7 +269,10 @@ func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) { // TODO(a-robinson): The below three lines won't be needed once the old // rangeInfo code is ripped out of the allocator. repl.mu.state.Stats = &enginepb.MVCCStats{} + repl.leaseholderStats = newReplicaStats(s.Clock(), nil) + repl.leaseholderStats.setAvgQPSForTesting(r.qps) + repl.writeStats = newReplicaStats(s.Clock(), nil) acc.addReplica(replicaWithStats{ repl: repl, @@ -289,15 +290,18 @@ func TestChooseLeaseToTransfer(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + stopper, g, _, a, _ := createTestAllocatorWithKnobs( + 10, false /* deterministic */, &AllocatorTestingKnobs{ + // Let the allocator pick lease transfer targets that are replicas in need + // of snapshots, in order to avoid mocking out a fake raft group for the + // `replicaMayNeedSnapshot` checks inside `TransferLeaseTarget`. + AllowLeaseTransfersToReplicasNeedingSnapshots: true, + }, + ) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) storeMap := storeListToMap(storeList) - - const minQPS = 800 - const maxQPS = 1200 - localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g @@ -331,38 +335,143 @@ func TestChooseLeaseToTransfer(t *testing.T) { qps float64 expectTarget roachpb.StoreID }{ - {[]roachpb.StoreID{1}, 100, 0}, - {[]roachpb.StoreID{1, 2}, 100, 0}, - {[]roachpb.StoreID{1, 3}, 100, 0}, - {[]roachpb.StoreID{1, 4}, 100, 4}, - {[]roachpb.StoreID{1, 5}, 100, 5}, - {[]roachpb.StoreID{5, 1}, 100, 0}, - {[]roachpb.StoreID{1, 2}, 200, 0}, - {[]roachpb.StoreID{1, 3}, 200, 0}, - {[]roachpb.StoreID{1, 4}, 200, 0}, - {[]roachpb.StoreID{1, 5}, 200, 5}, - {[]roachpb.StoreID{1, 2}, 500, 0}, - {[]roachpb.StoreID{1, 3}, 500, 0}, - {[]roachpb.StoreID{1, 4}, 500, 0}, - {[]roachpb.StoreID{1, 5}, 500, 5}, - {[]roachpb.StoreID{1, 5}, 600, 5}, - {[]roachpb.StoreID{1, 5}, 700, 5}, - {[]roachpb.StoreID{1, 5}, 800, 0}, - {[]roachpb.StoreID{1, 4}, 1.5, 4}, - {[]roachpb.StoreID{1, 5}, 1.5, 5}, - {[]roachpb.StoreID{1, 4}, 1.49, 0}, - {[]roachpb.StoreID{1, 5}, 1.49, 0}, + { + storeIDs: []roachpb.StoreID{1}, + qps: 100, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 2}, + qps: 100, + expectTarget: 2, + }, + { + storeIDs: []roachpb.StoreID{1, 3}, + qps: 100, + expectTarget: 3, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 100, + expectTarget: 4, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 100, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{5, 1}, + qps: 100, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 2}, + qps: 200, + expectTarget: 2, + }, + { + storeIDs: []roachpb.StoreID{1, 3}, + qps: 200, + expectTarget: 3, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 200, + expectTarget: 4, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 200, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 2}, + qps: 500, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 3}, + qps: 500, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 500, + expectTarget: 4, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 500, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 600, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 700, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 800, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 4, 5}, + qps: 800, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 3, 4, 5}, + qps: 800, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 1.5, + expectTarget: 4, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 1.5, + expectTarget: 5, + }, + { + storeIDs: []roachpb.StoreID{1, 4}, + qps: 1.49, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 5}, + qps: 1.49, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 2, 3, 4}, + qps: 1500, + expectTarget: 0, + }, + { + storeIDs: []roachpb.StoreID{1, 2, 3, 4, 5}, + qps: 1500, + expectTarget: 0, + }, } for _, tc := range testCases { - loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}) - hottestRanges := rr.topQPS() - _, target, _ := sr.chooseLeaseToTransfer( - ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) - if target.StoreID != tc.expectTarget { - t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", - target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) - } + t.Run("", func(t *testing.T) { + loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}) + hottestRanges := rr.topQPS() + _, target, _ := sr.chooseLeaseToTransfer(ctx, &hottestRanges, &localDesc, storeList, storeMap) + if target.StoreID != tc.expectTarget { + t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", + target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) + } + }) } } @@ -777,15 +886,16 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + stopper, g, _, a, _ := createTestAllocatorWithKnobs( + 10, + false, /* deterministic */ + &AllocatorTestingKnobs{AllowLeaseTransfersToReplicasNeedingSnapshots: true}, + ) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) storeMap := storeListToMap(storeList) - const minQPS = 800 - const maxQPS = 1200 - localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g @@ -823,8 +933,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { return status } - _, target, _ := sr.chooseLeaseToTransfer( - ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) + _, target, _ := sr.chooseLeaseToTransfer(ctx, &hottestRanges, &localDesc, storeList, storeMap) expectTarget := roachpb.StoreID(4) if target.StoreID != expectTarget { t.Errorf("got target store s%d for range with RaftStatus %v; want s%d", @@ -843,7 +952,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { &hottestRanges, &localDesc, storeList, - qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: 0.25}, + qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: 0.05}, ) expectTargets := []roachpb.ReplicationTarget{ {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 73443190424e..7fa2b71e6de3 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -37,6 +37,7 @@ type StoreTestingKnobs struct { ConsistencyTestingKnobs ConsistencyTestingKnobs TenantRateKnobs tenantrate.TestingKnobs StorageKnobs storage.TestingKnobs + AllocatorKnobs *AllocatorTestingKnobs // TestingRequestFilter is called before evaluating each request on a // replica. The filter is run before the request acquires latches, so @@ -382,6 +383,14 @@ var _ base.ModuleTestingKnobs = NodeLivenessTestingKnobs{} // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. func (NodeLivenessTestingKnobs) ModuleTestingKnobs() {} +// AllocatorTestingKnobs allows tests to override the behavior of `Allocator`. +type AllocatorTestingKnobs struct { + // AllowLeaseTransfersToReplicasNeedingSnapshots permits lease transfer + // targets produced by the Allocator to include replicas that may be waiting + // for snapshots. + AllowLeaseTransfersToReplicasNeedingSnapshots bool +} + // PinnedLeasesKnob is a testing know for controlling what store can acquire a // lease for specific ranges. type PinnedLeasesKnob struct {