Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: TestReplicateQueueDownReplicate #49812

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 16 additions & 39 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func TestReplicateQueueUpReplicate(t *testing.T) {
// notice over-replicated ranges and remove replicas from them.
func TestReplicateQueueDownReplicate(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
const replicaCount = 3

// The goal of this test is to ensure that down replication occurs correctly
Expand All @@ -255,46 +256,22 @@ func TestReplicateQueueDownReplicate(t *testing.T) {
},
},
)
defer tc.Stopper().Stop(context.Background())

// Split off a range from the initial range for testing; there are
// complications if the metadata ranges are moved.
testKey := roachpb.Key("m")
if _, _, err := tc.SplitRange(testKey); err != nil {
t.Fatal(err)
}
defer tc.Stopper().Stop(ctx)

allowedErrs := strings.Join([]string{
// If a node is already present, we expect this error.
"unable to add replica .* which is already present",
// If a replica for this range was previously present on this store and
// it has already been removed but has not yet been GCed, this error
// is expected.
kvserver.IntersectingSnapshotMsg,
}, "|")

// Up-replicate the new range to all nodes to create redundant replicas.
// Every time a new replica is added, there's a very good chance that
// another one is removed. So all the replicas can't be added at once and
// instead need to be added one at a time ensuring that the replica did
// indeed make it to the desired target.
for _, server := range tc.Servers {
nodeID := server.NodeID()
// If this is not wrapped in a SucceedsSoon, then other temporary
// failures unlike the ones listed below, such as rejected reservations
// can cause the test to fail. When encountering those failures, a
// retry is in order.
testutils.SucceedsSoon(t, func() error {
_, err := tc.AddReplicas(testKey, roachpb.ReplicationTarget{
NodeID: nodeID,
StoreID: server.GetFirstStoreID(),
})
if testutils.IsError(err, allowedErrs) {
return nil
}
return err
})
}
// Disable the replication queues so that the range we're about to create
// doesn't get down-replicated too soon.
tc.ToggleReplicateQueues(false)

testKey := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, testKey)
// At the end of StartTestCluster(), all ranges have 5 replicas since they're
// all "system ranges". When the ScratchRange() splits its range, it also
// starts up with 5 replicas. Since it's not a system range, its default zone
// config asks for 3x replication, and the replication queue will
// down-replicate it.
require.Len(t, desc.Replicas().All(), 5)
// Re-enable the replication queue.
tc.ToggleReplicateQueues(true)

// Now wait until the replicas have been down-replicated back to the
// desired number.
Expand Down
12 changes: 12 additions & 0 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ func (tc *TestCluster) findMemberStore(storeID roachpb.StoreID) (*kvserver.Store
// TODO(andrei): This method takes inexplicably long.
// I think it shouldn't need any retries. See #38565.
func (tc *TestCluster) WaitForFullReplication() error {
log.Infof(context.TODO(), "WaitForFullReplication")
start := timeutil.Now()
defer func() {
end := timeutil.Now()
Expand Down Expand Up @@ -874,6 +875,17 @@ func (tc *TestCluster) ReplicationMode() base.TestClusterReplicationMode {
return tc.replicationMode
}

// ToggleReplicateQueues activates or deactivates the replication queues on all
// the stores on all the nodes.
func (tc *TestCluster) ToggleReplicateQueues(active bool) {
for _, s := range tc.Servers {
_ = s.Stores().VisitStores(func(store *kvserver.Store) error {
store.SetReplicateQueueActive(active)
return nil
})
}
}

type testClusterFactoryImpl struct{}

// TestClusterFactory can be passed to serverutils.InitTestClusterFactory
Expand Down