-
Notifications
You must be signed in to change notification settings - Fork 3
/
signal_client.go
185 lines (158 loc) · 4.67 KB
/
signal_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package star
import (
"encoding/json"
"github.com/gorilla/websocket"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
"time"
)
func startClient(url string, peerMultiaddr ma.Multiaddr, addressBook addressBook,
handshakeSubscription *handshakeSubscription, stopCh <-chan struct{}) (<-chan transport.CapableConn,
chan<- handshakeData) {
logger.Debugf("Use signal server: %s", url)
acceptedCh := make(chan transport.CapableConn)
handshakeDataCh := make(chan handshakeData)
internalStopCh := make(chan struct{})
threadsRunning := false
stopSessionThreads := func() {
logger.Debugf("Stop active session threads")
internalStopCh <- struct{}{}
internalStopCh <- struct{}{}
}
go func() {
var connection *websocket.Conn
var sp *sessionProperties
var err error
for {
if stopSignalReceived(stopCh) {
logger.Debugf("Stop signal received. Closing")
stopSessionThreads()
close(handshakeDataCh)
return
}
if !isConnectionHealthy(connection) {
if threadsRunning {
stopSessionThreads()
threadsRunning = false
}
connection, err = openConnection(url)
if err != nil {
logger.Errorf("Can't establish connection: %v", err)
time.Sleep(3 * time.Second)
continue
}
logger.Debugf("Connection to signal server established")
sp, err = openSession(connection, peerMultiaddr, handshakeDataCh, internalStopCh)
if err != nil {
logger.Errorf("Can't open session: %v", err)
connection = nil
continue
}
threadsRunning = true
}
message, err := readMessage(connection)
if err != nil {
logger.Errorf("%s: Can't read message: %v", sp.SID, err)
connection = nil
continue
}
logger.Debugf("%s: Received message: %s", sp.SID, message)
err = processMessage(addressBook, handshakeSubscription, message)
if err != nil {
logger.Warningf("%s: Can't process message: %v", sp.SID, err)
continue
}
}
}()
return acceptedCh, handshakeDataCh
}
func openSession(connection *websocket.Conn, peerMultiaddr ma.Multiaddr,
handshakeDataCh <-chan handshakeData, stopCh <-chan struct{}) (*sessionProperties, error) {
message, err := readMessage(connection)
if err != nil {
return nil, err
}
var sp sessionProperties
err = json.Unmarshal(message, &sp)
if err != nil {
return nil, err
}
pingInterval := time.Duration(sp.PingIntervalMillis * int64(time.Millisecond))
pingTimeout := time.Duration(sp.PingTimeoutMillis * int64(time.Millisecond))
logger.Debugf("%s: Ping interval: %v, Ping timeout: %v", sp.SID, pingInterval, pingTimeout)
connection.SetReadLimit(maxMessageSize)
connection.SetPongHandler(func(string) error {
logger.Debugf("%s: Pong message received", sp.SID)
return connection.SetReadDeadline(time.Time{})
})
err = readEmptyMessage(connection)
if err != nil {
return nil, err
}
go func() {
pingTicker := time.NewTicker(pingInterval)
for {
select {
case <-stopCh:
logger.Debugf("%s: Stop signal received. Close ping ticker", sp.SID)
pingTicker.Stop()
return
case <-pingTicker.C:
logger.Debugf("%s: Send ping message", sp.SID)
err := connection.SetReadDeadline(time.Now().Add(pingTimeout))
if err != nil {
logger.Errorf("%s: Can't set connection read deadline: %v", sp.SID, err)
continue
}
err = sendMessage(connection, "ping", nil) // Application layer ping?
if err != nil {
logger.Errorf("%s: Can't send ping message: %v", sp.SID, err)
continue
}
err = connection.WriteControl(websocket.PingMessage, []byte("ping"), time.Time{})
if err != nil {
logger.Errorf("%s: Can't send ping message: %v", sp.SID, err)
continue
}
}
}
}()
logger.Debugf("%s: Join peer network (peerID: %s)", sp.SID, peerMultiaddr.String())
err = sendMessage(connection, "ss-join", peerMultiaddr.String())
if err != nil {
return nil, err
}
go func() {
for {
select {
case <-stopCh:
logger.Debugf("%s: Stop signal received. Close handshake offer sender", sp.SID)
return
case offer := <-handshakeDataCh:
logger.Debugf("%s: Send handshake message", sp.SID)
err = sendMessage(connection, "ss-handshake", offer)
if err != nil {
logger.Errorf("%s: Can't send handshake offer: %v", sp.SID, err)
continue
}
}
}
}()
return &sp, nil
}
func stopSignalReceived(stopCh <-chan struct{}) bool {
select {
case <-stopCh:
return true
default:
return false
}
}
func isConnectionHealthy(connection *websocket.Conn) bool {
return connection != nil
}
func openConnection(url string) (*websocket.Conn, error) {
logger.Debugf("Open new connection: %s", url)
connection, _, err := websocket.DefaultDialer.Dial(url, nil)
return connection, err
}