diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index f756ceccf033..f19438d3dae9 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -436,12 +436,19 @@
| STORAGE | queue.replicate.addreplica.error | Number of failed replica additions processed by the replicate queue | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.addreplica.success | Number of successful replica additions processed by the replicate queue | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.addvoterreplica | Number of voter replica additions attempted by the replicate queue | Replica Additions | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | queue.replicate.enqueue.add | Number of replicas successfully added to the replicate queue | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | queue.replicate.enqueue.failedprecondition | Number of replicas that failed the precondition checks and were therefore not added to the replicate queue | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | queue.replicate.enqueue.noaction | Number of replicas for which ShouldQueue determined no action was needed and were therefore not added to the replicate queue | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | queue.replicate.enqueue.unexpectederror | Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to add to the replicate queue directly), but failed to be enqueued due to unexpected errors | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.nonvoterpromotions | Number of non-voters promoted to voters by the replicate queue | Promotions of Non Voters to Voters | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.pending | Number of pending replicas in the replicate queue | Replicas | GAUGE | COUNT | AVG | NONE |
+| STORAGE | queue.replicate.priority_inversion.requeue | Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time. When the priority has changed from a high priority repair action to rebalance, the change is requeued to avoid unfairness. | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | queue.replicate.priority_inversion.total | Total number of priority inversions in the replicate queue. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.process.failure | Number of replicas which failed processing in the replicate queue | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.process.success | Number of replicas successfully processed by the replicate queue | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.processingnanos | Nanoseconds spent processing replicas in the replicate queue | Processing Time | COUNTER | NANOSECONDS | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.purgatory | Number of replicas in the replicate queue's purgatory, awaiting allocation options | Replicas | GAUGE | COUNT | AVG | NONE |
+| STORAGE | queue.replicate.queue_full | Number of times a replica was dropped from the queue due to queue fullness | Replicas | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.rebalancenonvoterreplica | Number of non-voter replica rebalancer-initiated additions attempted by the replicate queue | Replica Additions | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.rebalancereplica | Number of replica rebalancer-initiated additions attempted by the replicate queue | Replica Additions | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | queue.replicate.rebalancevoterreplica | Number of voter replica rebalancer-initiated additions attempted by the replicate queue | Replica Additions | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
@@ -596,6 +603,12 @@
| STORAGE | rangekeycount | Count of all range keys (e.g. MVCC range tombstones) | Keys | GAUGE | COUNT | AVG | NONE |
| STORAGE | ranges | Number of ranges | Ranges | GAUGE | COUNT | AVG | NONE |
| STORAGE | ranges.decommissioning | Number of ranges with at lease one replica on a decommissioning node | Ranges | GAUGE | COUNT | AVG | NONE |
+| STORAGE | ranges.decommissioning.nudger.enqueue | Number of enqueued enqueues of a range for decommissioning by the decommissioning nudger. Note: This metric tracks when the nudger attempts to enqueue, but the replica might not end up being enqueued by the priority queue due to various filtering or failure conditions. | Ranges | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | ranges.decommissioning.nudger.enqueue.failure | Number of ranges that failed to enqueue at the replicate queue | Ranges | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | ranges.decommissioning.nudger.enqueue.success | Number of ranges that were successfully enqueued by the decommisioning nudger | Ranges | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease | Number of ranges that were not the leaseholder or had an invalid lease at the decommissioning nudger | Ranges | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | ranges.decommissioning.nudger.process.failure | Number of ranges enqueued by the decommissioning nudger that failed to process by the replicate queue | Ranges | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+| STORAGE | ranges.decommissioning.nudger.process.success | Number of ranges enqueued by the decommissioning nudger that were successfully processed by the replicate queue | Ranges | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
| STORAGE | ranges.overreplicated | Number of ranges with more live replicas than the replication target | Ranges | GAUGE | COUNT | AVG | NONE |
| STORAGE | ranges.unavailable | Number of ranges with fewer live replicas than needed for quorum | Ranges | GAUGE | COUNT | AVG | NONE |
| STORAGE | ranges.underreplicated | Number of ranges with fewer live replicas than the replication target | Ranges | GAUGE | COUNT | AVG | NONE |
diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel b/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
index d432c584b732..d5fc9ca59070 100644
--- a/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
+++ b/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
@@ -28,6 +28,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
+ "//pkg/util/buildutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/stop",
diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
index 9785af67e666..afc81c1dcf5f 100644
--- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
+++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
@@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -138,6 +139,7 @@ const (
AllocatorConsiderRebalance
AllocatorRangeUnavailable
AllocatorFinalizeAtomicReplicationChange
+ AllocatorMaxPriority
)
// Add indicates an action adding a replica.
@@ -163,6 +165,15 @@ func (a AllocatorAction) Remove() bool {
a == AllocatorRemoveDecommissioningNonVoter
}
+// Decommissioning indicates an action replacing or removing a decommissioning
+// replicas.
+func (a AllocatorAction) Decommissioning() bool {
+ return a == AllocatorRemoveDecommissioningVoter ||
+ a == AllocatorRemoveDecommissioningNonVoter ||
+ a == AllocatorReplaceDecommissioningVoter ||
+ a == AllocatorReplaceDecommissioningNonVoter
+}
+
// TargetReplicaType returns that the action is for a voter or non-voter replica.
func (a AllocatorAction) TargetReplicaType() TargetReplicaType {
var t TargetReplicaType
@@ -242,10 +253,27 @@ func (a AllocatorAction) SafeValue() {}
// range. Within a given range, the ordering of the various checks inside
// `Allocator.computeAction` determines which repair/rebalancing actions are
// taken before the others.
+//
+// NB: Priorities should be non-negative and should be spaced in multiples of
+// 100 unless you believe they should belong to the same priority category.
+// AllocatorNoop should have the lowest priority. CheckPriorityInversion depends
+// on this contract. In most cases, the allocator returns a priority that
+// matches the definitions below. For AllocatorAddVoter,
+// AllocatorRemoveDeadVoter, and AllocatorRemoveVoter, the priority may be
+// adjusted (see ComputeAction for details), but the adjustment is expected to
+// be small (<49).
+//
+// Exceptions: AllocatorFinalizeAtomicReplicationChange, AllocatorRemoveLearner,
+// and AllocatorReplaceDeadVoter violates the spacing of 100. These cases
+// predate this comment, so we allow them as they belong to the same general
+// priority category.
func (a AllocatorAction) Priority() float64 {
+ const maxPriority = 12002
switch a {
+ case AllocatorMaxPriority:
+ return maxPriority
case AllocatorFinalizeAtomicReplicationChange:
- return 12002
+ return maxPriority
case AllocatorRemoveLearner:
return 12001
case AllocatorReplaceDeadVoter:
@@ -946,10 +974,68 @@ func (a *Allocator) ComputeAction(
return action, action.Priority()
}
- return a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
+ action, priority = a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
desc.Replicas().NonVoterDescriptors())
+ // Ensure that priority is never -1. Typically, computeAction return
+ // action.Priority(), but we sometimes modify the priority for specific
+ // actions like AllocatorAddVoter, AllocatorRemoveDeadVoter, and
+ // AllocatorRemoveVoter. A priority of -1 is a special case, indicating that
+ // the caller expects the processing logic to be invoked even if there's a
+ // priority inversion. If the priority is not -1, the range might be re-queued
+ // to be processed with the correct priority.
+ if priority == -1 {
+ if buildutil.CrdbTestBuild {
+ log.Fatalf(ctx, "allocator returned -1 priority for range %s: %v", desc, action)
+ } else {
+ log.Warningf(ctx, "allocator returned -1 priority for range %s: %v", desc, action)
+ }
+ }
+ return action, priority
}
+// computeAction determines the action to take on a range along with its
+// priority.
+//
+// NB: The returned priority may include a small adjustment and therefore might
+// not exactly match action.Priority(). See AllocatorAddVoter,
+// AllocatorRemoveDeadVoter, AllocatorRemoveVoter below. The adjustment should
+// be <49 with two assumptions below. New uses on this contract should be
+// avoided since the assumptions are not strong guarantees (especially the
+// second one).
+//
+// The claim that the adjustment is < 49 has two assumptions:
+// 1. min(num_replicas,total_nodes) in zone configuration is < 98.
+// 2. when ranges are not under-replicated, the difference between
+// min(num_replicas,total_nodes)/2-1 and existing_replicas is < 49.
+//
+// neededVoters <= min(num_replicas,total_nodes)
+// desiredQuorum = neededVoters/2-1
+// quorum = haveVoters/2-1
+//
+// For AllocatorAddVoter, we know haveVoters < neededVoters
+// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
+// To find the worst case (largest adjustment),
+// 1. haveVoters = neededVoters-1,
+// adjustment = neededVoters/2-1-(neededVoters-1)
+// = neededVoters/2-neededVoters = -neededVoters/2
+// 2. haveVoters = 0
+// adjustement = neededVoters/2-1
+//
+// In order for adjustment to be <49, neededVoters/2<49 => neededVoters<98.
+// Hence the first assumption.
+//
+// For AllocatorRemoveDeadVoter, we know haveVoters >= neededVoters
+// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
+// To find the worst case (largest adjustment),
+// 1. neededVoters/2-1 is much larger than haveVoters: given haveVoters >=
+// neededVoters, haveVoters/2-1 >= neededVoters/2-1. So this case is impossible.
+// 2. neededVoters/2-1 is much smaller than haveVoters: since ranges could be
+// over-replicated, theoretically speaking, there may be no upper bounds on
+// haveVoters. In order for adjustment to be < 49, we can only make an
+// assumption here that the difference between neededVoters/2-1 and haveVoters
+// cannot be >= 49 in this case.
+//
+// For AllocatorRemoveVoter, adjustment is haveVoters%2 = 0 or 1 < 49.
func (a *Allocator) computeAction(
ctx context.Context,
storePool storepool.AllocatorStorePool,
@@ -3220,3 +3306,65 @@ func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID {
}
return ret
}
+
+// roundToNearestPriorityCategory rounds a priority to the nearest 100. n should
+// be non-negative.
+func roundToNearestPriorityCategory(n float64) float64 {
+ return math.Round(n/100.0) * 100
+}
+
+// CheckPriorityInversion returns whether there was a priority inversion (and
+// the range should not be processed at this time, since doing so could starve
+// higher-priority items), and whether the caller should re-add the range to the
+// queue (presumably under its new priority). A priority inversion happens if
+// the priority at enqueue time is higher than the priority corresponding to the
+// action computed at processing time. Caller should re-add the range to the
+// queue if it has gone from a repair action to lowest priority
+// (AllocatorConsiderRebalance).
+//
+// Note: Changing from AllocatorRangeUnavailable/AllocatorNoop to
+// AllocatorConsiderRebalance is not treated as a priority inversion. Going from
+// a repair action to AllocatorRangeUnavailable/AllocatorNoop is considered a
+// priority inversion but shouldRequeue = false.
+//
+// INVARIANT: shouldRequeue => isInversion
+func CheckPriorityInversion(
+ priorityAtEnqueue float64, actionAtProcessing AllocatorAction,
+) (isInversion bool, shouldRequeue bool) {
+ // NB: priorityAtEnqueue is -1 for callers such as scatter, dry runs, and
+ // manual queue runs. Priority inversion does not apply to these calls.
+ if priorityAtEnqueue == -1 {
+ return false, false
+ }
+
+ // NB: we need to check for when priorityAtEnqueue falls within the range
+ // of the allocator actions because store.Enqueue might enqueue things with
+ // a very high priority (1e5). In those cases, we do not want to requeue
+ // these actions or count it as an inversion.
+ withinPriorityRange := func(priority float64) bool {
+ return AllocatorNoop.Priority() <= priority && priority <= AllocatorMaxPriority.Priority()
+ }
+ if !withinPriorityRange(priorityAtEnqueue) {
+ return false, false
+ }
+
+ if priorityAtEnqueue > AllocatorConsiderRebalance.Priority() && actionAtProcessing == AllocatorConsiderRebalance {
+ return true, true
+ }
+
+ // NB: Usually, the priority at enqueue time should correspond to
+ // action.Priority(). However, for AllocatorAddVoter,
+ // AllocatorRemoveDeadVoter, AllocatorRemoveVoter, the priority can be
+ // adjusted at enqueue time (See ComputeAction for more details). However, we
+ // expect the adjustment to be relatively small (<49). So we round the
+ // priority to the nearest 100 to compare against
+ // actionAtProcessing.Priority(). Without this rounding, we might treat going
+ // from 10000 to 999 as an inversion, but it was just due to the adjustment.
+ // Note that priorities at AllocatorFinalizeAtomicReplicationChange,
+ // AllocatorRemoveLearner, and AllocatorReplaceDeadVoter will be rounded to
+ // the same priority. They are so close to each other, so we don't really
+ // count it as an inversion among them.
+ normPriorityAtEnqueue := roundToNearestPriorityCategory(priorityAtEnqueue)
+ isInversion = normPriorityAtEnqueue > actionAtProcessing.Priority()
+ return isInversion, false
+}
diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
index a7491a59c836..ea2bdd572a4d 100644
--- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
+++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
@@ -9622,3 +9622,181 @@ func TestAllocatorRebalanceTargetVoterConstraintUnsatisfied(t *testing.T) {
})
}
}
+
+// TestRoundToNearestPriorityCategory tests the RoundToNearestPriorityCategory
+// function.
+func TestRoundToNearestPriorityCategory(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ testCases := []struct {
+ name string
+ input float64
+ expected float64
+ }{
+ {
+ name: "zero",
+ input: 0.0,
+ expected: 0.0,
+ },
+ {
+ name: "exact multiple of 100",
+ input: 100.0,
+ expected: 100.0,
+ },
+ {
+ name: "round down to nearest 100",
+ input: 149.0,
+ expected: 100.0,
+ },
+ {
+ name: "round up to nearest 100",
+ input: 151.0,
+ expected: 200.0,
+ },
+ {
+ name: "negative exact multiple of 100",
+ input: -200.0,
+ expected: -200.0,
+ },
+ {
+ name: "negative round down to nearest 100",
+ input: -249.0,
+ expected: -200.0,
+ },
+ {
+ name: "negative round up to nearest 100",
+ input: -251.0,
+ expected: -300.0,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ require.Equal(t, tc.expected, roundToNearestPriorityCategory(tc.input))
+ })
+ }
+}
+
+// TestCheckPriorityInversion tests the CheckPriorityInversion function.
+func TestCheckPriorityInversion(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ for action := AllocatorNoop; action <= AllocatorFinalizeAtomicReplicationChange; action++ {
+ t.Run(action.String(), func(t *testing.T) {
+ if action == AllocatorConsiderRebalance || action == AllocatorNoop || action == AllocatorRangeUnavailable {
+ inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
+ require.False(t, inversion)
+ require.False(t, requeue)
+ } else {
+ inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
+ require.True(t, inversion)
+ require.True(t, requeue)
+ }
+ })
+ }
+
+ testCases := []struct {
+ name string
+ priorityAtEnqueue float64
+ actionAtProcessing AllocatorAction
+ expectedInversion bool
+ expectedRequeue bool
+ }{
+ {
+ name: "AllocatorNoop at processing is noop",
+ priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
+ actionAtProcessing: AllocatorNoop,
+ expectedInversion: true,
+ expectedRequeue: false,
+ },
+ {
+ name: "AllocatorRangeUnavailable at processing is noop",
+ priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
+ actionAtProcessing: AllocatorRangeUnavailable,
+ expectedInversion: true,
+ expectedRequeue: false,
+ },
+ {
+ name: "priority -1 bypasses",
+ priorityAtEnqueue: -1,
+ actionAtProcessing: AllocatorConsiderRebalance,
+ expectedInversion: false,
+ expectedRequeue: false,
+ },
+ {
+ name: "priority increase",
+ priorityAtEnqueue: 0,
+ actionAtProcessing: AllocatorFinalizeAtomicReplicationChange,
+ expectedInversion: false,
+ expectedRequeue: false,
+ },
+ {
+ name: "above range priority(1e5)",
+ priorityAtEnqueue: 1e5,
+ actionAtProcessing: AllocatorConsiderRebalance,
+ expectedInversion: false,
+ expectedRequeue: false,
+ },
+ {
+ name: "below range priority at -10",
+ priorityAtEnqueue: -10,
+ actionAtProcessing: -100,
+ expectedInversion: false,
+ expectedRequeue: false,
+ },
+ {
+ name: "inversion but small priority changes",
+ priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
+ actionAtProcessing: AllocatorReplaceDecommissioningNonVoter,
+ expectedInversion: true,
+ expectedRequeue: false,
+ },
+ {
+ name: "inversion but small priority changes",
+ priorityAtEnqueue: AllocatorRemoveDeadVoter.Priority(),
+ actionAtProcessing: AllocatorAddNonVoter,
+ expectedInversion: true,
+ expectedRequeue: false,
+ },
+ {
+ name: "inversion but small priority changes",
+ priorityAtEnqueue: AllocatorConsiderRebalance.Priority(),
+ actionAtProcessing: AllocatorNoop,
+ expectedInversion: false,
+ expectedRequeue: false,
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ inversion, requeue := CheckPriorityInversion(tc.priorityAtEnqueue, tc.actionAtProcessing)
+ require.Equal(t, tc.expectedInversion, inversion)
+ require.Equal(t, tc.expectedRequeue, requeue)
+ })
+ }
+}
+
+// TestAllocatorPriorityInvariance verifies that allocator priorities remain
+// spaced in multiples of 100. This prevents regressions against the contract
+// relied on by CheckPriorityInversion. For details, see the comment above
+// action.Priority().
+func TestAllocatorPriorityInvariance(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ exceptions := map[AllocatorAction]struct{}{
+ AllocatorFinalizeAtomicReplicationChange: {},
+ AllocatorRemoveLearner: {},
+ AllocatorReplaceDeadVoter: {},
+ }
+ lowestPriority := AllocatorNoop.Priority()
+ for action := AllocatorNoop; action < AllocatorMaxPriority; action++ {
+ require.GreaterOrEqualf(t, action.Priority(), lowestPriority,
+ "priority %f is less than AllocatorNoop: likely violating contract",
+ action.Priority())
+ if _, ok := exceptions[action]; !ok {
+ require.Equalf(t, int(action.Priority())%100, 0,
+ "priority %f is not a multiple of 100: likely violating contract",
+ action.Priority())
+
+ }
+ }
+}
diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go
index 59ad7056fe93..d0f2c132056d 100644
--- a/pkg/kv/kvserver/consistency_queue.go
+++ b/pkg/kv/kvserver/consistency_queue.go
@@ -166,7 +166,7 @@ func consistencyQueueShouldQueueImpl(
// process() is called on every range for which this node is a lease holder.
func (q *consistencyQueue) process(
- ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
) (bool, error) {
if q.interval() <= 0 {
return false, nil
diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go
index 86499eb6c147..48002bcff709 100644
--- a/pkg/kv/kvserver/helpers_test.go
+++ b/pkg/kv/kvserver/helpers_test.go
@@ -203,7 +203,7 @@ func manualQueue(s *Store, q queueImpl, repl *Replica) error {
return fmt.Errorf("%s: system config not yet available", s)
}
ctx := repl.AnnotateCtx(context.Background())
- _, err := q.process(ctx, repl, cfg)
+ _, err := q.process(ctx, repl, cfg, -1 /*priorityAtEnqueue*/)
return err
}
diff --git a/pkg/kv/kvserver/lease_queue.go b/pkg/kv/kvserver/lease_queue.go
index 61e1cbe1813c..a79868e77420 100644
--- a/pkg/kv/kvserver/lease_queue.go
+++ b/pkg/kv/kvserver/lease_queue.go
@@ -114,7 +114,7 @@ func (lq *leaseQueue) shouldQueue(
}
func (lq *leaseQueue) process(
- ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
if tokenErr := repl.allocatorToken.TryAcquire(ctx, lq.name); tokenErr != nil {
return false, tokenErr
diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go
index c8063c71f07c..1c767f952198 100644
--- a/pkg/kv/kvserver/merge_queue.go
+++ b/pkg/kv/kvserver/merge_queue.go
@@ -240,7 +240,7 @@ func (mq *mergeQueue) requestRangeStats(
}
func (mq *mergeQueue) process(
- ctx context.Context, lhsRepl *Replica, confReader spanconfig.StoreReader,
+ ctx context.Context, lhsRepl *Replica, confReader spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
lhsDesc := lhsRepl.Desc()
@@ -421,7 +421,7 @@ func (mq *mergeQueue) process(
return false, rangeMergePurgatoryError{err}
}
if testingAggressiveConsistencyChecks {
- if _, err := mq.store.consistencyQueue.process(ctx, lhsRepl, confReader); err != nil {
+ if _, err := mq.store.consistencyQueue.process(ctx, lhsRepl, confReader, -1 /*priorityAtEnqueue*/); err != nil {
log.Warningf(ctx, "%v", err)
}
}
diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go
index 4dbac0c55590..01efea2d0db4 100644
--- a/pkg/kv/kvserver/metrics.go
+++ b/pkg/kv/kvserver/metrics.go
@@ -142,6 +142,44 @@ var (
Unit: metric.Unit_COUNT,
}
+ // Decommisioning nudger metrics.
+ metaDecommissioningNudgerEnqueue = metric.Metadata{
+ Name: "ranges.decommissioning.nudger.enqueue",
+ Help: "Number of enqueued enqueues of a range for decommissioning by the decommissioning nudger. Note: This metric tracks when the nudger attempts to enqueue, but the replica might not end up being enqueued by the priority queue due to various filtering or failure conditions.",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
+ metaDecommissioningNudgerEnqueueSuccess = metric.Metadata{
+ Name: "ranges.decommissioning.nudger.enqueue.success",
+ Help: "Number of ranges that were successfully enqueued by the decommisioning nudger",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
+ metaDecommissioningNudgerEnqueueFailure = metric.Metadata{
+ Name: "ranges.decommissioning.nudger.enqueue.failure",
+ Help: "Number of ranges that failed to enqueue at the replicate queue",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
+ metaDecommissioningNudgerProcessSuccess = metric.Metadata{
+ Name: "ranges.decommissioning.nudger.process.success",
+ Help: "Number of ranges enqueued by the decommissioning nudger that were successfully processed by the replicate queue",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
+ metaDecommissioningNudgerProcessFailure = metric.Metadata{
+ Name: "ranges.decommissioning.nudger.process.failure",
+ Help: "Number of ranges enqueued by the decommissioning nudger that failed to process by the replicate queue",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
+ metaDecommissioningNudgerNotLeaseholderOrInvalidLease = metric.Metadata{
+ Name: "ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease",
+ Help: "Number of ranges that were not the leaseholder or had an invalid lease at the decommissioning nudger",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
+
// Lease request metrics.
metaLeaseRequestSuccessCount = metric.Metadata{
Name: "leases.success",
@@ -1979,6 +2017,33 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
Measurement: "Processing Time",
Unit: metric.Unit_NANOSECONDS,
}
+ metaReplicateQueueEnqueueAdd = metric.Metadata{
+ Name: "queue.replicate.enqueue.add",
+ Help: "Number of replicas successfully added to the replicate queue",
+ Measurement: "Replicas",
+ Unit: metric.Unit_COUNT,
+ }
+ metaReplicateQueueEnqueueFailedPrecondition = metric.Metadata{
+ Name: "queue.replicate.enqueue.failedprecondition",
+ Help: "Number of replicas that failed the precondition checks and were therefore not added to the replicate " +
+ "queue",
+ Measurement: "Replicas",
+ Unit: metric.Unit_COUNT,
+ }
+ metaReplicateQueueEnqueueNoAction = metric.Metadata{
+ Name: "queue.replicate.enqueue.noaction",
+ Help: "Number of replicas for which ShouldQueue determined no action was needed and were therefore not " +
+ "added to the replicate queue",
+ Measurement: "Replicas",
+ Unit: metric.Unit_COUNT,
+ }
+ metaReplicateQueueEnqueueUnexpectedError = metric.Metadata{
+ Name: "queue.replicate.enqueue.unexpectederror",
+ Help: "Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to " +
+ "add to the replicate queue directly), but failed to be enqueued due to unexpected errors",
+ Measurement: "Replicas",
+ Unit: metric.Unit_COUNT,
+ }
metaLeaseQueueSuccesses = metric.Metadata{
Name: "queue.lease.process.success",
Help: "Number of replicas successfully processed by the replica lease queue",
@@ -2027,6 +2092,12 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
+ metaReplicateQueueFull = metric.Metadata{
+ Name: "queue.replicate.queue_full",
+ Help: "Number of times a replica was dropped from the queue due to queue fullness",
+ Measurement: "Replicas",
+ Unit: metric.Unit_COUNT,
+ }
metaReplicateQueueProcessingNanos = metric.Metadata{
Name: "queue.replicate.processingnanos",
Help: "Nanoseconds spent processing replicas in the replicate queue",
@@ -2631,6 +2702,14 @@ type StoreMetrics struct {
OverReplicatedRangeCount *metric.Gauge
DecommissioningRangeCount *metric.Gauge
+ // Decommissioning nudger metrics.
+ DecommissioningNudgerEnqueue *metric.Counter
+ DecommissioningNudgerEnqueueSuccess *metric.Counter
+ DecommissioningNudgerEnqueueFailure *metric.Counter
+ DecommissioningNudgerProcessSuccess *metric.Counter
+ DecommissioningNudgerProcessFailure *metric.Counter
+ DecommissioningNudgerNotLeaseholderOrInvalidLease *metric.Counter
+
// Lease request metrics for successful and failed lease requests. These
// count proposals (i.e. it does not matter how many replicas apply the
// lease).
@@ -2908,9 +2987,14 @@ type StoreMetrics struct {
ReplicaGCQueueFailures *metric.Counter
ReplicaGCQueuePending *metric.Gauge
ReplicaGCQueueProcessingNanos *metric.Counter
+ ReplicateQueueEnqueueAdd *metric.Counter
+ ReplicateQueueEnqueueFailedPrecondition *metric.Counter
+ ReplicateQueueEnqueueNoAction *metric.Counter
+ ReplicateQueueEnqueueUnexpectedError *metric.Counter
ReplicateQueueSuccesses *metric.Counter
ReplicateQueueFailures *metric.Counter
ReplicateQueuePending *metric.Gauge
+ ReplicateQueueFull *metric.Counter
ReplicateQueueProcessingNanos *metric.Counter
ReplicateQueuePurgatory *metric.Gauge
SplitQueueSuccesses *metric.Counter
@@ -3335,6 +3419,14 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
OverReplicatedRangeCount: metric.NewGauge(metaOverReplicatedRangeCount),
DecommissioningRangeCount: metric.NewGauge(metaDecommissioningRangeCount),
+ // Decommissioning nuder metrics.
+ DecommissioningNudgerEnqueue: metric.NewCounter(metaDecommissioningNudgerEnqueue),
+ DecommissioningNudgerEnqueueSuccess: metric.NewCounter(metaDecommissioningNudgerEnqueueSuccess),
+ DecommissioningNudgerEnqueueFailure: metric.NewCounter(metaDecommissioningNudgerEnqueueFailure),
+ DecommissioningNudgerProcessSuccess: metric.NewCounter(metaDecommissioningNudgerProcessSuccess),
+ DecommissioningNudgerProcessFailure: metric.NewCounter(metaDecommissioningNudgerProcessFailure),
+ DecommissioningNudgerNotLeaseholderOrInvalidLease: metric.NewCounter(metaDecommissioningNudgerNotLeaseholderOrInvalidLease),
+
// Lease request metrics.
LeaseRequestSuccessCount: metric.NewCounter(metaLeaseRequestSuccessCount),
LeaseRequestErrorCount: metric.NewCounter(metaLeaseRequestErrorCount),
@@ -3681,9 +3773,14 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
ReplicaGCQueueFailures: metric.NewCounter(metaReplicaGCQueueFailures),
ReplicaGCQueuePending: metric.NewGauge(metaReplicaGCQueuePending),
ReplicaGCQueueProcessingNanos: metric.NewCounter(metaReplicaGCQueueProcessingNanos),
+ ReplicateQueueEnqueueAdd: metric.NewCounter(metaReplicateQueueEnqueueAdd),
+ ReplicateQueueEnqueueFailedPrecondition: metric.NewCounter(metaReplicateQueueEnqueueFailedPrecondition),
+ ReplicateQueueEnqueueNoAction: metric.NewCounter(metaReplicateQueueEnqueueNoAction),
+ ReplicateQueueEnqueueUnexpectedError: metric.NewCounter(metaReplicateQueueEnqueueUnexpectedError),
ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses),
ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures),
ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending),
+ ReplicateQueueFull: metric.NewCounter(metaReplicateQueueFull),
ReplicateQueueProcessingNanos: metric.NewCounter(metaReplicateQueueProcessingNanos),
ReplicateQueuePurgatory: metric.NewGauge(metaReplicateQueuePurgatory),
SplitQueueSuccesses: metric.NewCounter(metaSplitQueueSuccesses),
diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go
index 4002f46bcb98..1db607aaa5a9 100644
--- a/pkg/kv/kvserver/mvcc_gc_queue.go
+++ b/pkg/kv/kvserver/mvcc_gc_queue.go
@@ -689,7 +689,7 @@ func (r *replicaGCer) GC(
// 7. push these transactions (again, recreating txn entries).
// 8. send a GCRequest.
func (mgcq *mvccGCQueue) process(
- ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
@@ -921,7 +921,7 @@ func (mgcq *mvccGCQueue) scanReplicasForHiPriGCHints(
if !isLeaseHolder {
return true
}
- added, _ := mgcq.addInternal(ctx, desc, replica.ReplicaID(), deleteRangePriority)
+ added, _ := mgcq.addInternal(ctx, desc, replica.ReplicaID(), deleteRangePriority, noopProcessCallback)
if added {
mgcq.store.metrics.GCEnqueueHighPriority.Inc(1)
foundReplicas++
diff --git a/pkg/kv/kvserver/mvcc_gc_queue_test.go b/pkg/kv/kvserver/mvcc_gc_queue_test.go
index 4479286c5899..e6b6a72664b0 100644
--- a/pkg/kv/kvserver/mvcc_gc_queue_test.go
+++ b/pkg/kv/kvserver/mvcc_gc_queue_test.go
@@ -912,7 +912,7 @@ func testMVCCGCQueueProcessImpl(t *testing.T, useEfos bool) {
// Process through a scan queue.
mgcq := newMVCCGCQueue(tc.store)
- processed, err := mgcq.process(ctx, tc.repl, cfg)
+ processed, err := mgcq.process(ctx, tc.repl, cfg, -1 /* priorityAtEnqueue */)
if err != nil {
t.Fatal(err)
}
@@ -1163,7 +1163,7 @@ func TestMVCCGCQueueTransactionTable(t *testing.T) {
t.Fatal(err)
}
- processed, err := mgcq.process(ctx, tc.repl, cfg)
+ processed, err := mgcq.process(ctx, tc.repl, cfg, -1 /* priorityAtEnqueue */)
if err != nil {
t.Fatal(err)
}
@@ -1180,7 +1180,7 @@ func TestMVCCGCQueueTransactionTable(t *testing.T) {
if err != nil {
return err
}
- if expGC := (sp.newStatus == -1); expGC {
+ if expGC := (sp.newStatus == -1 /* priorityAtEnqueue */); expGC {
if expGC != !ok {
return fmt.Errorf("%s: expected gc: %t, but found %s\n%s", strKey, expGC, txn, roachpb.Key(strKey))
}
@@ -1297,7 +1297,7 @@ func TestMVCCGCQueueIntentResolution(t *testing.T) {
t.Fatal(err)
}
mgcq := newMVCCGCQueue(tc.store)
- processed, err := mgcq.process(ctx, tc.repl, confReader)
+ processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
if err != nil {
t.Fatal(err)
}
@@ -1362,7 +1362,7 @@ func TestMVCCGCQueueLastProcessedTimestamps(t *testing.T) {
// Process through a scan queue.
mgcq := newMVCCGCQueue(tc.store)
- processed, err := mgcq.process(ctx, tc.repl, confReader)
+ processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
if err != nil {
t.Fatal(err)
}
@@ -1470,7 +1470,7 @@ func TestMVCCGCQueueChunkRequests(t *testing.T) {
}
tc.manualClock.Advance(conf.TTL() + 1)
mgcq := newMVCCGCQueue(tc.store)
- processed, err := mgcq.process(ctx, tc.repl, confReader)
+ processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
if err != nil {
t.Fatal(err)
}
diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go
index 376888bd969e..c34161962b86 100644
--- a/pkg/kv/kvserver/queue.go
+++ b/pkg/kv/kvserver/queue.go
@@ -113,9 +113,60 @@ type PurgatoryError interface {
PurgatoryErrorMarker() // dummy method for unique interface
}
-// processCallback is a hook that is called when a replica finishes processing.
-// It is called with the result of the process attempt.
-type processCallback func(error)
+// noopProcessCallback is a processCallback that does nothing.
+var noopProcessCallback = processCallback{
+ onProcessResult: func(err error) {},
+ onEnqueueResult: func(indexOnHeap int, err error) {},
+}
+
+// processCallback is a hook that is called when a replica is enqueued or
+// finishes processing.
+//
+// NB: None of the fields below can be nil. Use noopProcessCallback if you do
+// not need to register any callback.
+//
+// The callback behavior depends on when it's registered. Currently, addInternal
+// and MaybeAddCallback are the only two users. See comments above them for more
+// details on the exact behaviour.
+//
+// NB: Callback execution is not guaranteed since removeLocked or
+// removeFromReplicaSetLocked may be called without executing callbacks. This
+// happens when the replica is destroyed or recreated with a new replica ID.
+//
+// For now, the two use cases (decommissioning nudger and
+// maybeBackpressureBatch) are okay with the current behaviour. But adding new
+// uses is discouraged without cleaning up the contract of processCallback.
+// TODO(wenyihu6): consider cleaning up the semantics after backports
+type processCallback struct {
+ // onEnqueueResult is called with the result of the enqueue attempt. It is
+ // invoked when the range is added to the queue and if the range encounters
+ // any errors and being enqueued again before being processed.
+ //
+ // If error is nil, the index on the priority queue where this item sits is
+ // also passed in the callback. If error is non-nil, the index passed in the
+ // callback is -1. Note: indexOnHeap does not represent the item's exact rank
+ // by priority. It only reflects the item's position in the heap array, which
+ // gives a rough idea of where it sits in the priority hierarchy.
+ //
+ // - May be invoked multiple times:
+ // 1. Immediately after successful enqueue (err = nil).
+ // 2. If the replica is later dropped due to full queue (err =
+ // errDroppedDueToFullQueueSize).
+ // 3. If re-added with updated priority (err = nil, new heap index).
+ // 4. If the replica is already in the queue and processing.
+ // - May be skipped if the replica is already in queue and no priority changes
+ // occur.
+ onEnqueueResult func(indexOnHeap int, err error)
+
+ // onProcessResult is called with the result of any process attempts. It is
+ // only invoked if the base queue gets a chance to process this replica.
+ //
+ // - May be invoked multiple times if the replica goes through purgatory or
+ // re-processing.
+ // - May be skipped if the replica is removed with removeFromReplicaSetLocked
+ // or registered with a new replica id before processing begins.
+ onProcessResult func(err error)
+}
// A replicaItem holds a replica and metadata about its queue state and
// processing state.
@@ -124,7 +175,9 @@ type replicaItem struct {
replicaID roachpb.ReplicaID
seq int // enforce FIFO order for equal priorities
- // fields used when a replicaItem is enqueued in a priority queue.
+ // fields used when a replicaItem is enqueued in a priority queue. This field
+ // is preserved for purgatory queue as well since baseQueue.processReplica
+ // requies it.
priority float64
index int // The index of the item in the heap, maintained by the heap.Interface methods
@@ -136,7 +189,6 @@ type replicaItem struct {
// setProcessing moves the item from an enqueued state to a processing state.
func (i *replicaItem) setProcessing() {
- i.priority = 0
if i.index >= 0 {
log.Fatalf(context.Background(),
"r%d marked as processing but appears in prioQ", i.rangeID,
@@ -145,8 +197,15 @@ func (i *replicaItem) setProcessing() {
i.processing = true
}
-// registerCallback adds a new callback to be executed when the replicaItem
-// finishes processing.
+// registerCallback adds a new callback to be executed when the replicaItem is
+// enqueued or finishes processing. There are two cases where the callback may
+// be registered at:
+// 1. bq.MaybeAddCallback: register the callback if the replicaItem has been
+// added to bq.mu.replicas
+// 2. bq.addInternal: register the callback if the replicaItem has not been
+// added to bq.mu.replicas yet.
+// Note that the contract here is tricky, so adding new uses is discouraged. See
+// the comment on processCallback for more details.
func (i *replicaItem) registerCallback(cb processCallback) {
i.callbacks = append(i.callbacks, cb)
}
@@ -204,8 +263,13 @@ func (pq *priorityQueue) update(item *replicaItem, priority float64) {
}
var (
- errQueueDisabled = errors.New("queue disabled")
- errQueueStopped = errors.New("queue stopped")
+ errQueueDisabled = errors.New("queue disabled")
+ errQueueStopped = errors.New("queue stopped")
+ errReplicaNotInitialized = errors.New("replica not initialized")
+ errReplicaAlreadyProcessing = errors.New("replica already processing")
+ errReplicaAlreadyInPurgatory = errors.New("replica in purgatory")
+ errReplicaAlreadyInQueue = errors.New("replica already in queue")
+ errDroppedDueToFullQueueSize = errors.New("queue full")
)
func isExpectedQueueError(err error) bool {
@@ -261,7 +325,7 @@ type queueImpl interface {
// queue-specific work on it. The Replica is guaranteed to be initialized.
// We return a boolean to indicate if the Replica was processed successfully
// (vs. it being a no-op or an error).
- process(context.Context, *Replica, spanconfig.StoreReader) (processed bool, err error)
+ process(context.Context, *Replica, spanconfig.StoreReader, float64) (processed bool, err error)
// processScheduled is called after async task was created to run process.
// This function is called by the process loop synchronously. This method is
@@ -325,6 +389,20 @@ type queueConfig struct {
processDestroyedReplicas bool
// processTimeout returns the timeout for processing a replica.
processTimeoutFunc queueProcessTimeoutFunc
+ // enqueueAdd is a counter of replicas that were successfully added to the
+ // queue.
+ enqueueAdd *metric.Counter
+ // enqueueFailedPrecondition is a counter of replicas that failed the
+ // precondition checks and were therefore not added to the queue.
+ enqueueFailedPrecondition *metric.Counter
+ // enqueueNoAction is a counter of replicas that had ShouldQueue determine no
+ // action was needed and were therefore not added to the queue.
+ enqueueNoAction *metric.Counter
+ // enqueueUnexpectedError is a counter of replicas that were expected to be
+ // enqueued (either had ShouldQueue return true or the caller explicitly
+ // requested to be added to the queue directly), but failed to be enqueued
+ // during the enqueue process (such as Async was rated limited).
+ enqueueUnexpectedError *metric.Counter
// successes is a counter of replicas processed successfully.
successes *metric.Counter
// failures is a counter of replicas which failed processing.
@@ -335,6 +413,11 @@ type queueConfig struct {
storeFailures *metric.Counter
// pending is a gauge measuring current replica count pending.
pending *metric.Gauge
+ // full is a counter measuring replicas dropped due to exceeding the queue max
+ // size.
+ // NB: this metric may be nil for queues that are not interested in tracking
+ // this.
+ full *metric.Counter
// processingNanos is a counter measuring total nanoseconds spent processing
// replicas.
processingNanos *metric.Counter
@@ -445,6 +528,7 @@ type baseQueue struct {
purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors
stopped bool
disabled bool
+ maxSize int64
}
}
@@ -497,6 +581,7 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b
},
}
bq.mu.replicas = map[roachpb.RangeID]*replicaItem{}
+ bq.mu.maxSize = int64(cfg.maxSize)
bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV))
cfg.disabledConfig.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) {
bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV))
@@ -541,6 +626,23 @@ func (bq *baseQueue) SetDisabled(disabled bool) {
bq.mu.Unlock()
}
+// SetMaxSize sets the max size of the queue.
+func (bq *baseQueue) SetMaxSize(maxSize int64) {
+ bq.mu.Lock()
+ defer bq.mu.Unlock()
+ bq.mu.maxSize = maxSize
+ // Drop replicas until no longer exceeding the max size. Note: We call
+ // removeLocked to match the behavior of addInternal. In theory, only
+ // removeFromQueueLocked should be triggered in removeLocked, since the item
+ // is in the priority queue, it should not be processing or in the purgatory
+ // queue. To be safe, however, we use removeLocked.
+ for int64(bq.mu.priorityQ.Len()) > maxSize {
+ pqLen := bq.mu.priorityQ.Len()
+ bq.full.Inc(1)
+ bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
+ }
+}
+
// lockProcessing locks all processing in the baseQueue. It returns
// a function to unlock processing.
func (bq *baseQueue) lockProcessing() func() {
@@ -575,8 +677,10 @@ func (h baseQueueHelper) MaybeAdd(
h.bq.maybeAdd(ctx, repl, now)
}
-func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio float64) {
- _, err := h.bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), prio)
+func (h baseQueueHelper) Add(
+ ctx context.Context, repl replicaInQueue, prio float64, cb processCallback,
+) {
+ _, err := h.bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), prio, cb)
if err != nil && log.V(1) {
log.Infof(ctx, "during Add: %s", err)
}
@@ -584,21 +688,26 @@ func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio floa
type queueHelper interface {
MaybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp)
- Add(ctx context.Context, repl replicaInQueue, prio float64)
+ Add(ctx context.Context, repl replicaInQueue, prio float64, cb processCallback)
}
+// baseQueueAsyncRateLimited indicates that the base queue async task was rate
+// limited and the task was not executed.
+var baseQueueAsyncRateLimited = errors.Newf("async task rate limited")
+
// Async is a more performant substitute for calling AddAsync or MaybeAddAsync
// when many operations are going to be carried out. It invokes the given helper
// function in a goroutine if semaphore capacity is available. If the semaphore
-// is not available, the 'wait' parameter decides whether to wait or to return
-// as a noop. Note that if the system is quiescing, fn may never be called in-
-// dependent of the value of 'wait'.
+// is at capacity, the 'wait' parameter determines whether to block until
+// capacity becomes available or return immediately with an error. Note that if
+// the system is shutting down, the function may not be executed regardless of
+// the 'wait' value.
//
// The caller is responsible for ensuring that opName does not contain PII.
// (Best is to pass a constant string.)
func (bq *baseQueue) Async(
ctx context.Context, opName string, wait bool, fn func(ctx context.Context, h queueHelper),
-) {
+) error {
if log.V(3) {
log.InfofDepth(ctx, 2, "%s", redact.Safe(opName))
}
@@ -612,9 +721,13 @@ func (bq *baseQueue) Async(
},
func(ctx context.Context) {
fn(ctx, baseQueueHelper{bq})
- }); err != nil && bq.addLogN.ShouldLog() {
- log.Infof(ctx, "rate limited in %s: %s", redact.Safe(opName), err)
+ }); err != nil {
+ if bq.addLogN.ShouldLog() {
+ log.Infof(ctx, "rate limited in %s: %s", redact.Safe(opName), err)
+ }
+ return baseQueueAsyncRateLimited
}
+ return nil
}
// MaybeAddAsync offers the replica to the queue. The queue will only process a
@@ -623,18 +736,77 @@ func (bq *baseQueue) Async(
func (bq *baseQueue) MaybeAddAsync(
ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp,
) {
- bq.Async(ctx, "MaybeAdd", false /* wait */, func(ctx context.Context, h queueHelper) {
+ _ = bq.Async(ctx, "MaybeAdd", false /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}
+// AddAsyncWithCallback is the same as AddAsync, but allows the caller to
+// register a process callback that will be invoked when the replica is enqueued
+// or processed.
+func (bq *baseQueue) AddAsyncWithCallback(
+ ctx context.Context, repl replicaInQueue, prio float64, cb processCallback,
+) {
+ if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
+ h.Add(ctx, repl, prio, cb)
+ }); err != nil {
+ cb.onEnqueueResult(-1 /*indexOnHeap*/, err)
+ bq.updateMetricsOnEnqueueUnexpectedError()
+ }
+}
+
// AddAsync adds the replica to the queue. Unlike MaybeAddAsync, it will wait
// for other operations to finish instead of turning into a noop (because
// unlikely MaybeAdd, Add is not subject to being called opportunistically).
func (bq *baseQueue) AddAsync(ctx context.Context, repl replicaInQueue, prio float64) {
- bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
- h.Add(ctx, repl, prio)
- })
+ if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
+ h.Add(ctx, repl, prio, noopProcessCallback)
+ }); err != nil {
+ // We don't update metrics in MaybeAddAsync because we don't know if the
+ // replica should be queued at this point. We only count it as an unexpected
+ // error when we're certain the replica should be enqueued. In this case,
+ // the caller explicitly wants to add the replica to the queue directly, so
+ // we do update the metrics on unexpected error.
+ bq.updateMetricsOnEnqueueUnexpectedError()
+ }
+}
+
+// updateMetricsOnEnqueueFailedPrecondition updates the metrics when a replica
+// fails precondition checks (replicaCanBeProcessed) and should not be
+// considered for enqueueing. This may include cases where the replica does not
+// have a valid lease, is uninitialized, is destroyed, failed to retrieve span
+// conf reader, or unsplit ranges.
+func (bq *baseQueue) updateMetricsOnEnqueueFailedPrecondition() {
+ if bq.enqueueFailedPrecondition != nil {
+ bq.enqueueFailedPrecondition.Inc(1)
+ }
+}
+
+// updateMetricsOnEnqueueNoAction updates the metrics when shouldQueue
+// determines no action is needed and the replica is not added to the queue.
+func (bq *baseQueue) updateMetricsOnEnqueueNoAction() {
+ if bq.enqueueNoAction != nil {
+ bq.enqueueNoAction.Inc(1)
+ }
+}
+
+// updateMetricsOnEnqueueUnexpectedError updates the metrics when an unexpected
+// error occurs during enqueue operations. This should be called for replicas
+// that were expected to be enqueued (either had ShouldQueue return true or the
+// caller explicitly requested to be added to the queue directly), but failed to
+// be enqueued during the enqueue process (such as Async was rated limited).
+func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() {
+ if bq.enqueueUnexpectedError != nil {
+ bq.enqueueUnexpectedError.Inc(1)
+ }
+}
+
+// updateMetricsOnEnqueueAdd updates the metrics when a replica is successfully
+// added to the queue.
+func (bq *baseQueue) updateMetricsOnEnqueueAdd() {
+ if bq.enqueueAdd != nil {
+ bq.enqueueAdd.Inc(1)
+ }
}
func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
@@ -671,6 +843,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
// Load the system config if it's needed.
confReader, err := bq.replicaCanBeProcessed(ctx, repl, false /* acquireLeaseIfNeeded */)
if err != nil {
+ bq.updateMetricsOnEnqueueFailedPrecondition()
return
}
@@ -680,6 +853,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
realRepl, _ := repl.(*Replica)
should, priority := bq.impl.shouldQueue(ctx, now, realRepl, confReader)
if !should {
+ bq.updateMetricsOnEnqueueNoAction()
return
}
@@ -687,15 +861,17 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
if extConf != nil && extConf.Get(&bq.store.cfg.Settings.SV) {
hasExternal, err := realRepl.HasExternalBytes()
if err != nil {
+ bq.updateMetricsOnEnqueueUnexpectedError()
log.Warningf(ctx, "could not determine if %s has external bytes: %s", realRepl, err)
return
}
if hasExternal {
+ bq.updateMetricsOnEnqueueUnexpectedError()
log.VInfof(ctx, 1, "skipping %s for %s because it has external bytes", bq.name, realRepl)
return
}
}
- _, err = bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
+ _, err = bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, noopProcessCallback)
if !isExpectedQueueError(err) {
log.Errorf(ctx, "unable to add: %+v", err)
}
@@ -704,15 +880,47 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
// addInternal adds the replica the queue with specified priority. If
// the replica is already queued at a lower priority, updates the existing
// priority. Expects the queue lock to be held by caller.
+//
+// processCallback allows the caller to register a callback that will be invoked
+// when the replica is enqueued or processed.
+// - If the replicaItem has not been added to bq.mu.replicas yet, the callback
+// is registered and onEnqueueResult is invoked immediately with the result of
+// the enqueue attempt. If successfully enqueued, onProcessResult will be
+// invoked when processing completes.
+// - If the replicaItem has already been added to bq.mu.replicas, no new
+// callbacks will be registered. onEnqueueResult registered first time will be
+// invoked with the result of enqueue attempts:
+// 1. Already processing or in purgatory: invoked with
+// errReplicaAlreadyProcessing/errReplicaAlreadyInPurgatory
+// 2. Priority updated: invoked with error = nil and new heap index
+// 3. Waiting in queue without priority change: not invoked
+// 4. Dropped due to full queue: invoked with
+// errDroppedDueToFullQueueSizeonEnqueueResult registered first time is
+// invoked with the result of this enqueue attempt.
+// 5. Other errors: invoked with the error.
+//
+// NB: callback invokation is not guanranteed since removeFromReplicaSetLocked
+// may remove the replica from the queue at any time without invoking them.
func (bq *baseQueue) addInternal(
- ctx context.Context, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, priority float64,
-) (bool, error) {
+ ctx context.Context,
+ desc *roachpb.RangeDescriptor,
+ replicaID roachpb.ReplicaID,
+ priority float64,
+ cb processCallback,
+) (added bool, err error) {
+ defer func() {
+ // INVARIANT: added => err == nil.
+ if err != nil {
+ cb.onEnqueueResult(-1 /* indexOnHeap */, err)
+ bq.updateMetricsOnEnqueueUnexpectedError()
+ }
+ }()
// NB: this is intentionally outside of bq.mu to avoid having to consider
// lock ordering constraints.
if !desc.IsInitialized() {
// We checked this above in MaybeAdd(), but we need to check it
// again for Add().
- return false, errors.New("replica not initialized")
+ return false, errReplicaNotInitialized
}
bq.mu.Lock()
@@ -737,6 +945,7 @@ func (bq *baseQueue) addInternal(
// If the replica is currently in purgatory, don't re-add it.
if _, ok := bq.mu.purgatory[desc.RangeID]; ok {
+ cb.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaAlreadyInPurgatory)
return false, nil
}
@@ -746,6 +955,7 @@ func (bq *baseQueue) addInternal(
if item.processing {
wasRequeued := item.requeue
item.requeue = true
+ cb.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaAlreadyProcessing)
return !wasRequeued, nil
}
@@ -757,6 +967,8 @@ func (bq *baseQueue) addInternal(
log.Infof(ctx, "updating priority: %0.3f -> %0.3f", item.priority, priority)
}
bq.mu.priorityQ.update(item, priority)
+ // item.index should be updated now based on heap property now.
+ cb.onEnqueueResult(item.index /*indexOnHeap*/, nil)
}
return false, nil
}
@@ -765,12 +977,27 @@ func (bq *baseQueue) addInternal(
log.Infof(ctx, "adding: priority=%0.3f", priority)
}
item = &replicaItem{rangeID: desc.RangeID, replicaID: replicaID, priority: priority}
+ item.registerCallback(cb)
bq.addLocked(item)
- // If adding this replica has pushed the queue past its maximum size,
- // remove the lowest priority element.
- if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize {
- bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
+ // If adding this replica has pushed the queue past its maximum size, remove
+ // an element. Note that it might not be the lowest priority since heap is not
+ // guaranteed to be globally ordered. Ideally, we would remove the lowest
+ // priority element, but it would require additional bookkeeping or a linear
+ // scan.
+ if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize {
+ replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
+ if bq.full != nil {
+ bq.full.Inc(1)
+ }
+ log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
+ priority, replicaItemToDrop.replicaID)
+ // TODO(wenyihu6): when we introduce base queue max size cluster setting,
+ // remember to invoke this callback when shrinking the size
+ for _, callback := range replicaItemToDrop.callbacks {
+ callback.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize)
+ }
+ bq.removeLocked(replicaItemToDrop)
}
// Signal the processLoop that a replica has been added.
select {
@@ -778,26 +1005,51 @@ func (bq *baseQueue) addInternal(
default:
// No need to signal again.
}
+ if postEnqueueInterceptor := bq.store.TestingKnobs().BaseQueuePostEnqueueInterceptor; postEnqueueInterceptor != nil {
+ postEnqueueInterceptor(bq.store.StoreID(), desc.RangeID)
+ }
+ // Note that we are bumping enqueueAdd here instead of during defer to avoid
+ // treating requeuing a processing replica as newly added. They will be
+ // re-added to the queue later which will double count them.
+ bq.updateMetricsOnEnqueueAdd()
+ // Note: it may already be dropped or dropped afterwards.
+ cb.onEnqueueResult(item.index /*indexOnHeap*/, nil)
return true, nil
}
// MaybeAddCallback adds a callback to be called when the specified range
-// finishes processing if the range is in the queue. If the range is in
-// purgatory, the callback is called immediately with the purgatory error. If
-// the range is not in the queue (either waiting or processing), the method
-// returns false.
+// finishes processing. The replica can be in one of several states:
+//
+// - waiting: not in mu.replicas
+// Returns false and no callback is executed.
+//
+// - queued: in mu.replicas and mu.priorityQ
+// Returns true. onProcessResult is executed when the replica is processed.
+//
+// - purgatory: in mu.replicas and mu.purgatory
+// Returns true and the onProcessResult is called immediately with the
+// purgatory error. Note that the onProcessResult may be invoked again when
+// the purgatory finishes processing the replica..
+//
+// - processing: only in mu.replicas and currently being processed
+// Returns true and onProcessResult is executed when processing completes.
+// If the replica is currently being processed by the purgatory queue, it
+// will not be in bq.mu.purgatory and the onProcessResult will only execute
+// when the purgatory finishes processing the replica.
+//
+// If it returns true, onEnqueueResult is invoked on subsequent invocations to
+// addInternal as well.
//
-// NB: If the replica this attaches to is dropped from an overfull queue, this
-// callback is never called. This is surprising, but the single caller of this
-// is okay with these semantics. Adding new uses is discouraged without cleaning
-// up the contract of this method, but this code doesn't lend itself readily to
-// upholding invariants so there may need to be some cleanup first.
+// NB: Adding new uses is discouraged without cleaning up the contract of
+// processCallback. For example, removeFromReplicaSetLocked may be called
+// without invoking these callbacks. See replicaItem.registerCallback for more
+// details.
func (bq *baseQueue) MaybeAddCallback(rangeID roachpb.RangeID, cb processCallback) bool {
bq.mu.Lock()
defer bq.mu.Unlock()
if purgatoryErr, ok := bq.mu.purgatory[rangeID]; ok {
- cb(purgatoryErr)
+ cb.onProcessResult(purgatoryErr)
return true
}
if item, ok := bq.mu.replicas[rangeID]; ok {
@@ -871,10 +1123,10 @@ func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
// Acquire from the process semaphore.
bq.processSem <- struct{}{}
- repl, priority := bq.pop()
+ repl, priorityAtEnqueue := bq.pop()
if repl != nil {
- bq.processOneAsyncAndReleaseSem(ctx, repl, stopper)
- bq.impl.postProcessScheduled(ctx, repl, priority)
+ bq.processOneAsyncAndReleaseSem(ctx, repl, stopper, priorityAtEnqueue)
+ bq.impl.postProcessScheduled(ctx, repl, priorityAtEnqueue)
} else {
// Release semaphore if no replicas were available.
<-bq.processSem
@@ -902,23 +1154,16 @@ func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
// processOneAsyncAndReleaseSem processes a replica if possible and releases the
// processSem when the processing is complete.
func (bq *baseQueue) processOneAsyncAndReleaseSem(
- ctx context.Context, repl replicaInQueue, stopper *stop.Stopper,
+ ctx context.Context, repl replicaInQueue, stopper *stop.Stopper, priorityAtEnqueue float64,
) {
ctx = repl.AnnotateCtx(ctx)
taskName := bq.processOpName() + " [outer]"
- // Validate that the replica is still in a state that can be processed. If
- // it is no longer processable, return immediately.
- if _, err := bq.replicaCanBeProcessed(ctx, repl, false /*acquireLeaseIfNeeded */); err != nil {
- bq.finishProcessingReplica(ctx, stopper, repl, err)
- <-bq.processSem
- return
- }
if err := stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: taskName},
func(ctx context.Context) {
// Release semaphore when finished processing.
defer func() { <-bq.processSem }()
start := timeutil.Now()
- err := bq.processReplica(ctx, repl)
+ err := bq.processReplica(ctx, repl, priorityAtEnqueue)
bq.recordProcessDuration(ctx, timeutil.Since(start))
bq.finishProcessingReplica(ctx, stopper, repl, err)
}); err != nil {
@@ -951,7 +1196,9 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
//
// ctx should already be annotated by both bq.AnnotateCtx() and
// repl.AnnotateCtx().
-func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
+func (bq *baseQueue) processReplica(
+ ctx context.Context, repl replicaInQueue, priorityAtEnqueue float64,
+) error {
ctx, span := tracing.EnsureChildSpan(ctx, bq.Tracer, bq.processOpName())
defer span.Finish()
@@ -975,7 +1222,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er
// it may not be and shouldQueue will be passed a nil realRepl. These tests
// know what they're getting into so that's fine.
realRepl, _ := repl.(*Replica)
- processed, err := bq.impl.process(ctx, realRepl, conf)
+ processed, err := bq.impl.process(ctx, realRepl, conf, priorityAtEnqueue)
if err != nil {
return err
}
@@ -1090,19 +1337,24 @@ func IsPurgatoryError(err error) (PurgatoryError, bool) {
}
// assertInvariants codifies the guarantees upheld by the data structures in the
-// base queue. In summary, a replica is one of:
+// base queue.
+// 1. In summary, a replica is one of:
// - "queued" and in mu.replicas and mu.priorityQ
// - "processing" and only in mu.replicas
// - "purgatory" and in mu.replicas and mu.purgatory
+// 2. For every item in bq.mu.priorityQ.sl, bq.mu.purgatory, and bq.mu.replicas,
+// assertOnReplicaItem callback is called with the item. Note that we expect
+// items in priorityQ and purgatory to be in replicas.
//
// Note that in particular, nothing is ever in both mu.priorityQ and
// mu.purgatory.
-func (bq *baseQueue) assertInvariants() {
+func (bq *baseQueue) assertInvariants(assertOnReplicaItem func(item *replicaItem)) {
bq.mu.Lock()
defer bq.mu.Unlock()
ctx := bq.AnnotateCtx(context.Background())
for _, item := range bq.mu.priorityQ.sl {
+ assertOnReplicaItem(item)
if item.processing {
log.Fatalf(ctx, "processing item found in prioQ: %v", item)
}
@@ -1115,6 +1367,7 @@ func (bq *baseQueue) assertInvariants() {
}
for rangeID := range bq.mu.purgatory {
item, inReplicas := bq.mu.replicas[rangeID]
+ assertOnReplicaItem(item)
if !inReplicas {
log.Fatalf(ctx, "item found in purg but not in mu.replicas: %v", item)
}
@@ -1129,6 +1382,7 @@ func (bq *baseQueue) assertInvariants() {
// that there aren't any non-processing replicas *only* in bq.mu.replicas.
var nNotProcessing int
for _, item := range bq.mu.replicas {
+ assertOnReplicaItem(item)
if !item.processing {
nNotProcessing++
}
@@ -1153,6 +1407,7 @@ func (bq *baseQueue) finishProcessingReplica(
processing := item.processing
callbacks := item.callbacks
requeue := item.requeue
+ priority := item.priority
item.callbacks = nil
bq.removeFromReplicaSetLocked(repl.GetRangeID())
item = nil // prevent accidental use below
@@ -1164,7 +1419,7 @@ func (bq *baseQueue) finishProcessingReplica(
// Call any registered callbacks.
for _, cb := range callbacks {
- cb(err)
+ cb.onProcessResult(err)
}
// Handle failures.
@@ -1188,7 +1443,7 @@ func (bq *baseQueue) finishProcessingReplica(
// purgatory.
if purgErr, ok := IsPurgatoryError(err); ok {
bq.mu.Lock()
- bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
+ bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr, priority /*priorityAtEnqueue*/, callbacks /*processCallback*/)
bq.mu.Unlock()
return
}
@@ -1208,7 +1463,12 @@ func (bq *baseQueue) finishProcessingReplica(
// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
// holds replicas which have failed processing.
func (bq *baseQueue) addToPurgatoryLocked(
- ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError,
+ ctx context.Context,
+ stopper *stop.Stopper,
+ repl replicaInQueue,
+ purgErr PurgatoryError,
+ priorityAtEnqueue float64,
+ cbs []processCallback,
) {
bq.mu.AssertHeld()
@@ -1232,7 +1492,14 @@ func (bq *baseQueue) addToPurgatoryLocked(
return
}
- item := &replicaItem{rangeID: repl.GetRangeID(), replicaID: repl.ReplicaID(), index: -1}
+ item := &replicaItem{
+ rangeID: repl.GetRangeID(),
+ replicaID: repl.ReplicaID(),
+ index: -1,
+ priority: priorityAtEnqueue,
+ callbacks: cbs,
+ }
+
bq.mu.replicas[repl.GetRangeID()] = item
defer func() {
@@ -1318,12 +1585,8 @@ func (bq *baseQueue) processReplicasInPurgatory(
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunTask(
annotatedCtx, bq.processOpName(), func(ctx context.Context) {
- if _, err := bq.replicaCanBeProcessed(ctx, repl, false); err != nil {
- bq.finishProcessingReplica(ctx, stopper, repl, err)
- } else {
- err = bq.processReplica(ctx, repl)
- bq.finishProcessingReplica(ctx, stopper, repl, err)
- }
+ err = bq.processReplica(ctx, repl, item.priority /*priorityAtEnqueue*/)
+ bq.finishProcessingReplica(ctx, stopper, repl, err)
},
) != nil {
// NB: We do not need to worry about removing any unprocessed replicas
@@ -1448,7 +1711,7 @@ func (bq *baseQueue) DrainQueue(ctx context.Context, stopper *stop.Stopper) {
if _, err := bq.replicaCanBeProcessed(annotatedCtx, repl, false); err != nil {
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
} else {
- err = bq.processReplica(annotatedCtx, repl)
+ err = bq.processReplica(annotatedCtx, repl, -1 /*priorityAtEnqueue*/)
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
}
}
diff --git a/pkg/kv/kvserver/queue_concurrency_test.go b/pkg/kv/kvserver/queue_concurrency_test.go
index 63c0c0dad1d3..96c737384017 100644
--- a/pkg/kv/kvserver/queue_concurrency_test.go
+++ b/pkg/kv/kvserver/queue_concurrency_test.go
@@ -113,7 +113,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
})
}
g.Go(func() error {
- bq.assertInvariants()
+ bq.assertInvariants(func(item *replicaItem) {})
return nil
})
}
@@ -141,7 +141,7 @@ func (fakeQueueImpl) shouldQueue(
}
func (fq fakeQueueImpl) process(
- ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, _ float64,
) (bool, error) {
return fq.pr(ctx, repl, confReader)
}
diff --git a/pkg/kv/kvserver/queue_helpers_testutil.go b/pkg/kv/kvserver/queue_helpers_testutil.go
index a667a15a43c0..ecf2a09dc229 100644
--- a/pkg/kv/kvserver/queue_helpers_testutil.go
+++ b/pkg/kv/kvserver/queue_helpers_testutil.go
@@ -18,7 +18,16 @@ import (
func (bq *baseQueue) testingAdd(
ctx context.Context, repl replicaInQueue, priority float64,
) (bool, error) {
- return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
+ return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, noopProcessCallback)
+}
+
+// testingAddWithCallback is the same as testingAdd, but allows the caller to
+// register a process callback that will be invoked when the replica is enqueued
+// or processed.
+func (bq *baseQueue) testingAddWithCallback(
+ ctx context.Context, repl replicaInQueue, priority float64, cb processCallback,
+) (bool, error) {
+ return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, cb)
}
func forceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) error {
diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go
index c0274e8f7192..abdb51e1fba6 100644
--- a/pkg/kv/kvserver/queue_test.go
+++ b/pkg/kv/kvserver/queue_test.go
@@ -58,7 +58,7 @@ func (tq *testQueueImpl) shouldQueue(
}
func (tq *testQueueImpl) process(
- _ context.Context, _ *Replica, _ spanconfig.StoreReader,
+ _ context.Context, _ *Replica, _ spanconfig.StoreReader, _ float64,
) (bool, error) {
defer atomic.AddInt32(&tq.processed, 1)
if tq.err != nil {
@@ -112,6 +112,8 @@ func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfi
cfg.pending = metric.NewGauge(metric.Metadata{Name: "pending"})
cfg.processingNanos = metric.NewCounter(metric.Metadata{Name: "processingnanos"})
cfg.purgatory = metric.NewGauge(metric.Metadata{Name: "purgatory"})
+ cfg.enqueueAdd = metric.NewCounter(metric.Metadata{Name: "enqueueadd"})
+ cfg.enqueueUnexpectedError = metric.NewCounter(metric.Metadata{Name: "enqueueunexpectederror"})
cfg.disabledConfig = testQueueEnabled
return newBaseQueue(name, impl, store, cfg)
}
@@ -226,6 +228,11 @@ func TestBaseQueueAddUpdateAndRemove(t *testing.T) {
} else {
bq.finishProcessingReplica(ctx, stopper, r2, nil)
}
+ bq.assertInvariants(func(item *replicaItem) {
+ replica, err := bq.getReplica(item.rangeID)
+ require.NoError(t, err)
+ require.Equal(t, priorityMap[replica.(*Replica)], item.priority)
+ })
if v := bq.pending.Value(); v != 1 {
t.Errorf("expected 1 pending replicas; got %d", v)
}
@@ -298,6 +305,11 @@ func TestBaseQueueAddUpdateAndRemove(t *testing.T) {
if r, _ := bq.pop(); r != nil {
t.Errorf("expected empty queue; got %v", r)
}
+ bq.assertInvariants(func(item *replicaItem) {
+ replica, err := bq.getReplica(item.rangeID)
+ require.NoError(t, err)
+ require.Equal(t, priorityMap[replica.(*Replica)], item.priority)
+ })
// Try removing a replica.
bq.maybeAdd(ctx, r1, hlc.ClockTimestamp{})
@@ -317,6 +329,11 @@ func TestBaseQueueAddUpdateAndRemove(t *testing.T) {
if v := bq.pending.Value(); v != 0 {
t.Errorf("expected 0 pending replicas; got %d", v)
}
+ bq.assertInvariants(func(item *replicaItem) {
+ replica, err := bq.getReplica(item.rangeID)
+ require.NoError(t, err)
+ require.Equal(t, priorityMap[replica.(*Replica)], item.priority)
+ })
}
// TestBaseQueueSamePriorityFIFO verifies that if multiple items are queued at
@@ -542,11 +559,12 @@ func TestBaseQueueAddRemove(t *testing.T) {
t.Fatal(err)
}
+ const testPriority = 1.0
testQueue := &testQueueImpl{
blocker: make(chan struct{}, 1),
shouldQueueFn: func(now hlc.ClockTimestamp, r *Replica) (shouldQueue bool, priority float64) {
shouldQueue = true
- priority = 1.0
+ priority = testPriority
return
},
}
@@ -554,7 +572,14 @@ func TestBaseQueueAddRemove(t *testing.T) {
bq.Start(stopper)
bq.maybeAdd(ctx, r, hlc.ClockTimestamp{})
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, testPriority, item.priority)
+ })
+
bq.MaybeRemove(r.RangeID)
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, testPriority, item.priority)
+ })
// Wake the queue
close(testQueue.blocker)
@@ -841,10 +866,19 @@ func TestBaseQueuePurgatory(t *testing.T) {
bq.maybeAdd(context.Background(), r, hlc.ClockTimestamp{})
}
+ // Make sure priority is preserved during processing.
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, float64(item.rangeID), item.priority)
+ })
+
testutils.SucceedsSoon(t, func() error {
if pc := testQueue.getProcessed(); pc != replicaCount {
return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc)
}
+ // Make sure priorities are preserved with the purgatory queue.
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, float64(item.rangeID), item.priority)
+ })
// We have to loop checking the following conditions because the increment
// of testQueue.processed does not happen atomically with the replica being
// placed in purgatory.
@@ -856,6 +890,9 @@ func TestBaseQueuePurgatory(t *testing.T) {
if l := bq.Length(); l != 0 {
return errors.Errorf("expected empty priorityQ; got %d", l)
}
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, float64(item.rangeID), item.priority)
+ })
// Check metrics.
if v := bq.successes.Count(); v != 0 {
return errors.Errorf("expected 0 processed replicas; got %d", v)
@@ -890,6 +927,9 @@ func TestBaseQueuePurgatory(t *testing.T) {
if l := bq.Length(); l != 0 {
return errors.Errorf("expected empty priorityQ; got %d", l)
}
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, float64(item.rangeID), item.priority)
+ })
// Check metrics.
if v := bq.successes.Count(); v != 0 {
return errors.Errorf("expected 0 processed replicas; got %d", v)
@@ -925,6 +965,9 @@ func TestBaseQueuePurgatory(t *testing.T) {
if pc := testQueue.getProcessed(); pc != replicaCount*3-rmReplCount {
return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3-rmReplCount, pc)
}
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, float64(item.rangeID), item.priority)
+ })
// Check metrics.
if v := bq.successes.Count(); v != int64(replicaCount)-rmReplCount {
return errors.Errorf("expected %d processed replicas; got %d", replicaCount-rmReplCount, v)
@@ -968,6 +1011,9 @@ func TestBaseQueuePurgatory(t *testing.T) {
if pc := testQueue.getProcessed(); pc != beforeProcessCount+1 {
return errors.Errorf("expected %d processed replicas; got %d", beforeProcessCount+1, pc)
}
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, float64(item.rangeID), item.priority)
+ })
if v := bq.successes.Count(); v != beforeSuccessCount+1 {
return errors.Errorf("expected %d processed replicas; got %d", beforeSuccessCount+1, v)
}
@@ -991,7 +1037,7 @@ type processTimeoutQueueImpl struct {
var _ queueImpl = &processTimeoutQueueImpl{}
func (pq *processTimeoutQueueImpl) process(
- ctx context.Context, r *Replica, _ spanconfig.StoreReader,
+ ctx context.Context, r *Replica, _ spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
<-ctx.Done()
atomic.AddInt32(&pq.processed, 1)
@@ -1121,7 +1167,7 @@ type processTimeQueueImpl struct {
var _ queueImpl = &processTimeQueueImpl{}
func (pq *processTimeQueueImpl) process(
- _ context.Context, _ *Replica, _ spanconfig.StoreReader,
+ _ context.Context, _ *Replica, _ spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
time.Sleep(5 * time.Millisecond)
return true, nil
@@ -1250,6 +1296,281 @@ func TestBaseQueueDisable(t *testing.T) {
}
}
+// TestBaseQueueCallbackOnEnqueueResult tests the callback onEnqueueResult for
+// 1. successful case: the replica is successfully enqueued.
+// 2. priority update: updates the priority of the replica and not enqueuing
+// again.
+// 3. disabled: queue is disabled and the replica is not enqueued.
+// 4. stopped: queue is stopped and the replica is not enqueued.
+// 5. already queued: the replica is already in the queue and not enqueued
+// again.
+// 6. purgatory: the replica is in purgatory and not enqueued again.
+// 7. processing: the replica is already being processed and not enqueued again.
+// 8. full queue: the queue is full and the replica is not enqueued again.
+func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ tc := testContext{}
+ stopper := stop.NewStopper()
+ ctx := context.Background()
+ defer stopper.Stop(ctx)
+ tc.Start(ctx, t, stopper)
+
+ t.Run("successfuladd", func(t *testing.T) {
+ testQueue := &testQueueImpl{}
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 1})
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+ queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.Equal(t, 0, indexOnHeap)
+ require.NoError(t, err)
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.Equal(t, bq.enqueueAdd.Count(), int64(1))
+ require.True(t, queued)
+ })
+
+ t.Run("priority", func(t *testing.T) {
+ testQueue := &testQueueImpl{}
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 5})
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+ priorities := []float64{5.0, 4.0, 8.0, 1.0, 3.0}
+ expectedIndices := []int{0, 1, 0, 3, 4}
+ // When inserting 5, [5], index 0.
+ // When inserting 4, [5, 4], index 1.
+ // When inserting 8, [8, 4, 5], index 0.
+ // When inserting 1, [8, 4, 5, 1], index 3.
+ // When inserting 3, [8, 4, 5, 1, 3], index 4.
+ for i, priority := range priorities {
+ r.Desc().RangeID = roachpb.RangeID(i + 1)
+ queued, _ := bq.testingAddWithCallback(ctx, r, priority, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.Equal(t, expectedIndices[i], indexOnHeap)
+ require.NoError(t, err)
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.Equal(t, int64(i+1), bq.enqueueAdd.Count())
+ require.True(t, queued)
+ }
+ // Set range id back to 1.
+ r.Desc().RangeID = 1
+ })
+ t.Run("disabled", func(t *testing.T) {
+ testQueue := &testQueueImpl{}
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
+ bq.SetDisabled(true)
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+ queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.Equal(t, -1, indexOnHeap)
+ require.ErrorIs(t, err, errQueueDisabled)
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.Equal(t, int64(0), bq.enqueueAdd.Count())
+ require.Equal(t, int64(1), bq.enqueueUnexpectedError.Count())
+ require.False(t, queued)
+ })
+ t.Run("stopped", func(t *testing.T) {
+ testQueue := &testQueueImpl{}
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
+ bq.mu.stopped = true
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+ queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.Equal(t, -1, indexOnHeap)
+ require.ErrorIs(t, err, errQueueStopped)
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.False(t, queued)
+ require.Equal(t, int64(0), bq.enqueueAdd.Count())
+ require.Equal(t, int64(1), bq.enqueueUnexpectedError.Count())
+ })
+
+ t.Run("alreadyqueued", func(t *testing.T) {
+ testQueue := &testQueueImpl{}
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+ queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.Equal(t, 0, indexOnHeap)
+ require.NoError(t, err)
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.True(t, queued)
+ require.Equal(t, int64(1), bq.enqueueAdd.Count())
+ require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
+
+ // Inserting again on the same range id should fail.
+ queued, _ = bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.Equal(t, -1, indexOnHeap)
+ require.ErrorIs(t, err, errReplicaAlreadyInQueue)
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.False(t, queued)
+ require.Equal(t, int64(1), bq.enqueueAdd.Count())
+ require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
+ })
+
+ t.Run("purgatory", func(t *testing.T) {
+ testQueue := &testQueueImpl{
+ pChan: make(chan time.Time, 1),
+ }
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+ bq.mu.Lock()
+ bq.addToPurgatoryLocked(ctx, stopper, r, &testPurgatoryError{}, 1.0, nil)
+ bq.mu.Unlock()
+ // Inserting a range in purgatory should not enqueue again.
+ queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.Equal(t, -1, indexOnHeap)
+ require.ErrorIs(t, err, errReplicaAlreadyInPurgatory)
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.False(t, queued)
+ require.Equal(t, int64(0), bq.enqueueAdd.Count())
+ require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
+ })
+
+ t.Run("processing", func(t *testing.T) {
+ testQueue := &testQueueImpl{}
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+ item := &replicaItem{rangeID: r.Desc().RangeID, replicaID: r.ReplicaID(), index: -1}
+ item.setProcessing()
+ bq.addLocked(item)
+ // Inserting a range that is already being processed should not enqueue again.
+ markedAsRequeued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.Equal(t, -1, indexOnHeap)
+ require.ErrorIs(t, err, errReplicaAlreadyProcessing)
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.True(t, markedAsRequeued)
+ require.Equal(t, int64(0), bq.enqueueAdd.Count())
+ require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
+ })
+ t.Run("fullqueue", func(t *testing.T) {
+ testQueue := &testQueueImpl{}
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 0})
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+ // Max size is 0, so the replica should not be enqueued.
+ queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ // It may be called with err = nil.
+ if err != nil {
+ require.ErrorIs(t, err, errDroppedDueToFullQueueSize)
+ }
+ },
+ onProcessResult: func(err error) {
+ t.Fatal("unexpected call to onProcessResult")
+ },
+ })
+ require.True(t, queued)
+ require.Equal(t, int64(1), bq.enqueueAdd.Count())
+ require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
+ })
+}
+
+// TestBaseQueueCallbackOnProcessResult tests that the processCallback is
+// invoked when the replica is processed and will be invoked again if the
+// replica ends up in the purgatory queue and being processed again.
+func TestBaseQueueCallbackOnProcessResult(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ tc := testContext{}
+ stopper := stop.NewStopper()
+ ctx := context.Background()
+ defer stopper.Stop(ctx)
+ tsc := TestStoreConfig(nil)
+ tc.StartWithStoreConfig(ctx, t, stopper, tsc)
+
+ testQueue := &testQueueImpl{
+ duration: time.Nanosecond,
+ pChan: make(chan time.Time, 1),
+ err: &testPurgatoryError{},
+ }
+
+ const replicaCount = 10
+ repls := createReplicas(t, &tc, replicaCount)
+
+ bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: replicaCount})
+ bq.Start(stopper)
+
+ var totalProcessedCalledWithErr atomic.Int32
+ for _, r := range repls {
+ queued, _ := bq.testingAddWithCallback(context.Background(), r, 1.0, processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ require.NoError(t, err)
+ },
+ onProcessResult: func(err error) {
+ if err != nil {
+ totalProcessedCalledWithErr.Add(1)
+ }
+ },
+ })
+ require.True(t, queued)
+ }
+
+ testutils.SucceedsSoon(t, func() error {
+ if pc := testQueue.getProcessed(); pc != replicaCount {
+ return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc)
+ }
+
+ if totalProcessedCalledWithErr.Load() != int32(replicaCount) {
+ return errors.Errorf("expected %d processed replicas with err; got %d", replicaCount, totalProcessedCalledWithErr.Load())
+ }
+ return nil
+ })
+
+ // Now, signal that purgatoried replicas should retry.
+ testQueue.pChan <- timeutil.Now()
+
+ testutils.SucceedsSoon(t, func() error {
+ if pc := testQueue.getProcessed(); pc != replicaCount*2 {
+ return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc)
+ }
+
+ if totalProcessedCalledWithErr.Load() != int32(replicaCount*2) {
+ return errors.Errorf("expected %d processed replicas with err; got %d", replicaCount, totalProcessedCalledWithErr.Load())
+ }
+ return nil
+ })
+}
+
// TestQueueDisable verifies that setting the set of queue.enabled cluster
// settings actually disables the base queue. This test works alongside
// TestBaseQueueDisable to verify the entire disable workflow.
@@ -1345,13 +1666,13 @@ type parallelQueueImpl struct {
var _ queueImpl = ¶llelQueueImpl{}
func (pq *parallelQueueImpl) process(
- ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, priority float64,
) (processed bool, err error) {
atomic.AddInt32(&pq.processing, 1)
if pq.processBlocker != nil {
<-pq.processBlocker
}
- processed, err = pq.testQueueImpl.process(ctx, repl, confReader)
+ processed, err = pq.testQueueImpl.process(ctx, repl, confReader, priority)
atomic.AddInt32(&pq.processing, -1)
return processed, err
}
@@ -1372,11 +1693,12 @@ func TestBaseQueueProcessConcurrently(t *testing.T) {
repls := createReplicas(t, &tc, 3)
r1, r2, r3 := repls[0], repls[1], repls[2]
+ const testPriority = 1
pQueue := ¶llelQueueImpl{
testQueueImpl: testQueueImpl{
blocker: make(chan struct{}, 1),
shouldQueueFn: func(now hlc.ClockTimestamp, r *Replica) (shouldQueue bool, priority float64) {
- return true, 1
+ return true, testPriority
},
},
processBlocker: make(chan struct{}, 1),
@@ -1421,6 +1743,9 @@ func TestBaseQueueProcessConcurrently(t *testing.T) {
pQueue.processBlocker <- struct{}{}
assertProcessedAndProcessing(3, 0)
+ bq.assertInvariants(func(item *replicaItem) {
+ require.Equal(t, float64(testPriority), item.priority)
+ })
}
// TestBaseQueueReplicaChange ensures that if a replica is added to the queue
diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go
index 819e881297a3..92feac867d94 100644
--- a/pkg/kv/kvserver/raft_log_queue.go
+++ b/pkg/kv/kvserver/raft_log_queue.go
@@ -673,7 +673,7 @@ func (rlq *raftLogQueue) shouldQueueImpl(
// leader and if the total number of the range's raft log's stale entries
// exceeds RaftLogQueueStaleThreshold.
func (rlq *raftLogQueue) process(
- ctx context.Context, r *Replica, _ spanconfig.StoreReader,
+ ctx context.Context, r *Replica, _ spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
decision, err := newTruncateDecision(ctx, r)
if err != nil {
diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go
index 13a1cb6a2663..68dc49298ff7 100644
--- a/pkg/kv/kvserver/raft_snapshot_queue.go
+++ b/pkg/kv/kvserver/raft_snapshot_queue.go
@@ -79,7 +79,7 @@ func (rq *raftSnapshotQueue) shouldQueue(
}
func (rq *raftSnapshotQueue) process(
- ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
) (anyProcessed bool, _ error) {
// If a follower requires a Raft snapshot, perform it.
if status := repl.RaftStatus(); status != nil {
diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go
index 0b2fbc22408d..0e430b919ad5 100644
--- a/pkg/kv/kvserver/replica.go
+++ b/pkg/kv/kvserver/replica.go
@@ -1065,25 +1065,25 @@ func (r *Replica) SetSpanConfig(conf roachpb.SpanConfig, sp roachpb.Span) bool {
// impacted by changes to the SpanConfig. This should be called after any
// changes to the span configs.
func (r *Replica) MaybeQueue(ctx context.Context, now hlc.ClockTimestamp) {
- r.store.splitQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
+ _ = r.store.splitQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, r, now)
})
- r.store.mergeQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
+ _ = r.store.mergeQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, r, now)
})
if EnqueueInMvccGCQueueOnSpanConfigUpdateEnabled.Get(&r.store.GetStoreConfig().Settings.SV) {
- r.store.mvccGCQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
+ _ = r.store.mvccGCQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, r, now)
})
}
- r.store.leaseQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
+ _ = r.store.leaseQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, r, now)
})
// The replicate queue has a relatively more expensive queue check
// (shouldQueue), because it scales with the number of stores, and
// performs more checks.
if EnqueueInReplicateQueueOnSpanConfigUpdateEnabled.Get(&r.store.GetStoreConfig().Settings.SV) {
- r.store.replicateQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
+ _ = r.store.replicateQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, r, now)
})
}
@@ -2567,15 +2567,19 @@ func (r *Replica) GetMutexForTesting() *ReplicaMutex {
// manner via the replica scanner, see #130199. This functionality is disabled
// by default for this reason.
func (r *Replica) maybeEnqueueProblemRange(
- ctx context.Context, now time.Time, leaseValid, isLeaseholder bool,
+ ctx context.Context, now time.Time, leaseValid, isLeaseholder bool, maybeLog bool,
) {
+
// The method expects the caller to provide whether the lease is valid and
// the replica is the leaseholder for the range, so that it can avoid
// unnecessary work. We expect this method to be called in the context of
// updating metrics.
if !isLeaseholder || !leaseValid {
// The replicate queue will not process the replica without a valid lease.
- // Nothing to do.
+ // Track when we skip enqueuing for these reasons.
+ log.KvDistribution.Infof(ctx, "not enqueuing replica %s because isLeaseholder=%t, leaseValid=%t",
+ r.Desc(), isLeaseholder, leaseValid)
+ r.store.metrics.DecommissioningNudgerNotLeaseholderOrInvalidLease.Inc(1)
return
}
@@ -2596,10 +2600,43 @@ func (r *Replica) maybeEnqueueProblemRange(
// expect a race, however if the value changed underneath us we won't enqueue
// the replica as we lost the race.
if !r.lastProblemRangeReplicateEnqueueTime.CompareAndSwap(lastTime, now) {
+ // This race condition is expected to be rare.
+ log.KvDistribution.VInfof(ctx, 1, "not enqueuing replica %s due to race: "+
+ "lastProblemRangeReplicateEnqueueTime was updated concurrently", r.Desc())
return
}
- r.store.replicateQueue.AddAsync(ctx, r,
- allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority())
+ r.store.metrics.DecommissioningNudgerEnqueue.Inc(1)
+ // TODO(dodeca12): Figure out a better way to track the
+ // decommissioning nudger enqueue failures/errors.
+ level := log.Level(2)
+ if maybeLog {
+ level = log.Level(0)
+ }
+ r.store.replicateQueue.AddAsyncWithCallback(ctx, r,
+ allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority(), processCallback{
+ onEnqueueResult: func(indexOnHeap int, err error) {
+ if err != nil {
+ log.KvDistribution.VInfof(ctx, level,
+ "decommissioning nudger failed to enqueue range %v due to %v", r.Desc(), err)
+ r.store.metrics.DecommissioningNudgerEnqueueFailure.Inc(1)
+ } else {
+ log.KvDistribution.VInfof(ctx, level,
+ "decommissioning nudger successfully enqueued range %v at index %d", r.Desc(), indexOnHeap)
+ r.store.metrics.DecommissioningNudgerEnqueueSuccess.Inc(1)
+ }
+ },
+ onProcessResult: func(err error) {
+ if err != nil {
+ log.KvDistribution.VInfof(ctx, level,
+ "decommissioning nudger failed to process range %v due to %v", r.Desc(), err)
+ r.store.metrics.DecommissioningNudgerProcessFailure.Inc(1)
+ } else {
+ log.KvDistribution.VInfof(ctx, level,
+ "decommissioning nudger successfully processed replica %s", r.Desc())
+ r.store.metrics.DecommissioningNudgerProcessSuccess.Inc(1)
+ }
+ },
+ })
}
// SendStreamStats sets the stats for the replica send streams that belong to
diff --git a/pkg/kv/kvserver/replica_backpressure.go b/pkg/kv/kvserver/replica_backpressure.go
index 01b2c1183821..b19ca69972e1 100644
--- a/pkg/kv/kvserver/replica_backpressure.go
+++ b/pkg/kv/kvserver/replica_backpressure.go
@@ -192,8 +192,17 @@ func (r *Replica) maybeBackpressureBatch(ctx context.Context, ba *kvpb.BatchRequ
// Register a callback on an ongoing split for this range in the splitQueue.
splitC := make(chan error, 1)
- if !r.store.splitQueue.MaybeAddCallback(r.RangeID, func(err error) {
- splitC <- err
+ if !r.store.splitQueue.MaybeAddCallback(r.RangeID, processCallback{
+ onEnqueueResult: func(rank int, err error) {},
+ onProcessResult: func(err error) {
+ select {
+ case splitC <- err:
+ default:
+ // Drop the error if the channel is already full. This prevents
+ // blocking if the callback is invoked multiple times.
+ return
+ }
+ },
}) {
// No split ongoing. We may have raced with its completion. There's
// no good way to prevent this race, so we conservatively allow the
diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go
index 1df7a12a4e9d..17f3501ad601 100644
--- a/pkg/kv/kvserver/replica_command.go
+++ b/pkg/kv/kvserver/replica_command.go
@@ -4194,7 +4194,7 @@ func (r *Replica) scatterRangeAndRandomizeLeases(ctx context.Context, randomizeL
break
}
_, err = rq.processOneChange(
- ctx, r, desc, conf, true /* scatter */, false, /* dryRun */
+ ctx, r, desc, conf, true /* scatter */, false /* dryRun */, -1, /*priorityAtEnqueue*/
)
if err != nil {
// If the error is expected to be transient, retry processing the range.
diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go
index 2874c355ad9a..7acaf4f99914 100644
--- a/pkg/kv/kvserver/replica_gc_queue.go
+++ b/pkg/kv/kvserver/replica_gc_queue.go
@@ -216,7 +216,7 @@ func replicaGCShouldQueueImpl(now, lastCheck hlc.Timestamp, isSuspect bool) (boo
// process performs a consistent lookup on the range descriptor to see if we are
// still a member of the range.
func (rgcq *replicaGCQueue) process(
- ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
// Note that the Replicas field of desc is probably out of date, so
// we should only use `desc` for its static fields like RangeID and
diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go
index ada083bbbe95..8c5f802a8008 100644
--- a/pkg/kv/kvserver/replica_test.go
+++ b/pkg/kv/kvserver/replica_test.go
@@ -10432,7 +10432,7 @@ func TestConsistenctQueueErrorFromCheckConsistency(t *testing.T) {
}
for i := 0; i < 2; i++ {
// Do this twice because it used to deadlock. See #25456.
- processed, err := tc.store.consistencyQueue.process(ctx, tc.repl, confReader)
+ processed, err := tc.store.consistencyQueue.process(ctx, tc.repl, confReader, -1 /*priorityAtEnqueue*/)
if !testutils.IsError(err, "boom") {
t.Fatal(err)
}
diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go
index 69e433280913..5e4528d95742 100644
--- a/pkg/kv/kvserver/replicate_queue.go
+++ b/pkg/kv/kvserver/replicate_queue.go
@@ -100,6 +100,38 @@ var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetti
settings.NonNegativeDuration,
)
+// TODO(wenyihu6): move these cluster settings to kvserverbase
+
+// PriorityInversionRequeue is a setting that controls whether to requeue
+// replicas when their priority at enqueue time and processing time is inverted
+// too much (e.g. dropping from a repair action to AllocatorConsiderRebalance).
+// TODO(wenyihu6): flip default to true after landing 152596 to bake
+var PriorityInversionRequeue = settings.RegisterBoolSetting(
+ settings.SystemOnly,
+ "kv.priority_inversion_requeue_replicate_queue.enabled",
+ "whether the requeue replicas should requeue when enqueued for "+
+ "repair action but ended up consider rebalancing during processing",
+ false,
+)
+
+// ReplicateQueueMaxSize is a setting that controls the max size of the
+// replicate queue. When this limit is exceeded, lower priority replicas (not
+// guaranteed to be the lowest) are dropped from the queue.
+var ReplicateQueueMaxSize = settings.RegisterIntSetting(
+ settings.ApplicationLevel,
+ "kv.replicate_queue.max_size",
+ "maximum number of replicas that can be queued for replicate queue processing; "+
+ "when this limit is exceeded, lower priority (not guaranteed to be the lowest) "+
+ "replicas are dropped from the queue",
+ defaultQueueMaxSize,
+ settings.WithValidateInt(func(v int64) error {
+ if v < defaultQueueMaxSize {
+ return errors.Errorf("cannot be set to a value lower than %d: %d", defaultQueueMaxSize, v)
+ }
+ return nil
+ }),
+)
+
var (
metaReplicateQueueAddReplicaCount = metric.Metadata{
Name: "queue.replicate.addreplica",
@@ -287,6 +319,22 @@ var (
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
+ metaReplicateQueueRequeueDueToPriorityInversion = metric.Metadata{
+ Name: "queue.replicate.priority_inversion.requeue",
+ Help: "Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. " +
+ "A priority inversion occurs when the priority at processing time ends up being lower " +
+ "than at enqueue time. When the priority has changed from a high priority repair action to rebalance, " +
+ "the change is requeued to avoid unfairness.",
+ Measurement: "Replicas",
+ Unit: metric.Unit_COUNT,
+ }
+ metaReplicateQueuePriorityInversionTotal = metric.Metadata{
+ Name: "queue.replicate.priority_inversion.total",
+ Help: "Total number of priority inversions in the replicate queue. " +
+ "A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time",
+ Measurement: "Replicas",
+ Unit: metric.Unit_COUNT,
+ }
)
// quorumError indicates a retryable error condition which sends replicas being
@@ -309,6 +357,7 @@ func (e *quorumError) Error() string {
func (*quorumError) PurgatoryErrorMarker() {}
// ReplicateQueueMetrics is the set of metrics for the replicate queue.
+// TODO(wenyihu6): metrics initialization could be cleaned up by using a map.
type ReplicateQueueMetrics struct {
AddReplicaCount *metric.Counter
AddVoterReplicaCount *metric.Counter
@@ -346,6 +395,10 @@ type ReplicateQueueMetrics struct {
// TODO(sarkesian): Consider adding metrics for AllocatorRemoveLearner,
// AllocatorConsiderRebalance, and AllocatorFinalizeAtomicReplicationChange
// allocator actions.
+
+ // Priority Inversion.
+ RequeueDueToPriorityInversion *metric.Counter
+ PriorityInversionTotal *metric.Counter
}
func makeReplicateQueueMetrics() ReplicateQueueMetrics {
@@ -382,6 +435,9 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics {
ReplaceDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaErrorCount),
RemoveDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaSuccessCount),
RemoveDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaErrorCount),
+
+ RequeueDueToPriorityInversion: metric.NewCounter(metaReplicateQueueRequeueDueToPriorityInversion),
+ PriorityInversionTotal: metric.NewCounter(metaReplicateQueuePriorityInversionTotal),
}
}
@@ -521,6 +577,7 @@ func (metrics *ReplicateQueueMetrics) trackResultByAllocatorAction(
// additional replica to their range.
type replicateQueue struct {
*baseQueue
+ maxSize *settings.IntSetting
metrics ReplicateQueueMetrics
allocator allocatorimpl.Allocator
storePool storepool.AllocatorStorePool
@@ -545,6 +602,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
storePool = store.cfg.StorePool
}
rq := &replicateQueue{
+ maxSize: ReplicateQueueMaxSize,
metrics: makeReplicateQueueMetrics(),
planner: plan.NewReplicaPlanner(allocator, storePool,
store.TestingKnobs().ReplicaPlannerKnobs),
@@ -569,16 +627,25 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
- processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
- successes: store.metrics.ReplicateQueueSuccesses,
- failures: store.metrics.ReplicateQueueFailures,
- storeFailures: store.metrics.StoreFailures,
- pending: store.metrics.ReplicateQueuePending,
- processingNanos: store.metrics.ReplicateQueueProcessingNanos,
- purgatory: store.metrics.ReplicateQueuePurgatory,
- disabledConfig: kvserverbase.ReplicateQueueEnabled,
+ processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
+ successes: store.metrics.ReplicateQueueSuccesses,
+ failures: store.metrics.ReplicateQueueFailures,
+ storeFailures: store.metrics.StoreFailures,
+ pending: store.metrics.ReplicateQueuePending,
+ full: store.metrics.ReplicateQueueFull,
+ processingNanos: store.metrics.ReplicateQueueProcessingNanos,
+ purgatory: store.metrics.ReplicateQueuePurgatory,
+ disabledConfig: kvserverbase.ReplicateQueueEnabled,
+ enqueueAdd: store.metrics.ReplicateQueueEnqueueAdd,
+ enqueueFailedPrecondition: store.metrics.ReplicateQueueEnqueueFailedPrecondition,
+ enqueueNoAction: store.metrics.ReplicateQueueEnqueueNoAction,
+ enqueueUnexpectedError: store.metrics.ReplicateQueueEnqueueUnexpectedError,
},
)
+ rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))
+ ReplicateQueueMaxSize.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) {
+ rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))
+ })
updateFn := func() {
select {
case rq.updateCh <- timeutil.Now():
@@ -632,7 +699,7 @@ func (rq *replicateQueue) shouldQueue(
}
func (rq *replicateQueue) process(
- ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, priorityAtEnqueue float64,
) (processed bool, err error) {
if tokenErr := repl.allocatorToken.TryAcquire(ctx, rq.name); tokenErr != nil {
log.KvDistribution.VEventf(ctx,
@@ -658,7 +725,7 @@ func (rq *replicateQueue) process(
// usually signaling that a rebalancing reservation could not be made with the
// selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
- requeue, err := rq.processOneChangeWithTracing(ctx, repl, desc, &conf)
+ requeue, err := rq.processOneChangeWithTracing(ctx, repl, desc, &conf, priorityAtEnqueue)
if isSnapshotError(err) {
// If ChangeReplicas failed because the snapshot failed, we attempt to
// retry the operation. The most likely causes of the snapshot failing
@@ -677,21 +744,21 @@ func (rq *replicateQueue) process(
log.KvDistribution.Infof(ctx, "%v", err)
continue
}
-
+ // At the time of writing, requeue => err == nil except for priority
+ // inversions. Priority inversion intentionally returns a priority inversion
+ // error along with requeue = true.
+ if requeue {
+ log.KvDistribution.VEventf(ctx, 1, "re-queuing: %v", err)
+ rq.maybeAdd(ctx, repl, rq.store.Clock().NowAsClockTimestamp())
+ }
if err != nil {
return false, err
}
-
if testingAggressiveConsistencyChecks {
- if _, err := rq.store.consistencyQueue.process(ctx, repl, confReader); err != nil {
+ if _, err := rq.store.consistencyQueue.process(ctx, repl, confReader, -1 /*priorityAtEnqueue*/); err != nil {
log.KvDistribution.Warningf(ctx, "%v", err)
}
}
-
- if requeue {
- log.KvDistribution.VEventf(ctx, 1, "re-queuing")
- rq.maybeAdd(ctx, repl, rq.store.Clock().NowAsClockTimestamp())
- }
return true, nil
}
@@ -742,7 +809,11 @@ func filterTracingSpans(rec tracingpb.Recording, opNamesToFilter ...string) trac
// logging the resulting traces to the DEV channel in the case of errors or
// when the configured log traces threshold is exceeded.
func (rq *replicateQueue) processOneChangeWithTracing(
- ctx context.Context, repl *Replica, desc *roachpb.RangeDescriptor, conf *roachpb.SpanConfig,
+ ctx context.Context,
+ repl *Replica,
+ desc *roachpb.RangeDescriptor,
+ conf *roachpb.SpanConfig,
+ priorityAtEnqueue float64,
) (requeue bool, _ error) {
processStart := timeutil.Now()
startTracing := log.ExpensiveLogEnabled(ctx, 1)
@@ -757,7 +828,7 @@ func (rq *replicateQueue) processOneChangeWithTracing(
defer sp.Finish()
requeue, err := rq.processOneChange(ctx, repl, desc, conf,
- false /* scatter */, false, /* dryRun */
+ false /* scatter */, false /* dryRun */, priorityAtEnqueue,
)
processDuration := timeutil.Since(processStart)
loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl)
@@ -858,15 +929,21 @@ func ShouldRequeue(
return requeue
}
+// priorityInversionLogEveryN rate limits how often we log about priority
+// inversion to avoid spams.
+var priorityInversionLogEveryN = log.Every(3 * time.Second)
+
func (rq *replicateQueue) processOneChange(
ctx context.Context,
repl *Replica,
desc *roachpb.RangeDescriptor,
conf *roachpb.SpanConfig,
scatter, dryRun bool,
+ priorityAtEnqueue float64,
) (requeue bool, _ error) {
change, err := rq.planner.PlanOneChange(
ctx, repl, desc, conf, plan.PlannerOptions{Scatter: scatter})
+
// When there is an error planning a change, return the error immediately
// and do not requeue. It is unlikely that the range or storepool state
// will change quickly enough in order to not get the same error and
@@ -891,6 +968,30 @@ func (rq *replicateQueue) processOneChange(
return false, nil
}
+ // At this point, planning returned no error, and we're not doing a dry run.
+ // Check for priority inversion if enabled. If detected, we may requeue the
+ // replica to return an error early to requeue the range instead to avoid
+ // starving other higher priority work.
+ if PriorityInversionRequeue.Get(&rq.store.cfg.Settings.SV) {
+ if inversion, shouldRequeue := allocatorimpl.CheckPriorityInversion(priorityAtEnqueue, change.Action); inversion {
+ rq.metrics.PriorityInversionTotal.Inc(1)
+ if priorityInversionLogEveryN.ShouldLog() {
+ log.KvDistribution.Infof(ctx,
+ "priority inversion during process: shouldRequeue = %t action=%s, priority=%v, enqueuePriority=%v",
+ shouldRequeue, change.Action, change.Action.Priority(), priorityAtEnqueue)
+ }
+ if shouldRequeue {
+ rq.metrics.RequeueDueToPriorityInversion.Inc(1)
+ // Return true to requeue the range. Return the error to ensure it is
+ // logged and tracked in replicate queue bq.failures metrics. See
+ // replicateQueue.process for details.
+ return true /*requeue*/, maybeAnnotateDecommissionErr(
+ errors.Errorf("requing due to priority inversion: action=%s, priority=%v, enqueuePriority=%v",
+ change.Action, change.Action.Priority(), priorityAtEnqueue), change.Action)
+ }
+ }
+ }
+
// Track the metrics generated during planning. These are not updated
// directly during planning to avoid pushing the dryRun flag into every
// function.
@@ -922,19 +1023,12 @@ func (rq *replicateQueue) processOneChange(
}
func maybeAnnotateDecommissionErr(err error, action allocatorimpl.AllocatorAction) error {
- if err != nil && isDecommissionAction(action) {
+ if err != nil && action.Decommissioning() {
err = decommissionPurgatoryError{err}
}
return err
}
-func isDecommissionAction(action allocatorimpl.AllocatorAction) bool {
- return action == allocatorimpl.AllocatorRemoveDecommissioningVoter ||
- action == allocatorimpl.AllocatorRemoveDecommissioningNonVoter ||
- action == allocatorimpl.AllocatorReplaceDecommissioningVoter ||
- action == allocatorimpl.AllocatorReplaceDecommissioningNonVoter
-}
-
// shedLease takes in a leaseholder replica, looks for a target for transferring
// the lease and, if a suitable target is found (e.g. alive, not draining),
// transfers the lease away.
diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go
index 1f1e02fcd8e8..372de7f75e94 100644
--- a/pkg/kv/kvserver/replicate_queue_test.go
+++ b/pkg/kv/kvserver/replicate_queue_test.go
@@ -2469,12 +2469,58 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
return nil
})
+ getDecommissioningNudgerMetricValue := func(t *testing.T, tc *testcluster.TestCluster, metricType string,
+ ) int64 {
+ var total int64
+
+ for i := 0; i < tc.NumServers(); i++ {
+ store := tc.GetFirstStoreFromServer(t, i)
+ var value int64
+
+ switch metricType {
+ case "decommissioning_ranges":
+ value = store.Metrics().DecommissioningRangeCount.Value()
+ case "enqueue":
+ value = store.Metrics().DecommissioningNudgerEnqueue.Count()
+ case "enqueue_success":
+ value = store.Metrics().DecommissioningNudgerEnqueueSuccess.Count()
+ case "process_success":
+ value = store.Metrics().DecommissioningNudgerProcessSuccess.Count()
+ default:
+ t.Fatalf("unknown metric type: %s", metricType)
+ }
+
+ total += value
+ }
+ return total
+ }
+
+ initialDecommissioningRanges := getDecommissioningNudgerMetricValue(t, tc, "decommissioning_ranges")
+
// Now add a replica to the decommissioning node and then enable the
// replicate queue. We expect that the replica will be removed after the
// decommissioning replica is noticed via maybeEnqueueProblemRange.
scratchKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx))
tc.ToggleReplicateQueues(true /* active */)
+
+ // Wait for the enqueue logic to trigger and validate metrics were updated.
+ testutils.SucceedsSoon(t, func() error {
+ // TODO(wenyihu6): is there a race condition here where we might not observe
+ // decommissioning_ranges increasing?
+ afterDecommissioningRanges := getDecommissioningNudgerMetricValue(t, tc, "decommissioning_ranges")
+ afterEnqueued := getDecommissioningNudgerMetricValue(t, tc, "enqueue")
+ if afterDecommissioningRanges <= initialDecommissioningRanges {
+ return errors.New("expected DecommissioningRangeCount to increase")
+ }
+ if afterEnqueued <= 0 {
+ return errors.New("expected DecommissioningNudgerEnqueueEnqueued to be greater than 0")
+ }
+
+ return nil
+ })
+
+ // Verify that the decommissioning node has no replicas left.
testutils.SucceedsSoon(t, func() error {
var descs []*roachpb.RangeDescriptor
tc.GetFirstStoreFromServer(t, decommissioningSrvIdx).VisitReplicas(func(r *kvserver.Replica) bool {
@@ -2486,4 +2532,122 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
}
return nil
})
+ afterEnqueueSuccess := getDecommissioningNudgerMetricValue(t, tc, "enqueue_success")
+ require.Greater(t, afterEnqueueSuccess, int64(0))
+ afterProcessSuccess := getDecommissioningNudgerMetricValue(t, tc, "process_success")
+ require.Greater(t, afterProcessSuccess, int64(0))
+}
+
+// TestPriorityInversionRequeue tests that the replicate queue correctly handles
+// priority inversions by requeuing replicas when the PriorityInversionRequeue
+// setting is enabled.
+//
+// This test specifically targets a race condition where:
+// 1. A replica is enqueued for a high-priority repair action
+// (FinalizeAtomicReplicationChange or RemoveLearner).
+// 2. By the time the replica is processed, the repair is no longer needed and
+// only a low-priority rebalance action (ConsiderRebalance) is computed.
+// 3. This creates a priority inversion where a low-priority action blocks
+// other higher-priority replicas in the queue from being processed.
+//
+// The race occurs during range rebalancing:
+// 1. A leaseholder replica of a range is rebalanced from one store to another.
+// 2. The new leaseholder enqueues the replica for repair (e.g. to finalize
+// the atomic replication change or remove a learner replica).
+// 3. Before processing, the old leaseholder has left the atomic joint config
+// state or removed the learner replica. 4. When the new leaseholder processes
+// the replica, it computes a ConsiderRebalance action, causing priority
+// inversion.
+//
+// With PriorityInversionRequeue enabled, the queue should detect this condition
+// and requeue the replica at the correct priority. The test validates this
+// behavior through metrics that track priority inversions and requeuing events.
+func TestPriorityInversionRequeue(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ skip.UnderDuress(t)
+
+ ctx := context.Background()
+ settings := cluster.MakeTestingClusterSettings()
+ kvserver.PriorityInversionRequeue.Override(ctx, &settings.SV, true)
+
+ var scratchRangeID int64
+ atomic.StoreInt64(&scratchRangeID, -1)
+ require.NoError(t, log.SetVModule("queue=5,replicate_queue=5,replica_command=5,replicate=5,replica=5"))
+
+ const newLeaseholderStoreAndNodeID = 4
+ var waitUntilLeavingJoint = func() {}
+
+ tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
+ ReplicationMode: base.ReplicationManual,
+ ServerArgs: base.TestServerArgs{
+ Settings: settings,
+ Knobs: base.TestingKnobs{
+ Store: &kvserver.StoreTestingKnobs{
+ BaseQueueDisabledBypassFilter: func(rangeID roachpb.RangeID) bool {
+ // Disable the replicate queue except for the scratch range on the new leaseholder.
+ t.Logf("range %d is added to replicate queue store", rangeID)
+ return rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID))
+ },
+ BaseQueuePostEnqueueInterceptor: func(storeID roachpb.StoreID, rangeID roachpb.RangeID) {
+ // After enqueuing, wait for the old leaseholder to leave the atomic
+ // joint config state or remove the learner replica to force the
+ // priority inversion.
+ t.Logf("waiting for %d to leave joint config", rangeID)
+ if storeID == 4 && rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) {
+ waitUntilLeavingJoint()
+ }
+ },
+ },
+ },
+ },
+ })
+ defer tc.Stopper().Stop(ctx)
+
+ scratchKey := tc.ScratchRange(t)
+
+ // Wait until the old leaseholder has left the atomic joint config state or
+ // removed the learner replica.
+ waitUntilLeavingJoint = func() {
+ testutils.SucceedsSoon(t, func() error {
+ rangeDesc := tc.LookupRangeOrFatal(t, scratchKey)
+ replicas := rangeDesc.Replicas()
+ t.Logf("range %v: waiting to leave joint conf", rangeDesc)
+ if replicas.InAtomicReplicationChange() || len(replicas.LearnerDescriptors()) != 0 {
+ return errors.Newf("in between atomic changes: %v", replicas)
+ }
+ return nil
+ })
+ }
+
+ scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
+ tc.AddVotersOrFatal(t, scratchRange.StartKey.AsRawKey(), tc.Targets(1, 2)...)
+ atomic.StoreInt64(&scratchRangeID, int64(scratchRange.RangeID))
+ lh, err := tc.FindRangeLeaseHolder(scratchRange, nil)
+ require.NoError(t, err)
+
+ // Rebalance the leaseholder replica to a new store. This will cause the race
+ // condition where the new leaseholder can enqueue a replica to replicate
+ // queue with high priority but compute a low priority action at processing
+ // time.
+ t.Logf("rebalancing range %d from s%d to s%d", scratchRange, lh.StoreID, newLeaseholderStoreAndNodeID)
+ _, err = tc.RebalanceVoter(
+ ctx,
+ scratchRange.StartKey.AsRawKey(),
+ roachpb.ReplicationTarget{StoreID: lh.StoreID, NodeID: lh.NodeID}, /* src */
+ roachpb.ReplicationTarget{StoreID: newLeaseholderStoreAndNodeID, NodeID: newLeaseholderStoreAndNodeID}, /* dest */
+ )
+ require.NoError(t, err)
+
+ // Wait until the priority inversion is detected and the replica is requeued.
+ testutils.SucceedsSoon(t, func() error {
+ store := tc.GetFirstStoreFromServer(t, 3)
+ if c := store.ReplicateQueueMetrics().PriorityInversionTotal.Count(); c == 0 {
+ return errors.New("expected non-zero priority inversion total count but got 0")
+ }
+ if c := store.ReplicateQueueMetrics().RequeueDueToPriorityInversion.Count(); c == 0 {
+ return errors.New("expected to requeue due to priority inversion but got 0")
+ }
+ return nil
+ })
}
diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go
index 170964f3d5cb..37779e5096a5 100644
--- a/pkg/kv/kvserver/split_queue.go
+++ b/pkg/kv/kvserver/split_queue.go
@@ -219,7 +219,7 @@ var _ PurgatoryError = unsplittableRangeError{}
// process synchronously invokes admin split for each proposed split key.
func (sq *splitQueue) process(
- ctx context.Context, r *Replica, confReader spanconfig.StoreReader,
+ ctx context.Context, r *Replica, confReader spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
processed, err = sq.processAttemptWithTracing(ctx, r, confReader)
if errors.HasType(err, (*kvpb.ConditionFailedError)(nil)) {
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index af919377613c..00233c5df87a 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -3354,6 +3354,12 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
ioOverload, _ = s.ioThreshold.t.Score()
s.ioThreshold.Unlock()
+ // TODO(wenyihu6): it would be nicer if we can sort the replicas so that we
+ // can always get the nudger story on the same set of replicas, will this
+ // introduce a lot of overhead? For now, it seems fine since we usually see <
+ // 15 ranges on decommission stall.
+ var logBudgetOnDecommissioningNudger = 15
+
// We want to avoid having to read this multiple times during the replica
// visiting, so load it once up front for all nodes.
livenessMap := s.cfg.NodeLiveness.ScanNodeVitalityFromCache()
@@ -3417,7 +3423,11 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
if metrics.Decommissioning {
// NB: Enqueue is disabled by default from here and throttled async if
// enabled.
- rep.maybeEnqueueProblemRange(ctx, goNow, metrics.LeaseValid, metrics.Leaseholder)
+ maybeLog := logBudgetOnDecommissioningNudger > 0
+ if maybeLog {
+ logBudgetOnDecommissioningNudger--
+ }
+ rep.maybeEnqueueProblemRange(ctx, goNow, metrics.LeaseValid, metrics.Leaseholder, maybeLog)
decommissioningRangeCount++
}
}
@@ -3840,7 +3850,7 @@ func (s *Store) ReplicateQueueDryRun(
return collectAndFinish(), nil
}
_, err = s.replicateQueue.processOneChange(
- ctx, repl, desc, conf, false /* scatter */, true, /* dryRun */
+ ctx, repl, desc, conf, false /* scatter */, true /* dryRun */, -1, /*priorityAtEnqueue*/
)
if err != nil {
log.Eventf(ctx, "error simulating allocator on replica %s: %s", repl, err)
@@ -4019,7 +4029,7 @@ func (s *Store) Enqueue(
}
log.Eventf(ctx, "running %s.process", queueName)
- processed, processErr := qImpl.process(ctx, repl, confReader)
+ processed, processErr := qImpl.process(ctx, repl, confReader, -1 /*priorityAtEnqueue*/)
log.Eventf(ctx, "processed: %t (err: %v)", processed, processErr)
return processErr, nil
}
@@ -4056,7 +4066,7 @@ func (s *Store) PurgeOutdatedReplicas(ctx context.Context, version roachpb.Versi
g.GoCtx(func(ctx context.Context) error {
defer alloc.Release()
- processed, err := s.replicaGCQueue.process(ctx, repl, nil)
+ processed, err := s.replicaGCQueue.process(ctx, repl, nil, -1 /*priorityAtEnqueue*/)
if err != nil {
return errors.Wrapf(err, "on %s", repl.Desc())
}
diff --git a/pkg/kv/kvserver/store_gossip.go b/pkg/kv/kvserver/store_gossip.go
index 3232d213b7f9..a50c70397b1d 100644
--- a/pkg/kv/kvserver/store_gossip.go
+++ b/pkg/kv/kvserver/store_gossip.go
@@ -206,10 +206,10 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
now := s.cfg.Clock.NowAsClockTimestamp()
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
if shouldQueue {
- s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
+ _ = s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
- s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
+ _ = s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}
diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go
index a3da9e7ee11c..e4a2c01c871a 100644
--- a/pkg/kv/kvserver/store_rebalancer_test.go
+++ b/pkg/kv/kvserver/store_rebalancer_test.go
@@ -1860,3 +1860,67 @@ func TestingRaftStatusFn(desc *roachpb.RangeDescriptor, storeID roachpb.StoreID)
}
return status
}
+
+// TestReplicateQueueMaxSize tests the max size of the replicate queue and
+// verifies that replicas are dropped when the max size is exceeded. It also
+// checks that the metric ReplicateQueueDroppedDueToSize is updated correctly.
+func TestReplicateQueueMaxSize(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ tc := testContext{}
+ stopper := stop.NewStopper()
+ ctx := context.Background()
+ defer stopper.Stop(ctx)
+ tc.Start(ctx, t, stopper)
+
+ r, err := tc.store.GetReplica(1)
+ require.NoError(t, err)
+
+ stopper, _, _, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, true /* deterministic */)
+ defer stopper.Stop(ctx)
+ ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 1)
+ replicateQueue := newReplicateQueue(tc.store, a)
+
+ // Helper function to add a replica and verify queue state.
+ verify := func(expectedLength int, expectedDropped int64) {
+ require.Equal(t, expectedLength, replicateQueue.Length())
+ require.Equal(t, expectedDropped, replicateQueue.full.Count())
+ require.Equal(t, expectedDropped, tc.store.metrics.ReplicateQueueFull.Count())
+ }
+
+ addReplicaAndVerify := func(rangeID roachpb.RangeID, expectedLength int, expectedDropped int64) {
+ r.Desc().RangeID = rangeID
+ enqueued, err := replicateQueue.testingAdd(context.Background(), r, 0.0)
+ require.NoError(t, err)
+ require.True(t, enqueued)
+ verify(expectedLength, expectedDropped)
+ }
+
+ // First replica should be added.
+ addReplicaAndVerify(1 /* rangeID */, 1 /* expectedLength */, 0 /* expectedDropped */)
+ // Second replica should be dropped.
+ addReplicaAndVerify(2 /* rangeID */, 1 /* expectedLength */, 1 /* expectedDropped */)
+ // Third replica should be dropped.
+ addReplicaAndVerify(3 /* rangeID */, 1 /* expectedLength */, 2 /* expectedDropped */)
+
+ // Increase the max size to 100 and add more replicas
+ ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100)
+ for i := 2; i <= 100; i++ {
+ // Should be added.
+ addReplicaAndVerify(roachpb.RangeID(i+1 /* rangeID */), i /* expectedLength */, 2 /* expectedDropped */)
+ }
+
+ // Add one more to exceed the max size. Should be dropped.
+ addReplicaAndVerify(102 /* rangeID */, 100 /* expectedLength */, 3 /* expectedDropped */)
+
+ // Reset to the same size should not change the queue length.
+ ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100)
+ verify(100 /* expectedLength */, 3 /* expectedDropped */)
+
+ // Decrease the max size to 10 which should drop 90 replicas.
+ ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 10)
+ verify(10 /* expectedLength */, 93 /* expectedDropped: 3 + 90 */)
+
+ // Should drop another one now that max size is 10.
+ addReplicaAndVerify(103 /* rangeID */, 10 /* expectedLength */, 94 /* expectedDropped: 3 + 90 + 1 */)
+}
diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go
index 7e2d38e30dc5..48208e8f07a7 100644
--- a/pkg/kv/kvserver/testing_knobs.go
+++ b/pkg/kv/kvserver/testing_knobs.go
@@ -527,6 +527,10 @@ type StoreTestingKnobs struct {
// rangeID should ignore the queue being disabled, and be processed anyway.
BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool
+ // BaseQueuePostEnqueueInterceptor is called with the storeID and rangeID of
+ // the replica right after a replica is enqueued (before it is processed)
+ BaseQueuePostEnqueueInterceptor func(storeID roachpb.StoreID, rangeID roachpb.RangeID)
+
// InjectReproposalError injects an error in tryReproposeWithNewLeaseIndexRaftMuLocked.
// If nil is returned, reproposal will be attempted.
InjectReproposalError func(p *ProposalData) error
diff --git a/pkg/kv/kvserver/ts_maintenance_queue.go b/pkg/kv/kvserver/ts_maintenance_queue.go
index 5b047edff3e8..a226f73b6c60 100644
--- a/pkg/kv/kvserver/ts_maintenance_queue.go
+++ b/pkg/kv/kvserver/ts_maintenance_queue.go
@@ -136,7 +136,7 @@ func (q *timeSeriesMaintenanceQueue) shouldQueue(
}
func (q *timeSeriesMaintenanceQueue) process(
- ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
+ ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
) (processed bool, err error) {
desc := repl.Desc()
eng := repl.store.StateEngine()