Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e6d05be
kvserver: add logging & metrics for decommissioning nudger
dodeca12 Aug 19, 2025
c3ee387
allocator: move isDecommissionAction to allocatorimpl
wenyihu6 Aug 25, 2025
9ddbd22
kvserver: simplify logging in maybeEnqueueProblemRange
wenyihu6 Aug 26, 2025
a145b88
kvserver: fix comment when dropping due to exceeding size
wenyihu6 Aug 26, 2025
926f353
kvserver: add logging for ranges dropped from base queue
wenyihu6 Aug 26, 2025
fc3eeb2
kvserver: plumb enqueue time priority
wenyihu6 Aug 22, 2025
c64055e
kvserver: remove priority reset during setProcessing
wenyihu6 Aug 22, 2025
b6f5eac
kvserver: plumb priority at enqueue for purgatory queue
wenyihu6 Aug 22, 2025
b6c5e5a
allocatorimpl: adds a priority assertion to computeAction
wenyihu6 Aug 26, 2025
1a85966
allocatorimpl: add invariants on priority to base queue tests
wenyihu6 Aug 26, 2025
612eff2
allocator: correct logging for priority assertion
wenyihu6 Aug 28, 2025
977fd2b
kvserver: remove bq.replicaCanBeProcessed right before bq.processReplica
wenyihu6 Aug 26, 2025
7f1141a
kvserver: add ReplicateQueueDroppedDueToSize
wenyihu6 Aug 29, 2025
f6fbcdc
kvserver: add ReplicateQueueMaxSize
wenyihu6 Aug 29, 2025
7b1934f
kvserver: add TestReplicateQueueMaxSize
wenyihu6 Aug 29, 2025
e2ff61c
kvserver: drop excess replicas when lowering ReplicateQueueMaxSize
wenyihu6 Aug 29, 2025
39ace7b
kvserver: rename ReplicateQueueDroppedDueToSize to ReplicateQueueFull
wenyihu6 Aug 29, 2025
d8bcf53
kvserver: add PriorityInversionRequeue
wenyihu6 Aug 27, 2025
cd3fd27
kvserver: requeue on priority inversion for replicate queue
wenyihu6 Aug 27, 2025
5fefded
kvserver: use priorityInversionLogEveryN
wenyihu6 Aug 28, 2025
7914bde
kvserver: improve comments for PriorityInversionRequeue
wenyihu6 Aug 29, 2025
7bcae61
allocator: small refactor for CheckPriorityInversion
wenyihu6 Aug 29, 2025
c8a882a
allocator: add TestAllocatorPriorityInvariance
wenyihu6 Aug 29, 2025
1703464
kvserver: guard inversion check and requeue behind PriorityInversionR…
wenyihu6 Aug 29, 2025
0f19382
kvserver: move priority inversion check before applyChange
wenyihu6 Sep 1, 2025
ed70f3c
kvserver: check for requeue before error checking in rq.process
wenyihu6 Sep 2, 2025
871a85d
kvserver: use non-blocking send on errors for maybeBackpressureBatch
wenyihu6 Aug 31, 2025
b17a4c7
kvserver: return baseQueueAsyncRateLimited from bq.Async
wenyihu6 Aug 29, 2025
900b3e2
kvserver: add onProcessResult and onEnqueueResult to processCallback
wenyihu6 Aug 28, 2025
0c7917c
kvserver: add TestBaseQueueCallback
wenyihu6 Aug 30, 2025
9eee9e1
kvserver: better comments for on processCallback
wenyihu6 Sep 2, 2025
2d301d0
kvserver: treat priority update as a success with onEnqueueResult
wenyihu6 Sep 2, 2025
55f2293
kvserver: rename processCallback processCallback to cb processCallback
wenyihu6 Sep 2, 2025
cb21338
kvserver: call cb.onEnqueueResult in defer on errors
wenyihu6 Sep 2, 2025
28ec8d4
fixup! kvserver: treat priority update as a success with onEnqueueResult
wenyihu6 Sep 2, 2025
f83869f
kvserver: allow logs from callbacks up to 15 replicas per updateRepli…
wenyihu6 Sep 2, 2025
131cc9e
kvserver: rename shouldLog to maybeLog and change vlevel to a var
wenyihu6 Sep 3, 2025
02dfa8f
kvserver: improve observability with decommission nudger
wenyihu6 Aug 31, 2025
fc26f6c
kvserver: add enqueue metrics to base queue
wenyihu6 Sep 3, 2025
47425ed
kvserver: move bq.enqueueAdd update to be outside of defer
wenyihu6 Sep 3, 2025
c301eba
kvserver: test metrics in TestBaseQueueCallback* and TestReplicateQue…
wenyihu6 Sep 3, 2025
b7eeaee
kvserver: track priority inversion in replicate queue metrics
wenyihu6 Aug 27, 2025
c354041
kvserver: add TestPriorityInversionRequeue
wenyihu6 Aug 27, 2025
9a7fb40
kvserver: delete per action priority inversion metrics
wenyihu6 Sep 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,19 @@
<tr><td>STORAGE</td><td>queue.replicate.addreplica.error</td><td>Number of failed replica additions processed by the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.addreplica.success</td><td>Number of successful replica additions processed by the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.addvoterreplica</td><td>Number of voter replica additions attempted by the replicate queue</td><td>Replica Additions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.enqueue.add</td><td>Number of replicas successfully added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.enqueue.failedprecondition</td><td>Number of replicas that failed the precondition checks and were therefore not added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.enqueue.noaction</td><td>Number of replicas for which ShouldQueue determined no action was needed and were therefore not added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.enqueue.unexpectederror</td><td>Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to add to the replicate queue directly), but failed to be enqueued due to unexpected errors</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.nonvoterpromotions</td><td>Number of non-voters promoted to voters by the replicate queue</td><td>Promotions of Non Voters to Voters</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.pending</td><td>Number of pending replicas in the replicate queue</td><td>Replicas</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.priority_inversion.requeue</td><td>Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time. When the priority has changed from a high priority repair action to rebalance, the change is requeued to avoid unfairness.</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.priority_inversion.total</td><td>Total number of priority inversions in the replicate queue. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.process.failure</td><td>Number of replicas which failed processing in the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.process.success</td><td>Number of replicas successfully processed by the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.processingnanos</td><td>Nanoseconds spent processing replicas in the replicate queue</td><td>Processing Time</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.purgatory</td><td>Number of replicas in the replicate queue&#39;s purgatory, awaiting allocation options</td><td>Replicas</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.queue_full</td><td>Number of times a replica was dropped from the queue due to queue fullness</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.rebalancenonvoterreplica</td><td>Number of non-voter replica rebalancer-initiated additions attempted by the replicate queue</td><td>Replica Additions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.rebalancereplica</td><td>Number of replica rebalancer-initiated additions attempted by the replicate queue</td><td>Replica Additions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.replicate.rebalancevoterreplica</td><td>Number of voter replica rebalancer-initiated additions attempted by the replicate queue</td><td>Replica Additions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down Expand Up @@ -596,6 +603,12 @@
<tr><td>STORAGE</td><td>rangekeycount</td><td>Count of all range keys (e.g. MVCC range tombstones)</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges</td><td>Number of ranges</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning</td><td>Number of ranges with at lease one replica on a decommissioning node</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning.nudger.enqueue</td><td>Number of enqueued enqueues of a range for decommissioning by the decommissioning nudger. Note: This metric tracks when the nudger attempts to enqueue, but the replica might not end up being enqueued by the priority queue due to various filtering or failure conditions.</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning.nudger.enqueue.failure</td><td>Number of ranges that failed to enqueue at the replicate queue</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning.nudger.enqueue.success</td><td>Number of ranges that were successfully enqueued by the decommisioning nudger</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease</td><td>Number of ranges that were not the leaseholder or had an invalid lease at the decommissioning nudger</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning.nudger.process.failure</td><td>Number of ranges enqueued by the decommissioning nudger that failed to process by the replicate queue</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning.nudger.process.success</td><td>Number of ranges enqueued by the decommissioning nudger that were successfully processed by the replicate queue</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>ranges.overreplicated</td><td>Number of ranges with more live replicas than the replication target</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.unavailable</td><td>Number of ranges with fewer live replicas than needed for quorum</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.underreplicated</td><td>Number of ranges with fewer live replicas than the replication target</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/stop",
Expand Down
152 changes: 150 additions & 2 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -138,6 +139,7 @@ const (
AllocatorConsiderRebalance
AllocatorRangeUnavailable
AllocatorFinalizeAtomicReplicationChange
AllocatorMaxPriority
)

