Skip to content

Commit ed31412

Browse files
committed
fix test
1 parent b1778a0 commit ed31412

File tree

3 files changed

+12
-9
lines changed

3 files changed

+12
-9
lines changed

modules/queue/manager_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ MAX_WORKERS = 2
6060

6161
q2 := createWorkerPoolQueue("sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
6262
assert.Equal(t, "sub", q2.GetName())
63-
assert.Equal(t, "levelqueue", q2.GetType()) // no handler
63+
assert.Equal(t, "level", q2.GetType()) // no handler
6464
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues-dir2"), q2.baseConfig.DataFullDir)
6565
assert.Equal(t, 102, q2.baseConfig.Length)
6666
assert.Equal(t, 22, q2.batchLength)

modules/queue/workergroup.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package queue
66
import (
77
"context"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"code.gitea.io/gitea/modules/log"
@@ -18,9 +19,13 @@ var (
1819

1920
var (
2021
workerIdleDuration = 1 * time.Second
21-
unhandledItemRequeueDuration = 1 * time.Second
22+
unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test
2223
)
2324

25+
func init() {
26+
unhandledItemRequeueDuration.Store(int64(5 * time.Second))
27+
}
28+
2429
// workerGroup is a group of workers to work with a WorkerPoolQueue
2530
type workerGroup[T any] struct {
2631
q *WorkerPoolQueue[T]
@@ -93,9 +98,9 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
9398
unhandled := q.safeHandler(batch...)
9499
// if none of the items were handled, it should back-off for a few seconds
95100
// in this case the handler (eg: document indexer) may have encountered some errors/failures
96-
if len(unhandled) == len(batch) && unhandledItemRequeueDuration != 0 {
101+
if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
97102
log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
98-
time.Sleep(unhandledItemRequeueDuration)
103+
time.Sleep(time.Duration(unhandledItemRequeueDuration.Load()))
99104
}
100105
for _, item := range unhandled {
101106
if err := q.Push(item); err != nil {

modules/queue/workerqueue_test.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,9 @@ func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
3131
}
3232

3333
func TestWorkerPoolQueueUnhandled(t *testing.T) {
34-
oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration
35-
unhandledItemRequeueDuration = 0
36-
defer func() {
37-
unhandledItemRequeueDuration = oldUnhandledItemRequeueDuration
38-
}()
34+
oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load()
35+
unhandledItemRequeueDuration.Store(0)
36+
defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration)
3937

4038
mu := sync.Mutex{}
4139

0 commit comments

Comments
 (0)