From 392ff11e0a13a1f81bc1a98c32a18652523a2c3b Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Aug 2024 10:12:35 +0200 Subject: [PATCH 1/5] refactor: use int64 histograms They are working better with datadog. --- openmeter/entitlement/balanceworker/recalculate.go | 10 +++++----- openmeter/sink/flushhandler/handler.go | 2 +- openmeter/sink/flushhandler/meters.go | 4 ++-- openmeter/watermill/router/metrics.go | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/openmeter/entitlement/balanceworker/recalculate.go b/openmeter/entitlement/balanceworker/recalculate.go index 34704f221..076b38c93 100644 --- a/openmeter/entitlement/balanceworker/recalculate.go +++ b/openmeter/entitlement/balanceworker/recalculate.go @@ -29,7 +29,7 @@ const ( defaultLRUCacheSize = 10_000 - metricNameRecalculationTime = "balance_worker_entitlement_recalculation_time" + metricNameRecalculationTime = "balance_worker_entitlement_recalculation_time_ms" ) var ( @@ -70,7 +70,7 @@ type Recalculator struct { featureCache *lru.Cache[string, productcatalog.Feature] subjectCache *lru.Cache[string, models.Subject] - metricRecalculationTime metric.Float64Histogram + metricRecalculationTime metric.Int64Histogram } func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) { @@ -88,7 +88,7 @@ func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) { return nil, fmt.Errorf("failed to create subject ID cache: %w", err) } - metricRecalculationTime, err := opts.MetricMeter.Float64Histogram( + metricRecalculationTime, err := opts.MetricMeter.Int64Histogram( metricNameRecalculationTime, metric.WithDescription("Entitlement recalculation time"), metric.WithExplicitBucketBoundaries(metricRecalculationBuckets...), @@ -135,7 +135,7 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e } r.metricRecalculationTime.Record(ctx, - time.Since(start).Seconds(), + time.Since(start).Milliseconds(), metric.WithAttributes(recalculationTimeDeleteAttribute)) } else { err := r.sendEntitlementUpdatedEvent(ctx, ent) @@ -144,7 +144,7 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e } r.metricRecalculationTime.Record(ctx, - time.Since(start).Seconds(), + time.Since(start).Milliseconds(), metric.WithAttributes(recalculationTimeUpdateAttribute)) } } diff --git a/openmeter/sink/flushhandler/handler.go b/openmeter/sink/flushhandler/handler.go index 8a64bf26c..35626ff16 100644 --- a/openmeter/sink/flushhandler/handler.go +++ b/openmeter/sink/flushhandler/handler.go @@ -146,7 +146,7 @@ func (f *flushEventHandler) invokeCallback(ctx context.Context, events []models. return err } - f.metrics.eventProcessingTime.Record(ctx, time.Since(startTime).Seconds()) + f.metrics.eventProcessingTime.Record(ctx, time.Since(startTime).Milliseconds()) f.metrics.eventsProcessed.Add(ctx, 1) return nil diff --git a/openmeter/sink/flushhandler/meters.go b/openmeter/sink/flushhandler/meters.go index d79d873c4..f16f8b13e 100644 --- a/openmeter/sink/flushhandler/meters.go +++ b/openmeter/sink/flushhandler/meters.go @@ -10,7 +10,7 @@ type metrics struct { eventsReceived metric.Int64Counter eventsProcessed metric.Int64Counter eventsFailed metric.Int64Counter - eventProcessingTime metric.Float64Histogram + eventProcessingTime metric.Int64Histogram eventChannelFull metric.Int64Counter } @@ -35,7 +35,7 @@ func newMetrics(handlerName string, meter metric.Meter) (*metrics, error) { return nil, err } - if r.eventProcessingTime, err = meter.Float64Histogram(fmt.Sprintf("sink.flush_handler.%s.event_processing_time", handlerName)); err != nil { + if r.eventProcessingTime, err = meter.Int64Histogram(fmt.Sprintf("sink.flush_handler.%s.event_processing_time_ms", handlerName)); err != nil { return nil, err } diff --git a/openmeter/watermill/router/metrics.go b/openmeter/watermill/router/metrics.go index f9ec68112..2c9ff45a2 100644 --- a/openmeter/watermill/router/metrics.go +++ b/openmeter/watermill/router/metrics.go @@ -15,13 +15,13 @@ import ( const ( unkonwnEventType = "UNKNOWN" - messageHandlerProcessingTimeMetricName = "message_handler_processing_time_seconds" + messageHandlerProcessingTimeMetricName = "message_handler_processing_time_ms" messageHandlerSuccessCountMetricName = "message_handler_success_count" messageHandlerErrorCountMetricName = "message_handler_error_count" ) func HandlerMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) (func(message.HandlerFunc) message.HandlerFunc, error) { - messageProcessingTime, err := metricMeter.Float64Histogram( + messageProcessingTime, err := metricMeter.Int64Histogram( fmt.Sprintf("%s.%s", prefix, messageHandlerProcessingTimeMetricName), metric.WithDescription("Time spent by the handler processing a message"), ) @@ -62,7 +62,7 @@ func HandlerMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) ( return resMsg, err } - messageProcessingTime.Record(msg.Context(), time.Since(start).Seconds(), metric.WithAttributeSet( + messageProcessingTime.Record(msg.Context(), time.Since(start).Milliseconds(), metric.WithAttributeSet( attrSet, )) messageProcessed.Add(msg.Context(), 1, metric.WithAttributeSet( From ac8792d40f0b772532c6f4d26cfcd9d7b630fd84 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Aug 2024 10:34:13 +0200 Subject: [PATCH 2/5] refactor: add group handler metrics So that we can report the skipped/ignored messages too. --- openmeter/entitlement/balanceworker/worker.go | 10 ++- openmeter/notification/consumer/consumer.go | 23 +++--- .../watermill/grouphandler/grouphandler.go | 75 ++++++++++++++++++- 3 files changed, 95 insertions(+), 13 deletions(-) diff --git a/openmeter/entitlement/balanceworker/worker.go b/openmeter/entitlement/balanceworker/worker.go index d51167275..3f608c938 100644 --- a/openmeter/entitlement/balanceworker/worker.go +++ b/openmeter/entitlement/balanceworker/worker.go @@ -8,6 +8,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" lru "github.com/hashicorp/golang-lru/v2" + "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/openmeter/credit/grant" "github.com/openmeterio/openmeter/openmeter/entitlement" @@ -85,7 +86,11 @@ func New(opts WorkerOptions) (*Worker, error) { worker.router = router - eventHandler := worker.eventHandler() + eventHandler, err := worker.eventHandler(opts.Router.MetricMeter) + if err != nil { + return nil, err + } + router.AddNoPublisherHandler( "balance_worker_system_events", opts.SystemEventsTopic, @@ -105,9 +110,10 @@ func New(opts WorkerOptions) (*Worker, error) { return worker, nil } -func (w *Worker) eventHandler() message.NoPublishHandlerFunc { +func (w *Worker) eventHandler(metricMeter metric.Meter) (message.NoPublishHandlerFunc, error) { return grouphandler.NewNoPublishingHandler( w.opts.EventBus.Marshaler(), + metricMeter, // Entitlement created event grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementCreatedEvent) error { diff --git a/openmeter/notification/consumer/consumer.go b/openmeter/notification/consumer/consumer.go index 892cc9e22..0200651fa 100644 --- a/openmeter/notification/consumer/consumer.go +++ b/openmeter/notification/consumer/consumer.go @@ -50,19 +50,24 @@ func New(opts Options) (*Consumer, error) { balanceThresholdHandler: balanceThresholdEventHandler, } + handler, err := grouphandler.NewNoPublishingHandler(opts.Marshaler, opts.Router.MetricMeter, + grouphandler.NewGroupEventHandler(func(ctx context.Context, event *snapshot.SnapshotEvent) error { + if event == nil { + return nil + } + + return consumer.balanceThresholdHandler.Handle(ctx, *event) + }), + ) + if err != nil { + return nil, err + } + _ = router.AddNoPublisherHandler( "balance_consumer_system_events", opts.SystemEventsTopic, opts.Router.Subscriber, - grouphandler.NewNoPublishingHandler(opts.Marshaler, - grouphandler.NewGroupEventHandler(func(ctx context.Context, event *snapshot.SnapshotEvent) error { - if event == nil { - return nil - } - - return consumer.balanceThresholdHandler.Handle(ctx, *event) - }), - ), + handler, ) return consumer, nil diff --git a/openmeter/watermill/grouphandler/grouphandler.go b/openmeter/watermill/grouphandler/grouphandler.go index 3f484220e..77b9c1b78 100644 --- a/openmeter/watermill/grouphandler/grouphandler.go +++ b/openmeter/watermill/grouphandler/grouphandler.go @@ -2,9 +2,23 @@ package grouphandler import ( "context" + "time" "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + meterNameHandlerMessageCount = "grouphandler_message_count" + meterNameHandlerProcessingTime = "grouphandler_processing_time_ms" +) + +var ( + meterAttributeStatusIgnored = attribute.String("status", "ignored") + meterAttributeStatusFailed = attribute.String("status", "failed") + meterAttributeStatusSuccess = attribute.String("status", "success") ) type GroupEventHandler = cqrs.GroupEventHandler @@ -14,7 +28,12 @@ func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) } // NewNoPublishingHandler creates a NoPublishHandlerFunc that will handle events with the provided GroupEventHandlers. -func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers ...GroupEventHandler) message.NoPublishHandlerFunc { +func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, metricMeter metric.Meter, groupHandlers ...GroupEventHandler) (message.NoPublishHandlerFunc, error) { + meters, err := getMeters(metricMeter) + if err != nil { + return nil, err + } + typeHandlerMap := make(map[string]cqrs.GroupEventHandler) for _, groupHandler := range groupHandlers { event := groupHandler.NewEvent() @@ -24,8 +43,14 @@ func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers return func(msg *message.Message) error { eventName := marshaler.NameFromMessage(msg) + meterAttributeCEType := attribute.String("ce_type", eventName) + groupHandler, ok := typeHandlerMap[eventName] if !ok { + meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusIgnored, + )) return nil } @@ -35,6 +60,52 @@ func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers return err } - return groupHandler.Handle(msg.Context(), event) + startedAt := time.Now() + err := groupHandler.Handle(msg.Context(), event) + if err != nil { + meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusFailed, + )) + meters.handlerProcessingTime.Record(msg.Context(), time.Since(startedAt).Milliseconds(), metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusFailed, + )) + + return err + } + + meters.handlerProcessingTime.Record(msg.Context(), time.Since(startedAt).Milliseconds(), metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusSuccess, + )) + meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusSuccess, + )) + + return nil + }, nil +} + +type meters struct { + handlerMessageCount metric.Int64Counter + handlerProcessingTime metric.Int64Histogram +} + +func getMeters(meter metric.Meter) (*meters, error) { + handlerMessageCount, err := meter.Int64Counter(meterNameHandlerMessageCount) + if err != nil { + return nil, err + } + + handlerProcessingTime, err := meter.Int64Histogram(meterNameHandlerProcessingTime) + if err != nil { + return nil, err } + + return &meters{ + handlerMessageCount: handlerMessageCount, + handlerProcessingTime: handlerProcessingTime, + }, nil } From 85473a9ff7bb859c1c29f14788e87a00cc6894c8 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Aug 2024 10:44:02 +0200 Subject: [PATCH 3/5] refactor: unify router metrics --- openmeter/watermill/router/metrics.go | 89 ++++++++++++++++----------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/openmeter/watermill/router/metrics.go b/openmeter/watermill/router/metrics.go index 2c9ff45a2..5aa0251f6 100644 --- a/openmeter/watermill/router/metrics.go +++ b/openmeter/watermill/router/metrics.go @@ -16,12 +16,16 @@ const ( unkonwnEventType = "UNKNOWN" messageHandlerProcessingTimeMetricName = "message_handler_processing_time_ms" - messageHandlerSuccessCountMetricName = "message_handler_success_count" - messageHandlerErrorCountMetricName = "message_handler_error_count" + messageHandlerMessageCountMetricName = "message_handler_message_count" +) + +var ( + meterAttributeStatusFailed = attribute.String("status", "failed") + meterAttributeStatusSuccess = attribute.String("status", "success") ) func HandlerMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) (func(message.HandlerFunc) message.HandlerFunc, error) { - messageProcessingTime, err := metricMeter.Int64Histogram( + meterMessageProcessingTime, err := metricMeter.Int64Histogram( fmt.Sprintf("%s.%s", prefix, messageHandlerProcessingTimeMetricName), metric.WithDescription("Time spent by the handler processing a message"), ) @@ -29,44 +33,44 @@ func HandlerMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) ( return nil, err } - messageProcessed, err := metricMeter.Int64Counter( - fmt.Sprintf("%s.%s", prefix, messageHandlerSuccessCountMetricName), + meterMessageCount, err := metricMeter.Int64Counter( + fmt.Sprintf("%s.%s", prefix, messageHandlerMessageCountMetricName), metric.WithDescription("Number of messages processed by the handler"), ) if err != nil { return nil, err } - messageProcessingError, err := metricMeter.Int64Counter( - fmt.Sprintf("%s.%s", prefix, messageHandlerErrorCountMetricName), - metric.WithDescription("Number of messages that failed to process by the handler"), - ) - if err != nil { - return nil, err - } - return func(h message.HandlerFunc) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { start := time.Now() - attrSet := metricAttributesFromMessage(msg) + meterAttributeType := metricAttributeTypeFromMessage(msg) resMsg, err := h(msg) if err != nil { // This should be warning, as it might happen that the kafka message is produced later than the // database commit happens. log.Warn("Message handler failed, will retry later", "error", err, "message_metadata", msg.Metadata, "message_payload", string(msg.Payload)) - messageProcessingError.Add(msg.Context(), 1, metric.WithAttributeSet( - attrSet, + meterMessageCount.Add(msg.Context(), 1, metric.WithAttributes( + meterAttributeType, + meterAttributeStatusFailed, + )) + + meterMessageProcessingTime.Record(msg.Context(), time.Since(start).Milliseconds(), metric.WithAttributes( + meterAttributeType, + meterAttributeStatusFailed, )) return resMsg, err } - messageProcessingTime.Record(msg.Context(), time.Since(start).Milliseconds(), metric.WithAttributeSet( - attrSet, + meterMessageProcessingTime.Record(msg.Context(), time.Since(start).Milliseconds(), metric.WithAttributes( + meterAttributeType, + meterAttributeStatusSuccess, )) - messageProcessed.Add(msg.Context(), 1, metric.WithAttributeSet( - attrSet, + meterMessageCount.Add(msg.Context(), 1, metric.WithAttributes( + meterAttributeType, + meterAttributeStatusSuccess, )) return resMsg, nil } @@ -74,22 +78,22 @@ func HandlerMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) ( } const ( - messageProcessingErrorCountMetricName = "message_processing_error_count" - messageProcessingSuccessCountMetricName = "message_processing_success_count" + messageProcessingCountMetricName = "message_processing_count" + messageProcessingTimeMetricName = "message_processing_time_ms" ) func DLQMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) (func(message.HandlerFunc) message.HandlerFunc, error) { - messageProcessingErrorCount, err := metricMeter.Int64Counter( - fmt.Sprintf("%s.%s", prefix, messageProcessingErrorCountMetricName), - metric.WithDescription("Number of messages that failed to process"), + meterMessageProcessingCount, err := metricMeter.Int64Counter( + fmt.Sprintf("%s.%s", prefix, messageProcessingCountMetricName), + metric.WithDescription("Number of messages processed"), ) if err != nil { return nil, err } - messageProcessingSuccessCount, err := metricMeter.Int64Counter( - fmt.Sprintf("%s.%s", prefix, messageProcessingSuccessCountMetricName), - metric.WithDescription("Number of messages that were successfully processed"), + meterMessageProcessingTime, err := metricMeter.Int64Histogram( + fmt.Sprintf("%s.%s", prefix, messageProcessingTimeMetricName), + metric.WithDescription("Time spent processing a message (including retries)"), ) if err != nil { return nil, err @@ -97,19 +101,33 @@ func DLQMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) (func return func(h message.HandlerFunc) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { - attrSet := metricAttributesFromMessage(msg) + start := time.Now() + + meterAttributeCEType := metricAttributeTypeFromMessage(msg) resMsg, err := h(msg) if err != nil { log.Error("Failed to process message, message is going to DLQ", "error", err, "message_metadata", msg.Metadata, "message_payload", string(msg.Payload)) - messageProcessingErrorCount.Add(msg.Context(), 1, metric.WithAttributeSet( - attrSet, + + meterMessageProcessingCount.Add(msg.Context(), 1, metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusFailed, )) + meterMessageProcessingTime.Record(msg.Context(), time.Since(start).Milliseconds(), metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusFailed, + )) + return resMsg, err } - messageProcessingSuccessCount.Add(msg.Context(), 1, metric.WithAttributeSet( - attrSet, + meterMessageProcessingCount.Add(msg.Context(), 1, metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusSuccess, + )) + meterMessageProcessingTime.Record(msg.Context(), time.Since(start).Milliseconds(), metric.WithAttributes( + meterAttributeCEType, + meterAttributeStatusSuccess, )) return resMsg, nil @@ -117,12 +135,11 @@ func DLQMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) (func }, nil } -func metricAttributesFromMessage(msg *message.Message) attribute.Set { +func metricAttributeTypeFromMessage(msg *message.Message) attribute.KeyValue { ce_type := msg.Metadata.Get(marshaler.CloudEventsHeaderType) if ce_type == "" { ce_type = unkonwnEventType } - attrSet := attribute.NewSet(attribute.String("ce_type", ce_type)) - return attrSet + return attribute.String("ce_type", ce_type) } From 929ce55b956c1c255cdb612f71dc40a85a843012 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Aug 2024 10:53:36 +0200 Subject: [PATCH 4/5] refactor: rename sarama metrics Previously it was sarama.publisher.publisher / sarama.publisher.subscriber now the redundant publisher part is dropped. --- openmeter/watermill/driver/kafka/broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmeter/watermill/driver/kafka/broker.go b/openmeter/watermill/driver/kafka/broker.go index 7df01e7c1..1f445e7a0 100644 --- a/openmeter/watermill/driver/kafka/broker.go +++ b/openmeter/watermill/driver/kafka/broker.go @@ -15,7 +15,7 @@ import ( ) const ( - defaultMeterPrefix = "sarama.publisher." + defaultMeterPrefix = "sarama." defaultKeepalive = time.Minute ) From 50562c387dba95189d41230cc2ebd69b733575ce Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Aug 2024 11:03:38 +0200 Subject: [PATCH 5/5] refactor: rename metrics to use namespaces --- openmeter/entitlement/balanceworker/recalculate.go | 2 +- openmeter/watermill/grouphandler/grouphandler.go | 4 ++-- openmeter/watermill/router/metrics.go | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/openmeter/entitlement/balanceworker/recalculate.go b/openmeter/entitlement/balanceworker/recalculate.go index 076b38c93..8db3092e9 100644 --- a/openmeter/entitlement/balanceworker/recalculate.go +++ b/openmeter/entitlement/balanceworker/recalculate.go @@ -29,7 +29,7 @@ const ( defaultLRUCacheSize = 10_000 - metricNameRecalculationTime = "balance_worker_entitlement_recalculation_time_ms" + metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms" ) var ( diff --git a/openmeter/watermill/grouphandler/grouphandler.go b/openmeter/watermill/grouphandler/grouphandler.go index 77b9c1b78..ea2bb538d 100644 --- a/openmeter/watermill/grouphandler/grouphandler.go +++ b/openmeter/watermill/grouphandler/grouphandler.go @@ -11,8 +11,8 @@ import ( ) const ( - meterNameHandlerMessageCount = "grouphandler_message_count" - meterNameHandlerProcessingTime = "grouphandler_processing_time_ms" + meterNameHandlerMessageCount = "watermill.grouphandler.message_count" + meterNameHandlerProcessingTime = "watermill.grouphandler.processing_time_ms" ) var ( diff --git a/openmeter/watermill/router/metrics.go b/openmeter/watermill/router/metrics.go index 5aa0251f6..a4b8ce830 100644 --- a/openmeter/watermill/router/metrics.go +++ b/openmeter/watermill/router/metrics.go @@ -15,8 +15,8 @@ import ( const ( unkonwnEventType = "UNKNOWN" - messageHandlerProcessingTimeMetricName = "message_handler_processing_time_ms" - messageHandlerMessageCountMetricName = "message_handler_message_count" + messageHandlerProcessingTimeMetricName = "watermill.router.message_handler.processing_time_ms" + messageHandlerMessageCountMetricName = "watermill.router.message_handler.message_count" ) var ( @@ -78,8 +78,8 @@ func HandlerMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) ( } const ( - messageProcessingCountMetricName = "message_processing_count" - messageProcessingTimeMetricName = "message_processing_time_ms" + messageProcessingCountMetricName = "watermill.router.message_processing_count" + messageProcessingTimeMetricName = "watermill.router.message_processing_time_ms" ) func DLQMetrics(metricMeter metric.Meter, prefix string, log *slog.Logger) (func(message.HandlerFunc) message.HandlerFunc, error) {