Skip to content

Commit

Permalink
merge 1.69.1-rc.4 with v2
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahkm committed Oct 29, 2024
1 parent 8445590 commit 4018b72
Show file tree
Hide file tree
Showing 28 changed files with 1,419 additions and 184 deletions.
28 changes: 16 additions & 12 deletions .github/workflows/unit-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,24 @@ jobs:
image: memcached:1.5.9
ports:
- 11211:11211
zookeeper:
image: bitnami/zookeeper:latest
env:
ALLOW_ANONYMOUS_LOGIN: "yes"
ports:
- 2181:2181
kafka:
image: darccio/kafka:2.13-2.8.1
image: confluentinc/confluent-local:7.5.0
env:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: gotest:1:1,gosegtest:1:1
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092"
KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_BROKER_ID: "1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
ports:
- 9092:9092
localstack:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

// Package tracing contains tracing logic for the cloud.google.com/go/pubsub.v1 instrumentation.
//
// WARNING: this package SHOULD NOT import cloud.google.com/go/pubsub.
//
// The motivation of this package is to support orchestrion, which cannot use the main package because it imports
// the cloud.google.com/go/pubsub package, and since orchestrion modifies the library code itself,
// this would cause an import cycle.
package tracing

import (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"math"

"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
)

func WrapConsumeEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, consumer Consumer, translateFn func(E) TE) chan E {
// in will be nil when consuming via the events channel is not enabled
if in == nil {
return nil
}

out := make(chan E, 1)
go func() {
defer close(out)
for evt := range in {
tEvt := translateFn(evt)
var next *tracer.Span

// only trace messages
if msg, ok := tEvt.KafkaMessage(); ok {
next = tr.StartConsumeSpan(msg)
tr.SetConsumeCheckpoint(msg)
} else if offset, ok := tEvt.KafkaOffsetsCommitted(); ok {
tr.TrackCommitOffsets(offset.GetOffsets(), offset.GetError())
tr.TrackHighWatermarkOffset(offset.GetOffsets(), consumer)
}

out <- evt

if tr.PrevSpan != nil {
tr.PrevSpan.Finish()
}
tr.PrevSpan = next
}
// finish any remaining span
if tr.PrevSpan != nil {
tr.PrevSpan.Finish()
tr.PrevSpan = nil
}
}()
return out
}

func (tr *KafkaTracer) StartConsumeSpan(msg Message) *tracer.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(tr.consumerServiceName),
tracer.ResourceName("Consume Topic " + msg.GetTopicPartition().GetTopic()),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag(ext.MessagingKafkaPartition, msg.GetTopicPartition().GetPartition()),
tracer.Tag("offset", msg.GetTopicPartition().GetOffset()),
tracer.Tag(ext.Component, ComponentName(tr.ckgoVersion)),
tracer.Tag(ext.SpanKind, ext.SpanKindConsumer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka),
tracer.Measured(),
}
if tr.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, tr.bootstrapServers))
}
if tr.tagFns != nil {
for key, tagFn := range tr.tagFns {
opts = append(opts, tracer.Tag(key, tagFn(msg)))
}
}
if !math.IsNaN(tr.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.analyticsRate))
}
// kafka supports headers, so try to extract a span context
carrier := MessageCarrier{msg: msg}
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(tr.ctx, tr.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
tracer.Inject(span.Context(), carrier)
return span
}
88 changes: 88 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/internal/tracing/dsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"context"

"github.com/DataDog/dd-trace-go/v2/datastreams"
"github.com/DataDog/dd-trace-go/v2/datastreams/options"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
)

func (tr *KafkaTracer) TrackCommitOffsets(offsets []TopicPartition, err error) {
if err != nil || tr.groupID == "" || !tr.dsmEnabled {
return
}
for _, tp := range offsets {
tracer.TrackKafkaCommitOffset(tr.groupID, tp.GetTopic(), tp.GetPartition(), tp.GetOffset())
}
}

func (tr *KafkaTracer) TrackHighWatermarkOffset(offsets []TopicPartition, consumer Consumer) {
if !tr.dsmEnabled {
return
}
for _, tp := range offsets {
if _, high, err := consumer.GetWatermarkOffsets(tp.GetTopic(), tp.GetPartition()); err == nil {
tracer.TrackKafkaHighWatermarkOffset("", tp.GetTopic(), tp.GetPartition(), high)
}
}
}

