Skip to content

Commit ad8631c

Browse files
zeripathKN4CK3R
andauthored
Fix intermittent CI failure in EmptyQueue (#23753)
The ordering of the final token causing a close of the queue in this test may be out of sync due to concurrency. Instead just use ensure that the queue is closed when everything expected is done. Fixes: #23608 Fixes: #23977 --------- Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: KN4CK3R <admin@oldschoolhack.me>
1 parent 402df1d commit ad8631c

File tree

1 file changed

+29
-26
lines changed

1 file changed

+29
-26
lines changed

modules/queue/unique_queue_disk_channel_test.go

+29-26
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
package queue
55

66
import (
7-
"os"
87
"strconv"
98
"sync"
9+
"sync/atomic"
1010
"testing"
1111
"time"
1212

@@ -16,10 +16,7 @@ import (
1616
)
1717

1818
func TestPersistableChannelUniqueQueue(t *testing.T) {
19-
if os.Getenv("CI") != "" {
20-
t.Skip("Skipping because test is flaky on CI")
21-
}
22-
19+
// Create a temporary directory for the queue
2320
tmpDir := t.TempDir()
2421
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
2522

@@ -100,7 +97,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
10097
executedInitial := map[string][]string{}
10198
hasInitial := map[string][]string{}
10299

103-
fillQueue := func(name string, done chan struct{}) {
100+
fillQueue := func(name string, done chan int64) {
104101
t.Run("Initial Filling: "+name, func(t *testing.T) {
105102
lock := sync.Mutex{}
106103

@@ -157,33 +154,39 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
157154
assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
158155
mapLock.Unlock()
159156
})
157+
mapLock.Lock()
158+
count := int64(len(hasInitial[name]))
159+
mapLock.Unlock()
160+
done <- count
160161
close(done)
161162
}
162163

163-
doneA := make(chan struct{})
164-
doneB := make(chan struct{})
164+
hasQueueAChan := make(chan int64)
165+
hasQueueBChan := make(chan int64)
165166

166-
go fillQueue("QueueA", doneA)
167-
go fillQueue("QueueB", doneB)
167+
go fillQueue("QueueA", hasQueueAChan)
168+
go fillQueue("QueueB", hasQueueBChan)
168169

169-
<-doneA
170-
<-doneB
170+
hasA := <-hasQueueAChan
171+
hasB := <-hasQueueBChan
171172

172173
executedEmpty := map[string][]string{}
173174
hasEmpty := map[string][]string{}
174-
emptyQueue := func(name string, done chan struct{}) {
175+
emptyQueue := func(name string, numInQueue int64, done chan struct{}) {
175176
t.Run("Empty Queue: "+name, func(t *testing.T) {
176177
lock := sync.Mutex{}
177178
stop := make(chan struct{})
178179

179180
// collect the tasks that have been executed
181+
atomicCount := int64(0)
180182
handle := func(data ...Data) []Data {
181183
lock.Lock()
182184
for _, datum := range data {
183185
mapLock.Lock()
184186
executedEmpty[name] = append(executedEmpty[name], datum.(string))
185187
mapLock.Unlock()
186-
if datum.(string) == "final" {
188+
count := atomic.AddInt64(&atomicCount, 1)
189+
if count >= numInQueue {
187190
close(stop)
188191
}
189192
}
@@ -217,11 +220,11 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
217220
close(done)
218221
}
219222

220-
doneA = make(chan struct{})
221-
doneB = make(chan struct{})
223+
doneA := make(chan struct{})
224+
doneB := make(chan struct{})
222225

223-
go emptyQueue("QueueA", doneA)
224-
go emptyQueue("QueueB", doneB)
226+
go emptyQueue("QueueA", hasA, doneA)
227+
go emptyQueue("QueueB", hasB, doneB)
225228

226229
<-doneA
227230
<-doneB
@@ -237,20 +240,20 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
237240
hasEmpty = map[string][]string{}
238241
mapLock.Unlock()
239242

240-
doneA = make(chan struct{})
241-
doneB = make(chan struct{})
243+
hasQueueAChan = make(chan int64)
244+
hasQueueBChan = make(chan int64)
242245

243-
go fillQueue("QueueA", doneA)
244-
go fillQueue("QueueB", doneB)
246+
go fillQueue("QueueA", hasQueueAChan)
247+
go fillQueue("QueueB", hasQueueBChan)
245248

246-
<-doneA
247-
<-doneB
249+
hasA = <-hasQueueAChan
250+
hasB = <-hasQueueBChan
248251

249252
doneA = make(chan struct{})
250253
doneB = make(chan struct{})
251254

252-
go emptyQueue("QueueA", doneA)
253-
go emptyQueue("QueueB", doneB)
255+
go emptyQueue("QueueA", hasA, doneA)
256+
go emptyQueue("QueueB", hasB, doneB)
254257

255258
<-doneA
256259
<-doneB

0 commit comments

Comments
 (0)