diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index c34161962b86..4886a3bcc6b3 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -156,6 +156,11 @@ type processCallback struct { // 4. If the replica is already in the queue and processing. // - May be skipped if the replica is already in queue and no priority changes // occur. + // + // Note that the callback may be invoked with (during bq.addInternal) or + // without holding the lock (bq.AddAsyncWithCallback, bq.SetMaxSize, defer in + // bq.addInternal) on baseQueue.mu. Important that the callback does not take + // too long to execute. onEnqueueResult func(indexOnHeap int, err error) // onProcessResult is called with the result of any process attempts. It is @@ -165,6 +170,10 @@ type processCallback struct { // re-processing. // - May be skipped if the replica is removed with removeFromReplicaSetLocked // or registered with a new replica id before processing begins. + // + // Note that the callback may be invoked with (during bq.MaybeAddCallback) or + // without holding the lock (bq.finishProcessingReplica) on baseQueue.mu. + // Important that the callback does not take too long to execute. onProcessResult func(err error) } @@ -628,18 +637,37 @@ func (bq *baseQueue) SetDisabled(disabled bool) { // SetMaxSize sets the max size of the queue. func (bq *baseQueue) SetMaxSize(maxSize int64) { - bq.mu.Lock() - defer bq.mu.Unlock() - bq.mu.maxSize = maxSize - // Drop replicas until no longer exceeding the max size. Note: We call - // removeLocked to match the behavior of addInternal. In theory, only - // removeFromQueueLocked should be triggered in removeLocked, since the item - // is in the priority queue, it should not be processing or in the purgatory - // queue. To be safe, however, we use removeLocked. - for int64(bq.mu.priorityQ.Len()) > maxSize { - pqLen := bq.mu.priorityQ.Len() - bq.full.Inc(1) - bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1]) + var droppedItems []*replicaItem + func() { + bq.mu.Lock() + defer bq.mu.Unlock() + bq.mu.maxSize = maxSize + currentLen := int64(bq.mu.priorityQ.Len()) + if currentLen > maxSize { + itemsToDrop := currentLen - maxSize + droppedItems = make([]*replicaItem, 0, itemsToDrop) + + // Drop lower-priority replicas until no longer exceeding the max size. + // Note: We call removeLocked to match the behavior of addInternal. In + // theory, only removeFromQueueLocked should be triggered in removeLocked, + // since the item is in the priority queue, it should not be processing or + // in the purgatory queue. To be safe, however, we use removeLocked. + for int64(bq.mu.priorityQ.Len()) > maxSize { + lastIdx := bq.mu.priorityQ.Len() - 1 + item := bq.mu.priorityQ.sl[lastIdx] + droppedItems = append(droppedItems, item) + bq.removeLocked(item) + } + } + }() + + // Notify callbacks outside the lock to avoid holding onto the lock for too + // long. + for _, item := range droppedItems { + bq.updateMetricsOnDroppedDueToFullQueue() + for _, cb := range item.callbacks { + cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize) + } } } @@ -809,6 +837,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueAdd() { } } +// updateMetricsOnDroppedDueToFullQueue updates the metrics when a replica is +// dropped due to a full queue size. +func (bq *baseQueue) updateMetricsOnDroppedDueToFullQueue() { + if bq.full != nil { + bq.full.Inc(1) + } +} + func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) { ctx = repl.AnnotateCtx(ctx) ctx = bq.AnnotateCtx(ctx) @@ -987,9 +1023,7 @@ func (bq *baseQueue) addInternal( // scan. if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize { replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] - if bq.full != nil { - bq.full.Inc(1) - } + bq.updateMetricsOnDroppedDueToFullQueue() log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", priority, replicaItemToDrop.replicaID) // TODO(wenyihu6): when we introduce base queue max size cluster setting, diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index abdb51e1fba6..3929b2bb3b4c 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -1503,6 +1503,43 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { require.Equal(t, int64(1), bq.enqueueAdd.Count()) require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) }) + t.Run("queuesizeshrinking", func(t *testing.T) { + testQueue := &testQueueImpl{} + const oldMaxSize = 15 + const newMaxSize = 5 + expectedEnqueueErrorCount := oldMaxSize - newMaxSize + bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: oldMaxSize}) + r, err := tc.store.GetReplica(1) + require.NoError(t, err) + var enqueueErrorCount atomic.Int64 + // Max size is 10, so the replica should be enqueued. + for i := 0; i < oldMaxSize; i++ { + r.Desc().RangeID = roachpb.RangeID(i + 1) + queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{ + onEnqueueResult: func(indexOnHeap int, err error) { + if err != nil { + enqueueErrorCount.Add(1) + } + }, + onProcessResult: func(err error) { + t.Fatal("unexpected call to onProcessResult") + }, + }) + require.True(t, queued) + } + require.Equal(t, int64(oldMaxSize), bq.enqueueAdd.Count()) + require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) + + // Set max size to 5 and add more replicas. + bq.SetMaxSize(newMaxSize) + testutils.SucceedsSoon(t, func() error { + if enqueueErrorCount.Load() != int64(expectedEnqueueErrorCount) { + return errors.Errorf("expected %d enqueue errors; got %d", + expectedEnqueueErrorCount, enqueueErrorCount.Load()) + } + return nil + }) + }) } // TestBaseQueueCallbackOnProcessResult tests that the processCallback is