Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13951,6 +13951,22 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NONE
- name: queue.replicate.priority_inversion.requeue
exported_name: queue_replicate_priority_inversion_requeue
description: Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time. When the priority has changed from a high priority repair action to rebalance, the change is requeued to avoid unfairness.
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.total
exported_name: queue_replicate_priority_inversion_total
description: Total number of priority inversions in the replicate queue. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.process.failure
exported_name: queue_replicate_process_failure
description: Number of replicas which failed processing in the replicate queue
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,9 @@ func (bq *baseQueue) addInternal(
default:
// No need to signal again.
}
if postEnqueueInterceptor := bq.store.TestingKnobs().BaseQueuePostEnqueueInterceptor; postEnqueueInterceptor != nil {
postEnqueueInterceptor(bq.store.StoreID(), desc.RangeID)
}
// Note that we are bumping enqueueAdd here instead of during defer to avoid
// treating requeuing a processing replica as newly added. They will be
// re-added to the queue later which will double count them.
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,22 @@ var (
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRequeueDueToPriorityInversion = metric.Metadata{
Name: "queue.replicate.priority_inversion.requeue",
Help: "Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. " +
"A priority inversion occurs when the priority at processing time ends up being lower " +
"than at enqueue time. When the priority has changed from a high priority repair action to rebalance, " +
"the change is requeued to avoid unfairness.",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueuePriorityInversionTotal = metric.Metadata{
Name: "queue.replicate.priority_inversion.total",
Help: "Total number of priority inversions in the replicate queue. " +
"A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
)

// quorumError indicates a retryable error condition which sends replicas being
Expand All @@ -344,6 +360,7 @@ func (e *quorumError) Error() string {
func (*quorumError) PurgatoryErrorMarker() {}

// ReplicateQueueMetrics is the set of metrics for the replicate queue.
// TODO(wenyihu6): metrics initialization could be cleaned up by using a map.
type ReplicateQueueMetrics struct {
AddReplicaCount *metric.Counter
AddVoterReplicaCount *metric.Counter
Expand Down Expand Up @@ -381,6 +398,10 @@ type ReplicateQueueMetrics struct {
// TODO(sarkesian): Consider adding metrics for AllocatorRemoveLearner,
// AllocatorConsiderRebalance, and AllocatorFinalizeAtomicReplicationChange
// allocator actions.

// Priority Inversion.
RequeueDueToPriorityInversion *metric.Counter
PriorityInversionTotal *metric.Counter
}

func makeReplicateQueueMetrics() ReplicateQueueMetrics {
Expand Down Expand Up @@ -417,6 +438,9 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics {
ReplaceDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaErrorCount),
RemoveDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaSuccessCount),
RemoveDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaErrorCount),

RequeueDueToPriorityInversion: metric.NewCounter(metaReplicateQueueRequeueDueToPriorityInversion),
PriorityInversionTotal: metric.NewCounter(metaReplicateQueuePriorityInversionTotal),
}
}

Expand Down Expand Up @@ -955,12 +979,14 @@ func (rq *replicateQueue) processOneChange(
// starving other higher priority work.
if PriorityInversionRequeue.Get(&rq.store.cfg.Settings.SV) {
if inversion, shouldRequeue := allocatorimpl.CheckPriorityInversion(priorityAtEnqueue, change.Action); inversion {
rq.metrics.PriorityInversionTotal.Inc(1)
if priorityInversionLogEveryN.ShouldLog() {
log.KvDistribution.Infof(ctx,
"priority inversion during process: shouldRequeue = %t action=%s, priority=%v, enqueuePriority=%v",
shouldRequeue, change.Action, change.Action.Priority(), priorityAtEnqueue)
}
if shouldRequeue {
rq.metrics.RequeueDueToPriorityInversion.Inc(1)
// Return true to requeue the range. Return the error to ensure it is
// logged and tracked in replicate queue bq.failures metrics. See
// replicateQueue.process for details.
Expand Down
114 changes: 114 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2540,3 +2540,117 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
afterProcessSuccess := getDecommissioningNudgerMetricValue(t, tc, "process_success")
require.Greater(t, afterProcessSuccess, int64(0))
}

// TestPriorityInversionRequeue tests that the replicate queue correctly handles
// priority inversions by requeuing replicas when the PriorityInversionRequeue
// setting is enabled.
//
// This test specifically targets a race condition where:
// 1. A replica is enqueued for a high-priority repair action
// (FinalizeAtomicReplicationChange or RemoveLearner).
// 2. By the time the replica is processed, the repair is no longer needed and
// only a low-priority rebalance action (ConsiderRebalance) is computed.
// 3. This creates a priority inversion where a low-priority action blocks
// other higher-priority replicas in the queue from being processed.
//
// The race occurs during range rebalancing:
// 1. A leaseholder replica of a range is rebalanced from one store to another.
// 2. The new leaseholder enqueues the replica for repair (e.g. to finalize
// the atomic replication change or remove a learner replica).
// 3. Before processing, the old leaseholder has left the atomic joint config
// state or removed the learner replica. 4. When the new leaseholder processes
// the replica, it computes a ConsiderRebalance action, causing priority
// inversion.
//
// With PriorityInversionRequeue enabled, the queue should detect this condition
// and requeue the replica at the correct priority. The test validates this
// behavior through metrics that track priority inversions and requeuing events.
func TestPriorityInversionRequeue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderDuress(t)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
kvserver.PriorityInversionRequeue.Override(ctx, &settings.SV, true)

var scratchRangeID int64
atomic.StoreInt64(&scratchRangeID, -1)
require.NoError(t, log.SetVModule("queue=5,replicate_queue=5,replica_command=5,replicate=5,replica=5"))

const newLeaseholderStoreAndNodeID = 4
var waitUntilLeavingJoint = func() {}

tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
BaseQueueDisabledBypassFilter: func(rangeID roachpb.RangeID) bool {
// Disable the replicate queue except for the scratch range on the new leaseholder.
t.Logf("range %d is added to replicate queue store", rangeID)
return rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID))
},
BaseQueuePostEnqueueInterceptor: func(storeID roachpb.StoreID, rangeID roachpb.RangeID) {
// After enqueuing, wait for the old leaseholder to leave the atomic
// joint config state or remove the learner replica to force the
// priority inversion.
t.Logf("waiting for %d to leave joint config", rangeID)
if storeID == 4 && rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) {
waitUntilLeavingJoint()
}
},
},
},
},
})
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)

