Skip to content

Commit ab7f701

Browse files
authored
Make WrappedQueues and PersistableChannelUniqueQueues Pausable (go-gitea#18393)
Implements the Pausable interface on WrappedQueues and PersistableChannelUniqueQueues Reference go-gitea#15928 Signed-off-by: Andrew Thornton art27@cantab.net
1 parent 43c6b27 commit ab7f701

9 files changed

+72
-14
lines changed

modules/queue/queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func RegisteredTypesAsString() []string {
196196
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
197197
newFn, ok := queuesMap[queueType]
198198
if !ok {
199-
return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
199+
return nil, fmt.Errorf("unsupported queue type: %v", queueType)
200200
}
201201
return newFn(handlerFunc, opts, exemplar)
202202
}

modules/queue/queue_bytefifo.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (q *ByteFIFOQueue) Push(data Data) error {
9292
// PushBack pushes data to the fifo
9393
func (q *ByteFIFOQueue) PushBack(data Data) error {
9494
if !assignableTo(data, q.exemplar) {
95-
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
95+
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
9696
}
9797
bs, err := json.Marshal(data)
9898
if err != nil {
@@ -110,7 +110,7 @@ func (q *ByteFIFOQueue) PushBack(data Data) error {
110110
// PushFunc pushes data to the fifo
111111
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
112112
if !assignableTo(data, q.exemplar) {
113-
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
113+
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
114114
}
115115
bs, err := json.Marshal(data)
116116
if err != nil {
@@ -398,7 +398,7 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
398398
// Has checks if the provided data is in the queue
399399
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
400400
if !assignableTo(data, q.exemplar) {
401-
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
401+
return false, fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
402402
}
403403
bs, err := json.Marshal(data)
404404
if err != nil {

modules/queue/queue_channel.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
9393
// Push will push data into the queue
9494
func (q *ChannelQueue) Push(data Data) error {
9595
if !assignableTo(data, q.exemplar) {
96-
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
96+
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
9797
}
9898
q.WorkerPool.Push(data)
9999
return nil

modules/queue/queue_wrapped.go

+43-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
5959
if s, ok := cfg.([]byte); ok {
6060
cfg = string(s)
6161
}
62-
return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
62+
return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
6363
default:
6464
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
6565
if err == nil {
@@ -76,9 +76,9 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
7676
i++
7777
if q.maxAttempts > 0 && i > q.maxAttempts {
7878
if bs, ok := q.cfg.([]byte); ok {
79-
return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
79+
return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
8080
}
81-
return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
81+
return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
8282
}
8383
sleepTime := 100 * time.Millisecond
8484
if q.timeout > 0 && q.maxAttempts > 0 {
@@ -271,6 +271,46 @@ func (q *WrappedQueue) Terminate() {
271271
log.Debug("WrappedQueue: %s Terminated", q.name)
272272
}
273273

274+
// IsPaused will return if the pool or queue is paused
275+
func (q *WrappedQueue) IsPaused() bool {
276+
q.lock.Lock()
277+
defer q.lock.Unlock()
278+
pausable, ok := q.internal.(Pausable)
279+
return ok && pausable.IsPaused()
280+
}
281+
282+
// Pause will pause the pool or queue
283+
func (q *WrappedQueue) Pause() {
284+
q.lock.Lock()
285+
defer q.lock.Unlock()
286+
if pausable, ok := q.internal.(Pausable); ok {
287+
pausable.Pause()
288+
}
289+
}
290+
291+
// Resume will resume the pool or queue
292+
func (q *WrappedQueue) Resume() {
293+
q.lock.Lock()
294+
defer q.lock.Unlock()
295+
if pausable, ok := q.internal.(Pausable); ok {
296+
pausable.Resume()
297+
}
298+
}
299+
300+
// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
301+
func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
302+
q.lock.Lock()
303+
defer q.lock.Unlock()
304+
if pausable, ok := q.internal.(Pausable); ok {
305+
return pausable.IsPausedIsResumed()
306+
}
307+
return context.Background().Done(), closedChan
308+
}
309+
310+
var closedChan chan struct{}
311+
274312
func init() {
275313
queuesMap[WrappedQueueType] = NewWrappedQueue
314+
closedChan = make(chan struct{})
315+
close(closedChan)
276316
}

modules/queue/setting.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func validType(t string) (Type, error) {
2222
return typ, nil
2323
}
2424
}
25-
return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
25+
return PersistableChannelQueueType, fmt.Errorf("unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
2626
}
2727

2828
func getQueueSettings(name string) (setting.QueueSettings, []byte) {

modules/queue/unique_queue_channel.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (q *ChannelUniqueQueue) Push(data Data) error {
111111
// PushFunc will push data into the queue
112112
func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
113113
if !assignableTo(data, q.exemplar) {
114-
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
114+
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
115115
}
116116

117117
bs, err := json.Marshal(data)

modules/queue/unique_queue_disk_channel.go

+20
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,26 @@ func (q *PersistableChannelUniqueQueue) IsEmpty() bool {
239239
return q.channelQueue.IsEmpty()
240240
}
241241

242+
// IsPaused will return if the pool or queue is paused
243+
func (q *PersistableChannelUniqueQueue) IsPaused() bool {
244+
return q.channelQueue.IsPaused()
245+
}
246+
247+
// Pause will pause the pool or queue
248+
func (q *PersistableChannelUniqueQueue) Pause() {
249+
q.channelQueue.Pause()
250+
}
251+
252+
// Resume will resume the pool or queue
253+
func (q *PersistableChannelUniqueQueue) Resume() {
254+
q.channelQueue.Resume()
255+
}
256+
257+
// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
258+
func (q *PersistableChannelUniqueQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
259+
return q.channelQueue.IsPausedIsResumed()
260+
}
261+
242262
// Shutdown processing this queue
243263
func (q *PersistableChannelUniqueQueue) Shutdown() {
244264
log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)

modules/queue/unique_queue_wrapped.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (q *WrappedUniqueQueue) Push(data Data) error {
105105
// PushFunc will push the data to the internal channel checking it against the exemplar
106106
func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
107107
if !assignableTo(data, q.exemplar) {
108-
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
108+
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
109109
}
110110

111111
q.tlock.Lock()

modules/queue/workerpool.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,12 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
5757
ctx, cancel := context.WithCancel(context.Background())
5858

5959
dataChan := make(chan Data, config.QueueLength)
60-
resumed := make(chan struct{})
61-
close(resumed)
6260
pool := &WorkerPool{
6361
baseCtx: ctx,
6462
baseCtxCancel: cancel,
6563
batchLength: config.BatchLength,
6664
dataChan: dataChan,
67-
resumed: resumed,
65+
resumed: closedChan,
6866
paused: make(chan struct{}),
6967
handle: handle,
7068
blockTimeout: config.BlockTimeout,

0 commit comments

Comments
 (0)