Skip to content

Commit

Permalink
Support eventing metrics (knative-extensions#688)
Browse files Browse the repository at this point in the history
* support eventing metrics

* lint

* imports

* update with latest deps
  • Loading branch information
skonto committed Jun 10, 2021
1 parent b4d38ca commit 83f3eef
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
26 changes: 24 additions & 2 deletions pkg/channel/consolidated/dispatcher/consumer_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/buffering"
"go.uber.org/zap"
"knative.dev/eventing-kafka/pkg/common/consumer"
"knative.dev/eventing-kafka/pkg/common/tracing"
eventingchannels "knative.dev/eventing/pkg/channel"
fanout "knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
)

type consumerMessageHandler struct {
Expand All @@ -35,6 +38,8 @@ type consumerMessageHandler struct {
dispatcher *eventingchannels.MessageDispatcherImpl
kafkaSubscription *KafkaSubscription
consumerGroup string
reporter eventingchannels.StatsReporter
channelNs string
}

var _ consumer.KafkaConsumerHandler = (*consumerMessageHandler)(nil)
Expand Down Expand Up @@ -69,16 +74,33 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar
ctx, span := tracing.StartTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic)
defer span.End()

_, err := c.dispatcher.DispatchMessageWithRetries(
te := kncloudevents.TypeExtractorTransformer("")

bufferedMessage, err := buffering.CopyMessage(ctx, message, &te)

if err != nil {
return false, err
}

args := eventingchannels.ReportArgs{
Ns: c.channelNs,
EventType: string(te),
}

_ = message.Finish(nil)

dispatchExecutionInfo, err := c.dispatcher.DispatchMessageWithRetries(
ctx,
message,
bufferedMessage,
nil,
c.sub.Subscriber,
c.sub.Reply,
c.sub.DeadLetter,
c.sub.RetryConfig,
)

_ = fanout.ParseDispatchResultAndReportMetrics(fanout.NewDispatchResult(err, dispatchExecutionInfo), c.reporter, args)

// NOTE: only return `true` here if DispatchMessage actually delivered the message.
return err == nil, err
}
10 changes: 8 additions & 2 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"sync/atomic"


"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -32,14 +33,15 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/consumer"
"knative.dev/eventing-kafka/pkg/common/tracing"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/kmeta"
)

const (
Expand All @@ -65,6 +67,7 @@ type KafkaDispatcher struct {

receiver *eventingchannels.MessageReceiver
dispatcher *eventingchannels.MessageDispatcherImpl
reporter eventingchannels.StatsReporter

kafkaSyncProducer sarama.SyncProducer
channelSubscriptions map[eventingchannels.ChannelReference]*KafkaSubscription
Expand Down Expand Up @@ -124,6 +127,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
return nil, err
}
reporter := eventingchannels.NewStatsReporter(containerName, kmeta.ChildName(podName, uuid.New().String()))
dispatcher.reporter = reporter
receiverFunc, err := eventingchannels.NewMessageReceiver(
func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, transformers []binding.Transformer, _ nethttp.Header) error {
kafkaProducerMessage := sarama.ProducerMessage{
Expand Down Expand Up @@ -265,6 +269,8 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
d.dispatcher,
d.channelSubscriptions[channelRef],
groupID,
d.reporter,
channelRef.Namespace,
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
Expand Down

0 comments on commit 83f3eef

Please sign in to comment.