Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add kafka connect instrumentation
Browse files Browse the repository at this point in the history
piochelepiotr committed Dec 11, 2024

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 7afe313 commit e64755b
Showing 6 changed files with 582 additions and 0 deletions.
37 changes: 37 additions & 0 deletions dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
muzzle {
pass {
group = "org.apache.kafka"
module = "connect-runtime"
versions = "[0.11.0.0,)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

dependencies {
compileOnly group: 'org.apache.kafka', name: 'connect-runtime', version: '0.11.0.0'

testImplementation(testFixtures(project(':dd-java-agent:agent-iast')))
testRuntimeOnly project(':dd-java-agent:instrumentation:iast-instrumenter')
testRuntimeOnly project(':dd-java-agent:instrumentation:java-lang')
testRuntimeOnly project(':dd-java-agent:instrumentation:java-io')
testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core')
testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core:jackson-core-2.8')
testImplementation(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.10')
testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0'
testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3'
testImplementation 'org.apache.kafka:connect-api:2.7.0' // Fixed version
testImplementation 'org.apache.kafka:connect-runtime:2.7.0'
testImplementation 'org.apache.kafka:connect-file:2.7.0' // For FileStreamSourceConnector
testImplementation 'org.apache.kafka:kafka-clients:2.7.0'
// Spring Kafka Test library
testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.9' // Version compatible with Kafka 2.7.x
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-0.11')
}

configurations.testRuntimeClasspath {
// spock-core depends on assertj version that is not compatible with kafka-clients
resolutionStrategy.force 'org.assertj:assertj-core:2.9.1'
}
258 changes: 258 additions & 0 deletions dd-java-agent/instrumentation/kafka-connect-0.11/gradle.lockfile

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package datadog.trace.instrumentation.kafka_connect;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.*;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.connect.runtime.TaskStatus.Listener;
import org.apache.kafka.connect.util.ConnectorTaskId;

@AutoService(InstrumenterModule.class)
public final class ConnectWorkerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy {

static final String TARGET_TYPE = "org.apache.kafka.connect.runtime.WorkerTask";

public ConnectWorkerInstrumentation() {
super("kafka", "kafka-connect");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".TaskListener",
};
}

@Override
public String hierarchyMarkerType() {
return TARGET_TYPE;
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return extendsClass(named(hierarchyMarkerType()));
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isConstructor()
.and(takesArgument(0, named("org.apache.kafka.connect.util.ConnectorTaskId")))
.and(takesArgument(1, named("org.apache.kafka.connect.runtime.TaskStatus$Listener"))),
ConnectWorkerInstrumentation.class.getName() + "$ConstructorAdvice");
}

public static class ConstructorAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrap(
@Advice.Argument(value = 0, readOnly = true) ConnectorTaskId id,
@Advice.Argument(value = 1, readOnly = false) Listener statusListener) {
statusListener = new TaskListener(statusListener);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package datadog.trace.instrumentation.kafka_connect;

import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import org.apache.kafka.connect.runtime.TaskStatus.Listener;
import org.apache.kafka.connect.util.ConnectorTaskId;

public class TaskListener implements Listener {
private final Listener delegate;

public TaskListener(Listener delegate) {
this.delegate = delegate;
}

@Override
public void onStartup(ConnectorTaskId connectorTaskId) {
AgentTracer.get().getDataStreamsMonitoring().setThreadServiceName(connectorTaskId.connector());
delegate.onStartup(connectorTaskId);
}

@Override
public void onPause(ConnectorTaskId connectorTaskId) {
delegate.onPause(connectorTaskId);
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
}

@Override
public void onResume(ConnectorTaskId connectorTaskId) {
delegate.onResume(connectorTaskId);
AgentTracer.get().getDataStreamsMonitoring().setThreadServiceName(connectorTaskId.connector());
}

@Override
public void onFailure(ConnectorTaskId connectorTaskId, Throwable throwable) {
delegate.onFailure(connectorTaskId, throwable);
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
}

@Override
public void onShutdown(ConnectorTaskId connectorTaskId) {
delegate.onShutdown(connectorTaskId);
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.core.datastreams.StatsGroup
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.DescribeClusterResult
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.utils.Time
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
import org.apache.kafka.connect.runtime.Herder
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder
import org.apache.kafka.connect.runtime.Worker
import org.apache.kafka.connect.runtime.WorkerConfig
import org.apache.kafka.connect.runtime.isolation.Plugins
import org.apache.kafka.connect.storage.FileOffsetBackingStore
import org.apache.kafka.connect.util.Callback
import org.springframework.kafka.test.EmbeddedKafkaBroker
import spock.lang.Shared

import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

class ConnectWorkerInstrumentationTest extends AgentTestRunner {
@Shared
EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false, 1, 'test-topic')

def setupSpec() {
embeddedKafka.afterPropertiesSet() // Initializes the broker
}

def cleanupSpec() {
embeddedKafka.destroy()
}

@Override
void configurePreAgent() {
super.configurePreAgent()
}

def "test kafka-connect instrumentation"() {
// Kafka bootstrap servers from the embedded broker
String bootstrapServers = embeddedKafka.getBrokersAsString()

// Retrieve Kafka cluster ID
// Create an AdminClient to interact with the Kafka cluster
Properties adminProps = new Properties()
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
String clusterId = null
try (AdminClient adminClient = AdminClient.create(adminProps)) {
DescribeClusterResult describeClusterResult = adminClient.describeCluster()
clusterId = describeClusterResult.clusterId().get() // Retrieve the cluster ID
}
assert clusterId != null : "Cluster ID is null"

// Create a temporary file with a test message
File tempFile = File.createTempFile("test-message", ".txt")

// Worker properties
Properties workerProps = new Properties()
workerProps.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter")
workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter")
workerProps.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/connect.offsets")
workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter")
workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter")
workerProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "") // Required but can be empty for built-in connectors
workerProps.put("plugin.scan.classpath", "true")

Map<String, String> workerPropsMap = workerProps.stringPropertyNames()
.collectEntries { [(it): workerProps.getProperty(it)] }

// Create the Connect worker
Time time = Time.SYSTEM
Plugins plugins = new Plugins(workerPropsMap)
plugins.compareAndSwapWithDelegatingLoader()
String workerId = "worker-1"

FileOffsetBackingStore offsetBackingStore = new FileOffsetBackingStore()
WorkerConfig workerConfig = new StandaloneConfig(workerPropsMap)
offsetBackingStore.configure(workerConfig)
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy()
Worker worker = new Worker(workerId, time, plugins, workerConfig, offsetBackingStore, connectorClientConfigOverridePolicy)
Herder herder = new StandaloneHerder(worker, clusterId, connectorClientConfigOverridePolicy)

// Start worker and herder
worker.start()
herder.start()

// Connector configuration
Map<String, String> connectorProps = [
'name' : 'file-source-connector',
'connector.class': 'org.apache.kafka.connect.file.FileStreamSourceConnector',
'tasks.max' : '1',
'file' : tempFile.getAbsolutePath(),
'topic' : 'test-topic'
]

// Latch to wait for connector addition
CountDownLatch connectorAddedLatch = new CountDownLatch(1)
Callback<Herder.Created<ConnectorInfo>> addConnectorCallback = new Callback<Herder.Created<ConnectorInfo>>() {
@Override
void onCompletion(Throwable error, Herder.Created<ConnectorInfo> result) {
if (error != null) {
error.printStackTrace()
} else {
println "Connector added successfully."
}
connectorAddedLatch.countDown()
}
}

when:
// Add the connector to the herder
herder.putConnectorConfig("file-source-connector", connectorProps, false, addConnectorCallback)

// Wait for the connector to be added
boolean connectorAdded = connectorAddedLatch.await(10, TimeUnit.SECONDS)
assert connectorAdded : "Connector was not added in time"

tempFile.write("Hello Kafka\n")

// Consume the message from Kafka
Properties consumerProps = new Properties()
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)
consumer.subscribe(['test-topic'])

String receivedMessage = null
for (int i = 0; i < 10; i++) { // Try for up to 10 seconds
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1))
if (!records.isEmpty()) {
receivedMessage = records.iterator().next().value()
break
}
}
TEST_DATA_STREAMS_WRITER.waitForGroups(2)

then:
receivedMessage == "Hello Kafka"

StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
verifyAll(first) {
edgeTags == ["direction:out", "kafka_cluster_id:$clusterId", "topic:test-topic", "type:kafka"]
edgeTags.size() == 4
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
verifyAll(second) {
edgeTags == [
"direction:in",
"group:test-consumer-group",
"kafka_cluster_id:$clusterId",
"topic:test-topic",
"type:kafka"
]
edgeTags.size() == 5
}
TEST_DATA_STREAMS_WRITER.getServices().contains('file-source-connector')


cleanup:
consumer?.close()
herder?.stop()
worker?.stop()
tempFile?.delete()
}

@Override
protected boolean isDataStreamsEnabled() {
return true
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -349,6 +349,7 @@ include ':dd-java-agent:instrumentation:kafka-clients-0.11'
include 'dd-java-agent:instrumentation:kafka-clients-3.8'
include ':dd-java-agent:instrumentation:kafka-streams-0.11'
include ':dd-java-agent:instrumentation:kafka-streams-1.0'
include ':dd-java-agent:instrumentation:kafka-connect-0.11'
include ':dd-java-agent:instrumentation:karate'
include ':dd-java-agent:instrumentation:kotlin-coroutines'
include ':dd-java-agent:instrumentation:kotlin-coroutines:coroutines-1.3'

0 comments on commit e64755b

Please sign in to comment.