-
Notifications
You must be signed in to change notification settings - Fork 0
/
event.go
146 lines (140 loc) · 3.3 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package satori
import (
"encoding/base64"
"encoding/json"
"errors"
"net"
"net/url"
"strings"
"time"
"github.com/RomiChan/websocket"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
func (cli *Client) connect() {
log.Infof("start connecting to the satori: %s", cli.api)
network, address := resolveURI(cli.api + "/v1/events")
dialer := websocket.Dialer{
NetDial: func(_, addr string) (net.Conn, error) {
if network == "unix" {
host, _, err := net.SplitHostPort(addr)
if err != nil {
host = addr
}
filepath, err := base64.RawURLEncoding.DecodeString(host)
if err == nil {
addr = string(filepath)
}
}
return net.Dial(network, addr) // support unix socket transport
},
}
identify, _ := json.Marshal(
&Signal[Identify]{
Op: OpCodeIdentify,
Body: Identify{Token: cli.token},
},
)
for {
var ready = Signal[Ready]{}
conn, res, err := dialer.Dial(address, nil) // nolint
if err != nil {
goto ERROR
}
_ = res.Body.Close()
if err = conn.WriteMessage(websocket.TextMessage, identify); err != nil {
_ = conn.Close()
goto ERROR
}
if err = conn.ReadJSON(&ready); err != nil {
_ = conn.Close()
goto ERROR
}
if ready.Op != OpCodeReady || len(ready.Body.Logins) == 0 {
err = errors.New("unknown satori ready signal")
_ = conn.Close()
goto ERROR
}
cli.platform = ready.Body.Logins[0].Platform
cli.selfID = ready.Body.Logins[0].SelfID
log.Infof("successfully connected to satori: %s, platform: %s, self_id: %s",
cli.api, cli.platform, cli.selfID)
cli.ws = conn
go cli.doheartbeat()
break
ERROR:
log.Warnf("failed to connect to satori: %s %v", cli.api, err)
time.Sleep(5 * time.Second)
}
}
// Listen 监听 satori 事件.
func (cli *Client) Listen(handler func(*Event)) {
cli.connect()
for {
t, payload, err := cli.ws.ReadMessage()
if err != nil { // reconnect
log.Warnf("lost connection to satori: %s %v", cli.api, err)
cli.cancel <- true
time.Sleep(time.Second * time.Duration(2))
cli.connect()
continue
}
if t != websocket.TextMessage {
continue
}
rsp := gjson.ParseBytes(payload)
if !rsp.Get("op").Exists() {
continue
}
var event = Signal[Event]{}
switch rsp.Get("op").Int() {
case OpCodeEvent:
err = json.Unmarshal(payload, &event)
if err != nil {
continue
}
handler(&event.Body)
case OpCodePing, OpCodePong, OpCodeIdentify, OpCodeReady:
//
default:
//
}
}
}
func (cli *Client) doheartbeat() {
var ping = []byte(`{"op": 1}`)
for {
select {
case <-time.After(time.Duration(5) * time.Second):
err := cli.ws.WriteMessage(websocket.TextMessage, ping)
if err != nil {
log.Warnf("an error occurred while sending heartbeat to satori: %v", err)
}
case <-cli.cancel:
return
}
}
}
func resolveURI(addr string) (network, address string) {
network, address = "tcp", addr
uri, err := url.Parse(addr)
if err == nil && uri.Scheme != "" {
scheme, ext, _ := strings.Cut(uri.Scheme, "+")
switch scheme {
case "http":
scheme = "ws"
case "https":
scheme = "wss"
}
if ext != "" {
network = ext
if ext == "unix" {
uri.Host, uri.Path, _ = strings.Cut(uri.Path, ":")
uri.Host = base64.StdEncoding.EncodeToString([]byte(uri.Host))
}
}
uri.Scheme = scheme // remove `+unix`/`+tcp4`
address = uri.String()
}
return
}