Skip to content

Commit

Permalink
Fix queue test (#30646)
Browse files Browse the repository at this point in the history
Fix #30643

The old test code is not stable due to the data-race described in the
TODO added at that time.

Make it stable, and remove a debug-only field from old test code.
  • Loading branch information
wxiaoguang authored Apr 22, 2024
1 parent 99c5683 commit e610395
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
18 changes: 11 additions & 7 deletions modules/queue/workergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
// This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers,
// so no need to hugely refactor at the moment.
q.workerNumMu.Lock()
noWorker := q.workerNum == 0
if full || noWorker {
Expand Down Expand Up @@ -136,6 +138,14 @@ func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
return true
}

func resetIdleTicker(t *time.Ticker, dur time.Duration) {
t.Reset(dur)
select {
case <-t.C:
default:
}
}

// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
wp.wg.Add(1)
Expand All @@ -146,8 +156,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
log.Debug("Queue %q starts new worker", q.GetName())
defer log.Debug("Queue %q stops idle worker", q.GetName())

atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging

t := time.NewTicker(workerIdleDuration)
defer t.Stop()

Expand All @@ -169,11 +177,7 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
}
q.doWorkerHandle(batch)
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
t.Reset(workerIdleDuration)
select {
case <-t.C:
default:
}
resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset
case <-t.C:
q.workerNumMu.Lock()
keepWorking = q.workerNum <= 1 // keep the last worker running
Expand Down
2 changes: 0 additions & 2 deletions modules/queue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type WorkerPoolQueue[T any] struct {
workerMaxNum int
workerActiveNum int
workerNumMu sync.Mutex

workerStartedCounter int32
}

type flushType chan struct{}
Expand Down
33 changes: 23 additions & 10 deletions modules/queue/workerqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package queue

import (
"context"
"slices"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -250,23 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {

func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
defer mockBackoffDuration(10 * time.Millisecond)()
defer mockBackoffDuration(5 * time.Millisecond)()

var q *WorkerPoolQueue[int]
var handledCount atomic.Int32
var hasOnlyOneWorkerRunning atomic.Bool
handler := func(items ...int) (unhandled []int) {
time.Sleep(50 * time.Millisecond)
handledCount.Add(int32(len(items)))
// make each work have different duration, and check the active worker number periodically
var activeNums []int
for i := 0; i < 5-items[0]%2; i++ {
time.Sleep(workerIdleDuration * 2)
activeNums = append(activeNums, q.GetWorkerActiveNumber())
}
// When the queue never becomes empty, the existing workers should keep working
// It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
// If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) {
hasOnlyOneWorkerRunning.Store(true)
}
return nil
}

q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
stop := runWorkerPoolQueue(q)
for i := 0; i < 20; i++ {
for i := 0; i < 100; i++ {
assert.NoError(t, q.Push(i))
}

time.Sleep(500 * time.Millisecond)
assert.EqualValues(t, 2, q.GetWorkerNumber())
assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
// when the queue never becomes empty, the existing workers should keep working
assert.EqualValues(t, 2, q.workerStartedCounter)
assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test
assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting")
stop()
}

0 comments on commit e610395

Please sign in to comment.