Skip to content

Commit d12d5b3

Browse files
authored
Merge branch 'main' into fix-18444-no-more-annotated-failures
2 parents a4ae0dc + 92b715e commit d12d5b3

7 files changed

+119
-95
lines changed

Diff for: modules/queue/queue_bytefifo.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,10 @@ loop:
205205
// tell the pool to shutdown.
206206
q.baseCtxCancel()
207207
return
208-
case data := <-q.dataChan:
208+
case data, ok := <-q.dataChan:
209+
if !ok {
210+
return
211+
}
209212
if err := q.PushBack(data); err != nil {
210213
log.Error("Unable to push back data into queue %s", q.name)
211214
}

Diff for: modules/queue/queue_channel.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
117117
select {
118118
case <-paused:
119119
return nil
120-
case data := <-q.dataChan:
120+
case data, ok := <-q.dataChan:
121+
if !ok {
122+
return nil
123+
}
121124
if unhandled := q.handle(data); unhandled != nil {
122125
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
123126
}

Diff for: modules/queue/queue_channel_test.go

+48-21
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"code.gitea.io/gitea/modules/log"
1213
"github.com/stretchr/testify/assert"
1314
)
1415

@@ -111,7 +112,6 @@ func TestChannelQueue_Pause(t *testing.T) {
111112
if pausable, ok := queue.(Pausable); ok {
112113
pausable.Pause()
113114
}
114-
pushBack = false
115115
lock.Unlock()
116116
return data
117117
}
@@ -123,7 +123,9 @@ func TestChannelQueue_Pause(t *testing.T) {
123123
}
124124
return nil
125125
}
126-
nilFn := func(_ func()) {}
126+
127+
queueShutdown := []func(){}
128+
queueTerminate := []func(){}
127129

128130
queue, err = NewChannelQueue(handle,
129131
ChannelQueueConfiguration{
@@ -139,7 +141,34 @@ func TestChannelQueue_Pause(t *testing.T) {
139141
}, &testData{})
140142
assert.NoError(t, err)
141143

142-
go queue.Run(nilFn, nilFn)
144+
go queue.Run(func(shutdown func()) {
145+
lock.Lock()
146+
defer lock.Unlock()
147+
queueShutdown = append(queueShutdown, shutdown)
148+
}, func(terminate func()) {
149+
lock.Lock()
150+
defer lock.Unlock()
151+
queueTerminate = append(queueTerminate, terminate)
152+
})
153+
154+
// Shutdown and Terminate in defer
155+
defer func() {
156+
lock.Lock()
157+
callbacks := make([]func(), len(queueShutdown))
158+
copy(callbacks, queueShutdown)
159+
lock.Unlock()
160+
for _, callback := range callbacks {
161+
callback()
162+
}
163+
lock.Lock()
164+
log.Info("Finally terminating")
165+
callbacks = make([]func(), len(queueTerminate))
166+
copy(callbacks, queueTerminate)
167+
lock.Unlock()
168+
for _, callback := range callbacks {
169+
callback()
170+
}
171+
}()
143172

144173
test1 := testData{"A", 1}
145174
test2 := testData{"B", 2}
@@ -155,14 +184,11 @@ func TestChannelQueue_Pause(t *testing.T) {
155184

156185
pausable.Pause()
157186

158-
paused, resumed := pausable.IsPausedIsResumed()
187+
paused, _ := pausable.IsPausedIsResumed()
159188

160189
select {
161190
case <-paused:
162-
case <-resumed:
163-
assert.Fail(t, "Queue should not be resumed")
164-
return
165-
default:
191+
case <-time.After(100 * time.Millisecond):
166192
assert.Fail(t, "Queue is not paused")
167193
return
168194
}
@@ -179,10 +205,11 @@ func TestChannelQueue_Pause(t *testing.T) {
179205
assert.Nil(t, result2)
180206

181207
pausable.Resume()
208+
_, resumed := pausable.IsPausedIsResumed()
182209

183210
select {
184211
case <-resumed:
185-
default:
212+
case <-time.After(100 * time.Millisecond):
186213
assert.Fail(t, "Queue should be resumed")
187214
}
188215

@@ -199,47 +226,47 @@ func TestChannelQueue_Pause(t *testing.T) {
199226
pushBack = true
200227
lock.Unlock()
201228

202-
paused, resumed = pausable.IsPausedIsResumed()
229+
_, resumed = pausable.IsPausedIsResumed()
203230

204231
select {
205-
case <-paused:
206-
assert.Fail(t, "Queue should not be paused")
207-
return
208232
case <-resumed:
209-
default:
233+
case <-time.After(100 * time.Millisecond):
210234
assert.Fail(t, "Queue is not resumed")
211235
return
212236
}
213237

214238
queue.Push(&test1)
239+
paused, _ = pausable.IsPausedIsResumed()
215240

216241
select {
217242
case <-paused:
218243
case <-handleChan:
219244
assert.Fail(t, "handler chan should not contain test1")
220245
return
221-
case <-time.After(500 * time.Millisecond):
246+
case <-time.After(100 * time.Millisecond):
222247
assert.Fail(t, "queue should be paused")
223248
return
224249
}
225250

226-
paused, resumed = pausable.IsPausedIsResumed()
251+
lock.Lock()
252+
pushBack = false
253+
lock.Unlock()
254+
255+
paused, _ = pausable.IsPausedIsResumed()
227256

228257
select {
229258
case <-paused:
230-
case <-resumed:
231-
assert.Fail(t, "Queue should not be resumed")
232-
return
233-
default:
259+
case <-time.After(100 * time.Millisecond):
234260
assert.Fail(t, "Queue is not paused")
235261
return
236262
}
237263

238264
pausable.Resume()
265+
_, resumed = pausable.IsPausedIsResumed()
239266

240267
select {
241268
case <-resumed:
242-
default:
269+
case <-time.After(100 * time.Millisecond):
243270
assert.Fail(t, "Queue should be resumed")
244271
}
245272

Diff for: modules/queue/queue_disk_channel.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -313,14 +313,13 @@ func (q *PersistableChannelQueue) Shutdown() {
313313
q.channelQueue.Wait()
314314
q.internal.(*LevelQueue).Wait()
315315
// Redirect all remaining data in the chan to the internal channel
316-
go func() {
317-
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
318-
for data := range q.channelQueue.dataChan {
319-
_ = q.internal.Push(data)
320-
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
321-
}
322-
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
323-
}()
316+
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
317+
close(q.channelQueue.dataChan)
318+
for data := range q.channelQueue.dataChan {
319+
_ = q.internal.Push(data)
320+
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
321+
}
322+
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
324323

325324
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
326325
}

0 commit comments

Comments
 (0)