diff --git a/queue.go b/queue.go index a502821..a1f008e 100644 --- a/queue.go +++ b/queue.go @@ -26,6 +26,7 @@ type ( routineGroup *routineGroup quit chan struct{} ready chan struct{} + newTaskAdded chan struct{} worker core.Worker stopOnce sync.Once stopFlag int32 @@ -43,6 +44,7 @@ func NewQueue(opts ...Option) (*Queue, error) { routineGroup: newRoutineGroup(), quit: make(chan struct{}), ready: make(chan struct{}, 1), + newTaskAdded: make(chan struct{}), workerCount: o.workerCount, logger: o.logger, worker: o.worker, @@ -147,6 +149,7 @@ func (q *Queue) queue(m *job.Message) error { } q.metric.IncSubmittedTask() + q.newTaskAdded <- struct{}{} return nil } @@ -320,8 +323,8 @@ func (q *Queue) start() { close(tasks) return } - case <-time.After(time.Second): - // sleep 1 second to fetch new task + case <-q.newTaskAdded: + // New task added } } }