Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -59,6 +60,13 @@ var (
},
[]string{"type", "source"},
)
metricConnectionLastPingReceivedTimestamp = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_connection_last_ping_received_timestamp",
Help: "The timestamp when the last ping request was received",
},
[]string{"type", "source"},
)
metricConnectionLastPingDuration = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_connection_last_ping_duration",
Expand All @@ -76,16 +84,23 @@ var (
},
[]string{"type", "source"},
)
metricConnectionTotalPingCount = promauto.NewCounterVec(
metricConnectionTotalPingSentCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_connection_total_ping_count",
Name: "jetkvm_connection_total_ping_sent",
Help: "The total number of pings sent to the connection",
},
[]string{"type", "source"},
)
metricConnectionTotalPingReceivedCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_connection_total_ping_received",
Help: "The total number of pings received from the connection",
},
[]string{"type", "source"},
)
metricConnectionSessionRequestCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_connection_session_total_request_count",
Name: "jetkvm_connection_session_total_requests",
Help: "The total number of session requests received",
},
[]string{"type", "source"},
Expand Down Expand Up @@ -131,6 +146,8 @@ func wsResetMetrics(established bool, sourceType string, source string) {
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).Set(-1)
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(-1)

metricConnectionLastPingReceivedTimestamp.WithLabelValues(sourceType, source).Set(-1)

metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).Set(-1)
metricConnectionLastSessionRequestDuration.WithLabelValues(sourceType, source).Set(-1)

Expand Down Expand Up @@ -277,20 +294,29 @@ func runWebsocketClient() error {
HTTPHeader: header,
OnPingReceived: func(ctx context.Context, payload []byte) bool {
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: cloud", payload, wsURL.Host)

metricConnectionTotalPingReceivedCount.WithLabelValues("cloud", wsURL.Host).Inc()
metricConnectionLastPingReceivedTimestamp.WithLabelValues("cloud", wsURL.Host).SetToCurrentTime()

return true
},
})
// if the context is canceled, we don't want to return an error
if err != nil {
if errors.Is(err, context.Canceled) {
cloudLogger.Infof("websocket connection canceled")
return nil
}
return err
}
defer c.CloseNow() //nolint:errcheck
cloudLogger.Infof("websocket connected to %s", wsURL)

// set the metrics when we successfully connect to the cloud.
wsResetMetrics(true, "cloud", "")
wsResetMetrics(true, "cloud", wsURL.Host)

// we don't have a source for the cloud connection
return handleWebRTCSignalWsMessages(c, true, "")
return handleWebRTCSignalWsMessages(c, true, wsURL.Host)
}

func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
Expand Down Expand Up @@ -379,9 +405,6 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess

func RunWebsocketClient() {
for {
// reset the metrics when we start the websocket client.
wsResetMetrics(false, "cloud", "")

// If the cloud token is not set, we don't need to run the websocket client.
if config.CloudToken == "" {
time.Sleep(5 * time.Second)
Expand Down
20 changes: 19 additions & 1 deletion web.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"embed"
"encoding/json"
"errors"
"fmt"
"io/fs"
"net/http"
Expand Down Expand Up @@ -190,6 +191,10 @@ func handleLocalWebRTCSignal(c *gin.Context) {
InsecureSkipVerify: true, // Allow connections from any origin
OnPingReceived: func(ctx context.Context, payload []byte) bool {
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: local", payload, source)

metricConnectionTotalPingReceivedCount.WithLabelValues("local", source).Inc()
metricConnectionLastPingReceivedTimestamp.WithLabelValues("local", source).SetToCurrentTime()

return true
},
}
Expand Down Expand Up @@ -251,6 +256,15 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
for {
time.Sleep(WebsocketPingInterval)

if ctxErr := runCtx.Err(); ctxErr != nil {
if !errors.Is(ctxErr, context.Canceled) {
logWarnf("websocket connection closed: %v", ctxErr)
} else {
logTracef("websocket connection closed as the context was canceled: %v")
}
return
}

// set the timer for the ping duration
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(v)
Expand All @@ -269,7 +283,7 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
// dont use `defer` here because we want to observe the duration of the ping
duration := timer.ObserveDuration()

metricConnectionTotalPingCount.WithLabelValues(sourceType, source).Inc()
metricConnectionTotalPingSentCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()

logTracef("received pong frame, duration: %v", duration)
Expand Down Expand Up @@ -317,6 +331,10 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
logWarnf("unable to write pong message: %v", err)
return err
}

metricConnectionTotalPingReceivedCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastPingReceivedTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()

continue
}

Expand Down