Skip to content

Commit

Permalink
merge 1.69.1-rc.4 with v2 (#2953)
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahkm authored Oct 29, 2024
1 parent 8445590 commit 729a1be
Show file tree
Hide file tree
Showing 22 changed files with 790 additions and 268 deletions.
28 changes: 16 additions & 12 deletions .github/workflows/unit-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,24 @@ jobs:
image: memcached:1.5.9
ports:
- 11211:11211
zookeeper:
image: bitnami/zookeeper:latest
env:
ALLOW_ANONYMOUS_LOGIN: "yes"
ports:
- 2181:2181
kafka:
image: darccio/kafka:2.13-2.8.1
image: confluentinc/confluent-local:7.5.0
env:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: gotest:1:1,gosegtest:1:1
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092"
KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_BROKER_ID: "1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
ports:
- 9092:9092
localstack:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

// Package tracing contains tracing logic for the cloud.google.com/go/pubsub.v1 instrumentation.
//
// WARNING: this package SHOULD NOT import cloud.google.com/go/pubsub.
//
// The motivation of this package is to support orchestrion, which cannot use the main package because it imports
// the cloud.google.com/go/pubsub package, and since orchestrion modifies the library code itself,
// this would cause an import cycle.
package tracing

import (
Expand Down
1 change: 0 additions & 1 deletion contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er
span.Finish(tracer.WithError(err))
}
}

return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ func TestCustomTags(t *testing.T) {
// assert.Equal(t, []byte("key1"), s.Tag("key"))
}

type consumerActionFn func(c *Consumer) (*kafka.Message, error)

// Test we don't leak goroutines and properly close the span when Produce returns an error.
func TestProduceError(t *testing.T) {
defer func() {
Expand Down Expand Up @@ -327,8 +329,6 @@ func TestProduceError(t *testing.T) {
assert.Len(t, spans, 1)
}

type consumerActionFn func(c *Consumer) (*kafka.Message, error)

func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]*mocktracer.Span, *kafka.Message) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("to enable integration test, set the INTEGRATION environment variable")
Expand Down
1 change: 0 additions & 1 deletion contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er
span.Finish(tracer.WithError(err))
}
}

return err
}

Expand Down
86 changes: 86 additions & 0 deletions contrib/segmentio/kafka-go/dsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package kafka

import (
"context"

"github.com/DataDog/dd-trace-go/v2/datastreams"
"github.com/DataDog/dd-trace-go/v2/datastreams/options"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
)

func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message) {
if !tr.cfg.dataStreamsEnabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.GetTopic(), "type:kafka"}
if tr.kafkaCfg.ConsumerGroupID != "" {
edges = append(edges, "group:"+tr.kafkaCfg.ConsumerGroupID)
}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)},
edges...,
)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if tr.kafkaCfg.ConsumerGroupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(tr.kafkaCfg.ConsumerGroupID, msg.GetTopic(), int32(msg.GetPartition()), msg.GetOffset())
}
}

func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer) {
if !tr.cfg.dataStreamsEnabled || msg == nil {
return
}

var topic string
if writer.GetTopic() != "" {
topic = writer.GetTopic()
} else {
topic = msg.GetTopic()
}

edges := []string{"direction:out", "topic:" + topic, "type:kafka"}
carrier := MessageCarrier{msg}
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)},
edges...,
)
if !ok {
return
}

// Headers will be dropped if the current protocol does not support them
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func getProducerMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
if msg.GetValue() != nil {
size += int64(len(msg.GetValue()))
}
if msg.GetKey() != nil {
size += int64(len(msg.GetKey()))
}
return size
}

func getConsumerMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
return size + int64(len(msg.GetValue())+len(msg.GetKey()))
}
2 changes: 1 addition & 1 deletion contrib/segmentio/kafka-go/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
kafkatrace "github.com/DataDog/dd-trace-go/contrib/segmentio/kafka-go/v2"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"

kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go"
)

func ExampleWriter() {
Expand Down
Loading

0 comments on commit 729a1be

Please sign in to comment.