Skip to content

Commit 18633ba

Browse files
committed
chore(websocket): use MetricVec instead of Metric to store metrics
1 parent b763703 commit 18633ba

File tree

2 files changed

+90
-45
lines changed

2 files changed

+90
-45
lines changed

cloud.go

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,59 +51,67 @@ var (
5151
Help: "The timestamp when the cloud connection was established",
5252
},
5353
)
54-
metricConnectionLastPingTimestamp = promauto.NewGauge(
54+
metricConnectionLastPingTimestamp = promauto.NewGaugeVec(
5555
prometheus.GaugeOpts{
5656
Name: "jetkvm_connection_last_ping_timestamp",
5757
Help: "The timestamp when the last ping response was received",
5858
},
59+
[]string{"type", "source"},
5960
)
60-
metricConnectionLastPingDuration = promauto.NewGauge(
61+
metricConnectionLastPingDuration = promauto.NewGaugeVec(
6162
prometheus.GaugeOpts{
6263
Name: "jetkvm_connection_last_ping_duration",
6364
Help: "The duration of the last ping response",
6465
},
66+
[]string{"type", "source"},
6567
)
66-
metricConnectionPingDuration = promauto.NewHistogram(
68+
metricConnectionPingDuration = promauto.NewHistogramVec(
6769
prometheus.HistogramOpts{
6870
Name: "jetkvm_connection_ping_duration",
6971
Help: "The duration of the ping response",
7072
Buckets: []float64{
7173
0.1, 0.5, 1, 10,
7274
},
7375
},
76+
[]string{"type", "source"},
7477
)
75-
metricConnectionTotalPingCount = promauto.NewCounter(
78+
metricConnectionTotalPingCount = promauto.NewCounterVec(
7679
prometheus.CounterOpts{
7780
Name: "jetkvm_connection_total_ping_count",
7881
Help: "The total number of pings sent to the connection",
7982
},
83+
[]string{"type", "source"},
8084
)
81-
metricConnectionSessionRequestCount = promauto.NewCounter(
85+
metricConnectionSessionRequestCount = promauto.NewCounterVec(
8286
prometheus.CounterOpts{
8387
Name: "jetkvm_connection_session_total_request_count",
84-
Help: "The total number of session requests received from the",
88+
Help: "The total number of session requests received",
8589
},
90+
[]string{"type", "source"},
8691
)
87-
metricConnectionSessionRequestDuration = promauto.NewHistogram(
92+
metricConnectionSessionRequestDuration = promauto.NewHistogramVec(
8893
prometheus.HistogramOpts{
8994
Name: "jetkvm_connection_session_request_duration",
9095
Help: "The duration of session requests",
9196
Buckets: []float64{
9297
0.1, 0.5, 1, 10,
9398
},
9499
},
100+
[]string{"type", "source"},
95101
)
96-
metricConnectionLastSessionRequestTimestamp = promauto.NewGauge(
102+
metricConnectionLastSessionRequestTimestamp = promauto.NewGaugeVec(
97103
prometheus.GaugeOpts{
98104
Name: "jetkvm_connection_last_session_request_timestamp",
99105
Help: "The timestamp of the last session request",
100106
},
107+
[]string{"type", "source"},
101108
)
102-
metricConnectionLastSessionRequestDuration = promauto.NewGauge(
109+
metricConnectionLastSessionRequestDuration = promauto.NewGaugeVec(
103110
prometheus.GaugeOpts{
104111
Name: "jetkvm_connection_last_session_request_duration",
105112
Help: "The duration of the last session request",
106113
},
114+
[]string{"type", "source"},
107115
)
108116
metricCloudConnectionFailureCount = promauto.NewCounter(
109117
prometheus.CounterOpts{
@@ -113,12 +121,16 @@ var (
113121
)
114122
)
115123

116-
func cloudResetMetrics(established bool) {
117-
metricConnectionLastPingTimestamp.Set(-1)
118-
metricConnectionLastPingDuration.Set(-1)
124+
func wsResetMetrics(established bool, sourceType string, source string) {
125+
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).Set(-1)
126+
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(-1)
119127

120-
metricConnectionLastSessionRequestTimestamp.Set(-1)
121-
metricConnectionLastSessionRequestDuration.Set(-1)
128+
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).Set(-1)
129+
metricConnectionLastSessionRequestDuration.WithLabelValues(sourceType, source).Set(-1)
130+
131+
if sourceType != "cloud" {
132+
return
133+
}
122134

123135
if established {
124136
metricCloudConnectionEstablishedTimestamp.SetToCurrentTime()
@@ -246,9 +258,10 @@ func runWebsocketClient() error {
246258
cloudLogger.Infof("websocket connected to %s", wsURL)
247259

248260
// set the metrics when we successfully connect to the cloud.
249-
cloudResetMetrics(true)
261+
wsResetMetrics(true, "cloud", "")
250262

251-
return handleWebRTCSignalWsMessages(c, true)
263+
// we don't have a source for the cloud connection
264+
return handleWebRTCSignalWsMessages(c, true, "")
252265
}
253266

254267
func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
@@ -282,10 +295,17 @@ func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessi
282295
return nil
283296
}
284297

285-
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest, isCloudConnection bool) error {
298+
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest, isCloudConnection bool, source string) error {
299+
var sourceType string
300+
if isCloudConnection {
301+
sourceType = "cloud"
302+
} else {
303+
sourceType = "local"
304+
}
305+
286306
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
287-
metricConnectionLastSessionRequestDuration.Set(v)
288-
metricConnectionSessionRequestDuration.Observe(v)
307+
metricConnectionLastSessionRequestDuration.WithLabelValues(sourceType, source).Set(v)
308+
metricConnectionSessionRequestDuration.WithLabelValues(sourceType, source).Observe(v)
289309
}))
290310
defer timer.ObserveDuration()
291311

@@ -331,7 +351,7 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
331351
func RunWebsocketClient() {
332352
for {
333353
// reset the metrics when we start the websocket client.
334-
cloudResetMetrics(false)
354+
wsResetMetrics(false, "cloud", "")
335355

336356
// If the cloud token is not set, we don't need to run the websocket client.
337357
if config.CloudToken == "" {

web.go

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -138,54 +138,79 @@ func handleLocalWebRTCSignal(c *gin.Context) {
138138
return
139139
}
140140

141+
// get the source from the request
142+
source := c.ClientIP()
143+
141144
// Now use conn for websocket operations
142145
defer wsCon.Close(websocket.StatusNormalClosure, "")
143-
err = handleWebRTCSignalWsMessages(wsCon, false)
146+
err = handleWebRTCSignalWsMessages(wsCon, false, source)
144147
if err != nil {
145148
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
146149
return
147150
}
148151
}
149152

150-
func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool) error {
153+
func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool, source string) error {
151154
runCtx, cancelRun := context.WithCancel(context.Background())
152155
defer cancelRun()
153156

154157
// Add connection tracking to detect reconnections
155158
connectionID := uuid.New().String()
156159
cloudLogger.Infof("new websocket connection established with ID: %s", connectionID)
157160

161+
// connection type
162+
var sourceType string
163+
if isCloudConnection {
164+
sourceType = "cloud"
165+
} else {
166+
sourceType = "local"
167+
}
168+
169+
// probably we can use a better logging framework here
170+
logInfof := func(format string, args ...interface{}) {
171+
args = append(args, source, sourceType)
172+
websocketLogger.Infof(format+", source: %s, sourceType: %s", args...)
173+
}
174+
logWarnf := func(format string, args ...interface{}) {
175+
args = append(args, source, sourceType)
176+
websocketLogger.Warnf(format+", source: %s, sourceType: %s", args...)
177+
}
178+
logTracef := func(format string, args ...interface{}) {
179+
args = append(args, source, sourceType)
180+
websocketLogger.Tracef(format+", source: %s, sourceType: %s", args...)
181+
}
182+
158183
go func() {
159184
for {
160185
time.Sleep(WebsocketPingInterval)
161186

162187
// set the timer for the ping duration
163188
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
164-
metricConnectionLastPingDuration.Set(v)
165-
metricConnectionPingDuration.Observe(v)
189+
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(v)
190+
metricConnectionPingDuration.WithLabelValues(sourceType, source).Observe(v)
166191
}))
167192

168-
cloudLogger.Infof("pinging websocket")
193+
logInfof("pinging websocket")
169194
err := wsCon.Ping(runCtx)
170195

171196
if err != nil {
172-
cloudLogger.Warnf("websocket ping error: %v", err)
197+
logWarnf("websocket ping error: %v", err)
173198
cancelRun()
174199
return
175200
}
176201

177202
// dont use `defer` here because we want to observe the duration of the ping
178203
timer.ObserveDuration()
179204

180-
metricConnectionTotalPingCount.Inc()
181-
metricConnectionLastPingTimestamp.SetToCurrentTime()
205+
metricConnectionTotalPingCount.WithLabelValues(sourceType, source).Inc()
206+
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
182207
}
183208
}()
184209

185210
for {
186211
typ, msg, err := wsCon.Read(runCtx)
187212
if err != nil {
188-
websocketLogger.Warnf("websocket read error: %v", err)
213+
logWarnf("websocket read error: %v", err)
189214
return err
190215
}
191216
if typ != websocket.MessageText {
@@ -200,54 +225,54 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool)
200225

201226
err = json.Unmarshal(msg, &message)
202227
if err != nil {
203-
websocketLogger.Warnf("unable to parse ws message: %v", string(msg))
228+
logWarnf("unable to parse ws message: %v", err)
204229
continue
205230
}
206231

207232
if message.Type == "offer" {
208-
websocketLogger.Infof("new session request received")
233+
logInfof("new session request received")
209234
var req WebRTCSessionRequest
210235
err = json.Unmarshal(message.Data, &req)
211236
if err != nil {
212-
websocketLogger.Warnf("unable to parse session request data: %v", string(message.Data))
237+
logWarnf("unable to parse session request data: %v", err)
213238
continue
214239
}
215240

216-
websocketLogger.Infof("new session request: %v", req.OidcGoogle)
217-
websocketLogger.Tracef("session request info: %v", req)
241+
logInfof("new session request: %v", req.OidcGoogle)
242+
logTracef("session request info: %v", req)
218243

219-
metricConnectionSessionRequestCount.Inc()
220-
metricConnectionLastSessionRequestTimestamp.SetToCurrentTime()
221-
err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection)
244+
metricConnectionSessionRequestCount.WithLabelValues(sourceType, source).Inc()
245+
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
246+
err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection, source)
222247
if err != nil {
223-
websocketLogger.Infof("error starting new session: %v", err)
248+
logWarnf("error starting new session: %v", err)
224249
continue
225250
}
226251
} else if message.Type == "new-ice-candidate" {
227-
websocketLogger.Infof("The client sent us a new ICE candidate: %v", string(message.Data))
252+
logInfof("The client sent us a new ICE candidate: %v", string(message.Data))
228253
var candidate webrtc.ICECandidateInit
229254

230255
// Attempt to unmarshal as a ICECandidateInit
231256
if err := json.Unmarshal(message.Data, &candidate); err != nil {
232-
websocketLogger.Warnf("unable to parse incoming ICE candidate data: %v", string(message.Data))
257+
logWarnf("unable to parse incoming ICE candidate data: %v", string(message.Data))
233258
continue
234259
}
235260

236261
if candidate.Candidate == "" {
237-
websocketLogger.Warnf("empty incoming ICE candidate, skipping")
262+
logWarnf("empty incoming ICE candidate, skipping")
238263
continue
239264
}
240265

241-
websocketLogger.Infof("unmarshalled incoming ICE candidate: %v", candidate)
266+
logInfof("unmarshalled incoming ICE candidate: %v", candidate)
242267

243268
if currentSession == nil {
244-
websocketLogger.Infof("no current session, skipping incoming ICE candidate")
269+
logInfof("no current session, skipping incoming ICE candidate")
245270
continue
246271
}
247272

248-
websocketLogger.Infof("adding incoming ICE candidate to current session: %v", candidate)
273+
logInfof("adding incoming ICE candidate to current session: %v", candidate)
249274
if err = currentSession.peerConnection.AddICECandidate(candidate); err != nil {
250-
websocketLogger.Warnf("failed to add incoming ICE candidate to our peer connection: %v", err)
275+
logWarnf("failed to add incoming ICE candidate to our peer connection: %v", err)
251276
}
252277
}
253278
}

0 commit comments

Comments
 (0)