From 83f3eef4802bf100dc4a2ed6bc778a927d6088ac Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Wed, 9 Jun 2021 14:15:43 +0300 Subject: [PATCH] Support eventing metrics (#688) * support eventing metrics * lint * imports * update with latest deps --- .../dispatcher/consumer_message_handler.go | 26 +++++++++++++++++-- .../consolidated/dispatcher/dispatcher.go | 10 +++++-- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/consumer_message_handler.go b/pkg/channel/consolidated/dispatcher/consumer_message_handler.go index 5b01e6b916..80aef9d536 100644 --- a/pkg/channel/consolidated/dispatcher/consumer_message_handler.go +++ b/pkg/channel/consolidated/dispatcher/consumer_message_handler.go @@ -23,10 +23,13 @@ import ( "github.com/Shopify/sarama" protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/buffering" "go.uber.org/zap" "knative.dev/eventing-kafka/pkg/common/consumer" "knative.dev/eventing-kafka/pkg/common/tracing" eventingchannels "knative.dev/eventing/pkg/channel" + fanout "knative.dev/eventing/pkg/channel/fanout" + "knative.dev/eventing/pkg/kncloudevents" ) type consumerMessageHandler struct { @@ -35,6 +38,8 @@ type consumerMessageHandler struct { dispatcher *eventingchannels.MessageDispatcherImpl kafkaSubscription *KafkaSubscription consumerGroup string + reporter eventingchannels.StatsReporter + channelNs string } var _ consumer.KafkaConsumerHandler = (*consumerMessageHandler)(nil) @@ -69,9 +74,24 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar ctx, span := tracing.StartTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic) defer span.End() - _, err := c.dispatcher.DispatchMessageWithRetries( + te := kncloudevents.TypeExtractorTransformer("") + + bufferedMessage, err := buffering.CopyMessage(ctx, message, &te) + + if err != nil { + return false, err + } + + args := eventingchannels.ReportArgs{ + Ns: c.channelNs, + EventType: string(te), + } + + _ = message.Finish(nil) + + dispatchExecutionInfo, err := c.dispatcher.DispatchMessageWithRetries( ctx, - message, + bufferedMessage, nil, c.sub.Subscriber, c.sub.Reply, @@ -79,6 +99,8 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar c.sub.RetryConfig, ) + _ = fanout.ParseDispatchResultAndReportMetrics(fanout.NewDispatchResult(err, dispatchExecutionInfo), c.reporter, args) + // NOTE: only return `true` here if DispatchMessage actually delivered the message. return err == nil, err } diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index a3927949e1..f872619165 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -23,6 +23,7 @@ import ( "sync" "sync/atomic" + "github.com/Shopify/sarama" protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -32,14 +33,15 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + eventingchannels "knative.dev/eventing/pkg/channel" + "knative.dev/pkg/kmeta" + "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" "knative.dev/eventing-kafka/pkg/common/client" "knative.dev/eventing-kafka/pkg/common/consumer" "knative.dev/eventing-kafka/pkg/common/tracing" - eventingchannels "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/pkg/kmeta" ) const ( @@ -65,6 +67,7 @@ type KafkaDispatcher struct { receiver *eventingchannels.MessageReceiver dispatcher *eventingchannels.MessageDispatcherImpl + reporter eventingchannels.StatsReporter kafkaSyncProducer sarama.SyncProducer channelSubscriptions map[eventingchannels.ChannelReference]*KafkaSubscription @@ -124,6 +127,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat return nil, err } reporter := eventingchannels.NewStatsReporter(containerName, kmeta.ChildName(podName, uuid.New().String())) + dispatcher.reporter = reporter receiverFunc, err := eventingchannels.NewMessageReceiver( func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, transformers []binding.Transformer, _ nethttp.Header) error { kafkaProducerMessage := sarama.ProducerMessage{ @@ -265,6 +269,8 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference d.dispatcher, d.channelSubscriptions[channelRef], groupID, + d.reporter, + channelRef.Namespace, } d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))