-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathwebsocket.go
147 lines (128 loc) · 3.37 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
"fmt"
"net/http"
"time"
"encoding/json"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// needed to allow connections from any origin for :3000 -> :8081
CheckOrigin: func(r *http.Request) bool { return true },
}
type JsonData struct {
Name string `json:"name"`
Text string `json:"text"`
Timestamp string `json:"timestamp"`
}
type Client struct {
name string
hub *ConnHub
conn *websocket.Conn
send chan []byte
}
func (c *Client) readPump() {
// schedule client to be disconnected
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
// init client connection
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
// handle connection read
for {
fmt.Println("reading from client")
// read JSON data from connection
message := JsonData{}
if err := c.conn.ReadJSON(&message); err != nil {
fmt.Println("Error reading JSON", err)
}
fmt.Printf("Get response: %#v\n", message)
messageJson, _ := json.Marshal(message)
// queue message for writing
c.hub.broadcast <- messageJson
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// channel has been closed by the hub
// c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// coalesce pending messages into one message
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
// send ping over websocket
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}
// handle /ws route, upgrade HTTP request and begin handling of client conn
func wsHandler(hub *ConnHub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
}
// init new client, register to hub
name := r.URL.Query().Get("name")
client := &Client{
name: name, // sent in http query params
hub: hub,
conn: conn,
send: make(chan []byte, 256),
}
client.hub.register <- client
// construct JSON list of connected client names and send to new client for display
names := make([]string, len(client.hub.clients) + 1)
i := 0
for k := range client.hub.clients {
names[i] = client.hub.clients[k]
i++
}
names[i] = name
namesJson, _ := json.Marshal(names)
client.hub.broadcast <- namesJson
// separate reads and writes to conform to WebSocket standard of one concurrent reader and writer
go client.writePump()
go client.readPump()
}