diff --git a/queue.go b/queue.go index 11f59e1..836de8a 100644 --- a/queue.go +++ b/queue.go @@ -26,6 +26,7 @@ type ( routineGroup *routineGroup quit chan struct{} ready chan struct{} + notify chan struct{} worker core.Worker stopOnce sync.Once stopFlag int32 @@ -44,6 +45,7 @@ func NewQueue(opts ...Option) (*Queue, error) { routineGroup: newRoutineGroup(), quit: make(chan struct{}), ready: make(chan struct{}, 1), + notify: make(chan struct{}, 1), workerCount: o.workerCount, logger: o.logger, worker: o.worker, @@ -149,6 +151,13 @@ func (q *Queue) queue(m *job.Message) error { } q.metric.IncSubmittedTask() + // notify worker + // if the channel is full, it means that the worker is busy + // and we don't want to block the main thread + select { + case q.notify <- struct{}{}: + default: + } return nil } @@ -325,6 +334,7 @@ func (q *Queue) start() { return } case <-ticker.C: + case <-q.notify: } } }