-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsubscribe.go
72 lines (64 loc) · 1.4 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package natssse
import (
"context"
"errors"
"net/http"
"time"
"github.com/nats-io/nats.go"
)
// NewSubHandler creates a handler that does server side event subscribing
func NewSubHandler(conn *nats.Conn, authFunc AuthFunc) http.HandlerFunc {
n := NatsContext{
Conn: conn,
Auth: authFunc,
}
return func(w http.ResponseWriter, r *http.Request) {
newSSEHandler(w, r, n, Subscribe)
}
}
// Subscribe wraps handleSubscription and handles flushing the writer.
func Subscribe(ctx context.Context, flusher http.Flusher, opts options) {
go handleSubscription(ctx, opts)
for {
select {
case <-ctx.Done():
return
default:
writeAndFlushResponse(opts.writer, flusher, <-opts.ch)
}
}
}
// handleSubscription creates the NATS subscription and iterates over the messages
func handleSubscription(ctx context.Context, opts options) {
sub, err := opts.nc.Conn.SubscribeSync(opts.subject)
if err != nil {
msg := nats.Msg{
Subject: "natssse.system",
Data: []byte(err.Error()),
}
opts.ch <- msg
opts.cancel()
return
}
defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
return
default:
msg, err := sub.NextMsg(10 * time.Second)
if err != nil && errors.Is(err, nats.ErrTimeout) {
continue
}
if err != nil {
msg := nats.Msg{
Subject: "natssse.system",
Data: []byte(err.Error()),
}
opts.ch <- msg
continue
}
opts.ch <- *msg
}
}
}