@@ -54,6 +54,18 @@ type Flushable interface {
5454 IsEmpty () bool
5555}
5656
57+ // Pausable represents a pool or queue that is Pausable
58+ type Pausable interface {
59+ // IsPaused will return if the pool or queue is paused
60+ IsPaused () bool
61+ // Pause will pause the pool or queue
62+ Pause ()
63+ // Resume will resume the pool or queue
64+ Resume ()
65+ // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
66+ IsPausedIsResumed () (paused , resumed <- chan struct {})
67+ }
68+
5769// ManagedPool is a simple interface to get certain details from a worker pool
5870type ManagedPool interface {
5971 // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
@@ -192,6 +204,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
192204 wg .Done ()
193205 continue
194206 }
207+ if pausable , ok := mq .Managed .(Pausable ); ok {
208+ // no point flushing paused queues
209+ if pausable .IsPaused () {
210+ wg .Done ()
211+ continue
212+ }
213+ }
214+
195215 allEmpty = false
196216 if flushable , ok := mq .Managed .(Flushable ); ok {
197217 log .Debug ("Flushing (flushable) queue: %s" , mq .Name )
@@ -215,7 +235,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
215235 log .Debug ("All queues are empty" )
216236 break
217237 }
218- // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign
238+ // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing
219239 // but don't delay cancellation here.
220240 select {
221241 case <- ctx .Done ():
@@ -298,6 +318,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can
298318 return nil
299319}
300320
321+ // Flushable returns true if the queue is flushable
322+ func (q * ManagedQueue ) Flushable () bool {
323+ _ , ok := q .Managed .(Flushable )
324+ return ok
325+ }
326+
301327// Flush flushes the queue with a timeout
302328func (q * ManagedQueue ) Flush (timeout time.Duration ) error {
303329 if flushable , ok := q .Managed .(Flushable ); ok {
@@ -315,6 +341,34 @@ func (q *ManagedQueue) IsEmpty() bool {
315341 return true
316342}
317343
344+ // Pausable returns whether the queue is Pausable
345+ func (q * ManagedQueue ) Pausable () bool {
346+ _ , ok := q .Managed .(Pausable )
347+ return ok
348+ }
349+
350+ // Pause pauses the queue
351+ func (q * ManagedQueue ) Pause () {
352+ if pausable , ok := q .Managed .(Pausable ); ok {
353+ pausable .Pause ()
354+ }
355+ }
356+
357+ // IsPaused reveals if the queue is paused
358+ func (q * ManagedQueue ) IsPaused () bool {
359+ if pausable , ok := q .Managed .(Pausable ); ok {
360+ return pausable .IsPaused ()
361+ }
362+ return false
363+ }
364+
365+ // Resume resumes the queue
366+ func (q * ManagedQueue ) Resume () {
367+ if pausable , ok := q .Managed .(Pausable ); ok {
368+ pausable .Resume ()
369+ }
370+ }
371+
318372// NumberOfWorkers returns the number of workers in the queue
319373func (q * ManagedQueue ) NumberOfWorkers () int {
320374 if pool , ok := q .Managed .(ManagedPool ); ok {
0 commit comments