Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func runWebsocketClient() error {
defer cancelDial()
c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{
HTTPHeader: header,
OnPingReceived: func(ctx context.Context, payload []byte) bool {
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: cloud", payload, wsURL.Host)
return true
},
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.21.1
require (
github.com/Masterminds/semver/v3 v3.3.0
github.com/beevik/ntp v1.3.1
github.com/coder/websocket v1.8.12
github.com/coder/websocket v1.8.13
github.com/coreos/go-oidc/v3 v3.11.0
github.com/creack/pty v1.1.23
github.com/gin-gonic/gin v1.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/coreos/go-oidc/v3 v3.11.0 h1:Ia3MxdwpSw702YW0xgfmP1GVCMA9aEFWu12XUZ3/OtI=
github.com/coreos/go-oidc/v3 v3.11.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0=
github.com/creack/goselect v0.1.2 h1:2DNy14+JPjRBgPzAd1thbQp4BSIihxcBf0IXhQXDRa0=
Expand Down
53 changes: 39 additions & 14 deletions web.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kvm

import (
"bytes"
"context"
"embed"
"encoding/json"
Expand Down Expand Up @@ -173,11 +174,24 @@ func handleWebRTCSession(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"sd": sd})
}

var (
pingMessage = []byte("ping")
pongMessage = []byte("pong")
)

func handleLocalWebRTCSignal(c *gin.Context) {
cloudLogger.Infof("new websocket connection established")

// get the source from the request
source := c.ClientIP()

// Create WebSocket options with InsecureSkipVerify to bypass origin check
wsOptions := &websocket.AcceptOptions{
InsecureSkipVerify: true, // Allow connections from any origin
OnPingReceived: func(ctx context.Context, payload []byte) bool {
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: local", payload, source)
return true
},
}

wsCon, err := websocket.Accept(c.Writer, c.Request, wsOptions)
Expand All @@ -186,9 +200,6 @@ func handleLocalWebRTCSignal(c *gin.Context) {
return
}

// get the source from the request
source := c.ClientIP()

// Now use conn for websocket operations
defer wsCon.Close(websocket.StatusNormalClosure, "")

Expand All @@ -211,7 +222,6 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,

// Add connection tracking to detect reconnections
connectionID := uuid.New().String()
cloudLogger.Infof("new websocket connection established with ID: %s", connectionID)

// connection type
var sourceType string
Expand All @@ -223,18 +233,20 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,

// probably we can use a better logging framework here
logInfof := func(format string, args ...interface{}) {
args = append(args, source, sourceType)
websocketLogger.Infof(format+", source: %s, sourceType: %s", args...)
args = append(args, source, sourceType, connectionID)
websocketLogger.Infof(format+", source: %s, sourceType: %s, id: %s", args...)
}
logWarnf := func(format string, args ...interface{}) {
args = append(args, source, sourceType)
websocketLogger.Warnf(format+", source: %s, sourceType: %s", args...)
args = append(args, source, sourceType, connectionID)
websocketLogger.Warnf(format+", source: %s, sourceType: %s, id: %s", args...)
}
logTracef := func(format string, args ...interface{}) {
args = append(args, source, sourceType)
websocketLogger.Tracef(format+", source: %s, sourceType: %s", args...)
args = append(args, source, sourceType, connectionID)
websocketLogger.Tracef(format+", source: %s, sourceType: %s, id: %s", args...)
}

logInfof("new websocket connection established")

go func() {
for {
time.Sleep(WebsocketPingInterval)
Expand All @@ -245,7 +257,7 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
metricConnectionPingDuration.WithLabelValues(sourceType, source).Observe(v)
}))

logInfof("pinging websocket")
logTracef("sending ping frame")
err := wsCon.Ping(runCtx)

if err != nil {
Expand All @@ -255,10 +267,12 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
}

// dont use `defer` here because we want to observe the duration of the ping
timer.ObserveDuration()
duration := timer.ObserveDuration()

metricConnectionTotalPingCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()

logTracef("received pong frame, duration: %v", duration)
}
}()

Expand Down Expand Up @@ -296,6 +310,16 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
Data json.RawMessage `json:"data"`
}

if bytes.Equal(msg, pingMessage) {
logInfof("ping message received: %s", string(msg))
err = wsCon.Write(context.Background(), websocket.MessageText, pongMessage)
if err != nil {
logWarnf("unable to write pong message: %v", err)
Copy link

Copilot AI Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error during pong message write may abruptly terminate the connection if encountered. Consider handling the error more gracefully (e.g., logging the error and possibly attempting a retry or closing the connection in a controlled manner).

Suggested change
logWarnf("unable to write pong message: %v", err)
logWarnf("unable to write pong message: %v", err)
closeErr := wsCon.Close(websocket.StatusInternalError, "error writing pong message")
if closeErr != nil {
logWarnf("error closing websocket connection: %v", closeErr)
}

Copilot uses AI. Check for mistakes.
return err
}
continue
}

err = json.Unmarshal(msg, &message)
if err != nil {
logWarnf("unable to parse ws message: %v", err)
Expand All @@ -311,8 +335,9 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
continue
}

logInfof("new session request: %v", req.OidcGoogle)
logTracef("session request info: %v", req)
if req.OidcGoogle != "" {
logInfof("new session request with OIDC Google: %v", req.OidcGoogle)
}

metricConnectionSessionRequestCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
Expand Down