Skip to content

Commit

Permalink
Add instrumentation example for Kafka (#153)
Browse files Browse the repository at this point in the history
* Add instrumentation for Kafka consumer group of Sarama

* Add example for Sarama consumer and producer

* Update CHANGELOG

* Update comments

* Add exclusive go mod for example

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
XSAM and MrAlias authored Jul 30, 2020
1 parent 166cf68 commit d5b98d8
Show file tree
Hide file tree
Showing 14 changed files with 750 additions and 46 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Create a detector that collects resources from GKE machines. (#139)
- Create a detector that collects resources from GCE machines. (#132)
- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134)
- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134, #153)
- Add links and status message for mock span. (#134)


Expand Down
50 changes: 5 additions & 45 deletions instrumentation/github.com/Shopify/sarama/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,31 @@
package sarama

import (
"context"
"strconv"

"github.com/Shopify/sarama"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
)

type partitionConsumer struct {
sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
dispatcher consumerMessagesDispatcher
}

// Messages returns the read channel for the messages that are returned by
// the broker.
func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
return pc.dispatcher.Messages()
}

// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(serviceName, opts...)

dispatcher := newConsumerMessagesDispatcherWrapper(pc, cfg)
go dispatcher.Run()
wrapped := &partitionConsumer{
PartitionConsumer: pc,
messages: make(chan *sarama.ConsumerMessage),
dispatcher: dispatcher,
}
go func() {
msgs := pc.Messages()

for msg := range msgs {
// Extract a span context from message to link.
carrier := NewConsumerMessageCarrier(msg)
parentSpanContext := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier)

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String(strconv.FormatInt(msg.Offset, 10)),
kafkaPartitionKey.Int32(msg.Partition),
}
opts := []trace.StartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
}
newCtx, span := cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...)

// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(newCtx, cfg.Propagators, carrier)

// Send messages back to user.
wrapped.messages <- msg

span.End()
}
close(wrapped.messages)
}()
return wrapped
}

Expand Down
59 changes: 59 additions & 0 deletions instrumentation/github.com/Shopify/sarama/consumer_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

import (
"github.com/Shopify/sarama"
)

type consumerGroupHandler struct {
sarama.ConsumerGroupHandler

cfg config
}

// ConsumeClaim wraps the session and claim to add instruments for messages.
// It implements parts of `ConsumerGroupHandler`.
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// Wrap claim
dispatcher := newConsumerMessagesDispatcherWrapper(claim, h.cfg)
go dispatcher.Run()
claim = &consumerGroupClaim{
ConsumerGroupClaim: claim,
dispatcher: dispatcher,
}

return h.ConsumerGroupHandler.ConsumeClaim(session, claim)
}

// WrapConsumerGroupHandler wraps a sarama.ConsumerGroupHandler causing each received
// message to be traced.
func WrapConsumerGroupHandler(serviceName string, handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler {
cfg := newConfig(serviceName, opts...)

return &consumerGroupHandler{
ConsumerGroupHandler: handler,
cfg: cfg,
}
}

type consumerGroupClaim struct {
sarama.ConsumerGroupClaim
dispatcher consumerMessagesDispatcher
}

func (c *consumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
return c.dispatcher.Messages()
}
20 changes: 20 additions & 0 deletions instrumentation/github.com/Shopify/sarama/consumer_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

// TODO: add test for consumer group
// Currently, sarama does not have a mock consumer group, so it's hard to
// write a unit test.
// Related PR: https://github.com/Shopify/sarama/pull/1750
87 changes: 87 additions & 0 deletions instrumentation/github.com/Shopify/sarama/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

import (
"context"
"strconv"

"github.com/Shopify/sarama"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
)

type consumerMessagesDispatcher interface {
Messages() <-chan *sarama.ConsumerMessage
}

type consumerMessagesDispatcherWrapper struct {
d consumerMessagesDispatcher
messages chan *sarama.ConsumerMessage

cfg config
}

func newConsumerMessagesDispatcherWrapper(d consumerMessagesDispatcher, cfg config) *consumerMessagesDispatcherWrapper {
return &consumerMessagesDispatcherWrapper{
d: d,
messages: make(chan *sarama.ConsumerMessage),
cfg: cfg,
}
}

// Messages returns the read channel for the messages that are returned by
// the broker.
func (w *consumerMessagesDispatcherWrapper) Messages() <-chan *sarama.ConsumerMessage {
return w.messages
}

func (w *consumerMessagesDispatcherWrapper) Run() {
msgs := w.d.Messages()

for msg := range msgs {
// Extract a span context from message to link.
carrier := NewConsumerMessageCarrier(msg)
parentSpanContext := propagation.ExtractHTTP(context.Background(), w.cfg.Propagators, carrier)

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(w.cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String(strconv.FormatInt(msg.Offset, 10)),
kafkaPartitionKey.Int32(msg.Partition),
}
opts := []trace.StartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
}
newCtx, span := w.cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...)

// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(newCtx, w.cfg.Propagators, carrier)

// Send messages back to user.
w.messages <- msg

span.End()
}
close(w.messages)
}
31 changes: 31 additions & 0 deletions instrumentation/github.com/Shopify/sarama/example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kafka Sarama instrumentation example

A Kafka producer and consumer using Sarama with instrumentation.

These instructions expect you have
[docker-compose](https://docs.docker.com/compose/) installed.

Bring up the `Kafka` and `ZooKeeper` services to run the
example:

```sh
docker-compose up -d zoo kafka
```

Then up the `kafka-producer` service to produce a message into Kafka:

```sh
docker-compose up kafka-producer
```

At last, up the `kafka-consumer` service to consume messages from Kafka:

```sh
docker-compose up kafka-consumer
```

Shut down the services when you are finished with the example:

```sh
docker-compose down
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:alpine AS base
COPY . /src/
WORKDIR /src/instrumentation/github.com/Shopify/sarama

FROM base AS kafka-consumer
RUN go install ./example/consumer/consumer.go
CMD ["/go/bin/consumer"]

Loading

0 comments on commit d5b98d8

Please sign in to comment.