Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wxiaoguang committed Mar 2, 2024
1 parent 9de5e39 commit 4e0f19b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
20 changes: 16 additions & 4 deletions modules/queue/workergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
full = true
}

// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
q.workerNumMu.Lock()
noWorker := q.workerNum == 0
if full || noWorker {
Expand Down Expand Up @@ -143,7 +146,11 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
log.Debug("Queue %q starts new worker", q.GetName())
defer log.Debug("Queue %q stops idle worker", q.GetName())

atomic.AddInt32(&q.workerStartedCounter, 1)

t := time.NewTicker(workerIdleDuration)
defer t.Stop()

keepWorking := true
stopWorking := func() {
q.workerNumMu.Lock()
Expand All @@ -158,13 +165,18 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
case batch, ok := <-q.batchChan:
if !ok {
stopWorking()
} else {
q.doWorkerHandle(batch)
t.Reset(workerIdleDuration)
continue
}
q.doWorkerHandle(batch)
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
t.Reset(workerIdleDuration)
select {
case <-t.C:
default:
}
case <-t.C:
q.workerNumMu.Lock()
keepWorking = q.workerNum <= 1
keepWorking = q.workerNum <= 1 // keep the last worker running
if !keepWorking {
q.workerNum--
}
Expand Down
2 changes: 2 additions & 0 deletions modules/queue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type WorkerPoolQueue[T any] struct {
workerMaxNum int
workerActiveNum int
workerNumMu sync.Mutex

workerStartedCounter int32
}

type flushType chan struct{}
Expand Down
29 changes: 24 additions & 5 deletions modules/queue/workerqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/test"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -175,11 +176,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
}

func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
oldWorkerIdleDuration := workerIdleDuration
workerIdleDuration = 300 * time.Millisecond
defer func() {
workerIdleDuration = oldWorkerIdleDuration
}()
defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)()

handler := func(items ...int) (unhandled []int) {
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -250,3 +247,25 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
assert.EqualValues(t, 20, q.GetQueueItemNumber())
}

func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()

handler := func(items ...int) (unhandled []int) {
time.Sleep(50 * time.Millisecond)
return nil
}

q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
stop := runWorkerPoolQueue(q)
for i := 0; i < 20; i++ {
assert.NoError(t, q.Push(i))
}

time.Sleep(500 * time.Millisecond)
assert.EqualValues(t, 2, q.GetWorkerNumber())
assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
// when the queue never becomes empty, the existing workers should keep working
assert.EqualValues(t, 2, q.workerStartedCounter)
stop()
}

0 comments on commit 4e0f19b

Please sign in to comment.