Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 49 additions & 15 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ type processCallback struct {
// 4. If the replica is already in the queue and processing.
// - May be skipped if the replica is already in queue and no priority changes
// occur.
//
// Note that the callback may be invoked with (during bq.addInternal) or
// without holding the lock (bq.AddAsyncWithCallback, bq.SetMaxSize, defer in
// bq.addInternal) on baseQueue.mu. Important that the callback does not take
// too long to execute.
onEnqueueResult func(indexOnHeap int, err error)

// onProcessResult is called with the result of any process attempts. It is
Expand All @@ -165,6 +170,10 @@ type processCallback struct {
// re-processing.
// - May be skipped if the replica is removed with removeFromReplicaSetLocked
// or registered with a new replica id before processing begins.
//
// Note that the callback may be invoked with (during bq.MaybeAddCallback) or
// without holding the lock (bq.finishProcessingReplica) on baseQueue.mu.
// Important that the callback does not take too long to execute.
onProcessResult func(err error)
}

Expand Down Expand Up @@ -624,18 +633,37 @@ func (bq *baseQueue) SetDisabled(disabled bool) {

// SetMaxSize sets the max size of the queue.
func (bq *baseQueue) SetMaxSize(maxSize int64) {
bq.mu.Lock()
defer bq.mu.Unlock()
bq.mu.maxSize = maxSize
// Drop replicas until no longer exceeding the max size. Note: We call
// removeLocked to match the behavior of addInternal. In theory, only
// removeFromQueueLocked should be triggered in removeLocked, since the item
// is in the priority queue, it should not be processing or in the purgatory
// queue. To be safe, however, we use removeLocked.
for int64(bq.mu.priorityQ.Len()) > maxSize {
pqLen := bq.mu.priorityQ.Len()
bq.full.Inc(1)
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
var droppedItems []*replicaItem
func() {
bq.mu.Lock()
defer bq.mu.Unlock()
bq.mu.maxSize = maxSize
currentLen := int64(bq.mu.priorityQ.Len())
if currentLen > maxSize {
itemsToDrop := currentLen - maxSize
droppedItems = make([]*replicaItem, 0, itemsToDrop)

// Drop lower-priority replicas until no longer exceeding the max size.
// Note: We call removeLocked to match the behavior of addInternal. In
// theory, only removeFromQueueLocked should be triggered in removeLocked,
// since the item is in the priority queue, it should not be processing or
// in the purgatory queue. To be safe, however, we use removeLocked.
for int64(bq.mu.priorityQ.Len()) > maxSize {
lastIdx := bq.mu.priorityQ.Len() - 1
item := bq.mu.priorityQ.sl[lastIdx]
droppedItems = append(droppedItems, item)
bq.removeLocked(item)
}
}
}()

// Notify callbacks outside the lock to avoid holding onto the lock for too
// long.
for _, item := range droppedItems {
bq.updateMetricsOnDroppedDueToFullQueue()
for _, cb := range item.callbacks {
cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize)
}
}
}

Expand Down Expand Up @@ -805,6 +833,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueAdd() {
}
}

// updateMetricsOnDroppedDueToFullQueue updates the metrics when a replica is
// dropped due to a full queue size.
func (bq *baseQueue) updateMetricsOnDroppedDueToFullQueue() {
if bq.full != nil {
bq.full.Inc(1)
}
}

func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
ctx = repl.AnnotateCtx(ctx)
ctx = bq.AnnotateCtx(ctx)
Expand Down Expand Up @@ -983,9 +1019,7 @@ func (bq *baseQueue) addInternal(
// scan.
if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize {
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
if bq.full != nil {
bq.full.Inc(1)
}
bq.updateMetricsOnDroppedDueToFullQueue()
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
priority, replicaItemToDrop.replicaID)
// TODO(wenyihu6): when we introduce base queue max size cluster setting,
Expand Down
37 changes: 37 additions & 0 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,43 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
require.Equal(t, int64(1), bq.enqueueAdd.Count())
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
})
t.Run("queuesizeshrinking", func(t *testing.T) {
testQueue := &testQueueImpl{}
const oldMaxSize = 15
const newMaxSize = 5
expectedEnqueueErrorCount := oldMaxSize - newMaxSize
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: oldMaxSize})
r, err := tc.store.GetReplica(1)
require.NoError(t, err)
var enqueueErrorCount atomic.Int64
// Max size is 10, so the replica should be enqueued.
for i := 0; i < oldMaxSize; i++ {
r.Desc().RangeID = roachpb.RangeID(i + 1)
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
onEnqueueResult: func(indexOnHeap int, err error) {
if err != nil {
enqueueErrorCount.Add(1)
}
},
onProcessResult: func(err error) {
t.Fatal("unexpected call to onProcessResult")
},
})
require.True(t, queued)
}
require.Equal(t, int64(oldMaxSize), bq.enqueueAdd.Count())
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())

// Set max size to 5 and add more replicas.
bq.SetMaxSize(newMaxSize)
testutils.SucceedsSoon(t, func() error {
if enqueueErrorCount.Load() != int64(expectedEnqueueErrorCount) {
return errors.Errorf("expected %d enqueue errors; got %d",
expectedEnqueueErrorCount, enqueueErrorCount.Load())
}
return nil
})
})
}

// TestBaseQueueCallbackOnProcessResult tests that the processCallback is
Expand Down