-
Notifications
You must be signed in to change notification settings - Fork 0
/
web_sse.go
194 lines (160 loc) · 4.98 KB
/
web_sse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package main
import (
"fmt"
"github.com/gin-gonic/gin"
"io"
"time"
)
// EventName enum to define events names - need to be used on client to subscribe with new EventSource in js.
type EventName int
const (
// KeepAlive keep alive to be sent peridocially by background routine
KeepAlive EventName = iota
// SessionCreated event when a new session is created on server, payload is the session info
SessionCreated
// SessionDeleted event when a new session is deleted on server, payload is the session key
SessionDeleted
// SessionUpdated event when a session is updated on server, payload is the session info
SessionUpdated
)
// String function to clean event name
func (s EventName) String() string {
switch s {
case KeepAlive:
return "keepalive"
case SessionCreated:
return "sessionCreated"
case SessionDeleted:
return "sessionDeleted"
case SessionUpdated:
return "sessionUpdated"
}
return "unknown"
}
// SetupSSERouter setup a broker for a path
func SetupSSERouter(router gin.IRouter, path string) {
broker = NewBroker()
// Set it running - listening and broadcasting events
go broker.Listen()
go broker.KeepAlive()
router.GET(path, broker.ServeHTTP)
}
// Broadcast an event to clients listening
func Broadcast(name EventName, payload interface{}) {
if broker == nil {
return
}
// fmt.Printf("Emitting: %s::%s\n", url, name)
broker.Notifier <- NotificationEvent{
Name: name,
Payload: payload,
}
}
// patience the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second * 1
type (
// NotificationEvent event name and payload to be sent back to the client.
NotificationEvent struct {
Name EventName
Payload interface{}
}
// NotifierChan channel for events to be passed
NotifierChan chan NotificationEvent
// Broker the main broker struct
Broker struct {
//path string
// Events are pushed to this channel by the main events-gathering routine
Notifier NotifierChan
// New client connections
newClients chan NotifierChan
// Closed client connections
closingClients chan NotifierChan
// Client connections registry
clients map[NotifierChan]struct{}
}
)
// String nice display of NotificationEvent
func (e NotificationEvent) String() interface{} {
return fmt.Sprintf("NotificationEvent{ Name: %s Payload: %+v}", e.Name.String(), e.Payload)
}
var broker *Broker
// NewBroker create broker for sse events to be sent to clients
func NewBroker() (broker *Broker) {
// Instantiate a broker
return &Broker{
Notifier: make(NotifierChan, 1),
newClients: make(chan NotifierChan),
closingClients: make(chan NotifierChan),
clients: make(map[NotifierChan]struct{}),
}
}
// ServeHTTP main handler of clients.
func (broker *Broker) ServeHTTP(c *gin.Context) {
url := c.FullPath()
fmt.Printf("[%s] Requested topic: %s\n", c.ClientIP(), url)
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(NotifierChan)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
fmt.Printf("[%s] Closing client down\n", c.ClientIP())
broker.closingClients <- messageChan
}()
c.Stream(func(w io.Writer) bool {
// Emit Server Sent Events compatible
event := <-messageChan
c.SSEvent(event.Name.String(), event.Payload)
// Flush the data immediately instead of buffering it for later.
c.Writer.Flush()
return true
})
}
// Listen for new notifications and redistribute them to clients
func (broker *Broker) Listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = struct{}{}
fmt.Printf("Client added. %d registered clients\n", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
fmt.Printf("Removed client. %d registered clients\n", len(broker.clients))
case event := <-broker.Notifier:
// fmt.Printf("broker.Notifier: %s\n", event.String())
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
select {
case clientMessageChan <- event:
case <-time.After(patience):
fmt.Print("Skipping client.\n")
}
}
}
}
}
// KeepAlive routine to periodically message keep alive messages.
func (broker *Broker) KeepAlive() {
for {
event := NotificationEvent{KeepAlive, ""}
for clientMessageChan := range broker.clients {
select {
case clientMessageChan <- event:
case <-time.After(patience):
fmt.Print("Skipping client.\n")
}
}
time.Sleep(30 * time.Second)
}
}