// Add indicates an action adding a replica.
Expand All @@ -163,6 +165,15 @@ func (a AllocatorAction) Remove() bool {
a == AllocatorRemoveDecommissioningNonVoter
}

// Decommissioning indicates an action replacing or removing a decommissioning
// replicas.
func (a AllocatorAction) Decommissioning() bool {
return a == AllocatorRemoveDecommissioningVoter ||
a == AllocatorRemoveDecommissioningNonVoter ||
a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorReplaceDecommissioningNonVoter
}

// TargetReplicaType returns that the action is for a voter or non-voter replica.
func (a AllocatorAction) TargetReplicaType() TargetReplicaType {
var t TargetReplicaType
Expand Down Expand Up @@ -242,10 +253,27 @@ func (a AllocatorAction) SafeValue() {}
// range. Within a given range, the ordering of the various checks inside
// `Allocator.computeAction` determines which repair/rebalancing actions are
// taken before the others.
//
// NB: Priorities should be non-negative and should be spaced in multiples of
// 100 unless you believe they should belong to the same priority category.
// AllocatorNoop should have the lowest priority. CheckPriorityInversion depends
// on this contract. In most cases, the allocator returns a priority that
// matches the definitions below. For AllocatorAddVoter,
// AllocatorRemoveDeadVoter, and AllocatorRemoveVoter, the priority may be
// adjusted (see ComputeAction for details), but the adjustment is expected to
// be small (<49).
//
// Exceptions: AllocatorFinalizeAtomicReplicationChange, AllocatorRemoveLearner,
// and AllocatorReplaceDeadVoter violates the spacing of 100. These cases
// predate this comment, so we allow them as they belong to the same general
// priority category.
func (a AllocatorAction) Priority() float64 {
const maxPriority = 12002
switch a {
case AllocatorMaxPriority:
return maxPriority
case AllocatorFinalizeAtomicReplicationChange:
return 12002
return maxPriority
case AllocatorRemoveLearner:
return 12001
case AllocatorReplaceDeadVoter:
Expand Down Expand Up @@ -946,10 +974,68 @@ func (a *Allocator) ComputeAction(
return action, action.Priority()
}

return a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
action, priority = a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
desc.Replicas().NonVoterDescriptors())
// Ensure that priority is never -1. Typically, computeAction return
// action.Priority(), but we sometimes modify the priority for specific
// actions like AllocatorAddVoter, AllocatorRemoveDeadVoter, and
// AllocatorRemoveVoter. A priority of -1 is a special case, indicating that
// the caller expects the processing logic to be invoked even if there's a
// priority inversion. If the priority is not -1, the range might be re-queued
// to be processed with the correct priority.
if priority == -1 {
if buildutil.CrdbTestBuild {
log.Fatalf(ctx, "allocator returned -1 priority for range %s: %v", desc, action)
} else {
log.Warningf(ctx, "allocator returned -1 priority for range %s: %v", desc, action)
}
}
return action, priority
}

// computeAction determines the action to take on a range along with its
// priority.
//
// NB: The returned priority may include a small adjustment and therefore might
// not exactly match action.Priority(). See AllocatorAddVoter,
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter below. The adjustment should
// be <49 with two assumptions below. New uses on this contract should be
// avoided since the assumptions are not strong guarantees (especially the
// second one).
//
// The claim that the adjustment is < 49 has two assumptions:
// 1. min(num_replicas,total_nodes) in zone configuration is < 98.
// 2. when ranges are not under-replicated, the difference between
// min(num_replicas,total_nodes)/2-1 and existing_replicas is < 49.
//
// neededVoters <= min(num_replicas,total_nodes)
// desiredQuorum = neededVoters/2-1
// quorum = haveVoters/2-1
//
// For AllocatorAddVoter, we know haveVoters < neededVoters
// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
// To find the worst case (largest adjustment),
// 1. haveVoters = neededVoters-1,
// adjustment = neededVoters/2-1-(neededVoters-1)
// = neededVoters/2-neededVoters = -neededVoters/2
// 2. haveVoters = 0
// adjustement = neededVoters/2-1
//
// In order for adjustment to be <49, neededVoters/2<49 => neededVoters<98.
// Hence the first assumption.
//
// For AllocatorRemoveDeadVoter, we know haveVoters >= neededVoters
// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
// To find the worst case (largest adjustment),
// 1. neededVoters/2-1 is much larger than haveVoters: given haveVoters >=
// neededVoters, haveVoters/2-1 >= neededVoters/2-1. So this case is impossible.
// 2. neededVoters/2-1 is much smaller than haveVoters: since ranges could be
// over-replicated, theoretically speaking, there may be no upper bounds on
// haveVoters. In order for adjustment to be < 49, we can only make an
// assumption here that the difference between neededVoters/2-1 and haveVoters
// cannot be >= 49 in this case.
//
// For AllocatorRemoveVoter, adjustment is haveVoters%2 = 0 or 1 < 49.
func (a *Allocator) computeAction(
ctx context.Context,
storePool storepool.AllocatorStorePool,
Expand Down Expand Up @@ -3220,3 +3306,65 @@ func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID {
}
return ret
}

// roundToNearestPriorityCategory rounds a priority to the nearest 100. n should
// be non-negative.
func roundToNearestPriorityCategory(n float64) float64 {
return math.Round(n/100.0) * 100
}

// CheckPriorityInversion returns whether there was a priority inversion (and
// the range should not be processed at this time, since doing so could starve
// higher-priority items), and whether the caller should re-add the range to the
// queue (presumably under its new priority). A priority inversion happens if
// the priority at enqueue time is higher than the priority corresponding to the
// action computed at processing time. Caller should re-add the range to the
// queue if it has gone from a repair action to lowest priority
// (AllocatorConsiderRebalance).
//
// Note: Changing from AllocatorRangeUnavailable/AllocatorNoop to
// AllocatorConsiderRebalance is not treated as a priority inversion. Going from
// a repair action to AllocatorRangeUnavailable/AllocatorNoop is considered a
// priority inversion but shouldRequeue = false.
//
// INVARIANT: shouldRequeue => isInversion
func CheckPriorityInversion(
priorityAtEnqueue float64, actionAtProcessing AllocatorAction,
) (isInversion bool, shouldRequeue bool) {
// NB: priorityAtEnqueue is -1 for callers such as scatter, dry runs, and
// manual queue runs. Priority inversion does not apply to these calls.
if priorityAtEnqueue == -1 {
return false, false
}

// NB: we need to check for when priorityAtEnqueue falls within the range
// of the allocator actions because store.Enqueue might enqueue things with
// a very high priority (1e5). In those cases, we do not want to requeue
// these actions or count it as an inversion.
withinPriorityRange := func(priority float64) bool {
return AllocatorNoop.Priority() <= priority && priority <= AllocatorMaxPriority.Priority()
}
if !withinPriorityRange(priorityAtEnqueue) {
return false, false
}

if priorityAtEnqueue > AllocatorConsiderRebalance.Priority() && actionAtProcessing == AllocatorConsiderRebalance {
return true, true
}

// NB: Usually, the priority at enqueue time should correspond to
// action.Priority(). However, for AllocatorAddVoter,
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter, the priority can be
// adjusted at enqueue time (See ComputeAction for more details). However, we
// expect the adjustment to be relatively small (<49). So we round the
// priority to the nearest 100 to compare against
// actionAtProcessing.Priority(). Without this rounding, we might treat going
// from 10000 to 999 as an inversion, but it was just due to the adjustment.
// Note that priorities at AllocatorFinalizeAtomicReplicationChange,
// AllocatorRemoveLearner, and AllocatorReplaceDeadVoter will be rounded to
// the same priority. They are so close to each other, so we don't really
// count it as an inversion among them.
normPriorityAtEnqueue := roundToNearestPriorityCategory(priorityAtEnqueue)
isInversion = normPriorityAtEnqueue > actionAtProcessing.Priority()
return isInversion, false
}
Loading