@@ -35,8 +35,8 @@ const (
3535 // CloudOidcRequestTimeout is the timeout for OIDC token verification requests
3636 // should be lower than the websocket response timeout set in cloud-api
3737 CloudOidcRequestTimeout = 10 * time .Second
38- // CloudWebSocketPingInterval is the interval at which the websocket client sends ping messages to the cloud
39- CloudWebSocketPingInterval = 15 * time .Second
38+ // WebsocketPingInterval is the interval at which the websocket client sends ping messages to the cloud
39+ WebsocketPingInterval = 15 * time .Second
4040)
4141
4242var (
@@ -52,59 +52,67 @@ var (
5252 Help : "The timestamp when the cloud connection was established" ,
5353 },
5454 )
55- metricCloudConnectionLastPingTimestamp = promauto .NewGauge (
55+ metricConnectionLastPingTimestamp = promauto .NewGaugeVec (
5656 prometheus.GaugeOpts {
57- Name : "jetkvm_cloud_connection_last_ping_timestamp " ,
57+ Name : "jetkvm_connection_last_ping_timestamp " ,
5858 Help : "The timestamp when the last ping response was received" ,
5959 },
60+ []string {"type" , "source" },
6061 )
61- metricCloudConnectionLastPingDuration = promauto .NewGauge (
62+ metricConnectionLastPingDuration = promauto .NewGaugeVec (
6263 prometheus.GaugeOpts {
63- Name : "jetkvm_cloud_connection_last_ping_duration " ,
64+ Name : "jetkvm_connection_last_ping_duration " ,
6465 Help : "The duration of the last ping response" ,
6566 },
67+ []string {"type" , "source" },
6668 )
67- metricCloudConnectionPingDuration = promauto .NewHistogram (
69+ metricConnectionPingDuration = promauto .NewHistogramVec (
6870 prometheus.HistogramOpts {
69- Name : "jetkvm_cloud_connection_ping_duration " ,
71+ Name : "jetkvm_connection_ping_duration " ,
7072 Help : "The duration of the ping response" ,
7173 Buckets : []float64 {
7274 0.1 , 0.5 , 1 , 10 ,
7375 },
7476 },
77+ []string {"type" , "source" },
7578 )
76- metricCloudConnectionTotalPingCount = promauto .NewCounter (
79+ metricConnectionTotalPingCount = promauto .NewCounterVec (
7780 prometheus.CounterOpts {
78- Name : "jetkvm_cloud_connection_total_ping_count " ,
79- Help : "The total number of pings sent to the cloud " ,
81+ Name : "jetkvm_connection_total_ping_count " ,
82+ Help : "The total number of pings sent to the connection " ,
8083 },
84+ []string {"type" , "source" },
8185 )
82- metricCloudConnectionSessionRequestCount = promauto .NewCounter (
86+ metricConnectionSessionRequestCount = promauto .NewCounterVec (
8387 prometheus.CounterOpts {
84- Name : "jetkvm_cloud_connection_session_total_request_count " ,
85- Help : "The total number of session requests received from the cloud " ,
88+ Name : "jetkvm_connection_session_total_request_count " ,
89+ Help : "The total number of session requests received" ,
8690 },
91+ []string {"type" , "source" },
8792 )
88- metricCloudConnectionSessionRequestDuration = promauto .NewHistogram (
93+ metricConnectionSessionRequestDuration = promauto .NewHistogramVec (
8994 prometheus.HistogramOpts {
90- Name : "jetkvm_cloud_connection_session_request_duration " ,
95+ Name : "jetkvm_connection_session_request_duration " ,
9196 Help : "The duration of session requests" ,
9297 Buckets : []float64 {
9398 0.1 , 0.5 , 1 , 10 ,
9499 },
95100 },
101+ []string {"type" , "source" },
96102 )
97- metricCloudConnectionLastSessionRequestTimestamp = promauto .NewGauge (
103+ metricConnectionLastSessionRequestTimestamp = promauto .NewGaugeVec (
98104 prometheus.GaugeOpts {
99- Name : "jetkvm_cloud_connection_last_session_request_timestamp " ,
105+ Name : "jetkvm_connection_last_session_request_timestamp " ,
100106 Help : "The timestamp of the last session request" ,
101107 },
108+ []string {"type" , "source" },
102109 )
103- metricCloudConnectionLastSessionRequestDuration = promauto .NewGauge (
110+ metricConnectionLastSessionRequestDuration = promauto .NewGaugeVec (
104111 prometheus.GaugeOpts {
105- Name : "jetkvm_cloud_connection_last_session_request_duration " ,
112+ Name : "jetkvm_connection_last_session_request_duration " ,
106113 Help : "The duration of the last session request" ,
107114 },
115+ []string {"type" , "source" },
108116 )
109117 metricCloudConnectionFailureCount = promauto .NewCounter (
110118 prometheus.CounterOpts {
@@ -119,12 +127,16 @@ var (
119127 cloudDisconnectLock = & sync.Mutex {}
120128)
121129
122- func cloudResetMetrics (established bool ) {
123- metricCloudConnectionLastPingTimestamp .Set (- 1 )
124- metricCloudConnectionLastPingDuration .Set (- 1 )
130+ func wsResetMetrics (established bool , sourceType string , source string ) {
131+ metricConnectionLastPingTimestamp . WithLabelValues ( sourceType , source ) .Set (- 1 )
132+ metricConnectionLastPingDuration . WithLabelValues ( sourceType , source ) .Set (- 1 )
125133
126- metricCloudConnectionLastSessionRequestTimestamp .Set (- 1 )
127- metricCloudConnectionLastSessionRequestDuration .Set (- 1 )
134+ metricConnectionLastSessionRequestTimestamp .WithLabelValues (sourceType , source ).Set (- 1 )
135+ metricConnectionLastSessionRequestDuration .WithLabelValues (sourceType , source ).Set (- 1 )
136+
137+ if sourceType != "cloud" {
138+ return
139+ }
128140
129141 if established {
130142 metricCloudConnectionEstablishedTimestamp .SetToCurrentTime ()
@@ -256,6 +268,7 @@ func runWebsocketClient() error {
256268
257269 header := http.Header {}
258270 header .Set ("X-Device-ID" , GetDeviceID ())
271+ header .Set ("X-App-Version" , builtAppVersion )
259272 header .Set ("Authorization" , "Bearer " + config .CloudToken )
260273 dialCtx , cancelDial := context .WithTimeout (context .Background (), CloudWebSocketConnectTimeout )
261274
@@ -270,88 +283,13 @@ func runWebsocketClient() error {
270283 cloudLogger .Infof ("websocket connected to %s" , wsURL )
271284
272285 // set the metrics when we successfully connect to the cloud.
273- cloudResetMetrics (true )
274-
275- runCtx , cancelRun := context .WithCancel (context .Background ())
276- defer cancelRun ()
277- go func () {
278- for {
279- time .Sleep (CloudWebSocketPingInterval )
280-
281- // set the timer for the ping duration
282- timer := prometheus .NewTimer (prometheus .ObserverFunc (func (v float64 ) {
283- metricCloudConnectionLastPingDuration .Set (v )
284- metricCloudConnectionPingDuration .Observe (v )
285- }))
286-
287- err := c .Ping (runCtx )
288-
289- if err != nil {
290- cloudLogger .Warnf ("websocket ping error: %v" , err )
291- cancelRun ()
292- return
293- }
294-
295- // dont use `defer` here because we want to observe the duration of the ping
296- timer .ObserveDuration ()
297-
298- metricCloudConnectionTotalPingCount .Inc ()
299- metricCloudConnectionLastPingTimestamp .SetToCurrentTime ()
300- }
301- }()
302-
303- // create a channel to receive the disconnect event, once received, we cancelRun
304- cloudDisconnectChan = make (chan error )
305- defer func () {
306- close (cloudDisconnectChan )
307- cloudDisconnectChan = nil
308- }()
309- go func () {
310- for err := range cloudDisconnectChan {
311- if err == nil {
312- continue
313- }
314- cloudLogger .Infof ("disconnecting from cloud due to: %v" , err )
315- cancelRun ()
316- }
317- }()
318-
319- for {
320- typ , msg , err := c .Read (runCtx )
321- if err != nil {
322- return err
323- }
324- if typ != websocket .MessageText {
325- // ignore non-text messages
326- continue
327- }
328- var req WebRTCSessionRequest
329- err = json .Unmarshal (msg , & req )
330- if err != nil {
331- cloudLogger .Warnf ("unable to parse ws message: %v" , string (msg ))
332- continue
333- }
286+ wsResetMetrics (true , "cloud" , "" )
334287
335- cloudLogger .Infof ("new session request: %v" , req .OidcGoogle )
336- cloudLogger .Tracef ("session request info: %v" , req )
337-
338- metricCloudConnectionSessionRequestCount .Inc ()
339- metricCloudConnectionLastSessionRequestTimestamp .SetToCurrentTime ()
340- err = handleSessionRequest (runCtx , c , req )
341- if err != nil {
342- cloudLogger .Infof ("error starting new session: %v" , err )
343- continue
344- }
345- }
288+ // we don't have a source for the cloud connection
289+ return handleWebRTCSignalWsMessages (c , true , "" )
346290}
347291
348- func handleSessionRequest (ctx context.Context , c * websocket.Conn , req WebRTCSessionRequest ) error {
349- timer := prometheus .NewTimer (prometheus .ObserverFunc (func (v float64 ) {
350- metricCloudConnectionLastSessionRequestDuration .Set (v )
351- metricCloudConnectionSessionRequestDuration .Observe (v )
352- }))
353- defer timer .ObserveDuration ()
354-
292+ func authenticateSession (ctx context.Context , c * websocket.Conn , req WebRTCSessionRequest ) error {
355293 oidcCtx , cancelOIDC := context .WithTimeout (ctx , CloudOidcRequestTimeout )
356294 defer cancelOIDC ()
357295 provider , err := oidc .NewProvider (oidcCtx , "https://accounts.google.com" )
@@ -379,10 +317,35 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
379317 return fmt .Errorf ("google identity mismatch" )
380318 }
381319
320+ return nil
321+ }
322+
323+ func handleSessionRequest (ctx context.Context , c * websocket.Conn , req WebRTCSessionRequest , isCloudConnection bool , source string ) error {
324+ var sourceType string
325+ if isCloudConnection {
326+ sourceType = "cloud"
327+ } else {
328+ sourceType = "local"
329+ }
330+
331+ timer := prometheus .NewTimer (prometheus .ObserverFunc (func (v float64 ) {
332+ metricConnectionLastSessionRequestDuration .WithLabelValues (sourceType , source ).Set (v )
333+ metricConnectionSessionRequestDuration .WithLabelValues (sourceType , source ).Observe (v )
334+ }))
335+ defer timer .ObserveDuration ()
336+
337+ // If the message is from the cloud, we need to authenticate the session.
338+ if isCloudConnection {
339+ if err := authenticateSession (ctx , c , req ); err != nil {
340+ return err
341+ }
342+ }
343+
382344 session , err := newSession (SessionConfig {
383- ICEServers : req .ICEServers ,
345+ ws : c ,
346+ IsCloud : isCloudConnection ,
384347 LocalIP : req .IP ,
385- IsCloud : true ,
348+ ICEServers : req . ICEServers ,
386349 })
387350 if err != nil {
388351 _ = wsjson .Write (context .Background (), c , gin.H {"error" : err })
@@ -406,14 +369,14 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
406369 cloudLogger .Info ("new session accepted" )
407370 cloudLogger .Tracef ("new session accepted: %v" , session )
408371 currentSession = session
409- _ = wsjson .Write (context .Background (), c , gin.H {"sd " : sd })
372+ _ = wsjson .Write (context .Background (), c , gin.H {"type" : "answer" , "data " : sd })
410373 return nil
411374}
412375
413376func RunWebsocketClient () {
414377 for {
415378 // reset the metrics when we start the websocket client.
416- cloudResetMetrics (false )
379+ wsResetMetrics (false , "cloud" , "" )
417380
418381 // If the cloud token is not set, we don't need to run the websocket client.
419382 if config .CloudToken == "" {
0 commit comments