Skip to content

Commit 50d4374

Browse files
committed
chore(websocket): use MetricVec instead of Metric to store metrics
1 parent 5b80ef9 commit 50d4374

File tree

2 files changed

+89
-50
lines changed

2 files changed

+89
-50
lines changed

cloud.go

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"net/http"
99
"net/url"
10-
"sync"
1110
"time"
1211

1312
"github.com/coder/websocket/wsjson"
@@ -52,59 +51,67 @@ var (
5251
Help: "The timestamp when the cloud connection was established",
5352
},
5453
)
55-
metricConnectionLastPingTimestamp = promauto.NewGauge(
54+
metricConnectionLastPingTimestamp = promauto.NewGaugeVec(
5655
prometheus.GaugeOpts{
5756
Name: "jetkvm_connection_last_ping_timestamp",
5857
Help: "The timestamp when the last ping response was received",
5958
},
59+
[]string{"type", "source"},
6060
)
61-
metricConnectionLastPingDuration = promauto.NewGauge(
61+
metricConnectionLastPingDuration = promauto.NewGaugeVec(
6262
prometheus.GaugeOpts{
6363
Name: "jetkvm_connection_last_ping_duration",
6464
Help: "The duration of the last ping response",
6565
},
66+
[]string{"type", "source"},
6667
)
67-
metricConnectionPingDuration = promauto.NewHistogram(
68+
metricConnectionPingDuration = promauto.NewHistogramVec(
6869
prometheus.HistogramOpts{
6970
Name: "jetkvm_connection_ping_duration",
7071
Help: "The duration of the ping response",
7172
Buckets: []float64{
7273
0.1, 0.5, 1, 10,
7374
},
7475
},
76+
[]string{"type", "source"},
7577
)
76-
metricConnectionTotalPingCount = promauto.NewCounter(
78+
metricConnectionTotalPingCount = promauto.NewCounterVec(
7779
prometheus.CounterOpts{
7880
Name: "jetkvm_connection_total_ping_count",
7981
Help: "The total number of pings sent to the connection",
8082
},
83+
[]string{"type", "source"},
8184
)
82-
metricConnectionSessionRequestCount = promauto.NewCounter(
85+
metricConnectionSessionRequestCount = promauto.NewCounterVec(
8386
prometheus.CounterOpts{
8487
Name: "jetkvm_connection_session_total_request_count",
85-
Help: "The total number of session requests received from the",
88+
Help: "The total number of session requests received",
8689
},
90+
[]string{"type", "source"},
8791
)
88-
metricConnectionSessionRequestDuration = promauto.NewHistogram(
92+
metricConnectionSessionRequestDuration = promauto.NewHistogramVec(
8993
prometheus.HistogramOpts{
9094
Name: "jetkvm_connection_session_request_duration",
9195
Help: "The duration of session requests",
9296
Buckets: []float64{
9397
0.1, 0.5, 1, 10,
9498
},
9599
},
100+
[]string{"type", "source"},
96101
)
97-
metricConnectionLastSessionRequestTimestamp = promauto.NewGauge(
102+
metricConnectionLastSessionRequestTimestamp = promauto.NewGaugeVec(
98103
prometheus.GaugeOpts{
99104
Name: "jetkvm_connection_last_session_request_timestamp",
100105
Help: "The timestamp of the last session request",
101106
},
107+
[]string{"type", "source"},
102108
)
103-
metricConnectionLastSessionRequestDuration = promauto.NewGauge(
109+
metricConnectionLastSessionRequestDuration = promauto.NewGaugeVec(
104110
prometheus.GaugeOpts{
105111
Name: "jetkvm_connection_last_session_request_duration",
106112
Help: "The duration of the last session request",
107113
},
114+
[]string{"type", "source"},
108115
)
109116
metricCloudConnectionFailureCount = promauto.NewCounter(
110117
prometheus.CounterOpts{
@@ -114,17 +121,16 @@ var (
114121
)
115122
)
116123

117-
var (
118-
cloudDisconnectChan chan error
119-
cloudDisconnectLock = &sync.Mutex{}
120-
)
124+
func wsResetMetrics(established bool, sourceType string, source string) {
125+
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).Set(-1)
126+
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(-1)
121127

122-
func cloudResetMetrics(established bool) {
123-
metricConnectionLastPingTimestamp.Set(-1)
124-
metricConnectionLastPingDuration.Set(-1)
128+
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).Set(-1)
129+
metricConnectionLastSessionRequestDuration.WithLabelValues(sourceType, source).Set(-1)
125130

126-
metricConnectionLastSessionRequestTimestamp.Set(-1)
127-
metricConnectionLastSessionRequestDuration.Set(-1)
131+
if sourceType != "cloud" {
132+
return
133+
}
128134

129135
if established {
130136
metricCloudConnectionEstablishedTimestamp.SetToCurrentTime()
@@ -270,9 +276,10 @@ func runWebsocketClient() error {
270276
cloudLogger.Infof("websocket connected to %s", wsURL)
271277

272278
// set the metrics when we successfully connect to the cloud.
273-
cloudResetMetrics(true)
279+
wsResetMetrics(true, "cloud", "")
274280

275-
return handleWebRTCSignalWsMessages(c, true)
281+
// we don't have a source for the cloud connection
282+
return handleWebRTCSignalWsMessages(c, true, "")
276283
}
277284

278285
func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
@@ -306,10 +313,17 @@ func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessi
306313
return nil
307314
}
308315

309-
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest, isCloudConnection bool) error {
316+
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest, isCloudConnection bool, source string) error {
317+
var sourceType string
318+
if isCloudConnection {
319+
sourceType = "cloud"
320+
} else {
321+
sourceType = "local"
322+
}
323+
310324
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
311-
metricConnectionLastSessionRequestDuration.Set(v)
312-
metricConnectionSessionRequestDuration.Observe(v)
325+
metricConnectionLastSessionRequestDuration.WithLabelValues(sourceType, source).Set(v)
326+
metricConnectionSessionRequestDuration.WithLabelValues(sourceType, source).Observe(v)
313327
}))
314328
defer timer.ObserveDuration()
315329

@@ -355,7 +369,7 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
355369
func RunWebsocketClient() {
356370
for {
357371
// reset the metrics when we start the websocket client.
358-
cloudResetMetrics(false)
372+
wsResetMetrics(false, "cloud", "")
359373

360374
// If the cloud token is not set, we don't need to run the websocket client.
361375
if config.CloudToken == "" {

web.go

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -138,54 +138,79 @@ func handleLocalWebRTCSignal(c *gin.Context) {
138138
return
139139
}
140140

141+
// get the source from the request
142+
source := c.ClientIP()
143+
141144
// Now use conn for websocket operations
142145
defer wsCon.Close(websocket.StatusNormalClosure, "")
143-
err = handleWebRTCSignalWsMessages(wsCon, false)
146+
err = handleWebRTCSignalWsMessages(wsCon, false, source)
144147
if err != nil {
145148
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
146149
return
147150
}
148151
}
149152

150-
func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool) error {
153+
func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool, source string) error {
151154
runCtx, cancelRun := context.WithCancel(context.Background())
152155
defer cancelRun()
153156

154157
// Add connection tracking to detect reconnections
155158
connectionID := uuid.New().String()
156159
cloudLogger.Infof("new websocket connection established with ID: %s", connectionID)
157160

161+
// connection type
162+
var sourceType string
163+
if isCloudConnection {
164+
sourceType = "cloud"
165+
} else {
166+
sourceType = "local"
167+
}
168+
169+
// probably we can use a better logging framework here
170+
logInfof := func(format string, args ...interface{}) {
171+
args = append(args, source, sourceType)
172+
websocketLogger.Infof(format+", source: %s, sourceType: %s", args...)
173+
}
174+
logWarnf := func(format string, args ...interface{}) {
175+
args = append(args, source, sourceType)
176+
websocketLogger.Warnf(format+", source: %s, sourceType: %s", args...)
177+
}
178+
logTracef := func(format string, args ...interface{}) {
179+
args = append(args, source, sourceType)
180+
websocketLogger.Tracef(format+", source: %s, sourceType: %s", args...)
181+
}
182+
158183
go func() {
159184
for {
160185
time.Sleep(WebsocketPingInterval)
161186

162187
// set the timer for the ping duration
163188
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
164-
metricConnectionLastPingDuration.Set(v)
165-
metricConnectionPingDuration.Observe(v)
189+
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(v)
190+
metricConnectionPingDuration.WithLabelValues(sourceType, source).Observe(v)
166191
}))
167192

168-
cloudLogger.Infof("pinging websocket")
193+
logInfof("pinging websocket")
169194
err := wsCon.Ping(runCtx)
170195

171196
if err != nil {
172-
cloudLogger.Warnf("websocket ping error: %v", err)
197+
logWarnf("websocket ping error: %v", err)
173198
cancelRun()
174199
return
175200
}
176201

177202
// dont use `defer` here because we want to observe the duration of the ping
178203
timer.ObserveDuration()
179204

180-
metricConnectionTotalPingCount.Inc()
181-
metricConnectionLastPingTimestamp.SetToCurrentTime()
205+
metricConnectionTotalPingCount.WithLabelValues(sourceType, source).Inc()
206+
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
182207
}
183208
}()
184209

185210
for {
186211
typ, msg, err := wsCon.Read(runCtx)
187212
if err != nil {
188-
websocketLogger.Warnf("websocket read error: %v", err)
213+
logWarnf("websocket read error: %v", err)
189214
return err
190215
}
191216
if typ != websocket.MessageText {
@@ -200,54 +225,54 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool)
200225

201226
err = json.Unmarshal(msg, &message)
202227
if err != nil {
203-
websocketLogger.Warnf("unable to parse ws message: %v", string(msg))
228+
logWarnf("unable to parse ws message: %v", err)
204229
continue
205230
}
206231

207232
if message.Type == "offer" {
208-
websocketLogger.Infof("new session request received")
233+
logInfof("new session request received")
209234
var req WebRTCSessionRequest
210235
err = json.Unmarshal(message.Data, &req)
211236
if err != nil {
212-
websocketLogger.Warnf("unable to parse session request data: %v", string(message.Data))
237+
logWarnf("unable to parse session request data: %v", err)
213238
continue
214239
}
215240

216-
websocketLogger.Infof("new session request: %v", req.OidcGoogle)
217-
websocketLogger.Tracef("session request info: %v", req)
241+
logInfof("new session request: %v", req.OidcGoogle)
242+
logTracef("session request info: %v", req)
218243

219-
metricConnectionSessionRequestCount.Inc()
220-
metricConnectionLastSessionRequestTimestamp.SetToCurrentTime()
221-
err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection)
244+
metricConnectionSessionRequestCount.WithLabelValues(sourceType, source).Inc()
245+
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
246+
err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection, source)
222247
if err != nil {
223-
websocketLogger.Infof("error starting new session: %v", err)
248+
logWarnf("error starting new session: %v", err)
224249
continue
225250
}
226251
} else if message.Type == "new-ice-candidate" {
227-
websocketLogger.Infof("The client sent us a new ICE candidate: %v", string(message.Data))
252+
logInfof("The client sent us a new ICE candidate: %v", string(message.Data))
228253
var candidate webrtc.ICECandidateInit
229254

230255
// Attempt to unmarshal as a ICECandidateInit
231256
if err := json.Unmarshal(message.Data, &candidate); err != nil {
232-
websocketLogger.Warnf("unable to parse incoming ICE candidate data: %v", string(message.Data))
257+
logWarnf("unable to parse incoming ICE candidate data: %v", string(message.Data))
233258
continue
234259
}
235260

236261
if candidate.Candidate == "" {
237-
websocketLogger.Warnf("empty incoming ICE candidate, skipping")
262+
logWarnf("empty incoming ICE candidate, skipping")
238263
continue
239264
}
240265

241-
websocketLogger.Infof("unmarshalled incoming ICE candidate: %v", candidate)
266+
logInfof("unmarshalled incoming ICE candidate: %v", candidate)
242267

243268
if currentSession == nil {
244-
websocketLogger.Infof("no current session, skipping incoming ICE candidate")
269+
logInfof("no current session, skipping incoming ICE candidate")
245270
continue
246271
}
247272

248-
websocketLogger.Infof("adding incoming ICE candidate to current session: %v", candidate)
273+
logInfof("adding incoming ICE candidate to current session: %v", candidate)
249274
if err = currentSession.peerConnection.AddICECandidate(candidate); err != nil {
250-
websocketLogger.Warnf("failed to add incoming ICE candidate to our peer connection: %v", err)
275+
logWarnf("failed to add incoming ICE candidate to our peer connection: %v", err)
251276
}
252277
}
253278
}

0 commit comments

Comments
 (0)