diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 1f0c1a44e10d..b2d38985bdb6 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -355,18 +355,15 @@ func (a *Allocator) ComputeAction( return AllocatorRemoveLearner, removeLearnerReplicaPriority } // computeAction expects to operate only on voters. - return a.computeAction(ctx, zone, desc.RangeID, desc.Replicas().Voters()) + return a.computeAction(ctx, zone, desc.Replicas().Voters()) } func (a *Allocator) computeAction( - ctx context.Context, - zone *zonepb.ZoneConfig, - rangeID roachpb.RangeID, - voterReplicas []roachpb.ReplicaDescriptor, + ctx context.Context, zone *zonepb.ZoneConfig, voterReplicas []roachpb.ReplicaDescriptor, ) (AllocatorAction, float64) { // TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets. have := len(voterReplicas) - decommissioningReplicas := a.storePool.decommissioningReplicas(rangeID, voterReplicas) + decommissioningReplicas := a.storePool.decommissioningReplicas(voterReplicas) clusterNodes := a.storePool.ClusterNodeCount() need := GetNeededReplicas(*zone.NumReplicas, clusterNodes) desiredQuorum := computeQuorum(need) @@ -383,7 +380,7 @@ func (a *Allocator) computeAction( return action, priority } - liveVoterReplicas, deadVoterReplicas := a.storePool.liveAndDeadReplicas(rangeID, voterReplicas) + liveVoterReplicas, deadVoterReplicas := a.storePool.liveAndDeadReplicas(voterReplicas) if len(liveVoterReplicas) < quorum { // Do not take any replacement/removal action if we do not have a quorum of live @@ -469,12 +466,9 @@ type decisionDetails struct { // // TODO(tbg): AllocateReplacement? func (a *Allocator) AllocateTarget( - ctx context.Context, - zone *zonepb.ZoneConfig, - rangeID roachpb.RangeID, - existingReplicas []roachpb.ReplicaDescriptor, + ctx context.Context, zone *zonepb.ZoneConfig, existingReplicas []roachpb.ReplicaDescriptor, ) (*roachpb.StoreDescriptor, string, error) { - sl, aliveStoreCount, throttled := a.storePool.getStoreList(rangeID, storeFilterThrottled) + sl, aliveStoreCount, throttled := a.storePool.getStoreList(storeFilterThrottled) target, details := a.allocateTargetFromList( ctx, sl, zone, existingReplicas, a.scorerOptions()) @@ -566,7 +560,7 @@ func (a Allocator) RemoveTarget( for i, exist := range candidates { existingStoreIDs[i] = exist.StoreID } - sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, roachpb.RangeID(0), storeFilterNone) + sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone) analyzedConstraints := constraint.AnalyzeConstraints( ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) @@ -625,12 +619,11 @@ func (a Allocator) RebalanceTarget( ctx context.Context, zone *zonepb.ZoneConfig, raftStatus *raft.Status, - rangeID roachpb.RangeID, existingReplicas []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, filter storeFilter, ) (add roachpb.ReplicationTarget, remove roachpb.ReplicationTarget, details string, ok bool) { - sl, _, _ := a.storePool.getStoreList(rangeID, filter) + sl, _, _ := a.storePool.getStoreList(filter) zero := roachpb.ReplicationTarget{} @@ -778,13 +771,12 @@ func (a *Allocator) TransferLeaseTarget( zone *zonepb.ZoneConfig, existing []roachpb.ReplicaDescriptor, leaseStoreID roachpb.StoreID, - rangeID roachpb.RangeID, stats *replicaStats, checkTransferLeaseSource bool, checkCandidateFullness bool, alwaysAllowDecisionWithoutStats bool, ) roachpb.ReplicaDescriptor { - sl, _, _ := a.storePool.getStoreList(rangeID, storeFilterNone) + sl, _, _ := a.storePool.getStoreList(storeFilterNone) sl = sl.filter(zone.Constraints) // Filter stores that are on nodes containing existing replicas, but leave @@ -849,7 +841,7 @@ func (a *Allocator) TransferLeaseTarget( } // Only consider live, non-draining replicas. - existing, _ = a.storePool.liveAndDeadReplicas(rangeID, existing) + existing, _ = a.storePool.liveAndDeadReplicas(existing) // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) { @@ -926,7 +918,6 @@ func (a *Allocator) ShouldTransferLease( zone *zonepb.ZoneConfig, existing []roachpb.ReplicaDescriptor, leaseStoreID roachpb.StoreID, - rangeID roachpb.RangeID, stats *replicaStats, ) bool { source, ok := a.storePool.getStoreDescriptor(leaseStoreID) @@ -949,12 +940,12 @@ func (a *Allocator) ShouldTransferLease( } } - sl, _, _ := a.storePool.getStoreList(rangeID, storeFilterNone) + sl, _, _ := a.storePool.getStoreList(storeFilterNone) sl = sl.filter(zone.Constraints) log.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=%d):\n%s", leaseStoreID, sl) // Only consider live, non-draining replicas. - existing, _ = a.storePool.liveAndDeadReplicas(rangeID, existing) + existing, _ = a.storePool.liveAndDeadReplicas(existing) // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == source.StoreID) { diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index f529937cedbf..a5c0e1a0ed9f 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -391,7 +391,6 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { result, _, err := a.AllocateTarget( context.Background(), &simpleZoneConfig, - firstRangeID, []roachpb.ReplicaDescriptor{}, ) if err != nil { @@ -410,7 +409,6 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { result, _, err := a.AllocateTarget( context.Background(), &simpleZoneConfig, - firstRangeID, []roachpb.ReplicaDescriptor{}, ) if result != nil { @@ -431,7 +429,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { result1, _, err := a.AllocateTarget( ctx, &multiDCConfig, - firstRangeID, []roachpb.ReplicaDescriptor{}, ) if err != nil { @@ -440,7 +437,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { result2, _, err := a.AllocateTarget( ctx, &multiDCConfig, - firstRangeID, []roachpb.ReplicaDescriptor{{ NodeID: result1.Node.NodeID, StoreID: result1.StoreID, @@ -458,7 +454,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { result3, _, err := a.AllocateTarget( ctx, &multiDCConfig, - firstRangeID, []roachpb.ReplicaDescriptor{ { NodeID: result1.Node.NodeID, @@ -494,7 +489,6 @@ func TestAllocatorExistingReplica(t *testing.T) { }, }, }, - firstRangeID, []roachpb.ReplicaDescriptor{ { NodeID: 2, @@ -597,7 +591,6 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { result, _, err := a.AllocateTarget( context.Background(), zonepb.EmptyCompleteZoneConfig(), - firstRangeID, tc.existing, ) if e, a := tc.expectTarget, result != nil; e != a { @@ -612,7 +605,6 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, /* raftStatus */ - firstRangeID, tc.existing, rangeUsageInfo, storeFilterThrottled, @@ -686,7 +678,6 @@ func TestAllocatorRebalance(t *testing.T) { ctx, zonepb.EmptyCompleteZoneConfig(), nil, - firstRangeID, []roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, rangeUsageInfo, storeFilterThrottled, @@ -708,7 +699,7 @@ func TestAllocatorRebalance(t *testing.T) { if !ok { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } - sl, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) if expResult := (i >= 2); expResult != result { t.Errorf("%d: expected rebalance %t; got %t; desc %+v; sl: %+v", i, expResult, result, desc, sl) @@ -837,7 +828,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { context.Background(), zonepb.EmptyCompleteZoneConfig(), status, - firstRangeID, replicas, rangeUsageInfo, storeFilterThrottled, @@ -858,7 +848,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { context.Background(), zonepb.EmptyCompleteZoneConfig(), status, - firstRangeID, replicas, rangeUsageInfo, storeFilterThrottled, @@ -876,7 +865,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { context.Background(), zonepb.EmptyCompleteZoneConfig(), status, - firstRangeID, replicas, rangeUsageInfo, storeFilterThrottled, @@ -956,7 +944,6 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { ctx, zonepb.EmptyCompleteZoneConfig(), nil, - firstRangeID, c.existing, rangeUsageInfo, storeFilterThrottled) @@ -1084,7 +1071,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { // Ensure gossiped store descriptor changes have propagated. testutils.SucceedsSoon(t, func() error { - sl, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) for j, s := range sl.stores { if a, e := s.Capacity.RangeCount, cluster[j].rangeCount; a != e { return errors.Errorf("range count for %d = %d != expected %d", j, a, e) @@ -1092,7 +1079,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { } return nil }) - sl, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) // Verify shouldRebalance returns the expected value. for j, store := range stores { @@ -1151,7 +1138,6 @@ func TestAllocatorRebalanceByCount(t *testing.T) { ctx, zonepb.EmptyCompleteZoneConfig(), nil, - firstRangeID, []roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, rangeUsageInfo, storeFilterThrottled, @@ -1167,7 +1153,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { if !ok { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } - sl, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) if expResult := (i < 3); expResult != result { t.Errorf("%d: expected rebalance %t; got %t", i, expResult, result) @@ -1225,7 +1211,6 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), c.existing, c.leaseholder, - 0, nil, /* replicaStats */ c.check, true, /* checkCandidateFullness */ @@ -1301,7 +1286,6 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), c.existing, c.leaseholder, - 0, nil, /* replicaStats */ c.check, true, /* checkCandidateFullness */ @@ -1435,7 +1419,6 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { ctx, zonepb.EmptyCompleteZoneConfig(), nil, /* raftStatus */ - firstRangeID, tc.existing, rangeUsageInfo, storeFilterThrottled, @@ -1506,7 +1489,6 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { ctx, zonepb.EmptyCompleteZoneConfig(), nil, /* raftStatus */ - firstRangeID, tc.existing, rangeUsageInfo, storeFilterThrottled, @@ -1573,7 +1555,6 @@ func TestAllocatorTransferLeaseTargetMultiStore(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), existing, c.leaseholder, - 0, nil, /* replicaStats */ c.check, true, /* checkCandidateFullness */ @@ -1630,7 +1611,6 @@ func TestAllocatorShouldTransferLease(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), c.existing, c.leaseholder, - 0, nil, /* replicaStats */ ) if c.expected != result { @@ -1692,7 +1672,6 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), c.existing, c.leaseholder, - 0, nil, /* replicaStats */ ) if c.expected != result { @@ -1822,7 +1801,6 @@ func TestAllocatorLeasePreferences(t *testing.T) { zone, c.existing, c.leaseholder, - 0, nil, /* replicaStats */ ) expectTransfer := c.expectedCheckTrue != 0 @@ -1834,7 +1812,6 @@ func TestAllocatorLeasePreferences(t *testing.T) { zone, c.existing, c.leaseholder, - 0, nil, /* replicaStats */ true, /* checkTransferLeaseSource */ true, /* checkCandidateFullness */ @@ -1848,7 +1825,6 @@ func TestAllocatorLeasePreferences(t *testing.T) { zone, c.existing, c.leaseholder, - 0, nil, /* replicaStats */ false, /* checkTransferLeaseSource */ true, /* checkCandidateFullness */ @@ -1931,7 +1907,6 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { zone, c.existing, c.leaseholder, - 0, nil, /* replicaStats */ true, /* checkTransferLeaseSource */ true, /* checkCandidateFullness */ @@ -1945,7 +1920,6 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { zone, c.existing, c.leaseholder, - 0, nil, /* replicaStats */ false, /* checkTransferLeaseSource */ true, /* checkCandidateFullness */ @@ -2091,7 +2065,6 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { targetStore, details, err := a.AllocateTarget( context.Background(), zonepb.EmptyCompleteZoneConfig(), - firstRangeID, existingRepls, ) if err != nil { @@ -2214,7 +2187,6 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, - firstRangeID, existingRepls, rangeUsageInfo, storeFilterThrottled, @@ -2358,7 +2330,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(multiDiversityDCStores, t) - sl, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) // Given a set of existing replicas for a range, rank which of the remaining // stores from multiDiversityDCStores would be the best addition to the range @@ -2773,7 +2745,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { } for testIdx, tc := range testCases { - sl, _, _ := a.storePool.getStoreListFromIDs(tc.existing, roachpb.RangeID(0), storeFilterNone) + sl, _, _ := a.storePool.getStoreListFromIDs(tc.existing, storeFilterNone) existingRepls := make([]roachpb.ReplicaDescriptor, len(tc.existing)) for i, storeID := range tc.existing { existingRepls[i] = roachpb.ReplicaDescriptor{ @@ -2819,7 +2791,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(multiDiversityDCStores, t) - sl, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) // Given a set of existing replicas for a range, rank which of the remaining // stores would be best to remove if we had to remove one purely on the basis @@ -3609,7 +3581,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { // Also verify that RebalanceTarget picks out one of the best options as // the final rebalance choice. target, _, details, ok := a.RebalanceTarget( - context.Background(), zone, nil, firstRangeID, existingRepls, rangeUsageInfo, storeFilterThrottled) + context.Background(), zone, nil, existingRepls, rangeUsageInfo, storeFilterThrottled) var found bool if !ok && len(tc.validTargets) == 0 { found = true @@ -3786,7 +3758,6 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), existing, c.leaseholder, - 0, c.stats, c.check, true, /* checkCandidateFullness */ @@ -5178,7 +5149,6 @@ func TestAllocatorThrottled(t *testing.T) { _, _, err := a.AllocateTarget( ctx, &simpleZoneConfig, - firstRangeID, []roachpb.ReplicaDescriptor{}, ) if !errors.HasInterface(err, (*purgatoryError)(nil)) { @@ -5190,7 +5160,6 @@ func TestAllocatorThrottled(t *testing.T) { result, _, err := a.AllocateTarget( ctx, &simpleZoneConfig, - firstRangeID, []roachpb.ReplicaDescriptor{}, ) if err != nil { @@ -5212,7 +5181,6 @@ func TestAllocatorThrottled(t *testing.T) { _, _, err = a.AllocateTarget( ctx, &simpleZoneConfig, - firstRangeID, []roachpb.ReplicaDescriptor{}, ) if errors.HasInterface(err, (*purgatoryError)(nil)) { @@ -5533,7 +5501,6 @@ func TestAllocatorRebalanceAway(t *testing.T) { ctx, &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: []zonepb.ConstraintsConjunction{constraints}}, nil, - firstRangeID, existingReplicas, rangeUsageInfo, storeFilterThrottled, @@ -5699,7 +5666,6 @@ func TestAllocatorFullDisks(t *testing.T) { ctx, zonepb.EmptyCompleteZoneConfig(), nil, - firstRangeID, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, rangeUsageInfo, storeFilterThrottled, @@ -5828,7 +5794,6 @@ func Example_rebalancing() { context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, - firstRangeID, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, rangeUsageInfo, storeFilterThrottled, diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index b46cde7a916a..0dec18f7ca1b 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -515,7 +515,7 @@ func (m *multiTestContext) initGossipNetwork() { m.gossipStores() testutils.SucceedsSoon(m.t, func() error { for i := 0; i < len(m.stores); i++ { - if _, alive, _ := m.storePools[i].GetStoreList(roachpb.RangeID(0)); alive != len(m.stores) { + if _, alive, _ := m.storePools[i].GetStoreList(); alive != len(m.stores) { return errors.Errorf("node %d's store pool only has %d alive stores, expected %d", m.stores[i].Ident.NodeID, alive, len(m.stores)) } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 821f1893cc0a..edfc2ea480e4 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -375,8 +375,8 @@ func (r *Replica) IsRaftGroupInitialized() bool { // GetStoreList exposes getStoreList for testing only, but with a hardcoded // storeFilter of storeFilterNone. -func (sp *StorePool) GetStoreList(rangeID roachpb.RangeID) (StoreList, int, int) { - list, available, throttled := sp.getStoreList(rangeID, storeFilterNone) +func (sp *StorePool) GetStoreList() (StoreList, int, int) { + list, available, throttled := sp.getStoreList(storeFilterNone) return list, available, len(throttled) } diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 68da2a66c4e2..9ed729d0d024 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -656,7 +656,7 @@ func TestNodeLivenessSetDraining(t *testing.T) { testutils.SucceedsSoon(t, func() error { for i, sp := range mtc.storePools { curNodeID := mtc.gossips[i].NodeID.Get() - sl, alive, _ := sp.GetStoreList(0) + sl, alive, _ := sp.GetStoreList() if alive != expectedLive { return errors.Errorf( "expected %d live stores but got %d from node %d", @@ -690,7 +690,7 @@ func TestNodeLivenessSetDraining(t *testing.T) { testutils.SucceedsSoon(t, func() error { for i, sp := range mtc.storePools { curNodeID := mtc.gossips[i].NodeID.Get() - sl, alive, _ := sp.GetStoreList(0) + sl, alive, _ := sp.GetStoreList() if alive != expectedLive { return errors.Errorf( "expected %d live stores but got %d from node %d", diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index fd6c78d8143d..2f2bb7aef137 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2134,7 +2134,7 @@ func (s *Store) relocateOne( return nil, nil, err } - storeList, _, _ := s.allocator.storePool.getStoreList(desc.RangeID, storeFilterNone) + storeList, _, _ := s.allocator.storePool.getStoreList(storeFilterNone) storeMap := storeListToMap(storeList) // Compute which replica to add and/or remove, respectively. We ask the allocator diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index e4d2e594580e..b7d37114e649 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -233,7 +233,7 @@ func (rq *replicateQueue) shouldQueue( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rangeUsageInfo := rangeUsageInfoForRepl(repl) _, _, _, ok := rq.allocator.RebalanceTarget( - ctx, zone, repl.RaftStatus(), desc.RangeID, voterReplicas, rangeUsageInfo, storeFilterThrottled) + ctx, zone, repl.RaftStatus(), voterReplicas, rangeUsageInfo, storeFilterThrottled) if ok { log.VEventf(ctx, 2, "rebalance target found, enqueuing") return true, 0 @@ -245,7 +245,7 @@ func (rq *replicateQueue) shouldQueue( if lease, _ := repl.GetLease(); repl.IsLeaseValid(lease, now) { if rq.canTransferLease() && rq.allocator.ShouldTransferLease( - ctx, zone, voterReplicas, lease.Replica.StoreID, desc.RangeID, repl.leaseholderStats) { + ctx, zone, voterReplicas, lease.Replica.StoreID, repl.leaseholderStats) { log.VEventf(ctx, 2, "lease transfer needed, enqueuing") return true, 0 } @@ -320,8 +320,7 @@ func (rq *replicateQueue) processOneChange( // Avoid taking action if the range has too many dead replicas to make // quorum. voterReplicas := desc.Replicas().Voters() - liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas( - desc.RangeID, voterReplicas) + liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas) { unavailable := !desc.Replicas().CanMakeProgress(func(rDesc roachpb.ReplicaDescriptor) bool { for _, inner := range liveVoterReplicas { @@ -379,8 +378,7 @@ func (rq *replicateQueue) processOneChange( } return rq.addOrReplace(ctx, repl, voterReplicas, liveVoterReplicas, removeIdx, dryRun) case AllocatorReplaceDecommissioning: - decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas( - desc.RangeID, voterReplicas) + decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(voterReplicas) if len(decommissioningReplicas) == 0 { // Nothing to do. return false, nil @@ -481,7 +479,6 @@ func (rq *replicateQueue) addOrReplace( newStore, details, err := rq.allocator.AllocateTarget( ctx, zone, - desc.RangeID, remainingLiveReplicas, ) if err != nil { @@ -522,7 +519,6 @@ func (rq *replicateQueue) addOrReplace( _, _, err := rq.allocator.AllocateTarget( ctx, zone, - desc.RangeID, oldPlusNewReplicas, ) if err != nil { @@ -719,8 +715,7 @@ func (rq *replicateQueue) removeDecommissioning( ctx context.Context, repl *Replica, dryRun bool, ) (requeue bool, _ error) { desc, _ := repl.DescAndZone() - decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas( - desc.RangeID, desc.Replicas().All()) + decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(desc.Replicas().All()) if len(decommissioningReplicas) == 0 { log.VEventf(ctx, 1, "range of replica %s was identified as having decommissioning replicas, "+ "but no decommissioning replicas were found", repl) @@ -837,7 +832,7 @@ func (rq *replicateQueue) considerRebalance( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rangeUsageInfo := rangeUsageInfoForRepl(repl) addTarget, removeTarget, details, ok := rq.allocator.RebalanceTarget( - ctx, zone, repl.RaftStatus(), desc.RangeID, existingReplicas, rangeUsageInfo, + ctx, zone, repl.RaftStatus(), existingReplicas, rangeUsageInfo, storeFilterThrottled) if !ok { log.VEventf(ctx, 1, "no suitable rebalance target") @@ -952,7 +947,6 @@ func (rq *replicateQueue) findTargetAndTransferLease( zone, desc.Replicas().Voters(), repl.store.StoreID(), - desc.RangeID, repl.leaseholderStats, opts.checkTransferLeaseSource, opts.checkCandidateFullness, diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index f1d10bf48e35..6ef626e70a89 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -186,19 +186,14 @@ const ( storeStatusUnknown // The store is alive but it is throttled. storeStatusThrottled - // The store is alive but a replica for the same rangeID was recently - // discovered to be corrupt. - storeStatusReplicaCorrupted // The store is alive and available. storeStatusAvailable // The store is decommissioning. storeStatusDecommissioning ) -// status returns the current status of the store, including whether -// any of the replicas for the specified rangeID are corrupted. func (sd *storeDetail) status( - now time.Time, threshold time.Duration, rangeID roachpb.RangeID, nl NodeLivenessFunc, + now time.Time, threshold time.Duration, nl NodeLivenessFunc, ) storeStatus { // The store is considered dead if it hasn't been updated via gossip // within the liveness threshold. Note that lastUpdatedTime is set @@ -318,7 +313,7 @@ func (sp *StorePool) String() string { for _, id := range ids { detail := sp.detailsMu.storeDetails[id] fmt.Fprintf(&buf, "%d", id) - status := detail.status(now, timeUntilStoreDead, 0, sp.nodeLivenessFn) + status := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn) if status != storeStatusAvailable { fmt.Fprintf(&buf, " (status=%d)", status) } @@ -472,7 +467,7 @@ func (sp *StorePool) getStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreD // decommissioningReplicas filters out replicas on decommissioning node/store // from the provided repls and returns them in a slice. func (sp *StorePool) decommissioningReplicas( - rangeID roachpb.RangeID, repls []roachpb.ReplicaDescriptor, + repls []roachpb.ReplicaDescriptor, ) (decommissioningReplicas []roachpb.ReplicaDescriptor) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -484,7 +479,7 @@ func (sp *StorePool) decommissioningReplicas( for _, repl := range repls { detail := sp.getStoreDetailLocked(repl.StoreID) - switch detail.status(now, timeUntilStoreDead, rangeID, sp.nodeLivenessFn) { + switch detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn) { case storeStatusDecommissioning: decommissioningReplicas = append(decommissioningReplicas, repl) } @@ -505,7 +500,7 @@ func (sp *StorePool) ClusterNodeCount() int { // from the returned slices. Replicas on decommissioning node/store are // considered live. func (sp *StorePool) liveAndDeadReplicas( - rangeID roachpb.RangeID, repls []roachpb.ReplicaDescriptor, + repls []roachpb.ReplicaDescriptor, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -516,7 +511,7 @@ func (sp *StorePool) liveAndDeadReplicas( for _, repl := range repls { detail := sp.getStoreDetailLocked(repl.StoreID) // Mark replica as dead if store is dead. - status := detail.status(now, timeUntilStoreDead, rangeID, sp.nodeLivenessFn) + status := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn) switch status { case storeStatusDead: deadReplicas = append(deadReplicas, repl) @@ -646,11 +641,8 @@ type throttledStoreReasons []string // getStoreList returns a storeList that contains all active stores that contain // the required attributes and their associated stats. The storeList is filtered // according to the provided storeFilter. It also returns the total number of -// alive and throttled stores. The passed in rangeID is used to check for -// corrupted replicas. -func (sp *StorePool) getStoreList( - rangeID roachpb.RangeID, filter storeFilter, -) (StoreList, int, throttledStoreReasons) { +// alive and throttled stores. +func (sp *StorePool) getStoreList(filter storeFilter) (StoreList, int, throttledStoreReasons) { sp.detailsMu.RLock() defer sp.detailsMu.RUnlock() @@ -658,23 +650,23 @@ func (sp *StorePool) getStoreList( for storeID := range sp.detailsMu.storeDetails { storeIDs = append(storeIDs, storeID) } - return sp.getStoreListFromIDsRLocked(storeIDs, rangeID, filter) + return sp.getStoreListFromIDsRLocked(storeIDs, filter) } // getStoreListFromIDs is the same function as getStoreList but only returns stores // from the subset of passed in store IDs. func (sp *StorePool) getStoreListFromIDs( - storeIDs roachpb.StoreIDSlice, rangeID roachpb.RangeID, filter storeFilter, + storeIDs roachpb.StoreIDSlice, filter storeFilter, ) (StoreList, int, throttledStoreReasons) { sp.detailsMu.RLock() defer sp.detailsMu.RUnlock() - return sp.getStoreListFromIDsRLocked(storeIDs, rangeID, filter) + return sp.getStoreListFromIDsRLocked(storeIDs, filter) } // getStoreListFromIDsRLocked is the same function as getStoreList but requires // that the detailsMU read lock is held. func (sp *StorePool) getStoreListFromIDsRLocked( - storeIDs roachpb.StoreIDSlice, rangeID roachpb.RangeID, filter storeFilter, + storeIDs roachpb.StoreIDSlice, filter storeFilter, ) (StoreList, int, throttledStoreReasons) { if sp.deterministic { sort.Sort(storeIDs) @@ -695,15 +687,13 @@ func (sp *StorePool) getStoreListFromIDsRLocked( // Do nothing; this store is not in the StorePool. continue } - switch s := detail.status(now, timeUntilStoreDead, rangeID, sp.nodeLivenessFn); s { + switch s := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn); s { case storeStatusThrottled: aliveStoreCount++ throttled = append(throttled, detail.throttledBecause) if filter != storeFilterThrottled { storeDescriptors = append(storeDescriptors, *detail.desc) } - case storeStatusReplicaCorrupted: - aliveStoreCount++ case storeStatusAvailable: aliveStoreCount++ storeDescriptors = append(storeDescriptors, *detail.desc) diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index b86d509dd6a8..ab1494bc5df6 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -150,7 +150,6 @@ func verifyStoreList( sp *StorePool, constraints []zonepb.ConstraintsConjunction, storeIDs roachpb.StoreIDSlice, // optional - rangeID roachpb.RangeID, filter storeFilter, expected []int, expectedAliveStoreCount int, @@ -160,9 +159,9 @@ func verifyStoreList( var aliveStoreCount int var throttled throttledStoreReasons if storeIDs == nil { - sl, aliveStoreCount, throttled = sp.getStoreList(rangeID, filter) + sl, aliveStoreCount, throttled = sp.getStoreList(filter) } else { - sl, aliveStoreCount, throttled = sp.getStoreListFromIDs(storeIDs, rangeID, filter) + sl, aliveStoreCount, throttled = sp.getStoreListFromIDs(storeIDs, filter) } throttledStoreCount := len(throttled) sl = sl.filter(constraints) @@ -207,7 +206,7 @@ func TestStorePoolGetStoreList(t *testing.T) { } required := []string{"ssd", "dc"} // Nothing yet. - sl, _, _ := sp.getStoreList(roachpb.RangeID(0), storeFilterNone) + sl, _, _ := sp.getStoreList(storeFilterNone) sl = sl.filter(constraints) if len(sl.stores) != 0 { t.Errorf("expected no stores, instead %+v", sl.stores) @@ -249,8 +248,6 @@ func TestStorePoolGetStoreList(t *testing.T) { Attrs: roachpb.Attributes{Attrs: required}, } - const rangeID = roachpb.RangeID(1) - // Gossip and mark all alive initially. sg.GossipStores([]*roachpb.StoreDescriptor{ &matchingStore, @@ -277,7 +274,6 @@ func TestStorePoolGetStoreList(t *testing.T) { sp, constraints, nil, /* storeIDs */ - rangeID, storeFilterNone, []int{ int(matchingStore.StoreID), @@ -295,7 +291,6 @@ func TestStorePoolGetStoreList(t *testing.T) { sp, constraints, nil, /* storeIDs */ - rangeID, storeFilterThrottled, []int{ int(matchingStore.StoreID), @@ -319,7 +314,6 @@ func TestStorePoolGetStoreList(t *testing.T) { sp, constraints, limitToStoreIDs, - rangeID, storeFilterNone, []int{ int(matchingStore.StoreID), @@ -337,7 +331,6 @@ func TestStorePoolGetStoreList(t *testing.T) { sp, constraints, limitToStoreIDs, - rangeID, storeFilterThrottled, []int{ int(matchingStore.StoreID), @@ -702,7 +695,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { mnl.setNodeStatus(roachpb.NodeID(i), kvserverpb.NodeLivenessStatus_LIVE) } - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(0, replicas) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas) if len(liveReplicas) != 5 { t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) } @@ -713,7 +706,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { mnl.setNodeStatus(4, kvserverpb.NodeLivenessStatus_DEAD) mnl.setNodeStatus(5, kvserverpb.NodeLivenessStatus_DEAD) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(0, replicas) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas) if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a) } @@ -724,7 +717,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { // Mark node 4 as merely unavailable. mnl.setNodeStatus(4, kvserverpb.NodeLivenessStatus_UNAVAILABLE) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(0, replicas) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas) if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a) } @@ -748,12 +741,12 @@ func TestStorePoolDefaultState(t *testing.T) { kvserverpb.NodeLivenessStatus_DEAD) defer stopper.Stop(context.TODO()) - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(0, []roachpb.ReplicaDescriptor{{StoreID: 1}}) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas([]roachpb.ReplicaDescriptor{{StoreID: 1}}) if len(liveReplicas) != 0 || len(deadReplicas) != 0 { t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas) } - sl, alive, throttled := sp.getStoreList(roachpb.RangeID(0), storeFilterNone) + sl, alive, throttled := sp.getStoreList(storeFilterNone) if len(sl.stores) > 0 { t.Errorf("expected no live stores; got list of %v", sl) } @@ -939,7 +932,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { mnl.setNodeStatus(roachpb.NodeID(i), kvserverpb.NodeLivenessStatus_LIVE) } - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(0, replicas) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas) if len(liveReplicas) != 5 { t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) } @@ -951,7 +944,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { // Mark node 5 as dead. mnl.setNodeStatus(5, kvserverpb.NodeLivenessStatus_DEAD) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(0, replicas) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas) // Decommissioning replicas are considered live. if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a) @@ -960,7 +953,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { t.Fatalf("expected dead replicas %+v; got %+v", e, a) } - decommissioningReplicas := sp.decommissioningReplicas(0, replicas) + decommissioningReplicas := sp.decommissioningReplicas(replicas) if a, e := decommissioningReplicas, replicas[3:4]; !reflect.DeepEqual(a, e) { t.Fatalf("expected decommissioning replicas %+v; got %+v", e, a) } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index fc69683bd81b..30e793164c93 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -196,7 +196,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { continue } - storeList, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone) + storeList, _, _ := sr.rq.allocator.storePool.getStoreList(storeFilterNone) sr.rebalanceStore(ctx, mode, storeList) } }) diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index d63076b78ad3..a72b388e9958 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -116,7 +116,7 @@ func TestChooseLeaseToTransfer(t *testing.T) { stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) - storeList, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) storeMap := storeListToMap(storeList) const minQPS = 800 @@ -199,7 +199,7 @@ func TestChooseReplicaToRebalance(t *testing.T) { stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) - storeList, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) storeMap := storeListToMap(storeList) const minQPS = 800 @@ -307,7 +307,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) - storeList, _, _ := a.storePool.getStoreList(firstRangeID, storeFilterThrottled) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) storeMap := storeListToMap(storeList) const minQPS = 800