-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker.go
75 lines (62 loc) · 1.57 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package websub
import "meow.tf/websub/model"
// PublishJob represents a job to publish data to a subscription.
type PublishJob struct {
Hub model.Hub `json:"hub"`
Subscription model.Subscription `json:"subscription"`
ContentType string `json:"contentType"`
Data []byte `json:"data"`
}
// Worker is an interface to allow other types of workers to be created.
type Worker interface {
Add(f PublishJob)
Start()
Stop()
}
// NewGoWorker creates a new worker from the specified hub and worker count.
func NewGoWorker(h *Hub, workerCount int) *GoWorker {
return &GoWorker{
hub: h,
workerCount: workerCount,
jobCh: make(chan PublishJob),
}
}
// GoWorker is a basic Goroutine-based worker.
// It will start workerCount workers and process jobs from a channel.
type GoWorker struct {
hub *Hub
workerCount int
jobCh chan PublishJob
}
// Add will add a job to the queue.
func (w *GoWorker) Add(job PublishJob) {
w.jobCh <- job
}
// Start will start the worker routines.
func (w *GoWorker) Start() {
for i := 0; i < w.workerCount; i++ {
go w.run()
}
}
// Stop will close the job channel, causing each worker routine to exit.
func (w *GoWorker) Stop() {
close(w.jobCh)
}
// run pulls jobs off the job channel and processes them.
func (w *GoWorker) run() {
for {
job, ok := <-w.jobCh
if !ok {
return
}
sent, err := Notify(w.hub.client, job)
// TODO: Log errors
if err != nil {
continue
}
// Remove failed subscriptions
if !sent {
w.hub.store.Remove(job.Subscription)
}
}
}