Skip to content

Commit 1a26431

Browse files
authored
chore(cloud): websocket client improvements (#323)
2 parents 65e4a58 + f3b5011 commit 1a26431

File tree

3 files changed

+184
-10
lines changed

3 files changed

+184
-10
lines changed

cloud.go

Lines changed: 148 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"time"
1111

1212
"github.com/coder/websocket/wsjson"
13+
"github.com/prometheus/client_golang/prometheus"
14+
"github.com/prometheus/client_golang/prometheus/promauto"
1315

1416
"github.com/coreos/go-oidc/v3/oidc"
1517

@@ -36,6 +38,97 @@ const (
3638
CloudWebSocketPingInterval = 15 * time.Second
3739
)
3840

41+
var (
42+
metricCloudConnectionStatus = promauto.NewGauge(
43+
prometheus.GaugeOpts{
44+
Name: "jetkvm_cloud_connection_status",
45+
Help: "The status of the cloud connection",
46+
},
47+
)
48+
metricCloudConnectionEstablishedTimestamp = promauto.NewGauge(
49+
prometheus.GaugeOpts{
50+
Name: "jetkvm_cloud_connection_established_timestamp",
51+
Help: "The timestamp when the cloud connection was established",
52+
},
53+
)
54+
metricCloudConnectionLastPingTimestamp = promauto.NewGauge(
55+
prometheus.GaugeOpts{
56+
Name: "jetkvm_cloud_connection_last_ping_timestamp",
57+
Help: "The timestamp when the last ping response was received",
58+
},
59+
)
60+
metricCloudConnectionLastPingDuration = promauto.NewGauge(
61+
prometheus.GaugeOpts{
62+
Name: "jetkvm_cloud_connection_last_ping_duration",
63+
Help: "The duration of the last ping response",
64+
},
65+
)
66+
metricCloudConnectionPingDuration = promauto.NewHistogram(
67+
prometheus.HistogramOpts{
68+
Name: "jetkvm_cloud_connection_ping_duration",
69+
Help: "The duration of the ping response",
70+
Buckets: []float64{
71+
0.1, 0.5, 1, 10,
72+
},
73+
},
74+
)
75+
metricCloudConnectionTotalPingCount = promauto.NewCounter(
76+
prometheus.CounterOpts{
77+
Name: "jetkvm_cloud_connection_total_ping_count",
78+
Help: "The total number of pings sent to the cloud",
79+
},
80+
)
81+
metricCloudConnectionSessionRequestCount = promauto.NewCounter(
82+
prometheus.CounterOpts{
83+
Name: "jetkvm_cloud_connection_session_total_request_count",
84+
Help: "The total number of session requests received from the cloud",
85+
},
86+
)
87+
metricCloudConnectionSessionRequestDuration = promauto.NewHistogram(
88+
prometheus.HistogramOpts{
89+
Name: "jetkvm_cloud_connection_session_request_duration",
90+
Help: "The duration of session requests",
91+
Buckets: []float64{
92+
0.1, 0.5, 1, 10,
93+
},
94+
},
95+
)
96+
metricCloudConnectionLastSessionRequestTimestamp = promauto.NewGauge(
97+
prometheus.GaugeOpts{
98+
Name: "jetkvm_cloud_connection_last_session_request_timestamp",
99+
Help: "The timestamp of the last session request",
100+
},
101+
)
102+
metricCloudConnectionLastSessionRequestDuration = promauto.NewGauge(
103+
prometheus.GaugeOpts{
104+
Name: "jetkvm_cloud_connection_last_session_request_duration",
105+
Help: "The duration of the last session request",
106+
},
107+
)
108+
metricCloudConnectionFailureCount = promauto.NewCounter(
109+
prometheus.CounterOpts{
110+
Name: "jetkvm_cloud_connection_failure_count",
111+
Help: "The number of times the cloud connection has failed",
112+
},
113+
)
114+
)
115+
116+
func cloudResetMetrics(established bool) {
117+
metricCloudConnectionLastPingTimestamp.Set(-1)
118+
metricCloudConnectionLastPingDuration.Set(-1)
119+
120+
metricCloudConnectionLastSessionRequestTimestamp.Set(-1)
121+
metricCloudConnectionLastSessionRequestDuration.Set(-1)
122+
123+
if established {
124+
metricCloudConnectionEstablishedTimestamp.SetToCurrentTime()
125+
metricCloudConnectionStatus.Set(1)
126+
} else {
127+
metricCloudConnectionEstablishedTimestamp.Set(-1)
128+
metricCloudConnectionStatus.Set(-1)
129+
}
130+
}
131+
39132
func handleCloudRegister(c *gin.Context) {
40133
var req CloudRegisterRequest
41134

@@ -90,11 +183,6 @@ func handleCloudRegister(c *gin.Context) {
90183
return
91184
}
92185

93-
if config.CloudToken == "" {
94-
cloudLogger.Info("Starting websocket client due to adoption")
95-
go RunWebsocketClient()
96-
}
97-
98186
config.CloudToken = tokenResp.SecretToken
99187

100188
provider, err := oidc.NewProvider(c, "https://accounts.google.com")
@@ -130,19 +218,23 @@ func runWebsocketClient() error {
130218
time.Sleep(5 * time.Second)
131219
return fmt.Errorf("cloud token is not set")
132220
}
221+
133222
wsURL, err := url.Parse(config.CloudURL)
134223
if err != nil {
135224
return fmt.Errorf("failed to parse config.CloudURL: %w", err)
136225
}
226+
137227
if wsURL.Scheme == "http" {
138228
wsURL.Scheme = "ws"
139229
} else {
140230
wsURL.Scheme = "wss"
141231
}
232+
142233
header := http.Header{}
143234
header.Set("X-Device-ID", GetDeviceID())
144235
header.Set("Authorization", "Bearer "+config.CloudToken)
145236
dialCtx, cancelDial := context.WithTimeout(context.Background(), CloudWebSocketConnectTimeout)
237+
146238
defer cancelDial()
147239
c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{
148240
HTTPHeader: header,
@@ -152,17 +244,35 @@ func runWebsocketClient() error {
152244
}
153245
defer c.CloseNow() //nolint:errcheck
154246
cloudLogger.Infof("websocket connected to %s", wsURL)
247+
248+
// set the metrics when we successfully connect to the cloud.
249+
cloudResetMetrics(true)
250+
155251
runCtx, cancelRun := context.WithCancel(context.Background())
156252
defer cancelRun()
157253
go func() {
158254
for {
159255
time.Sleep(CloudWebSocketPingInterval)
256+
257+
// set the timer for the ping duration
258+
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
259+
metricCloudConnectionLastPingDuration.Set(v)
260+
metricCloudConnectionPingDuration.Observe(v)
261+
}))
262+
160263
err := c.Ping(runCtx)
264+
161265
if err != nil {
162266
cloudLogger.Warnf("websocket ping error: %v", err)
163267
cancelRun()
164268
return
165269
}
270+
271+
// dont use `defer` here because we want to observe the duration of the ping
272+
timer.ObserveDuration()
273+
274+
metricCloudConnectionTotalPingCount.Inc()
275+
metricCloudConnectionLastPingTimestamp.SetToCurrentTime()
166276
}
167277
}()
168278
for {
@@ -184,6 +294,8 @@ func runWebsocketClient() error {
184294
cloudLogger.Infof("new session request: %v", req.OidcGoogle)
185295
cloudLogger.Tracef("session request info: %v", req)
186296

297+
metricCloudConnectionSessionRequestCount.Inc()
298+
metricCloudConnectionLastSessionRequestTimestamp.SetToCurrentTime()
187299
err = handleSessionRequest(runCtx, c, req)
188300
if err != nil {
189301
cloudLogger.Infof("error starting new session: %v", err)
@@ -193,6 +305,12 @@ func runWebsocketClient() error {
193305
}
194306

195307
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
308+
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
309+
metricCloudConnectionLastSessionRequestDuration.Set(v)
310+
metricCloudConnectionSessionRequestDuration.Observe(v)
311+
}))
312+
defer timer.ObserveDuration()
313+
196314
oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout)
197315
defer cancelOIDC()
198316
provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com")
@@ -253,9 +371,34 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
253371

