diff --git a/CHANGELOG.md b/CHANGELOG.md index 6debef5abce..b80a7fa25a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Create a detector that collects resources from GKE machines. (#139) - Create a detector that collects resources from GCE machines. (#132) -- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134) +- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134, #153) - Add links and status message for mock span. (#134) diff --git a/instrumentation/github.com/Shopify/sarama/consumer.go b/instrumentation/github.com/Shopify/sarama/consumer.go index e25aefb1dfe..5be0c5dff41 100644 --- a/instrumentation/github.com/Shopify/sarama/consumer.go +++ b/instrumentation/github.com/Shopify/sarama/consumer.go @@ -15,26 +15,18 @@ package sarama import ( - "context" - "strconv" - "github.com/Shopify/sarama" - - "go.opentelemetry.io/otel/api/kv" - "go.opentelemetry.io/otel/api/propagation" - "go.opentelemetry.io/otel/api/standard" - "go.opentelemetry.io/otel/api/trace" ) type partitionConsumer struct { sarama.PartitionConsumer - messages chan *sarama.ConsumerMessage + dispatcher consumerMessagesDispatcher } // Messages returns the read channel for the messages that are returned by // the broker. func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { - return pc.messages + return pc.dispatcher.Messages() } // WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received @@ -42,44 +34,12 @@ func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer { cfg := newConfig(serviceName, opts...) + dispatcher := newConsumerMessagesDispatcherWrapper(pc, cfg) + go dispatcher.Run() wrapped := &partitionConsumer{ PartitionConsumer: pc, - messages: make(chan *sarama.ConsumerMessage), + dispatcher: dispatcher, } - go func() { - msgs := pc.Messages() - - for msg := range msgs { - // Extract a span context from message to link. - carrier := NewConsumerMessageCarrier(msg) - parentSpanContext := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier) - - // Create a span. - attrs := []kv.KeyValue{ - standard.ServiceNameKey.String(cfg.ServiceName), - standard.MessagingSystemKey.String("kafka"), - standard.MessagingDestinationKindKeyTopic, - standard.MessagingDestinationKey.String(msg.Topic), - standard.MessagingOperationReceive, - standard.MessagingMessageIDKey.String(strconv.FormatInt(msg.Offset, 10)), - kafkaPartitionKey.Int32(msg.Partition), - } - opts := []trace.StartOption{ - trace.WithAttributes(attrs...), - trace.WithSpanKind(trace.SpanKindConsumer), - } - newCtx, span := cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...) - - // Inject current span context, so consumers can use it to propagate span. - propagation.InjectHTTP(newCtx, cfg.Propagators, carrier) - - // Send messages back to user. - wrapped.messages <- msg - - span.End() - } - close(wrapped.messages) - }() return wrapped } diff --git a/instrumentation/github.com/Shopify/sarama/consumer_group.go b/instrumentation/github.com/Shopify/sarama/consumer_group.go new file mode 100644 index 00000000000..3f125eb5a02 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/consumer_group.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "github.com/Shopify/sarama" +) + +type consumerGroupHandler struct { + sarama.ConsumerGroupHandler + + cfg config +} + +// ConsumeClaim wraps the session and claim to add instruments for messages. +// It implements parts of `ConsumerGroupHandler`. +func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + // Wrap claim + dispatcher := newConsumerMessagesDispatcherWrapper(claim, h.cfg) + go dispatcher.Run() + claim = &consumerGroupClaim{ + ConsumerGroupClaim: claim, + dispatcher: dispatcher, + } + + return h.ConsumerGroupHandler.ConsumeClaim(session, claim) +} + +// WrapConsumerGroupHandler wraps a sarama.ConsumerGroupHandler causing each received +// message to be traced. +func WrapConsumerGroupHandler(serviceName string, handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler { + cfg := newConfig(serviceName, opts...) + + return &consumerGroupHandler{ + ConsumerGroupHandler: handler, + cfg: cfg, + } +} + +type consumerGroupClaim struct { + sarama.ConsumerGroupClaim + dispatcher consumerMessagesDispatcher +} + +func (c *consumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { + return c.dispatcher.Messages() +} diff --git a/instrumentation/github.com/Shopify/sarama/consumer_group_test.go b/instrumentation/github.com/Shopify/sarama/consumer_group_test.go new file mode 100644 index 00000000000..1e9c1a02852 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/consumer_group_test.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +// TODO: add test for consumer group +// Currently, sarama does not have a mock consumer group, so it's hard to +// write a unit test. +// Related PR: https://github.com/Shopify/sarama/pull/1750 diff --git a/instrumentation/github.com/Shopify/sarama/dispatcher.go b/instrumentation/github.com/Shopify/sarama/dispatcher.go new file mode 100644 index 00000000000..8f7854508c5 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/dispatcher.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "context" + "strconv" + + "github.com/Shopify/sarama" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/propagation" + "go.opentelemetry.io/otel/api/standard" + "go.opentelemetry.io/otel/api/trace" +) + +type consumerMessagesDispatcher interface { + Messages() <-chan *sarama.ConsumerMessage +} + +type consumerMessagesDispatcherWrapper struct { + d consumerMessagesDispatcher + messages chan *sarama.ConsumerMessage + + cfg config +} + +func newConsumerMessagesDispatcherWrapper(d consumerMessagesDispatcher, cfg config) *consumerMessagesDispatcherWrapper { + return &consumerMessagesDispatcherWrapper{ + d: d, + messages: make(chan *sarama.ConsumerMessage), + cfg: cfg, + } +} + +// Messages returns the read channel for the messages that are returned by +// the broker. +func (w *consumerMessagesDispatcherWrapper) Messages() <-chan *sarama.ConsumerMessage { + return w.messages +} + +func (w *consumerMessagesDispatcherWrapper) Run() { + msgs := w.d.Messages() + + for msg := range msgs { + // Extract a span context from message to link. + carrier := NewConsumerMessageCarrier(msg) + parentSpanContext := propagation.ExtractHTTP(context.Background(), w.cfg.Propagators, carrier) + + // Create a span. + attrs := []kv.KeyValue{ + standard.ServiceNameKey.String(w.cfg.ServiceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(msg.Topic), + standard.MessagingOperationReceive, + standard.MessagingMessageIDKey.String(strconv.FormatInt(msg.Offset, 10)), + kafkaPartitionKey.Int32(msg.Partition), + } + opts := []trace.StartOption{ + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindConsumer), + } + newCtx, span := w.cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...) + + // Inject current span context, so consumers can use it to propagate span. + propagation.InjectHTTP(newCtx, w.cfg.Propagators, carrier) + + // Send messages back to user. + w.messages <- msg + + span.End() + } + close(w.messages) +} diff --git a/instrumentation/github.com/Shopify/sarama/example/README.md b/instrumentation/github.com/Shopify/sarama/example/README.md new file mode 100644 index 00000000000..e19ce0fab05 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/README.md @@ -0,0 +1,31 @@ +# Kafka Sarama instrumentation example + +A Kafka producer and consumer using Sarama with instrumentation. + +These instructions expect you have +[docker-compose](https://docs.docker.com/compose/) installed. + +Bring up the `Kafka` and `ZooKeeper` services to run the +example: + +```sh +docker-compose up -d zoo kafka +``` + +Then up the `kafka-producer` service to produce a message into Kafka: + +```sh +docker-compose up kafka-producer +``` + +At last, up the `kafka-consumer` service to consume messages from Kafka: + +```sh +docker-compose up kafka-consumer +``` + +Shut down the services when you are finished with the example: + +```sh +docker-compose down +``` diff --git a/instrumentation/github.com/Shopify/sarama/example/consumer/Dockerfile b/instrumentation/github.com/Shopify/sarama/example/consumer/Dockerfile new file mode 100644 index 00000000000..e4f059eead5 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/consumer/Dockerfile @@ -0,0 +1,21 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +FROM golang:alpine AS base +COPY . /src/ +WORKDIR /src/instrumentation/github.com/Shopify/sarama + +FROM base AS kafka-consumer +RUN go install ./example/consumer/consumer.go +CMD ["/go/bin/consumer"] + diff --git a/instrumentation/github.com/Shopify/sarama/example/consumer/consumer.go b/instrumentation/github.com/Shopify/sarama/example/consumer/consumer.go new file mode 100644 index 00000000000..f84cbb19bfb --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/consumer/consumer.go @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "log" + "os" + "strings" + "time" + + "github.com/Shopify/sarama" + + "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/api/propagation" + "go.opentelemetry.io/otel/api/standard" + "go.opentelemetry.io/otel/api/trace" + + saramatrace "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama" + "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/example" +) + +var ( + brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list") +) + +func main() { + example.InitTracer() + flag.Parse() + + if *brokers == "" { + flag.PrintDefaults() + os.Exit(1) + } + + brokerList := strings.Split(*brokers, ",") + log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", ")) + + startConsumerGroup(brokerList) + + select {} +} + +func startConsumerGroup(brokerList []string) { + consumerGroupHandler := Consumer{} + // Wrap instrumentation + handler := saramatrace.WrapConsumerGroupHandler("example-consumer", &consumerGroupHandler) + + config := sarama.NewConfig() + config.Version = sarama.V2_5_0_0 + config.Consumer.Offsets.Initial = sarama.OffsetOldest + + // Create consumer group + consumerGroup, err := sarama.NewConsumerGroup(brokerList, "example", config) + if err != nil { + log.Fatalln("Failed to start sarama consumer group:", err) + } + + err = consumerGroup.Consume(context.Background(), []string{example.KafkaTopic}, handler) + if err != nil { + log.Fatalln("Failed to consume via handler:", err) + } +} + +func printMessage(msg *sarama.ConsumerMessage) { + // Extract tracing info from message + ctx := propagation.ExtractHTTP(context.Background(), global.Propagators(), saramatrace.NewConsumerMessageCarrier(msg)) + + tr := global.Tracer("consumer") + _, span := tr.Start(ctx, "consume message", trace.WithAttributes( + standard.MessagingOperationProcess, + )) + defer span.End() + + // Emulate Work loads + time.Sleep(1 * time.Second) + + log.Println("Successful to read message: ", string(msg.Value)) +} + +// Consumer represents a Sarama consumer group consumer +type Consumer struct { +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). +func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + // NOTE: + // Do not move the code below to a goroutine. + // The `ConsumeClaim` itself is called within a goroutine, see: + // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 + for message := range claim.Messages() { + printMessage(message) + session.MarkMessage(message, "") + } + + return nil +} diff --git a/instrumentation/github.com/Shopify/sarama/example/docker-compose.yml b/instrumentation/github.com/Shopify/sarama/example/docker-compose.yml new file mode 100644 index 00000000000..f557a9f1b0c --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/docker-compose.yml @@ -0,0 +1,75 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +version: "3.7" +services: + zoo: + image: zookeeper:3.4.9 + hostname: zoo + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_SERVERS: server.1=zoo:2888:3888 + networks: + - example + kafka: + # Kafka version 2.5.0 + image: confluentinc/cp-kafka:5.5.0 + hostname: kafka + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zoo + networks: + - example + + kafka-producer: + build: + dockerfile: $PWD/producer/Dockerfile + context: ../../../../.. + command: + - "/bin/sh" + - "-c" + - "/go/bin/producer" + environment: + KAFKA_PEERS: kafka:19092 + depends_on: + - kafka + networks: + - example + kafka-consumer: + build: + dockerfile: $PWD/consumer/Dockerfile + context: ../../../../.. + command: + - "/bin/sh" + - "-c" + - "/go/bin/consumer" + environment: + KAFKA_PEERS: kafka:19092 + depends_on: + - kafka + networks: + - example +networks: + example: \ No newline at end of file diff --git a/instrumentation/github.com/Shopify/sarama/example/go.mod b/instrumentation/github.com/Shopify/sarama/example/go.mod new file mode 100644 index 00000000000..65189a0e024 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/go.mod @@ -0,0 +1,14 @@ +module go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/example + +go 1.14 + +replace go.opentelemetry.io/contrib => ../../../../.. + +replace go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama => ../ + +require ( + github.com/Shopify/sarama v1.26.4 + go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama v0.0.0-00010101000000-000000000000 + go.opentelemetry.io/otel v0.9.0 + google.golang.org/grpc v1.30.0 +) diff --git a/instrumentation/github.com/Shopify/sarama/example/go.sum b/instrumentation/github.com/Shopify/sarama/example/go.sum new file mode 100644 index 00000000000..7ea537d6fff --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/go.sum @@ -0,0 +1,149 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs= +github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60= +github.com/Shopify/sarama v1.26.4 h1:+17TxUq/PJEAfZAll0T7XJjSgQWCpaQSoki/x5yN8o8= +github.com/Shopify/sarama v1.26.4/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= +github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk= +github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= +github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= +github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +go.opentelemetry.io/otel v0.9.0 h1:nsdCDHzQx1Yv8E2nwCPcMXMfg+EMIlx1LBOXNC8qSQ8= +go.opentelemetry.io/otel v0.9.0/go.mod h1:ckxzUEfk7tAkTwEMVdkllBM+YOfE/K9iwg6zYntFYSg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w= +golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03 h1:4HYDjxeNXAOTv3o1N2tjo8UUSlhQgAD52FVkwxnWgM8= +google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE= +google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/instrumentation/github.com/Shopify/sarama/example/init.go b/instrumentation/github.com/Shopify/sarama/example/init.go new file mode 100644 index 00000000000..e2c71fba8e0 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/init.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package example + +import ( + "log" + + otelglobal "go.opentelemetry.io/otel/api/global" + oteltracestdout "go.opentelemetry.io/otel/exporters/trace/stdout" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +const ( + KafkaTopic = "sarama-instrumentation-example" +) + +func InitTracer() { + exporter, err := oteltracestdout.NewExporter(oteltracestdout.Options{PrettyPrint: true}) + if err != nil { + log.Fatal(err) + } + cfg := sdktrace.Config{ + DefaultSampler: sdktrace.AlwaysSample(), + } + tp, err := sdktrace.NewProvider( + sdktrace.WithConfig(cfg), + sdktrace.WithSyncer(exporter), + ) + if err != nil { + log.Fatal(err) + } + otelglobal.SetTraceProvider(tp) +} diff --git a/instrumentation/github.com/Shopify/sarama/example/producer/Dockerfile b/instrumentation/github.com/Shopify/sarama/example/producer/Dockerfile new file mode 100644 index 00000000000..6f661215786 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/producer/Dockerfile @@ -0,0 +1,20 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +FROM golang:alpine AS base +COPY . /src/ +WORKDIR /src/instrumentation/github.com/Shopify/sarama + +FROM base AS kafka-producer +RUN go install ./example/producer/producer.go +CMD ["/go/bin/producer"] diff --git a/instrumentation/github.com/Shopify/sarama/example/producer/producer.go b/instrumentation/github.com/Shopify/sarama/example/producer/producer.go new file mode 100644 index 00000000000..656572797f7 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/example/producer/producer.go @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/rand" + "os" + "strings" + "time" + + "github.com/Shopify/sarama" + "google.golang.org/grpc/codes" + + "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/api/propagation" + + saramatrace "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama" + "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/example" +) + +var ( + brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list") +) + +func main() { + example.InitTracer() + flag.Parse() + + if *brokers == "" { + flag.PrintDefaults() + os.Exit(1) + } + + brokerList := strings.Split(*brokers, ",") + log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", ")) + + producer := newAccessLogProducer(brokerList) + + rand.Seed(time.Now().Unix()) + + // Create root span + tr := global.Tracer("producer") + ctx, span := tr.Start(context.Background(), "produce message") + defer span.End() + + // Inject tracing info into message + msg := sarama.ProducerMessage{ + Topic: example.KafkaTopic, + Key: sarama.StringEncoder("random_number"), + Value: sarama.StringEncoder(fmt.Sprintf("%d", rand.Intn(1000))), + } + propagation.InjectHTTP(ctx, global.Propagators(), saramatrace.NewProducerMessageCarrier(&msg)) + + producer.Input() <- &msg + successMsg := <-producer.Successes() + log.Println("Successful to write message, offset:", successMsg.Offset) + + err := producer.Close() + if err != nil { + span.SetStatus(codes.Internal, err.Error()) + log.Fatalln("Failed to close producer:", err) + } +} + +func newAccessLogProducer(brokerList []string) sarama.AsyncProducer { + config := sarama.NewConfig() + config.Version = sarama.V2_5_0_0 + // So we can know the partition and offset of messages. + config.Producer.Return.Successes = true + + producer, err := sarama.NewAsyncProducer(brokerList, config) + if err != nil { + log.Fatalln("Failed to start Sarama producer:", err) + } + + // Wrap instrumentation + producer = saramatrace.WrapAsyncProducer("example-producer", config, producer) + + // We will log to STDOUT if we're not able to produce messages. + go func() { + for err := range producer.Errors() { + log.Println("Failed to write message:", err) + } + }() + + return producer +}