Skip to content

Commit 6acb453

Browse files
authored
feat: implement listener for asynchronous notifications in queue (#146)
- Add `notify` channel to `Queue` - Notify worker in the `queue` function without blocking the main thread - Add case to listen for `notify` channel in the `start` function Signed-off-by: appleboy <appleboy.tw@gmail.com>
1 parent ccb9133 commit 6acb453

File tree

1 file changed

+10
-0
lines changed

1 file changed

+10
-0
lines changed

queue.go

+10
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type (
2626
routineGroup *routineGroup
2727
quit chan struct{}
2828
ready chan struct{}
29+
notify chan struct{}
2930
worker core.Worker
3031
stopOnce sync.Once
3132
stopFlag int32
@@ -44,6 +45,7 @@ func NewQueue(opts ...Option) (*Queue, error) {
4445
routineGroup: newRoutineGroup(),
4546
quit: make(chan struct{}),
4647
ready: make(chan struct{}, 1),
48+
notify: make(chan struct{}, 1),
4749
workerCount: o.workerCount,
4850
logger: o.logger,
4951
worker: o.worker,
@@ -149,6 +151,13 @@ func (q *Queue) queue(m *job.Message) error {
149151
}
150152

151153
q.metric.IncSubmittedTask()
154+
// notify worker
155+
// if the channel is full, it means that the worker is busy
156+
// and we don't want to block the main thread
157+
select {
158+
case q.notify <- struct{}{}:
159+
default:
160+
}
152161

153162
return nil
154163
}
@@ -325,6 +334,7 @@ func (q *Queue) start() {
325334
return
326335
}
327336
case <-ticker.C:
337+
case <-q.notify:
328338
}
329339
}
330340
}

0 commit comments

Comments
 (0)