Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Support eventing metrics (#688)
Browse files Browse the repository at this point in the history
* support eventing metrics

* lint

* imports

* update with latest deps
  • Loading branch information
skonto authored Jun 9, 2021
1 parent afd4ed3 commit 3a1ef91
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
26 changes: 24 additions & 2 deletions pkg/channel/consolidated/dispatcher/consumer_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/buffering"
"go.uber.org/zap"
"knative.dev/eventing-kafka/pkg/common/consumer"
"knative.dev/eventing-kafka/pkg/common/tracing"
eventingchannels "knative.dev/eventing/pkg/channel"
fanout "knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
)

type consumerMessageHandler struct {
Expand All @@ -35,6 +38,8 @@ type consumerMessageHandler struct {
dispatcher *eventingchannels.MessageDispatcherImpl
kafkaSubscription *KafkaSubscription
consumerGroup string
reporter eventingchannels.StatsReporter
channelNs string
}

var _ consumer.KafkaConsumerHandler = (*consumerMessageHandler)(nil)
Expand Down Expand Up @@ -69,16 +74,33 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar
ctx, span := tracing.StartTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic)
defer span.End()

_, err := c.dispatcher.DispatchMessageWithRetries(
te := kncloudevents.TypeExtractorTransformer("")

bufferedMessage, err := buffering.CopyMessage(ctx, message, &te)

if err != nil {
return false, err
}

args := eventingchannels.ReportArgs{
Ns: c.channelNs,
EventType: string(te),
}

_ = message.Finish(nil)

dispatchExecutionInfo, err := c.dispatcher.DispatchMessageWithRetries(
ctx,
message,
bufferedMessage,
nil,
c.sub.Subscriber,
c.sub.Reply,
c.sub.DeadLetter,
c.sub.RetryConfig,
)

_ = fanout.ParseDispatchResultAndReportMetrics(fanout.NewDispatchResult(err, dispatchExecutionInfo), c.reporter, args)

// NOTE: only return `true` here if DispatchMessage actually delivered the message.
return err == nil, err
}
10 changes: 7 additions & 3 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ package dispatcher
import (
"context"
"fmt"
nethttp "net/http"
"strings"
"sync"

"knative.dev/eventing-kafka/pkg/common/config"

"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -32,11 +29,14 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/pkg/logging"

eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"

nethttp "net/http"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/consumer"
Expand All @@ -58,6 +58,7 @@ type KafkaDispatcherArgs struct {
type KafkaDispatcher struct {
receiver *eventingchannels.MessageReceiver
dispatcher *eventingchannels.MessageDispatcherImpl
reporter eventingchannels.StatsReporter

// Receiver data structures
// map[string]eventingchannels.ChannelReference
Expand Down Expand Up @@ -112,6 +113,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
return nil, err
}
reporter := eventingchannels.NewStatsReporter(containerName, kmeta.ChildName(podName, uuid.New().String()))
dispatcher.reporter = reporter
receiverFunc, err := eventingchannels.NewMessageReceiver(
func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, transformers []binding.Transformer, _ nethttp.Header) error {
kafkaProducerMessage := sarama.ProducerMessage{
Expand Down Expand Up @@ -303,6 +305,8 @@ func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscri
d.dispatcher,
kafkaSubscription,
groupID,
d.reporter,
channelRef.Namespace,
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
Expand Down

0 comments on commit 3a1ef91

Please sign in to comment.