diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index f05caa2933ba..35dc7fb22dba 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -1272,6 +1272,9 @@ func (bq *baseQueue) processReplicasInPurgatory( for _, item := range ranges { repl, err := bq.getReplica(item.rangeID) if err != nil || item.replicaID != repl.ReplicaID() { + bq.mu.Lock() + bq.removeFromReplicaSetLocked(item.rangeID) + bq.mu.Unlock() continue } annotatedCtx := repl.AnnotateCtx(ctx) @@ -1281,6 +1284,10 @@ func (bq *baseQueue) processReplicasInPurgatory( bq.finishProcessingReplica(ctx, stopper, repl, err) }, ) != nil { + // NB: We do not need to worry about removing any unprocessed replicas + // from the replica set here, as RunTask will only return an error when + // the stopper is quiescing or stopping -- meaning the process is + // shutting down. return } } diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index 484aaa2f26e7..f793f3653127 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -904,17 +904,28 @@ func TestBaseQueuePurgatory(t *testing.T) { return nil }) + // Change the replicaID of the first the replica and destroy the second + // replica. These replicas should not be processed and should be removed from + // the replica set. The number of processed replicas will be 2 less. + const rmReplCount = 2 + repls[0].replicaID = 2 + if err := tc.store.RemoveReplica(ctx, repls[1], repls[1].Desc().NextReplicaID, RemoveOptions{ + DestroyData: true, + }); err != nil { + t.Fatal(err) + } + // Remove error and reprocess. testQueue.err = nil testQueue.pChan <- timeutil.Now() testutils.SucceedsSoon(t, func() error { - if pc := testQueue.getProcessed(); pc != replicaCount*3 { - return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3, pc) + if pc := testQueue.getProcessed(); pc != replicaCount*3-rmReplCount { + return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3-rmReplCount, pc) } // Check metrics. - if v := bq.successes.Count(); v != int64(replicaCount) { - return errors.Errorf("expected %d processed replicas; got %d", replicaCount, v) + if v := bq.successes.Count(); v != int64(replicaCount)-rmReplCount { + return errors.Errorf("expected %d processed replicas; got %d", replicaCount-rmReplCount, v) } if v := bq.failures.Count(); v != int64(replicaCount*2) { return errors.Errorf("expected %d failed replicas; got %d", replicaCount*2, v) @@ -925,6 +936,15 @@ func TestBaseQueuePurgatory(t *testing.T) { if v := bq.purgatory.Value(); v != 0 { return errors.Errorf("expected 0 purgatory replicas; got %d", v) } + // Verify there are no replicas left in the replica set after finishing + // processing. This is within the retry loop as the above conditions can + // pass without considering the removed replicas. + bq.mu.Lock() + replicasCount := len(bq.mu.replicas) + bq.mu.Unlock() + if replicasCount != 0 { + return errors.Errorf("expected no replicas in the replica set: got %d", replicasCount) + } return nil }) @@ -936,6 +956,30 @@ func TestBaseQueuePurgatory(t *testing.T) { if l := bq.Length(); l != 0 { t.Errorf("expected empty priorityQ; got %d", l) } + + // Verify that the replica with a changed replicaID can be processed. + beforeProcessCount := testQueue.getProcessed() + beforeSuccessCount := bq.successes.Count() + beforeFailureCount := bq.failures.Count() + bq.maybeAdd(ctx, repls[0], hlc.ClockTimestamp{}) + testutils.SucceedsSoon(t, func() error { + if pc := testQueue.getProcessed(); pc != beforeProcessCount+1 { + return errors.Errorf("expected %d processed replicas; got %d", beforeProcessCount+1, pc) + } + if v := bq.successes.Count(); v != beforeSuccessCount+1 { + return errors.Errorf("expected %d processed replicas; got %d", beforeSuccessCount+1, v) + } + if v := bq.failures.Count(); v != beforeFailureCount { + return errors.Errorf("expected %d failed replicas; got %d", beforeFailureCount, v) + } + if v := bq.pending.Value(); v != 0 { + return errors.Errorf("expected 0 pending replicas; got %d", v) + } + if v := bq.purgatory.Value(); v != 0 { + return errors.Errorf("expected 0 purgatory replicas; got %d", v) + } + return nil + }) } type processTimeoutQueueImpl struct {