diff --git a/.github/workflows/unit-integration-tests.yml b/.github/workflows/unit-integration-tests.yml index cf0996709e..a379877448 100644 --- a/.github/workflows/unit-integration-tests.yml +++ b/.github/workflows/unit-integration-tests.yml @@ -180,20 +180,24 @@ jobs: image: memcached:1.5.9 ports: - 11211:11211 - zookeeper: - image: bitnami/zookeeper:latest - env: - ALLOW_ANONYMOUS_LOGIN: "yes" - ports: - - 2181:2181 kafka: - image: darccio/kafka:2.13-2.8.1 + image: confluentinc/confluent-local:7.5.0 env: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 - KAFKA_CREATE_TOPICS: gotest:1:1,gosegtest:1:1 - KAFKA_BROKER_ID: 1 + KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092" + KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_BROKER_ID: "1" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1" + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1" + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + KAFKA_NODE_ID: "1" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" ports: - 9092:9092 localstack: diff --git a/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go index 38d9be9795..ce82caa401 100644 --- a/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go +++ b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go @@ -3,6 +3,13 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2024 Datadog, Inc. +// Package tracing contains tracing logic for the cloud.google.com/go/pubsub.v1 instrumentation. +// +// WARNING: this package SHOULD NOT import cloud.google.com/go/pubsub. +// +// The motivation of this package is to support orchestrion, which cannot use the main package because it imports +// the cloud.google.com/go/pubsub package, and since orchestrion modifies the library code itself, +// this would cause an import cycle. package tracing import ( diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go index 7f1420f6af..2bd892a52e 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go @@ -208,7 +208,6 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er span.Finish(tracer.WithError(err)) } } - return err } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go index 382824534f..42b85800b2 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go @@ -281,6 +281,8 @@ func TestCustomTags(t *testing.T) { // assert.Equal(t, []byte("key1"), s.Tag("key")) } +type consumerActionFn func(c *Consumer) (*kafka.Message, error) + // Test we don't leak goroutines and properly close the span when Produce returns an error. func TestProduceError(t *testing.T) { defer func() { @@ -327,8 +329,6 @@ func TestProduceError(t *testing.T) { assert.Len(t, spans, 1) } -type consumerActionFn func(c *Consumer) (*kafka.Message, error) - func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]*mocktracer.Span, *kafka.Message) { if _, ok := os.LookupEnv("INTEGRATION"); !ok { t.Skip("to enable integration test, set the INTEGRATION environment variable") diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go index e3edbbb1cc..545ae2dc59 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go @@ -207,7 +207,6 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er span.Finish(tracer.WithError(err)) } } - return err } diff --git a/contrib/segmentio/kafka-go/dsm.go b/contrib/segmentio/kafka-go/dsm.go new file mode 100644 index 0000000000..6a37869b8e --- /dev/null +++ b/contrib/segmentio/kafka-go/dsm.go @@ -0,0 +1,86 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package kafka + +import ( + "context" + + "github.com/DataDog/dd-trace-go/v2/datastreams" + "github.com/DataDog/dd-trace-go/v2/datastreams/options" + "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" +) + +func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message) { + if !tr.cfg.dataStreamsEnabled || msg == nil { + return + } + edges := []string{"direction:in", "topic:" + msg.GetTopic(), "type:kafka"} + if tr.kafkaCfg.ConsumerGroupID != "" { + edges = append(edges, "group:"+tr.kafkaCfg.ConsumerGroupID) + } + carrier := NewMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams( + datastreams.ExtractFromBase64Carrier(context.Background(), carrier), + options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, + edges..., + ) + if !ok { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) + if tr.kafkaCfg.ConsumerGroupID != "" { + // only track Kafka lag if a consumer group is set. + // since there is no ack mechanism, we consider that messages read are committed right away. + tracer.TrackKafkaCommitOffset(tr.kafkaCfg.ConsumerGroupID, msg.GetTopic(), int32(msg.GetPartition()), msg.GetOffset()) + } +} + +func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer) { + if !tr.cfg.dataStreamsEnabled || msg == nil { + return + } + + var topic string + if writer.GetTopic() != "" { + topic = writer.GetTopic() + } else { + topic = msg.GetTopic() + } + + edges := []string{"direction:out", "topic:" + topic, "type:kafka"} + carrier := MessageCarrier{msg} + ctx, ok := tracer.SetDataStreamsCheckpointWithParams( + datastreams.ExtractFromBase64Carrier(context.Background(), carrier), + options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, + edges..., + ) + if !ok { + return + } + + // Headers will be dropped if the current protocol does not support them + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func getProducerMsgSize(msg Message) (size int64) { + for _, header := range msg.GetHeaders() { + size += int64(len(header.GetKey()) + len(header.GetValue())) + } + if msg.GetValue() != nil { + size += int64(len(msg.GetValue())) + } + if msg.GetKey() != nil { + size += int64(len(msg.GetKey())) + } + return size +} + +func getConsumerMsgSize(msg Message) (size int64) { + for _, header := range msg.GetHeaders() { + size += int64(len(header.GetKey()) + len(header.GetValue())) + } + return size + int64(len(msg.GetValue())+len(msg.GetKey())) +} diff --git a/contrib/segmentio/kafka-go/example_test.go b/contrib/segmentio/kafka-go/example_test.go index 777945d832..8c6d49e99e 100644 --- a/contrib/segmentio/kafka-go/example_test.go +++ b/contrib/segmentio/kafka-go/example_test.go @@ -13,7 +13,7 @@ import ( kafkatrace "github.com/DataDog/dd-trace-go/contrib/segmentio/kafka-go/v2" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" - kafka "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go" ) func ExampleWriter() { diff --git a/contrib/segmentio/kafka-go/kafka.go b/contrib/segmentio/kafka-go/kafka.go index 64a47c8775..106e657f87 100644 --- a/contrib/segmentio/kafka-go/kafka.go +++ b/contrib/segmentio/kafka-go/kafka.go @@ -7,12 +7,8 @@ package kafka // import "github.com/DataDog/dd-trace-go/contrib/segmentio/kafka- import ( "context" - "math" "strings" - "github.com/DataDog/dd-trace-go/v2/datastreams" - "github.com/DataDog/dd-trace-go/v2/datastreams/options" - "github.com/DataDog/dd-trace-go/v2/ddtrace/ext" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" "github.com/DataDog/dd-trace-go/v2/instrumentation" @@ -25,79 +21,35 @@ func init() { instr = instrumentation.Load(instrumentation.PackageSegmentioKafkaGo) } +// A Reader wraps a kafka.Reader. +type Reader struct { + *kafka.Reader + tracer *Tracer + prev *tracer.Span +} + // NewReader calls kafka.NewReader and wraps the resulting Consumer. func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader { return WrapReader(kafka.NewReader(conf), opts...) } -// NewWriter calls kafka.NewWriter and wraps the resulting Producer. -func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer { - return WrapWriter(kafka.NewWriter(conf), opts...) -} - // WrapReader wraps a kafka.Reader so that any consumed events are traced. func WrapReader(c *kafka.Reader, opts ...Option) *Reader { wrapped := &Reader{ Reader: c, - cfg: newConfig(opts...), } - + cfg := KafkaConfig{} if c.Config().Brokers != nil { - wrapped.bootstrapServers = strings.Join(c.Config().Brokers, ",") + cfg.BootstrapServers = strings.Join(c.Config().Brokers, ",") } - if c.Config().GroupID != "" { - wrapped.groupID = c.Config().GroupID + cfg.ConsumerGroupID = c.Config().GroupID } - - instr.Logger().Debug("contrib/segmentio/kafka-go.v0/kafka: Wrapping Reader: %#v", wrapped.cfg) + wrapped.tracer = NewTracer(cfg, opts...) + instr.Logger().Debug("contrib/segmentio/kafka-go/kafka: Wrapping Reader: %#v", wrapped.tracer.cfg) return wrapped } -// A kafkaConfig struct holds information from the kafka config for span tags -type kafkaConfig struct { - bootstrapServers string - groupID string -} - -// A Reader wraps a kafka.Reader. -type Reader struct { - *kafka.Reader - kafkaConfig - cfg *config - prev *tracer.Span -} - -func (r *Reader) startSpan(ctx context.Context, msg *kafka.Message) *tracer.Span { - opts := []tracer.StartSpanOption{ - tracer.ServiceName(r.cfg.consumerServiceName), - tracer.ResourceName("Consume Topic " + msg.Topic), - tracer.SpanType(ext.SpanTypeMessageConsumer), - tracer.Tag(ext.MessagingKafkaPartition, msg.Partition), - tracer.Tag("offset", msg.Offset), - tracer.Tag(ext.Component, instrumentation.PackageSegmentioKafkaGo), - tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Tag(ext.KafkaBootstrapServers, r.bootstrapServers), - tracer.Measured(), - } - - if !math.IsNaN(r.cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, r.cfg.analyticsRate)) - } - // kafka supports headers, so try to extract a span context - carrier := messageCarrier{msg} - if spanctx, err := tracer.Extract(carrier); err == nil { - opts = append(opts, tracer.ChildOf(spanctx)) - } - span, _ := tracer.StartSpanFromContext(ctx, r.cfg.consumerSpanName, opts...) - // reinject the span context so consumers can pick it up - if err := tracer.Inject(span.Context(), carrier); err != nil { - instr.Logger().Debug("contrib/segmentio/kafka-go: Failed to inject span context into carrier in reader, %v", err) - } - return span -} - // Close calls the underlying Reader.Close and if polling is enabled, finishes // any remaining span. func (r *Reader) Close() error { @@ -119,8 +71,9 @@ func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) { if err != nil { return kafka.Message{}, err } - r.prev = r.startSpan(ctx, &msg) - setConsumeCheckpoint(r.cfg.dataStreamsEnabled, r.groupID, &msg) + tMsg := wrapMessage(&msg) + r.prev = r.tracer.StartConsumeSpan(ctx, tMsg) + r.tracer.SetConsumeDSMCheckpoint(tMsg) return msg, nil } @@ -134,147 +87,51 @@ func (r *Reader) FetchMessage(ctx context.Context) (kafka.Message, error) { if err != nil { return msg, err } - r.prev = r.startSpan(ctx, &msg) - setConsumeCheckpoint(r.cfg.dataStreamsEnabled, r.groupID, &msg) + tMsg := wrapMessage(&msg) + r.prev = r.tracer.StartConsumeSpan(ctx, tMsg) + r.tracer.SetConsumeDSMCheckpoint(tMsg) return msg, nil } -func setConsumeCheckpoint(enabled bool, groupID string, msg *kafka.Message) { - if !enabled || msg == nil { - return - } - edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"} - if groupID != "" { - edges = append(edges, "group:"+groupID) - } - carrier := messageCarrier{msg} - ctx, ok := tracer.SetDataStreamsCheckpointWithParams( - datastreams.ExtractFromBase64Carrier(context.Background(), carrier), - options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, - edges..., - ) - if !ok { - return - } - datastreams.InjectToBase64Carrier(ctx, carrier) - if groupID != "" { - // only track Kafka lag if a consumer group is set. - // since there is no ack mechanism, we consider that messages read are committed right away. - tracer.TrackKafkaCommitOffset(groupID, msg.Topic, int32(msg.Partition), msg.Offset) - } +// Writer wraps a kafka.Writer with tracing config data +type KafkaWriter struct { + *kafka.Writer + tracer *Tracer +} + +// NewWriter calls kafka.NewWriter and wraps the resulting Producer. +func NewWriter(conf kafka.WriterConfig, opts ...Option) *KafkaWriter { + return WrapWriter(kafka.NewWriter(conf), opts...) } // WrapWriter wraps a kafka.Writer so requests are traced. -func WrapWriter(w *kafka.Writer, opts ...Option) *Writer { - writer := &Writer{ +func WrapWriter(w *kafka.Writer, opts ...Option) *KafkaWriter { + writer := &KafkaWriter{ Writer: w, - cfg: newConfig(opts...), } - + cfg := KafkaConfig{} if w.Addr.String() != "" { - writer.bootstrapServers = w.Addr.String() + cfg.BootstrapServers = w.Addr.String() } - instr.Logger().Debug("contrib/segmentio/kafka-go: Wrapping Writer: %#v", writer.cfg) + writer.tracer = NewTracer(cfg, opts...) + instr.Logger().Debug("contrib/segmentio/kafka-go: Wrapping Writer: %#v", writer.tracer.kafkaCfg) return writer } -// Writer wraps a kafka.Writer with tracing config data -type Writer struct { - *kafka.Writer - kafkaConfig - cfg *config -} - -func (w *Writer) startSpan(ctx context.Context, msg *kafka.Message) *tracer.Span { - opts := []tracer.StartSpanOption{ - tracer.ServiceName(w.cfg.producerServiceName), - tracer.SpanType(ext.SpanTypeMessageProducer), - tracer.Tag(ext.Component, instrumentation.PackageSegmentioKafkaGo), - tracer.Tag(ext.SpanKind, ext.SpanKindProducer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Tag(ext.KafkaBootstrapServers, w.bootstrapServers), - } - if w.Writer.Topic != "" { - opts = append(opts, tracer.ResourceName("Produce Topic "+w.Writer.Topic)) - } else { - opts = append(opts, tracer.ResourceName("Produce Topic "+msg.Topic)) - } - if !math.IsNaN(w.cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate)) - } - carrier := messageCarrier{msg} - span, _ := tracer.StartSpanFromContext(ctx, w.cfg.producerSpanName, opts...) - if err := tracer.Inject(span.Context(), carrier); err != nil { - instr.Logger().Debug("contrib/segmentio/kafka-go: Failed to inject span context into carrier in writer, %v", err) - } - return span -} - -func finishSpan(span *tracer.Span, partition int, offset int64, err error) { - span.SetTag(ext.MessagingKafkaPartition, partition) - span.SetTag("offset", offset) - span.Finish(tracer.WithError(err)) -} - // WriteMessages calls kafka-go.Writer.WriteMessages and traces the requests. -func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error { +func (w *KafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error { // although there's only one call made to the SyncProducer, the messages are // treated individually, so we create a span for each one spans := make([]*tracer.Span, len(msgs)) for i := range msgs { - spans[i] = w.startSpan(ctx, &msgs[i]) - setProduceCheckpoint(w.cfg.dataStreamsEnabled, &msgs[i], w.Writer) + tMsg := wrapMessage(&msgs[i]) + tWriter := wrapTracingWriter(w.Writer) + spans[i] = w.tracer.StartProduceSpan(ctx, tWriter, tMsg) + w.tracer.SetProduceDSMCheckpoint(tMsg, tWriter) } err := w.Writer.WriteMessages(ctx, msgs...) for i, span := range spans { - finishSpan(span, msgs[i].Partition, msgs[i].Offset, err) + w.tracer.FinishProduceSpan(span, msgs[i].Partition, msgs[i].Offset, err) } return err } - -func setProduceCheckpoint(enabled bool, msg *kafka.Message, writer *kafka.Writer) { - if !enabled || msg == nil { - return - } - - var topic string - if writer.Topic != "" { - topic = writer.Topic - } else { - topic = msg.Topic - } - - edges := []string{"direction:out", "topic:" + topic, "type:kafka"} - carrier := messageCarrier{msg} - ctx, ok := tracer.SetDataStreamsCheckpointWithParams( - datastreams.ExtractFromBase64Carrier(context.Background(), carrier), - options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, - edges..., - ) - if !ok { - return - } - - // Headers will be dropped if the current protocol does not support them - datastreams.InjectToBase64Carrier(ctx, carrier) -} - -func getProducerMsgSize(msg *kafka.Message) (size int64) { - for _, header := range msg.Headers { - size += int64(len(header.Key) + len(header.Value)) - } - if msg.Value != nil { - size += int64(len(msg.Value)) - } - if msg.Key != nil { - size += int64(len(msg.Key)) - } - return size -} - -func getConsumerMsgSize(msg *kafka.Message) (size int64) { - for _, header := range msg.Headers { - size += int64(len(header.Key) + len(header.Value)) - } - return size + int64(len(msg.Value)+len(msg.Key)) -} diff --git a/contrib/segmentio/kafka-go/kafka_test.go b/contrib/segmentio/kafka-go/kafka_test.go index 836ac02ef6..babe591672 100644 --- a/contrib/segmentio/kafka-go/kafka_test.go +++ b/contrib/segmentio/kafka-go/kafka_test.go @@ -7,7 +7,12 @@ package kafka import ( "context" + "errors" + "fmt" + "log" + "net" "os" + "strconv" "testing" "time" @@ -27,44 +32,139 @@ const ( testReaderMaxWait = 10 * time.Millisecond ) -func skipIntegrationTest(t *testing.T) { - if _, ok := os.LookupEnv("INTEGRATION"); !ok { - t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)") +var ( + // add some dummy values to broker/addr to test bootstrap servers. + kafkaBrokers = []string{"localhost:9092", "localhost:9093", "localhost:9094"} +) + +func TestMain(m *testing.M) { + _, ok := os.LookupEnv("INTEGRATION") + if !ok { + log.Println("🚧 Skipping integration test (INTEGRATION environment variable is not set)") + os.Exit(0) } + cleanup := createTopic() + exitCode := m.Run() + cleanup() + os.Exit(exitCode) } -/* -to setup the integration test locally run: - docker-compose -f local_testing.yaml up -*/ +func testWriter() *kafka.Writer { + return &kafka.Writer{ + Addr: kafka.TCP(kafkaBrokers...), + Topic: testTopic, + RequiredAcks: kafka.RequireOne, + Balancer: &kafka.LeastBytes{}, + } +} -type readerOpFn func(t *testing.T, r *Reader) +func testReader() *kafka.Reader { + return kafka.NewReader(kafka.ReaderConfig{ + Brokers: kafkaBrokers, + GroupID: testGroupID, + Topic: testTopic, + MaxWait: testReaderMaxWait, + MaxBytes: 10e6, // 10MB + }) +} + +func createTopic() func() { + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + log.Fatal(err) + } + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + log.Fatal(err) + } + if err := controllerConn.DeleteTopics(testTopic); err != nil && !errors.Is(err, kafka.UnknownTopicOrPartition) { + log.Fatalf("failed to delete topic: %v", err) + } + topicConfigs := []kafka.TopicConfig{ + { + Topic: testTopic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + if err := controllerConn.CreateTopics(topicConfigs...); err != nil { + log.Fatal(err) + } + if err := ensureTopicReady(); err != nil { + log.Fatal(err) + } + return func() { + if err := controllerConn.DeleteTopics(testTopic); err != nil { + log.Printf("failed to delete topic: %v", err) + } + if err := controllerConn.Close(); err != nil { + log.Printf("failed to close controller connection: %v", err) + } + } +} + +func ensureTopicReady() error { + const ( + maxRetries = 10 + retryDelay = 100 * time.Millisecond + ) + writer := testWriter() + defer writer.Close() + reader := testReader() + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var ( + retryCount int + err error + ) + for retryCount < maxRetries { + err = writer.WriteMessages(ctx, kafka.Message{Key: []byte("some-key"), Value: []byte("some-value")}) + if err == nil { + break + } + // This error happens sometimes with brand-new topics, as there is a delay between when the topic is created + // on the broker, and when the topic can actually be written to. + if errors.Is(err, kafka.UnknownTopicOrPartition) { + retryCount++ + log.Printf("topic not ready yet, retrying produce in %s (retryCount: %d)\n", retryDelay, retryCount) + time.Sleep(retryDelay) + } + } + if err != nil { + return fmt.Errorf("timeout waiting for topic to be ready: %w", err) + } + // read the message to ensure we don't pollute tests + _, err = reader.ReadMessage(ctx) + if err != nil { + return err + } + return nil +} -func genIntegrationTestSpans(t *testing.T, mt mocktracer.Tracer, writerOp func(t *testing.T, w *Writer), readerOp readerOpFn, writerOpts []Option, readerOpts []Option) ([]*mocktracer.Span, []kafka.Message) { - skipIntegrationTest(t) +type readerOpFn func(t *testing.T, r *Reader) +func genIntegrationTestSpans(t *testing.T, mt mocktracer.Tracer, writerOp func(t *testing.T, w *KafkaWriter), readerOp readerOpFn, writerOpts []Option, readerOpts []Option) ([]*mocktracer.Span, []kafka.Message) { writtenMessages := []kafka.Message{} - // add some dummy values to broker/addr to test bootstrap servers. - kw := &kafka.Writer{ - Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), - Topic: testTopic, - RequiredAcks: kafka.RequireOne, - Completion: func(messages []kafka.Message, err error) { - writtenMessages = append(writtenMessages, messages...) - }, + kw := testWriter() + kw.Completion = func(messages []kafka.Message, err error) { + writtenMessages = append(writtenMessages, messages...) } w := WrapWriter(kw, writerOpts...) writerOp(t, w) err := w.Close() require.NoError(t, err) - r := NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, - GroupID: testGroupID, - Topic: testTopic, - MaxWait: testReaderMaxWait, - }, readerOpts...) + r := WrapReader(testReader(), readerOpts...) readerOp(t, r) err = r.Close() require.NoError(t, err) @@ -93,7 +193,7 @@ func TestReadMessageFunctional(t *testing.T) { spans, writtenMessages := genIntegrationTestSpans( t, mt, - func(t *testing.T, w *Writer) { + func(t *testing.T, w *KafkaWriter) { err := w.WriteMessages(context.Background(), messagesToWrite...) require.NoError(t, err, "Expected to write message to topic") }, @@ -112,8 +212,8 @@ func TestReadMessageFunctional(t *testing.T) { []Option{WithDataStreams()}, ) - assert.Len(t, writtenMessages, len(messagesToWrite)) - assert.Len(t, readMessages, len(messagesToWrite)) + require.Len(t, writtenMessages, len(messagesToWrite)) + require.Len(t, readMessages, len(messagesToWrite)) // producer span s0 := spans[0] @@ -123,12 +223,12 @@ func TestReadMessageFunctional(t *testing.T) { assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s0.Tag(ext.SpanType)) assert.Equal(t, float64(0), s0.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "segmentio/kafka.go.v0", s0.Tag(ext.Component)) + assert.Equal(t, "segmentio/kafka-go", s0.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s0.Tag(ext.KafkaBootstrapServers)) - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]})) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(wrapMessage(&writtenMessages[0])))) assert.True(t, ok) expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka") expected, _ := datastreams.PathwayFromContext(expectedCtx) @@ -143,15 +243,19 @@ func TestReadMessageFunctional(t *testing.T) { assert.Equal(t, nil, s1.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s1.Tag(ext.SpanType)) assert.Equal(t, float64(0), s1.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "segmentio/kafka.go.v0", s1.Tag(ext.Component)) + assert.Equal(t, "segmentio/kafka-go", s1.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s1.Tag(ext.KafkaBootstrapServers)) - p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&readMessages[0]})) + // context propagation + assert.Equal(t, s0.SpanID(), s1.ParentID(), "consume span should be child of the produce span") + assert.Equal(t, s0.TraceID(), s1.TraceID(), "spans should have the same trace id") + + p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(wrapMessage(&readMessages[0])))) assert.True(t, ok) expectedCtx, _ = tracer.SetDataStreamsCheckpoint( - datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]}), + datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(wrapMessage(&writtenMessages[0]))), "direction:in", "topic:"+testTopic, "type:kafka", "group:"+testGroupID, ) expected, _ = datastreams.PathwayFromContext(expectedCtx) @@ -176,7 +280,7 @@ func TestFetchMessageFunctional(t *testing.T) { spans, writtenMessages := genIntegrationTestSpans( t, mt, - func(t *testing.T, w *Writer) { + func(t *testing.T, w *KafkaWriter) { err := w.WriteMessages(context.Background(), messagesToWrite...) require.NoError(t, err, "Expected to write message to topic") }, @@ -203,12 +307,12 @@ func TestFetchMessageFunctional(t *testing.T) { assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s0.Tag(ext.SpanType)) assert.Equal(t, float64(0), s0.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "segmentio/kafka.go.v0", s0.Tag(ext.Component)) + assert.Equal(t, "segmentio/kafka-go", s0.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s0.Tag(ext.KafkaBootstrapServers)) - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]})) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(wrapMessage(&writtenMessages[0])))) assert.True(t, ok) expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka") expected, _ := datastreams.PathwayFromContext(expectedCtx) @@ -223,15 +327,18 @@ func TestFetchMessageFunctional(t *testing.T) { assert.Equal(t, nil, s1.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s1.Tag(ext.SpanType)) assert.Equal(t, float64(0), s1.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "segmentio/kafka.go.v0", s1.Tag(ext.Component)) + assert.Equal(t, "segmentio/kafka-go", s1.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s1.Tag(ext.KafkaBootstrapServers)) - p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&readMessages[0]})) + // context propagation + assert.Equal(t, s0.SpanID(), s1.ParentID(), "consume span should be child of the produce span") + + p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(wrapMessage(&readMessages[0])))) assert.True(t, ok) expectedCtx, _ = tracer.SetDataStreamsCheckpoint( - datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]}), + datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(wrapMessage(&writtenMessages[0]))), "direction:in", "topic:"+testTopic, "type:kafka", "group:"+testGroupID, ) expected, _ = datastreams.PathwayFromContext(expectedCtx) @@ -239,40 +346,106 @@ func TestFetchMessageFunctional(t *testing.T) { assert.Equal(t, expected.GetHash(), p.GetHash()) } -func BenchmarkReaderStartSpan(b *testing.B) { - r := NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, - GroupID: testGroupID, - Topic: testTopic, - MaxWait: testReaderMaxWait, - }) +func TestProduceMultipleMessages(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + messages := []kafka.Message{ + { + Key: []byte("key1"), + Value: []byte("value1"), + }, + { + Key: []byte("key2"), + Value: []byte("value2"), + }, + { + Key: []byte("key3"), + Value: []byte("value3"), + }, + } + + writer := WrapWriter(testWriter()) + reader := WrapReader(testReader()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := writer.WriteMessages(ctx, messages...) + require.NoError(t, err) + require.NoError(t, writer.Close()) + + curMsg := 0 + for curMsg < len(messages) { + readMsg, err := reader.ReadMessage(ctx) + require.NoError(t, err) + require.Equal(t, string(messages[curMsg].Key), string(readMsg.Key)) + require.Equal(t, string(messages[curMsg].Value), string(readMsg.Value)) + curMsg++ + } + require.NoError(t, reader.Close()) + + spans := mt.FinishedSpans() + require.Len(t, spans, 6) + + produceSpans := spans[0:3] + consumeSpans := spans[3:6] + for i := 0; i < 3; i++ { + ps := produceSpans[i] + cs := consumeSpans[i] + + assert.Equal(t, "kafka.produce", ps.OperationName(), "wrong produce span name") + assert.Equal(t, "kafka.consume", cs.OperationName(), "wrong consume span name") + assert.Equal(t, cs.ParentID(), ps.SpanID(), "consume span should be child of a produce span") + assert.Equal(t, uint64(0), ps.ParentID(), "produce span should not be child of any span") + assert.Equal(t, cs.TraceID(), ps.TraceID(), "spans should be part of the same trace") + } +} + +// benchSpan is a package-level variable used to prevent compiler optimisations in the benchmarks below. +var benchSpan *tracer.Span + +func BenchmarkReaderStartSpan(b *testing.B) { + ctx := context.Background() + kafkaCfg := KafkaConfig{ + BootstrapServers: "localhost:9092,localhost:9093,localhost:9094", + ConsumerGroupID: testGroupID, + } + tr := NewTracer(kafkaCfg) msg := kafka.Message{ Key: []byte("key1"), Value: []byte("value1"), } + var result *tracer.Span b.ResetTimer() for n := 0; n < b.N; n++ { - r.startSpan(nil, &msg) + result = tr.StartConsumeSpan(ctx, wrapMessage(&msg)) } + benchSpan = result } func BenchmarkWriterStartSpan(b *testing.B) { + ctx := context.Background() + kafkaCfg := KafkaConfig{ + BootstrapServers: "localhost:9092,localhost:9093,localhost:9094", + ConsumerGroupID: testGroupID, + } + tr := NewTracer(kafkaCfg) kw := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: testTopic, RequiredAcks: kafka.RequireOne, } - w := WrapWriter(kw) - msg := kafka.Message{ Key: []byte("key1"), Value: []byte("value1"), } + var result *tracer.Span b.ResetTimer() for n := 0; n < b.N; n++ { - w.startSpan(nil, &msg) + result = tr.StartProduceSpan(ctx, wrapTracingWriter(kw), wrapMessage(&msg)) } + benchSpan = result } diff --git a/contrib/segmentio/kafka-go/message_carrier.go b/contrib/segmentio/kafka-go/message_carrier.go new file mode 100644 index 0000000000..45cdaeeeed --- /dev/null +++ b/contrib/segmentio/kafka-go/message_carrier.go @@ -0,0 +1,52 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package kafka + +import ( + "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" +) + +// A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message +type MessageCarrier struct { + msg Message +} + +var _ interface { + tracer.TextMapReader + tracer.TextMapWriter +} = (*MessageCarrier)(nil) + +// ForeachKey conforms to the TextMapReader interface. +func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error { + for _, h := range c.msg.GetHeaders() { + err := handler(h.GetKey(), string(h.GetValue())) + if err != nil { + return err + } + } + return nil +} + +// Set implements TextMapWriter +func (c MessageCarrier) Set(key, val string) { + headers := c.msg.GetHeaders() + // ensure uniqueness of keys + for i := 0; i < len(headers); i++ { + if headers[i].GetKey() == key { + headers = append(headers[:i], headers[i+1:]...) + i-- + } + } + headers = append(headers, KafkaHeader{ + Key: key, + Value: []byte(val), + }) + c.msg.SetHeaders(headers) +} + +func NewMessageCarrier(msg Message) MessageCarrier { + return MessageCarrier{msg: msg} +} diff --git a/contrib/segmentio/kafka-go/option_test.go b/contrib/segmentio/kafka-go/option_test.go index 43147f6ee1..effbd7f0b4 100644 --- a/contrib/segmentio/kafka-go/option_test.go +++ b/contrib/segmentio/kafka-go/option_test.go @@ -49,7 +49,7 @@ func TestAnalyticsSettings(t *testing.T) { t.Run("optionOverridesEnv", func(t *testing.T) { t.Setenv("DD_DATA_STREAMS_ENABLED", "false") cfg := newConfig() - WithDataStreams()(cfg) + WithDataStreams().apply(cfg) assert.True(t, cfg.dataStreamsEnabled) }) } diff --git a/contrib/segmentio/kafka-go/tracer.go b/contrib/segmentio/kafka-go/tracer.go new file mode 100644 index 0000000000..6510d7963f --- /dev/null +++ b/contrib/segmentio/kafka-go/tracer.go @@ -0,0 +1,19 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka + +type Tracer struct { + kafkaCfg KafkaConfig + cfg *config +} + +func NewTracer(kafkaCfg KafkaConfig, opts ...Option) *Tracer { + tr := &Tracer{ + kafkaCfg: kafkaCfg, + } + tr.cfg = newConfig(opts...) + return tr +} diff --git a/contrib/segmentio/kafka-go/tracer_test.go b/contrib/segmentio/kafka-go/tracer_test.go new file mode 100644 index 0000000000..06d64c9333 --- /dev/null +++ b/contrib/segmentio/kafka-go/tracer_test.go @@ -0,0 +1,55 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka + +import ( + "math" + "testing" + + "github.com/DataDog/dd-trace-go/v2/instrumentation/testutils" + + "github.com/stretchr/testify/assert" +) + +func TestTracerAnalyticsSettings(t *testing.T) { + t.Run("defaults", func(t *testing.T) { + tr := NewTracer(KafkaConfig{}) + assert.True(t, math.IsNaN(tr.cfg.analyticsRate)) + }) + + t.Run("global", func(t *testing.T) { + t.Skip("global flag disabled") + testutils.SetGlobalAnalyticsRate(t, 0.4) + + tr := NewTracer(KafkaConfig{}) + assert.Equal(t, 0.4, tr.cfg.analyticsRate) + }) + + t.Run("enabled", func(t *testing.T) { + tr := NewTracer(KafkaConfig{}, WithAnalytics(true)) + assert.Equal(t, 1.0, tr.cfg.analyticsRate) + }) + + t.Run("override", func(t *testing.T) { + testutils.SetGlobalAnalyticsRate(t, 0.4) + + tr := NewTracer(KafkaConfig{}, WithAnalyticsRate(0.2)) + assert.Equal(t, 0.2, tr.cfg.analyticsRate) + }) + + t.Run("withEnv", func(t *testing.T) { + t.Setenv("DD_DATA_STREAMS_ENABLED", "true") + tr := NewTracer(KafkaConfig{}) + assert.True(t, tr.cfg.dataStreamsEnabled) + }) + + t.Run("optionOverridesEnv", func(t *testing.T) { + t.Setenv("DD_DATA_STREAMS_ENABLED", "false") + tr := NewTracer(KafkaConfig{}) + WithDataStreams().apply(tr.cfg) + assert.True(t, tr.cfg.dataStreamsEnabled) + }) +} diff --git a/contrib/segmentio/kafka-go/tracing.go b/contrib/segmentio/kafka-go/tracing.go new file mode 100644 index 0000000000..5cb014b0d4 --- /dev/null +++ b/contrib/segmentio/kafka-go/tracing.go @@ -0,0 +1,161 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka + +import ( + "context" + "math" + + "github.com/segmentio/kafka-go" + + "github.com/DataDog/dd-trace-go/v2/ddtrace/ext" + "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" +) + +const componentName = "segmentio/kafka-go" + +func (tr *Tracer) StartConsumeSpan(ctx context.Context, msg Message) *tracer.Span { + opts := []tracer.StartSpanOption{ + tracer.ServiceName(tr.cfg.consumerServiceName), + tracer.ResourceName("Consume Topic " + msg.GetTopic()), + tracer.SpanType(ext.SpanTypeMessageConsumer), + tracer.Tag(ext.MessagingKafkaPartition, msg.GetPartition()), + tracer.Tag("offset", msg.GetOffset()), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + tracer.Measured(), + } + if tr.kafkaCfg.BootstrapServers != "" { + opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, tr.kafkaCfg.BootstrapServers)) + } + if !math.IsNaN(tr.cfg.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.cfg.analyticsRate)) + } + // kafka supports headers, so try to extract a span context + carrier := NewMessageCarrier(msg) + if spanctx, err := tracer.Extract(carrier); err == nil { + opts = append(opts, tracer.ChildOf(spanctx)) + } + span, _ := tracer.StartSpanFromContext(ctx, tr.cfg.consumerSpanName, opts...) + // reinject the span context so consumers can pick it up + if err := tracer.Inject(span.Context(), carrier); err != nil { + instr.Logger().Debug("contrib/segmentio/kafka-go: Failed to inject span context into carrier in reader, %v", err) + } + return span +} + +func (tr *Tracer) StartProduceSpan(ctx context.Context, writer Writer, msg Message, spanOpts ...tracer.StartSpanOption) *tracer.Span { + opts := []tracer.StartSpanOption{ + tracer.ServiceName(tr.cfg.producerServiceName), + tracer.SpanType(ext.SpanTypeMessageProducer), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindProducer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + } + if tr.kafkaCfg.BootstrapServers != "" { + opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, tr.kafkaCfg.BootstrapServers)) + } + if writer.GetTopic() != "" { + opts = append(opts, tracer.ResourceName("Produce Topic "+writer.GetTopic())) + } else { + opts = append(opts, tracer.ResourceName("Produce Topic "+msg.GetTopic())) + } + if !math.IsNaN(tr.cfg.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.cfg.analyticsRate)) + } + opts = append(opts, spanOpts...) + carrier := NewMessageCarrier(msg) + span, _ := tracer.StartSpanFromContext(ctx, tr.cfg.producerSpanName, opts...) + if err := tracer.Inject(span.Context(), carrier); err != nil { + instr.Logger().Debug("contrib/segmentio/kafka-go: Failed to inject span context into carrier in writer, %v", err) + } + return span +} + +func (*Tracer) FinishProduceSpan(span *tracer.Span, partition int, offset int64, err error) { + span.SetTag(ext.MessagingKafkaPartition, partition) + span.SetTag("offset", offset) + span.Finish(tracer.WithError(err)) +} + +type wMessage struct { + *kafka.Message +} + +func wrapMessage(msg *kafka.Message) Message { + if msg == nil { + return nil + } + return &wMessage{msg} +} + +func (w *wMessage) GetValue() []byte { + return w.Value +} + +func (w *wMessage) GetKey() []byte { + return w.Key +} + +func (w *wMessage) GetHeaders() []Header { + hs := make([]Header, 0, len(w.Headers)) + for _, h := range w.Headers { + hs = append(hs, wrapHeader(h)) + } + return hs +} + +func (w *wMessage) SetHeaders(headers []Header) { + hs := make([]kafka.Header, 0, len(headers)) + for _, h := range headers { + hs = append(hs, kafka.Header{ + Key: h.GetKey(), + Value: h.GetValue(), + }) + } + w.Message.Headers = hs +} + +func (w *wMessage) GetTopic() string { + return w.Topic +} + +func (w *wMessage) GetPartition() int { + return w.Partition +} + +func (w *wMessage) GetOffset() int64 { + return w.Offset +} + +type wHeader struct { + kafka.Header +} + +func wrapHeader(h kafka.Header) Header { + return &wHeader{h} +} + +func (w wHeader) GetKey() string { + return w.Key +} + +func (w wHeader) GetValue() []byte { + return w.Value +} + +type wWriter struct { + *kafka.Writer +} + +func (w *wWriter) GetTopic() string { + return w.Topic +} + +func wrapTracingWriter(w *kafka.Writer) Writer { + return &wWriter{w} +} diff --git a/contrib/segmentio/kafka-go/types.go b/contrib/segmentio/kafka-go/types.go new file mode 100644 index 0000000000..ef4a83e64a --- /dev/null +++ b/contrib/segmentio/kafka-go/types.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package kafka + +type Header interface { + GetKey() string + GetValue() []byte +} + +type KafkaHeader struct { + Key string + Value []byte +} + +func (h KafkaHeader) GetKey() string { + return h.Key +} + +func (h KafkaHeader) GetValue() []byte { + return h.Value +} + +type Writer interface { + GetTopic() string +} + +type Message interface { + GetValue() []byte + GetKey() []byte + GetHeaders() []Header + SetHeaders([]Header) + GetTopic() string + GetPartition() int + GetOffset() int64 +} + +// KafkaConfig holds information from the kafka config for span tags. +type KafkaConfig struct { + BootstrapServers string + ConsumerGroupID string +} diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index f7a6059ec3..814a089c86 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -1365,6 +1365,10 @@ func setHeaderTags(headerAsTags []string) bool { globalconfig.ClearHeaderTags() for _, h := range headerAsTags { header, tag := normalizer.HeaderTag(h) + if len(header) == 0 || len(tag) == 0 { + log.Debug("Header-tag input is in unsupported format; dropping input value %v", h) + continue + } globalconfig.SetHeaderTag(header, tag) } return true diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index 28bd88b89f..1f3689f4e0 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -1566,6 +1566,28 @@ func TestWithHeaderTags(t *testing.T) { assert.Equal(ext.HTTPRequestHeaders+".2_h_e_a_d_e_r", globalconfig.HeaderTag("2.h.e.a.d.e.r")) }) + t.Run("envvar-invalid", func(t *testing.T) { + defer globalconfig.ClearHeaderTags() + t.Setenv("DD_TRACE_HEADER_TAGS", "header1:") + + assert := assert.New(t) + newConfig() + + assert.Equal(0, globalconfig.HeaderTagsLen()) + }) + + t.Run("envvar-partially-invalid", func(t *testing.T) { + defer globalconfig.ClearHeaderTags() + t.Setenv("DD_TRACE_HEADER_TAGS", "header1,header2:") + + assert := assert.New(t) + newConfig() + + assert.Equal(1, globalconfig.HeaderTagsLen()) + fmt.Println(globalconfig.HeaderTagMap()) + assert.Equal(ext.HTTPRequestHeaders+".header1", globalconfig.HeaderTag("Header1")) + }) + t.Run("env-override", func(t *testing.T) { defer globalconfig.ClearHeaderTags() assert := assert.New(t) diff --git a/docker-compose.yaml b/docker-compose.yaml index 03242034e5..a246ee8423 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -125,6 +125,7 @@ services: image: memcached:1.5.9 ports: - "11211:11211" +<<<<<<< HEAD zookeeper: image: confluentinc/cp-zookeeper:7.7.0 environment: @@ -145,6 +146,26 @@ services: KAFKA_GROUP_ID: "gotest" depends_on: - zookeeper +======= + kafka: + image: confluentinc/confluent-local:7.5.0 + environment: + KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092" + KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_BROKER_ID: "1" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1" + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1" + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + KAFKA_NODE_ID: "1" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" +>>>>>>> v1.69.1-rc.4 ports: - "9092:9092" localstack: diff --git a/instrumentation/internal/namingschematest/segmentio_kafkago.go b/instrumentation/internal/namingschematest/segmentio_kafkago.go index 60fb01f0a5..61df865d12 100644 --- a/instrumentation/internal/namingschematest/segmentio_kafkago.go +++ b/instrumentation/internal/namingschematest/segmentio_kafkago.go @@ -27,7 +27,7 @@ const ( type readerOpFn func(t *testing.T, r *segmentiotracer.Reader) -func genIntegrationTestSpans(t *testing.T, mt mocktracer.Tracer, writerOp func(t *testing.T, w *segmentiotracer.Writer), readerOp readerOpFn, writerOpts []segmentiotracer.Option, readerOpts []segmentiotracer.Option) ([]*mocktracer.Span, []kafka.Message) { +func genIntegrationTestSpans(t *testing.T, mt mocktracer.Tracer, writerOp func(t *testing.T, w *segmentiotracer.KafkaWriter), readerOp readerOpFn, writerOpts []segmentiotracer.Option, readerOpts []segmentiotracer.Option) ([]*mocktracer.Span, []kafka.Message) { writtenMessages := []kafka.Message{} // add some dummy values to broker/addr to test bootstrap servers. @@ -81,7 +81,7 @@ func segmentioKafkaGoGenSpans() harness.GenSpansFn { spans, _ := genIntegrationTestSpans( t, mt, - func(t *testing.T, w *segmentiotracer.Writer) { + func(t *testing.T, w *segmentiotracer.KafkaWriter) { err := w.WriteMessages(context.Background(), messagesToWrite...) require.NoError(t, err, "Expected to write message to topic") }, diff --git a/instrumentation/packages.go b/instrumentation/packages.go index e42270b723..a1702ff31e 100644 --- a/instrumentation/packages.go +++ b/instrumentation/packages.go @@ -52,7 +52,7 @@ const ( PackageSyndtrGoLevelDB Package = "syndtr/goleveldb/leveldb" PackageSirupsenLogrus Package = "sirupsen/logrus" PackageShopifySarama Package = "Shopify/sarama" - PackageSegmentioKafkaGo Package = "segmentio/kafka.go.v0" + PackageSegmentioKafkaGo Package = "segmentio/kafka-go" PackageRedisGoRedisV9 Package = "redis/go-redis.v9" PackageOlivereElasticV5 Package = "olivere/elastic" PackageMiekgDNS Package = "miekg/dns" diff --git a/internal/civisibility/integrations/gotesting/instrumentation_orchestrion.go b/internal/civisibility/integrations/gotesting/instrumentation_orchestrion.go index 2b3345549b..cd715f812f 100644 --- a/internal/civisibility/integrations/gotesting/instrumentation_orchestrion.go +++ b/internal/civisibility/integrations/gotesting/instrumentation_orchestrion.go @@ -14,6 +14,7 @@ import ( "sync/atomic" "testing" "time" + _ "unsafe" "github.com/DataDog/dd-trace-go/v2/ddtrace/ext" "github.com/DataDog/dd-trace-go/v2/internal/civisibility/constants" @@ -29,6 +30,8 @@ import ( // ****************************************************************************************************************** // instrumentTestingM helper function to instrument internalTests and internalBenchmarks in a `*testing.M` instance. +// +//go:linkname instrumentTestingM func instrumentTestingM(m *testing.M) func(exitCode int) { // Check if CI Visibility was disabled using the kill switch before trying to initialize it atomic.StoreInt32(&ciVisibilityEnabledValue, -1) @@ -72,6 +75,8 @@ func instrumentTestingM(m *testing.M) func(exitCode int) { } // instrumentTestingTFunc helper function to instrument a testing function func(*testing.T) +// +//go:linkname instrumentTestingTFunc func instrumentTestingTFunc(f func(*testing.T)) func(*testing.T) { // Check if CI Visibility was disabled using the kill switch before instrumenting if !isCiVisibilityEnabled() { @@ -188,6 +193,8 @@ func instrumentTestingTFunc(f func(*testing.T)) func(*testing.T) { } // instrumentSetErrorInfo helper function to set an error in the `*testing.T, *testing.B, *testing.common` CI Visibility span +// +//go:linkname instrumentSetErrorInfo func instrumentSetErrorInfo(tb testing.TB, errType string, errMessage string, skip int) { // Check if CI Visibility was disabled using the kill switch before if !isCiVisibilityEnabled() { @@ -202,6 +209,8 @@ func instrumentSetErrorInfo(tb testing.TB, errType string, errMessage string, sk } // instrumentCloseAndSkip helper function to close and skip with a reason a `*testing.T, *testing.B, *testing.common` CI Visibility span +// +//go:linkname instrumentCloseAndSkip func instrumentCloseAndSkip(tb testing.TB, skipReason string) { // Check if CI Visibility was disabled using the kill switch before if !isCiVisibilityEnabled() { @@ -216,6 +225,8 @@ func instrumentCloseAndSkip(tb testing.TB, skipReason string) { } // instrumentSkipNow helper function to close and skip a `*testing.T, *testing.B, *testing.common` CI Visibility span +// +//go:linkname instrumentSkipNow func instrumentSkipNow(tb testing.TB) { // Check if CI Visibility was disabled using the kill switch before if !isCiVisibilityEnabled() { @@ -230,6 +241,8 @@ func instrumentSkipNow(tb testing.TB) { } // instrumentTestingBFunc helper function to instrument a benchmark function func(*testing.B) +// +//go:linkname instrumentTestingBFunc func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (string, func(*testing.B)) { // Check if CI Visibility was disabled using the kill switch before instrumenting if !isCiVisibilityEnabled() { diff --git a/internal/civisibility/integrations/gotesting/testcontroller_test.go b/internal/civisibility/integrations/gotesting/testcontroller_test.go index 84f5197638..8cb9d04fb9 100644 --- a/internal/civisibility/integrations/gotesting/testcontroller_test.go +++ b/internal/civisibility/integrations/gotesting/testcontroller_test.go @@ -96,6 +96,7 @@ func runFlakyTestRetriesTests(m *testing.M) { // 1 TestMyTest01 // 1 TestMyTest02 + 2 subtests // 1 Test_Foo + 3 subtests + // 1 TestWithExternalCalls + 2 subtests // 1 TestSkip // 1 TestRetryWithPanic + 3 retry tests from testing_test.go // 1 TestRetryWithFail + 3 retry tests from testing_test.go @@ -115,6 +116,9 @@ func runFlakyTestRetriesTests(m *testing.M) { checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/yellow_should_return_color", 1) checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/banana_should_return_fruit", 1) checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/duck_should_return_animal", 1) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls", 1) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls/default", 1) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls/custom-name", 1) checkSpansByResourceName(finishedSpans, "testing_test.go.TestSkip", 1) checkSpansByResourceName(finishedSpans, "testing_test.go.TestRetryWithPanic", 4) checkSpansByResourceName(finishedSpans, "testing_test.go.TestRetryWithFail", 4) @@ -127,12 +131,12 @@ func runFlakyTestRetriesTests(m *testing.M) { // check spans by type checkSpansByType(finishedSpans, - 39, + 44, 1, 1, 2, - 35, - 0) + 38, + 2) os.Exit(0) } @@ -174,6 +178,7 @@ func runEarlyFlakyTestDetectionTests(m *testing.M) { // 11 TestMyTest01 // 11 TestMyTest02 + 22 subtests // 11 Test_Foo + 33 subtests + // 11 TestWithExternalCalls + 22 subtests // 11 TestSkip // 11 TestRetryWithPanic // 11 TestRetryWithFail @@ -194,6 +199,9 @@ func runEarlyFlakyTestDetectionTests(m *testing.M) { checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/yellow_should_return_color", 11) checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/banana_should_return_fruit", 11) checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/duck_should_return_animal", 11) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls", 11) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls/default", 11) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls/custom-name", 11) checkSpansByResourceName(finishedSpans, "testing_test.go.TestSkip", 11) checkSpansByResourceName(finishedSpans, "testing_test.go.TestRetryWithPanic", 11) checkSpansByResourceName(finishedSpans, "testing_test.go.TestRetryWithFail", 11) @@ -202,17 +210,17 @@ func runEarlyFlakyTestDetectionTests(m *testing.M) { checkSpansByResourceName(finishedSpans, "testing_test.go.TestEarlyFlakeDetection", 11) // check spans by tag - checkSpansByTagName(finishedSpans, constants.TestIsNew, 154) - checkSpansByTagName(finishedSpans, constants.TestIsRetry, 140) + checkSpansByTagName(finishedSpans, constants.TestIsNew, 187) + checkSpansByTagName(finishedSpans, constants.TestIsRetry, 170) // check spans by type checkSpansByType(finishedSpans, - 163, + 218, 1, 1, 2, - 159, - 0) + 192, + 22) os.Exit(0) } @@ -289,6 +297,9 @@ func runFlakyTestRetriesWithEarlyFlakyTestDetectionTests(m *testing.M) { checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/yellow_should_return_color", 1) checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/banana_should_return_fruit", 1) checkSpansByResourceName(finishedSpans, "testing_test.go.Test_Foo/duck_should_return_animal", 1) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls", 1) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls/default", 1) + checkSpansByResourceName(finishedSpans, "testing_test.go.TestWithExternalCalls/custom-name", 1) checkSpansByResourceName(finishedSpans, "testing_test.go.TestSkip", 1) checkSpansByResourceName(finishedSpans, "testing_test.go.TestRetryWithPanic", 4) checkSpansByResourceName(finishedSpans, "testing_test.go.TestRetryWithFail", 4) @@ -302,12 +313,12 @@ func runFlakyTestRetriesWithEarlyFlakyTestDetectionTests(m *testing.M) { // check spans by type checkSpansByType(finishedSpans, - 59, + 64, 1, 1, 2, - 55, - 0) + 58, + 2) os.Exit(0) } @@ -426,11 +437,6 @@ func setUpHttpServer(flakyRetriesEnabled bool, earlyFlakyDetectionEnabled bool, fmt.Printf("MockApi sending response: %v\n", response) json.NewEncoder(w).Encode(&response) - } else if r.URL.Path == "/api/v2/git/repository/search_commits" { - w.Header().Set("Content-Type", "application/json") - w.Write([]byte("{}")) - } else if r.URL.Path == "/api/v2/git/repository/packfile" { - w.WriteHeader(http.StatusAccepted) } else { http.NotFound(w, r) }