// Wait until the old leaseholder has left the atomic joint config state or
// removed the learner replica.
waitUntilLeavingJoint = func() {
testutils.SucceedsSoon(t, func() error {
rangeDesc := tc.LookupRangeOrFatal(t, scratchKey)
replicas := rangeDesc.Replicas()
t.Logf("range %v: waiting to leave joint conf", rangeDesc)
if replicas.InAtomicReplicationChange() || len(replicas.LearnerDescriptors()) != 0 {
return errors.Newf("in between atomic changes: %v", replicas)
}
return nil
})
}

scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
tc.AddVotersOrFatal(t, scratchRange.StartKey.AsRawKey(), tc.Targets(1, 2)...)
atomic.StoreInt64(&scratchRangeID, int64(scratchRange.RangeID))
lh, err := tc.FindRangeLeaseHolder(scratchRange, nil)
require.NoError(t, err)

// Rebalance the leaseholder replica to a new store. This will cause the race
// condition where the new leaseholder can enqueue a replica to replicate
// queue with high priority but compute a low priority action at processing
// time.
t.Logf("rebalancing range %d from s%d to s%d", scratchRange, lh.StoreID, newLeaseholderStoreAndNodeID)
_, err = tc.RebalanceVoter(
ctx,
scratchRange.StartKey.AsRawKey(),
roachpb.ReplicationTarget{StoreID: lh.StoreID, NodeID: lh.NodeID}, /* src */
roachpb.ReplicationTarget{StoreID: newLeaseholderStoreAndNodeID, NodeID: newLeaseholderStoreAndNodeID}, /* dest */
)
require.NoError(t, err)

// Wait until the priority inversion is detected and the replica is requeued.
testutils.SucceedsSoon(t, func() error {
store := tc.GetFirstStoreFromServer(t, 3)
if c := store.ReplicateQueueMetrics().PriorityInversionTotal.Count(); c == 0 {
return errors.New("expected non-zero priority inversion total count but got 0")
}
if c := store.ReplicateQueueMetrics().RequeueDueToPriorityInversion.Count(); c == 0 {
return errors.New("expected to requeue due to priority inversion but got 0")
}
return nil
})
}
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,10 @@ type StoreTestingKnobs struct {
// rangeID should ignore the queue being disabled, and be processed anyway.
BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool

// BaseQueuePostEnqueueInterceptor is called with the storeID and rangeID of
// the replica right after a replica is enqueued (before it is processed)
BaseQueuePostEnqueueInterceptor func(storeID roachpb.StoreID, rangeID roachpb.RangeID)

// InjectReproposalError injects an error in tryReproposeWithNewLeaseIndexRaftMuLocked.
// If nil is returned, reproposal will be attempted.
InjectReproposalError func(p *ProposalData) error
Expand Down