diff --git a/pkg/channel/consolidated/dispatcher/consumer_message_handler.go b/pkg/channel/consolidated/dispatcher/consumer_message_handler.go index 5b01e6b916..80aef9d536 100644 --- a/pkg/channel/consolidated/dispatcher/consumer_message_handler.go +++ b/pkg/channel/consolidated/dispatcher/consumer_message_handler.go @@ -23,10 +23,13 @@ import ( "github.com/Shopify/sarama" protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/buffering" "go.uber.org/zap" "knative.dev/eventing-kafka/pkg/common/consumer" "knative.dev/eventing-kafka/pkg/common/tracing" eventingchannels "knative.dev/eventing/pkg/channel" + fanout "knative.dev/eventing/pkg/channel/fanout" + "knative.dev/eventing/pkg/kncloudevents" ) type consumerMessageHandler struct { @@ -35,6 +38,8 @@ type consumerMessageHandler struct { dispatcher *eventingchannels.MessageDispatcherImpl kafkaSubscription *KafkaSubscription consumerGroup string + reporter eventingchannels.StatsReporter + channelNs string } var _ consumer.KafkaConsumerHandler = (*consumerMessageHandler)(nil) @@ -69,9 +74,24 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar ctx, span := tracing.StartTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic) defer span.End() - _, err := c.dispatcher.DispatchMessageWithRetries( + te := kncloudevents.TypeExtractorTransformer("") + + bufferedMessage, err := buffering.CopyMessage(ctx, message, &te) + + if err != nil { + return false, err + } + + args := eventingchannels.ReportArgs{ + Ns: c.channelNs, + EventType: string(te), + } + + _ = message.Finish(nil) + + dispatchExecutionInfo, err := c.dispatcher.DispatchMessageWithRetries( ctx, - message, + bufferedMessage, nil, c.sub.Subscriber, c.sub.Reply, @@ -79,6 +99,8 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar c.sub.RetryConfig, ) + _ = fanout.ParseDispatchResultAndReportMetrics(fanout.NewDispatchResult(err, dispatchExecutionInfo), c.reporter, args) + // NOTE: only return `true` here if DispatchMessage actually delivered the message. return err == nil, err } diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index aa31d7b6e6..8c0053d25b 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -18,12 +18,9 @@ package dispatcher import ( "context" "fmt" - nethttp "net/http" "strings" "sync" - "knative.dev/eventing-kafka/pkg/common/config" - "github.com/Shopify/sarama" protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -32,11 +29,14 @@ import ( "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/eventing-kafka/pkg/common/config" "knative.dev/pkg/logging" eventingchannels "knative.dev/eventing/pkg/channel" "knative.dev/pkg/kmeta" + nethttp "net/http" + "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" "knative.dev/eventing-kafka/pkg/common/consumer" @@ -58,6 +58,7 @@ type KafkaDispatcherArgs struct { type KafkaDispatcher struct { receiver *eventingchannels.MessageReceiver dispatcher *eventingchannels.MessageDispatcherImpl + reporter eventingchannels.StatsReporter // Receiver data structures // map[string]eventingchannels.ChannelReference @@ -112,6 +113,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat return nil, err } reporter := eventingchannels.NewStatsReporter(containerName, kmeta.ChildName(podName, uuid.New().String())) + dispatcher.reporter = reporter receiverFunc, err := eventingchannels.NewMessageReceiver( func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, transformers []binding.Transformer, _ nethttp.Header) error { kafkaProducerMessage := sarama.ProducerMessage{ @@ -303,6 +305,8 @@ func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscri d.dispatcher, kafkaSubscription, groupID, + d.reporter, + channelRef.Namespace, } d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))