Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Support eventing metrics #688

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions pkg/channel/consolidated/dispatcher/consumer_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/buffering"
"go.uber.org/zap"
"knative.dev/eventing-kafka/pkg/common/consumer"
"knative.dev/eventing-kafka/pkg/common/tracing"
eventingchannels "knative.dev/eventing/pkg/channel"
fanout "knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
)

type consumerMessageHandler struct {
Expand All @@ -35,6 +38,8 @@ type consumerMessageHandler struct {
dispatcher *eventingchannels.MessageDispatcherImpl
kafkaSubscription *KafkaSubscription
consumerGroup string
reporter eventingchannels.StatsReporter
channelNs string
}

var _ consumer.KafkaConsumerHandler = (*consumerMessageHandler)(nil)
Expand Down Expand Up @@ -69,16 +74,33 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar
ctx, span := tracing.StartTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic)
defer span.End()

_, err := c.dispatcher.DispatchMessageWithRetries(
te := kncloudevents.TypeExtractorTransformer("")

bufferedMessage, err := buffering.CopyMessage(ctx, message, &te)

if err != nil {
return false, err
}

args := eventingchannels.ReportArgs{
Ns: c.channelNs,
EventType: string(te),
}

_ = message.Finish(nil)

dispatchExecutionInfo, err := c.dispatcher.DispatchMessageWithRetries(
ctx,
message,
bufferedMessage,
nil,
c.sub.Subscriber,
c.sub.Reply,
c.sub.DeadLetter,
c.sub.RetryConfig,
)

_ = fanout.ParseDispatchResultAndReportMetrics(fanout.NewDispatchResult(err, dispatchExecutionInfo), c.reporter, args)

// NOTE: only return `true` here if DispatchMessage actually delivered the message.
return err == nil, err
}
10 changes: 7 additions & 3 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ package dispatcher
import (
"context"
"fmt"
nethttp "net/http"
"strings"
"sync"

"knative.dev/eventing-kafka/pkg/common/config"

"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -32,11 +29,14 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/pkg/logging"

eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"

nethttp "net/http"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/consumer"
Expand All @@ -58,6 +58,7 @@ type KafkaDispatcherArgs struct {
type KafkaDispatcher struct {
receiver *eventingchannels.MessageReceiver
dispatcher *eventingchannels.MessageDispatcherImpl
reporter eventingchannels.StatsReporter

// Receiver data structures
// map[string]eventingchannels.ChannelReference
Expand Down Expand Up @@ -112,6 +113,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
return nil, err
}
reporter := eventingchannels.NewStatsReporter(containerName, kmeta.ChildName(podName, uuid.New().String()))
dispatcher.reporter = reporter
receiverFunc, err := eventingchannels.NewMessageReceiver(
func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, transformers []binding.Transformer, _ nethttp.Header) error {
kafkaProducerMessage := sarama.ProducerMessage{
Expand Down Expand Up @@ -303,6 +305,8 @@ func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscri
d.dispatcher,
kafkaSubscription,
groupID,
d.reporter,
channelRef.Namespace,
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
Expand Down