Skip to content

Commit

Permalink
contrib/segmentio/kafka.go.v0: refactor tracing code (#2885)
Browse files Browse the repository at this point in the history
  • Loading branch information
rarguelloF authored Oct 16, 2024
1 parent 93311db commit 88722fc
Show file tree
Hide file tree
Showing 18 changed files with 770 additions and 457 deletions.
28 changes: 16 additions & 12 deletions .github/workflows/unit-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,24 @@ jobs:
image: memcached:1.5.9
ports:
- 11211:11211
zookeeper:
image: bitnami/zookeeper:latest
env:
ALLOW_ANONYMOUS_LOGIN: "yes"
ports:
- 2181:2181
kafka:
image: darccio/kafka:2.13-2.8.1
image: confluentinc/confluent-local:7.5.0
env:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: gotest:1:1,gosegtest:1:1
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092"
KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_BROKER_ID: "1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
ports:
- 9092:9092
localstack:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

// Package tracing contains tracing logic for the cloud.google.com/go/pubsub.v1 instrumentation.
//
// WARNING: this package SHOULD NOT import cloud.google.com/go/pubsub.
//
// The motivation of this package is to support orchestrion, which cannot use the main package because it imports
// the cloud.google.com/go/pubsub package, and since orchestrion modifies the library code itself,
// this would cause an import cycle.
package tracing

import (
Expand Down
24 changes: 0 additions & 24 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,30 +179,6 @@ func TestConsumerChannel(t *testing.T) {
}
}

/*
to run the integration test locally:
docker network create confluent
docker run --rm \
--name zookeeper \
--network confluent \
-p 2181:2181 \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:5.0.0
docker run --rm \
--name kafka \
--network confluent \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_CREATE_TOPICS=gotest:1:1 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.0.0
*/

func TestConsumerFunctional(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down
24 changes: 0 additions & 24 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,30 +179,6 @@ func TestConsumerChannel(t *testing.T) {
}
}

/*
to run the integration test locally:
docker network create confluent
docker run --rm \
--name zookeeper \
--network confluent \
-p 2181:2181 \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:5.0.0
docker run --rm \
--name kafka \
--network confluent \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_CREATE_TOPICS=gotest:1:1 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.0.0
*/

func TestConsumerFunctional(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion contrib/segmentio/kafka.go.v0/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go"
)

func ExampleWriter() {
Expand Down
43 changes: 4 additions & 39 deletions contrib/segmentio/kafka.go.v0/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,14 @@
package kafka

import (
"github.com/segmentio/kafka-go"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/segmentio/kafka-go"
)

// A messageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
type messageCarrier struct {
msg *kafka.Message
}

var _ interface {
tracer.TextMapReader
tracer.TextMapWriter
} = (*messageCarrier)(nil)

// ForeachKey conforms to the TextMapReader interface.
func (c messageCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.msg.Headers {
err := handler(h.Key, string(h.Value))
if err != nil {
return err
}
}
return nil
}

// Set implements TextMapWriter
func (c messageCarrier) Set(key, val string) {
// ensure uniqueness of keys
for i := 0; i < len(c.msg.Headers); i++ {
if string(c.msg.Headers[i].Key) == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
i--
}
}
c.msg.Headers = append(c.msg.Headers, kafka.Header{
Key: key,
Value: []byte(val),
})
}

// ExtractSpanContext retrieves the SpanContext from a kafka.Message
func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error) {
return tracer.Extract(messageCarrier{&msg})
return tracer.Extract(tracing.NewMessageCarrier(wrapMessage(&msg)))
}
86 changes: 86 additions & 0 deletions contrib/segmentio/kafka.go.v0/internal/tracing/dsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracing

import (
"context"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message) {
if !tr.dataStreamsEnabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.GetTopic(), "type:kafka"}
if tr.kafkaCfg.ConsumerGroupID != "" {
edges = append(edges, "group:"+tr.kafkaCfg.ConsumerGroupID)
}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)},
edges...,
)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if tr.kafkaCfg.ConsumerGroupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(tr.kafkaCfg.ConsumerGroupID, msg.GetTopic(), int32(msg.GetPartition()), msg.GetOffset())
}
}

func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer) {
if !tr.dataStreamsEnabled || msg == nil {
return
}

var topic string
if writer.GetTopic() != "" {
topic = writer.GetTopic()
} else {
topic = msg.GetTopic()
}

edges := []string{"direction:out", "topic:" + topic, "type:kafka"}
carrier := MessageCarrier{msg}
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)},
edges...,
)
if !ok {
return
}

// Headers will be dropped if the current protocol does not support them
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func getProducerMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
if msg.GetValue() != nil {
size += int64(len(msg.GetValue()))
}
if msg.GetKey() != nil {
size += int64(len(msg.GetKey()))
}
return size
}

func getConsumerMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
return size + int64(len(msg.GetValue())+len(msg.GetKey()))
}
52 changes: 52 additions & 0 deletions contrib/segmentio/kafka.go.v0/internal/tracing/message_carrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracing

import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
type MessageCarrier struct {
msg Message
}

var _ interface {
tracer.TextMapReader
tracer.TextMapWriter
} = (*MessageCarrier)(nil)

// ForeachKey conforms to the TextMapReader interface.
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.msg.GetHeaders() {
err := handler(h.GetKey(), string(h.GetValue()))
if err != nil {
return err
}
}
return nil
}

// Set implements TextMapWriter
func (c MessageCarrier) Set(key, val string) {
headers := c.msg.GetHeaders()
// ensure uniqueness of keys
for i := 0; i < len(headers); i++ {
if headers[i].GetKey() == key {
headers = append(headers[:i], headers[i+1:]...)
i--
}
}
headers = append(headers, KafkaHeader{
Key: key,
Value: []byte(val),
})
c.msg.SetHeaders(headers)
}

func NewMessageCarrier(msg Message) MessageCarrier {
return MessageCarrier{msg: msg}
}
Loading

0 comments on commit 88722fc

Please sign in to comment.