diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index fd28442881ff..0f3333a46f18 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -13951,6 +13951,22 @@ layers: unit: COUNT aggregation: AVG derivative: NONE + - name: queue.replicate.priority_inversion.requeue + exported_name: queue_replicate_priority_inversion_requeue + description: Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time. When the priority has changed from a high priority repair action to rebalance, the change is requeued to avoid unfairness. + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.total + exported_name: queue_replicate_priority_inversion_total + description: Total number of priority inversions in the replicate queue. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE - name: queue.replicate.process.failure exported_name: queue_replicate_process_failure description: Number of replicas which failed processing in the replicate queue diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 84f49f9f67cf..a23d5c75b692 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -1001,6 +1001,9 @@ func (bq *baseQueue) addInternal( default: // No need to signal again. } + if postEnqueueInterceptor := bq.store.TestingKnobs().BaseQueuePostEnqueueInterceptor; postEnqueueInterceptor != nil { + postEnqueueInterceptor(bq.store.StoreID(), desc.RangeID) + } // Note that we are bumping enqueueAdd here instead of during defer to avoid // treating requeuing a processing replica as newly added. They will be // re-added to the queue later which will double count them. diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 6613aca510dc..133824a3d95e 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -322,6 +322,22 @@ var ( Measurement: "Replicas", Unit: metric.Unit_COUNT, } + metaReplicateQueueRequeueDueToPriorityInversion = metric.Metadata{ + Name: "queue.replicate.priority_inversion.requeue", + Help: "Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. " + + "A priority inversion occurs when the priority at processing time ends up being lower " + + "than at enqueue time. When the priority has changed from a high priority repair action to rebalance, " + + "the change is requeued to avoid unfairness.", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionTotal = metric.Metadata{ + Name: "queue.replicate.priority_inversion.total", + Help: "Total number of priority inversions in the replicate queue. " + + "A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } ) // quorumError indicates a retryable error condition which sends replicas being @@ -344,6 +360,7 @@ func (e *quorumError) Error() string { func (*quorumError) PurgatoryErrorMarker() {} // ReplicateQueueMetrics is the set of metrics for the replicate queue. +// TODO(wenyihu6): metrics initialization could be cleaned up by using a map. type ReplicateQueueMetrics struct { AddReplicaCount *metric.Counter AddVoterReplicaCount *metric.Counter @@ -381,6 +398,10 @@ type ReplicateQueueMetrics struct { // TODO(sarkesian): Consider adding metrics for AllocatorRemoveLearner, // AllocatorConsiderRebalance, and AllocatorFinalizeAtomicReplicationChange // allocator actions. + + // Priority Inversion. + RequeueDueToPriorityInversion *metric.Counter + PriorityInversionTotal *metric.Counter } func makeReplicateQueueMetrics() ReplicateQueueMetrics { @@ -417,6 +438,9 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics { ReplaceDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaErrorCount), RemoveDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaSuccessCount), RemoveDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaErrorCount), + + RequeueDueToPriorityInversion: metric.NewCounter(metaReplicateQueueRequeueDueToPriorityInversion), + PriorityInversionTotal: metric.NewCounter(metaReplicateQueuePriorityInversionTotal), } } @@ -955,12 +979,14 @@ func (rq *replicateQueue) processOneChange( // starving other higher priority work. if PriorityInversionRequeue.Get(&rq.store.cfg.Settings.SV) { if inversion, shouldRequeue := allocatorimpl.CheckPriorityInversion(priorityAtEnqueue, change.Action); inversion { + rq.metrics.PriorityInversionTotal.Inc(1) if priorityInversionLogEveryN.ShouldLog() { log.KvDistribution.Infof(ctx, "priority inversion during process: shouldRequeue = %t action=%s, priority=%v, enqueuePriority=%v", shouldRequeue, change.Action, change.Action.Priority(), priorityAtEnqueue) } if shouldRequeue { + rq.metrics.RequeueDueToPriorityInversion.Inc(1) // Return true to requeue the range. Return the error to ensure it is // logged and tracked in replicate queue bq.failures metrics. See // replicateQueue.process for details. diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 4e03789fadeb..cdef886a2606 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -2540,3 +2540,117 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) { afterProcessSuccess := getDecommissioningNudgerMetricValue(t, tc, "process_success") require.Greater(t, afterProcessSuccess, int64(0)) } + +// TestPriorityInversionRequeue tests that the replicate queue correctly handles +// priority inversions by requeuing replicas when the PriorityInversionRequeue +// setting is enabled. +// +// This test specifically targets a race condition where: +// 1. A replica is enqueued for a high-priority repair action +// (FinalizeAtomicReplicationChange or RemoveLearner). +// 2. By the time the replica is processed, the repair is no longer needed and +// only a low-priority rebalance action (ConsiderRebalance) is computed. +// 3. This creates a priority inversion where a low-priority action blocks +// other higher-priority replicas in the queue from being processed. +// +// The race occurs during range rebalancing: +// 1. A leaseholder replica of a range is rebalanced from one store to another. +// 2. The new leaseholder enqueues the replica for repair (e.g. to finalize +// the atomic replication change or remove a learner replica). +// 3. Before processing, the old leaseholder has left the atomic joint config +// state or removed the learner replica. 4. When the new leaseholder processes +// the replica, it computes a ConsiderRebalance action, causing priority +// inversion. +// +// With PriorityInversionRequeue enabled, the queue should detect this condition +// and requeue the replica at the correct priority. The test validates this +// behavior through metrics that track priority inversions and requeuing events. +func TestPriorityInversionRequeue(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderDuress(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + kvserver.PriorityInversionRequeue.Override(ctx, &settings.SV, true) + + var scratchRangeID int64 + atomic.StoreInt64(&scratchRangeID, -1) + require.NoError(t, log.SetVModule("queue=5,replicate_queue=5,replica_command=5,replicate=5,replica=5")) + + const newLeaseholderStoreAndNodeID = 4 + var waitUntilLeavingJoint = func() {} + + tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + BaseQueueDisabledBypassFilter: func(rangeID roachpb.RangeID) bool { + // Disable the replicate queue except for the scratch range on the new leaseholder. + t.Logf("range %d is added to replicate queue store", rangeID) + return rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) + }, + BaseQueuePostEnqueueInterceptor: func(storeID roachpb.StoreID, rangeID roachpb.RangeID) { + // After enqueuing, wait for the old leaseholder to leave the atomic + // joint config state or remove the learner replica to force the + // priority inversion. + t.Logf("waiting for %d to leave joint config", rangeID) + if storeID == 4 && rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) { + waitUntilLeavingJoint() + } + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + scratchKey := tc.ScratchRange(t) + + // Wait until the old leaseholder has left the atomic joint config state or + // removed the learner replica. + waitUntilLeavingJoint = func() { + testutils.SucceedsSoon(t, func() error { + rangeDesc := tc.LookupRangeOrFatal(t, scratchKey) + replicas := rangeDesc.Replicas() + t.Logf("range %v: waiting to leave joint conf", rangeDesc) + if replicas.InAtomicReplicationChange() || len(replicas.LearnerDescriptors()) != 0 { + return errors.Newf("in between atomic changes: %v", replicas) + } + return nil + }) + } + + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + tc.AddVotersOrFatal(t, scratchRange.StartKey.AsRawKey(), tc.Targets(1, 2)...) + atomic.StoreInt64(&scratchRangeID, int64(scratchRange.RangeID)) + lh, err := tc.FindRangeLeaseHolder(scratchRange, nil) + require.NoError(t, err) + + // Rebalance the leaseholder replica to a new store. This will cause the race + // condition where the new leaseholder can enqueue a replica to replicate + // queue with high priority but compute a low priority action at processing + // time. + t.Logf("rebalancing range %d from s%d to s%d", scratchRange, lh.StoreID, newLeaseholderStoreAndNodeID) + _, err = tc.RebalanceVoter( + ctx, + scratchRange.StartKey.AsRawKey(), + roachpb.ReplicationTarget{StoreID: lh.StoreID, NodeID: lh.NodeID}, /* src */ + roachpb.ReplicationTarget{StoreID: newLeaseholderStoreAndNodeID, NodeID: newLeaseholderStoreAndNodeID}, /* dest */ + ) + require.NoError(t, err) + + // Wait until the priority inversion is detected and the replica is requeued. + testutils.SucceedsSoon(t, func() error { + store := tc.GetFirstStoreFromServer(t, 3) + if c := store.ReplicateQueueMetrics().PriorityInversionTotal.Count(); c == 0 { + return errors.New("expected non-zero priority inversion total count but got 0") + } + if c := store.ReplicateQueueMetrics().RequeueDueToPriorityInversion.Count(); c == 0 { + return errors.New("expected to requeue due to priority inversion but got 0") + } + return nil + }) +} diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index facefb4569b3..549939b2e2c4 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -535,6 +535,10 @@ type StoreTestingKnobs struct { // rangeID should ignore the queue being disabled, and be processed anyway. BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool + // BaseQueuePostEnqueueInterceptor is called with the storeID and rangeID of + // the replica right after a replica is enqueued (before it is processed) + BaseQueuePostEnqueueInterceptor func(storeID roachpb.StoreID, rangeID roachpb.RangeID) + // InjectReproposalError injects an error in tryReproposeWithNewLeaseIndexRaftMuLocked. // If nil is returned, reproposal will be attempted. InjectReproposalError func(p *ProposalData) error