From 5280ea4f0e424d308d10d8361c03c92cdfc007c6 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Thu, 4 Sep 2025 22:28:25 -0400 Subject: [PATCH] kvserver: invoke callback when dropping replicas in bq.SetMaxSize MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, when SetMaxSize shrank the queue, replicas were dropped from the priority queue without invoking their callbacks. This commit ensures callbacks are properly invoked when SetMaxSize drops replicas. Replicas removed via removeFromReplicaSetLocked (such as when a replica is destroyed) still don’t always have their callbacks invoked. While the natural place to invoke the callback would be at removeFromReplicaSetLocked, invoking callbacks while holding a lock risks blocking for too long. (We are doing this already for addInternal though.) This PR focuses specifically on handling the SetMaxSize case since this PR is intended to be backported. We can follow up with a more complex but more principled approach on master if needed in the future. --- pkg/kv/kvserver/queue.go | 64 +++++++++++++++++++++++++++-------- pkg/kv/kvserver/queue_test.go | 37 ++++++++++++++++++++ 2 files changed, 86 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index c34161962b86..4886a3bcc6b3 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -156,6 +156,11 @@ type processCallback struct { // 4. If the replica is already in the queue and processing. // - May be skipped if the replica is already in queue and no priority changes // occur. + // + // Note that the callback may be invoked with (during bq.addInternal) or + // without holding the lock (bq.AddAsyncWithCallback, bq.SetMaxSize, defer in + // bq.addInternal) on baseQueue.mu. Important that the callback does not take + // too long to execute. onEnqueueResult func(indexOnHeap int, err error) // onProcessResult is called with the result of any process attempts. It is @@ -165,6 +170,10 @@ type processCallback struct { // re-processing. // - May be skipped if the replica is removed with removeFromReplicaSetLocked // or registered with a new replica id before processing begins. + // + // Note that the callback may be invoked with (during bq.MaybeAddCallback) or + // without holding the lock (bq.finishProcessingReplica) on baseQueue.mu. + // Important that the callback does not take too long to execute. onProcessResult func(err error) } @@ -628,18 +637,37 @@ func (bq *baseQueue) SetDisabled(disabled bool) { // SetMaxSize sets the max size of the queue. func (bq *baseQueue) SetMaxSize(maxSize int64) { - bq.mu.Lock() - defer bq.mu.Unlock() - bq.mu.maxSize = maxSize - // Drop replicas until no longer exceeding the max size. Note: We call - // removeLocked to match the behavior of addInternal. In theory, only - // removeFromQueueLocked should be triggered in removeLocked, since the item - // is in the priority queue, it should not be processing or in the purgatory - // queue. To be safe, however, we use removeLocked. - for int64(bq.mu.priorityQ.Len()) > maxSize { - pqLen := bq.mu.priorityQ.Len() - bq.full.Inc(1) - bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1]) + var droppedItems []*replicaItem + func() { + bq.mu.Lock() + defer bq.mu.Unlock() + bq.mu.maxSize = maxSize + currentLen := int64(bq.mu.priorityQ.Len()) + if currentLen > maxSize { + itemsToDrop := currentLen - maxSize + droppedItems = make([]*replicaItem, 0, itemsToDrop) + + // Drop lower-priority replicas until no longer exceeding the max size. + // Note: We call removeLocked to match the behavior of addInternal. In + // theory, only removeFromQueueLocked should be triggered in removeLocked, + // since the item is in the priority queue, it should not be processing or + // in the purgatory queue. To be safe, however, we use removeLocked. + for int64(bq.mu.priorityQ.Len()) > maxSize { + lastIdx := bq.mu.priorityQ.Len() - 1 + item := bq.mu.priorityQ.sl[lastIdx] + droppedItems = append(droppedItems, item) + bq.removeLocked(item) + } + } + }() + + // Notify callbacks outside the lock to avoid holding onto the lock for too + // long. + for _, item := range droppedItems { + bq.updateMetricsOnDroppedDueToFullQueue() + for _, cb := range item.callbacks { + cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize) + } } } @@ -809,6 +837,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueAdd() { } } +// updateMetricsOnDroppedDueToFullQueue updates the metrics when a replica is +// dropped due to a full queue size. +func (bq *baseQueue) updateMetricsOnDroppedDueToFullQueue() { + if bq.full != nil { + bq.full.Inc(1) + } +} + func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) { ctx = repl.AnnotateCtx(ctx) ctx = bq.AnnotateCtx(ctx) @@ -987,9 +1023,7 @@ func (bq *baseQueue) addInternal( // scan. if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize { replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] - if bq.full != nil { - bq.full.Inc(1) - } + bq.updateMetricsOnDroppedDueToFullQueue() log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", priority, replicaItemToDrop.replicaID) // TODO(wenyihu6): when we introduce base queue max size cluster setting, diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index abdb51e1fba6..3929b2bb3b4c 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -1503,6 +1503,43 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { require.Equal(t, int64(1), bq.enqueueAdd.Count()) require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) }) + t.Run("queuesizeshrinking", func(t *testing.T) { + testQueue := &testQueueImpl{} + const oldMaxSize = 15 + const newMaxSize = 5 + expectedEnqueueErrorCount := oldMaxSize - newMaxSize + bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: oldMaxSize}) + r, err := tc.store.GetReplica(1) + require.NoError(t, err) + var enqueueErrorCount atomic.Int64 + // Max size is 10, so the replica should be enqueued. + for i := 0; i < oldMaxSize; i++ { + r.Desc().RangeID = roachpb.RangeID(i + 1) + queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{ + onEnqueueResult: func(indexOnHeap int, err error) { + if err != nil { + enqueueErrorCount.Add(1) + } + }, + onProcessResult: func(err error) { + t.Fatal("unexpected call to onProcessResult") + }, + }) + require.True(t, queued) + } + require.Equal(t, int64(oldMaxSize), bq.enqueueAdd.Count()) + require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) + + // Set max size to 5 and add more replicas. + bq.SetMaxSize(newMaxSize) + testutils.SucceedsSoon(t, func() error { + if enqueueErrorCount.Load() != int64(expectedEnqueueErrorCount) { + return errors.Errorf("expected %d enqueue errors; got %d", + expectedEnqueueErrorCount, enqueueErrorCount.Load()) + } + return nil + }) + }) } // TestBaseQueueCallbackOnProcessResult tests that the processCallback is