Skip to content

Commit

Permalink
rename metric, add broadcast_duration_seconds histogram
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed May 26, 2024
1 parent 5ad3b3d commit 139b8ec
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 17 deletions.
25 changes: 15 additions & 10 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,19 @@ func getDeltaData(sub subInfo, key preparedKey, channel string, deltaPub *protoc
// broadcastPublication sends message to all clients subscribed on a channel.
func (h *subShard) broadcastPublication(channel string, sp StreamPosition, pub, prevPub, localPrevPub *Publication) error {
pubTime := pub.Time
// Check lag in PUB/SUB processing. We use it to notify subscribers with positioning enabled
// about insufficient state in the stream.
var maxLagExceeded bool
now := time.Now()
var timeLagMilli int64
if pubTime > 0 {
timeLagMilli = now.UnixMilli() - pubTime
if h.maxTimeLagMilli > 0 && timeLagMilli > h.maxTimeLagMilli {
maxLagExceeded = true
}
h.metrics.observePubSubDeliveryLag(timeLagMilli)
}

fullPub := pubToProto(pub)
preparedDataByKey := make(map[preparedKey]preparedData)

Expand Down Expand Up @@ -762,16 +775,6 @@ func (h *subShard) broadcastPublication(channel string, sp StreamPosition, pub,
continue
}

// Check lag in PUB/SUB processing. We use it to notify subscribers with positioning enabled
// about insufficient state in the stream.
var maxLagExceeded bool
if pubTime > 0 {
timeLagMilli := time.Now().UnixMilli() - pubTime
h.metrics.observePubSubTimeLag(timeLagMilli)
if h.maxTimeLagMilli > 0 && timeLagMilli > h.maxTimeLagMilli {
maxLagExceeded = true
}
}
_ = sub.client.writePublication(channel, fullPub, prepValue, sp, maxLagExceeded)
}
if jsonEncodeErr != nil && h.logger.enabled(LogLevelWarn) {
Expand All @@ -783,6 +786,8 @@ func (h *subShard) broadcastPublication(channel string, sp StreamPosition, pub,
"error": jsonEncodeErr.error,
}))
}

h.metrics.observeBroadcastDuration(now)
return nil
}

Expand Down
32 changes: 25 additions & 7 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ type metrics struct {
commandDurationSubRefresh prometheus.Observer
commandDurationUnknown prometheus.Observer

pubSubTimeLagHistogram prometheus.Histogram
pubSubDeliveryLagHistogram prometheus.Histogram
broadcastDurationHistogram prometheus.Histogram
}

func (m *metrics) observeCommandDuration(frameType protocol.FrameType, d time.Duration) {
Expand Down Expand Up @@ -119,11 +120,15 @@ func (m *metrics) observeCommandDuration(frameType protocol.FrameType, d time.Du
observer.Observe(d.Seconds())
}

func (m *metrics) observePubSubTimeLag(lagTimeMilli int64) {
func (m *metrics) observePubSubDeliveryLag(lagTimeMilli int64) {
if lagTimeMilli < 0 {
lagTimeMilli = -lagTimeMilli
}
m.pubSubTimeLagHistogram.Observe(float64(lagTimeMilli) / 1000)
m.pubSubDeliveryLagHistogram.Observe(float64(lagTimeMilli) / 1000)
}

func (m *metrics) observeBroadcastDuration(started time.Time) {
m.broadcastDurationHistogram.Observe(time.Since(started).Seconds())
}

func (m *metrics) setBuildInfo(version string) {
Expand Down Expand Up @@ -468,13 +473,23 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
Help: "Size in bytes of messages received from client connections over specific transport.",
}, []string{"transport", "frame_type", "channel_namespace"})

m.pubSubTimeLagHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
m.pubSubDeliveryLagHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "pub_sub_time_lag_seconds",
Help: "Pub sub time lag in seconds",
Name: "pub_sub_delivery_lag_seconds",
Help: "Pub sub delivery lag in seconds",
Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.200, 0.500, 1.000, 2.000, 5.000, 10.000},
})
m.broadcastDurationHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "broadcast_duration_seconds",
Help: "Broadcast duration in seconds",
Buckets: []float64{
0.000001, 0.000005, 0.000010, 0.000050, 0.000100, 0.000250, 0.000500, // Microsecond resolution.
0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.250, 0.500, // Millisecond resolution.
1.0, 2.5, 5.0, 10.0, // Second resolution.
}})

m.messagesReceivedCountPublication = m.messagesReceivedCount.WithLabelValues("publication")
m.messagesReceivedCountJoin = m.messagesReceivedCount.WithLabelValues("join")
Expand Down Expand Up @@ -588,7 +603,10 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
if err := registry.Register(m.surveyDurationSummary); err != nil && !errors.As(err, &alreadyRegistered) {
return nil, err
}
if err := registry.Register(m.pubSubTimeLagHistogram); err != nil && !errors.As(err, &alreadyRegistered) {
if err := registry.Register(m.pubSubDeliveryLagHistogram); err != nil && !errors.As(err, &alreadyRegistered) {
return nil, err
}
if err := registry.Register(m.broadcastDurationHistogram); err != nil && !errors.As(err, &alreadyRegistered) {
return nil, err
}
return m, nil
Expand Down

0 comments on commit 139b8ec

Please sign in to comment.