254372
func RunWebsocketClient() {
255373
for {
374+
// reset the metrics when we start the websocket client.
375+
cloudResetMetrics(false)
376+
377+
// If the cloud token is not set, we don't need to run the websocket client.
378+
if config.CloudToken == "" {
379+
time.Sleep(5 * time.Second)
380+
continue
381+
}
382+
383+
// If the network is not up, well, we can't connect to the cloud.
384+
if !networkState.Up {
385+
cloudLogger.Warn("waiting for network to be up, will retry in 3 seconds")
386+
time.Sleep(3 * time.Second)
387+
continue
388+
}
389+
390+
// If the system time is not synchronized, the API request will fail anyway because the TLS handshake will fail.
391+
if isTimeSyncNeeded() && !timeSyncSuccess {
392+
cloudLogger.Warn("system time is not synced, will retry in 3 seconds")
393+
time.Sleep(3 * time.Second)
394+
continue
395+
}
396+
256397
err := runWebsocketClient()
257398
if err != nil {
258399
cloudLogger.Errorf("websocket client error: %v", err)
400+
metricCloudConnectionStatus.Set(0)
401+
metricCloudConnectionFailureCount.Inc()
259402
time.Sleep(5 * time.Second)
260403
}
261404
}

main.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,9 @@ func Main() {
7272
if config.TLSMode != "" {
7373
go RunWebSecureServer()
7474
}
75-
// If the cloud token isn't set, the client won't be started by default.
76-
// However, if the user adopts the device via the web interface, handleCloudRegister will start the client.
77-
if config.CloudToken != "" {
78-
go RunWebsocketClient()
79-
}
75+
// As websocket client already checks if the cloud token is set, we can start it here.
76+
go RunWebsocketClient()
77+
8078
initSerialPort()
8179
sigs := make(chan os.Signal, 1)
8280
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

ntp.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"net/http"
77
"os/exec"
8+
"strconv"
89
"time"
910

1011
"github.com/beevik/ntp"
@@ -20,13 +21,41 @@ const (
2021
)
2122

2223
var (
24+
builtTimestamp string
2325
timeSyncRetryInterval = 0 * time.Second
26+
timeSyncSuccess = false
2427
defaultNTPServers = []string{
2528
"time.cloudflare.com",
2629
"time.apple.com",
2730
}
2831
)
2932

33+
func isTimeSyncNeeded() bool {
34+
if builtTimestamp == "" {
35+
logger.Warnf("Built timestamp is not set, time sync is needed")
36+
return true
37+
}
38+
39+
ts, err := strconv.Atoi(builtTimestamp)
40+
if err != nil {
41+
logger.Warnf("Failed to parse built timestamp: %v", err)
42+
return true
43+
}
44+
45+
// builtTimestamp is UNIX timestamp in seconds
46+
builtTime := time.Unix(int64(ts), 0)
47+
now := time.Now()
48+
49+
logger.Tracef("Built time: %v, now: %v", builtTime, now)
50+
51+
if now.Sub(builtTime) < 0 {
52+
logger.Warnf("System time is behind the built time, time sync is needed")
53+
return true
54+
}
55+
56+
return false
57+
}
58+
3059
func TimeSyncLoop() {
3160
for {
3261
if !networkState.checked {
@@ -40,6 +69,9 @@ func TimeSyncLoop() {
4069
continue
4170
}
4271

72+
// check if time sync is needed, but do nothing for now
73+
isTimeSyncNeeded()
74+
4375
logger.Infof("Syncing system time")
4476
start := time.Now()
4577
err := SyncSystemTime()
@@ -56,6 +88,7 @@ func TimeSyncLoop() {
5688

5789
continue
5890
}
91+
timeSyncSuccess = true
5992
logger.Infof("Time sync successful, now is: %v, time taken: %v", time.Now(), time.Since(start))
6093
time.Sleep(timeSyncInterval) // after the first sync is done
6194
}

0 commit comments

Comments
 (0)