func (tr *KafkaTracer) TrackProduceOffsets(msg Message) {
err := msg.GetTopicPartition().GetError()
if err != nil || !tr.dsmEnabled || msg.GetTopicPartition().GetTopic() == "" {
return
}
tp := msg.GetTopicPartition()
tracer.TrackKafkaProduceOffset(tp.GetTopic(), tp.GetPartition(), tp.GetOffset())
}

func (tr *KafkaTracer) SetConsumeCheckpoint(msg Message) {
if !tr.dsmEnabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.GetTopicPartition().GetTopic(), "type:kafka"}
if tr.groupID != "" {
edges = append(edges, "group:"+tr.groupID)
}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getMsgSize(msg)},
edges...,
)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func (tr *KafkaTracer) SetProduceCheckpoint(msg Message) {
if !tr.dsmEnabled || msg == nil {
return
}
edges := []string{"direction:out", "topic:" + msg.GetTopicPartition().GetTopic(), "type:kafka"}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getMsgSize(msg)},
edges...,
)
if !ok || tr.librdKafkaVersion < 0x000b0400 {
// headers not supported before librdkafka >=0.11.4
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func getMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
return size + int64(len(msg.GetValue())+len(msg.GetKey()))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"context"
"math"
"net"
"strings"

"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/internal"
"github.com/DataDog/dd-trace-go/v2/internal/namingschema"
)

const defaultServiceName = "kafka"

type KafkaTracer struct {
PrevSpan *tracer.Span
ctx context.Context
consumerServiceName string
producerServiceName string
consumerSpanName string
producerSpanName string
analyticsRate float64
bootstrapServers string
groupID string
tagFns map[string]func(msg Message) interface{}
dsmEnabled bool
ckgoVersion CKGoVersion
librdKafkaVersion int
}

func (tr *KafkaTracer) DSMEnabled() bool {
return tr.dsmEnabled
}

// An Option customizes the KafkaTracer.
type Option func(tr *KafkaTracer)

func NewKafkaTracer(ckgoVersion CKGoVersion, librdKafkaVersion int, opts ...Option) *KafkaTracer {
tr := &KafkaTracer{
ctx: context.Background(),
// analyticsRate: globalconfig.AnalyticsRate(),
analyticsRate: math.NaN(),
ckgoVersion: ckgoVersion,
librdKafkaVersion: librdKafkaVersion,
}
tr.dsmEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false)
if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) {
tr.analyticsRate = 1.0
}

tr.consumerServiceName = namingschema.ServiceName(defaultServiceName)
tr.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
tr.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
tr.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound)

for _, opt := range opts {
opt(tr)
}
return tr
}

// WithContext sets the config context to ctx.
// Deprecated: This is deprecated in favor of passing the context
// via the message headers
func WithContext(ctx context.Context) Option {
return func(tr *KafkaTracer) {
tr.ctx = ctx
}
}

// WithServiceName sets the config service name to serviceName.
func WithServiceName(serviceName string) Option {
return func(tr *KafkaTracer) {
tr.consumerServiceName = serviceName
tr.producerServiceName = serviceName
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(tr *KafkaTracer) {
if on {
tr.analyticsRate = 1.0
} else {
tr.analyticsRate = math.NaN()
}
}
}

// WithAnalyticsRate sets the sampling rate for Trace Analytics events
// correlated to started spans.
func WithAnalyticsRate(rate float64) Option {
return func(tr *KafkaTracer) {
if rate >= 0.0 && rate <= 1.0 {
tr.analyticsRate = rate
} else {
tr.analyticsRate = math.NaN()
}
}
}

// WithCustomTag will cause the given tagFn to be evaluated after executing
// a query and attach the result to the span tagged by the key.
func WithCustomTag(tag string, tagFn func(msg Message) interface{}) Option {
return func(tr *KafkaTracer) {
if tr.tagFns == nil {
tr.tagFns = make(map[string]func(msg Message) interface{})
}
tr.tagFns[tag] = tagFn
}
}

// WithConfig extracts the config information for the client to be tagged
func WithConfig(cg ConfigMap) Option {
return func(tr *KafkaTracer) {
if groupID, err := cg.Get("group.id", ""); err == nil {
tr.groupID = groupID.(string)
}
if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" {
for _, addr := range strings.Split(bs.(string), ",") {
host, _, err := net.SplitHostPort(addr)
if err == nil {
tr.bootstrapServers = host
return
}
}
}
}
}

// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithDataStreams() Option {
return func(tr *KafkaTracer) {
tr.dsmEnabled = true
}
}
Loading

0 comments on commit 4018b72

Please sign in to comment.