diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java index 28aaa82c174f..6bdc0383bf8c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java @@ -51,5 +51,10 @@ public static void set(ConsumerRecords records, Context context, Consumer< recordsConsumerField.set(records, consumer); } + public static void copy(ConsumerRecord from, ConsumerRecord to) { + recordContextField.set(to, recordContextField.get(from)); + recordConsumerField.set(to, recordConsumerField.get(from)); + } + private KafkaConsumerContextUtil() {} } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts new file mode 100644 index 000000000000..4f4e4a23a4eb --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts @@ -0,0 +1,48 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("io.projectreactor.kafka") + module.set("reactor-kafka") + // TODO: add support for 1.3 + versions.set("[1.0.0,1.3.0)") + } +} + +dependencies { + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + + bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap")) + + implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) + implementation(project(":instrumentation:reactor:reactor-3.1:library")) + + library("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE") + + testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) + testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent")) + + testImplementation("org.testcontainers:kafka") + + testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE") + + latestDepTestLibrary("io.projectreactor:reactor-core:3.4.+") + // TODO: add support for 1.3 + latestDepTestLibrary("io.projectreactor.kafka:reactor-kafka:1.2.+") +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testing.suites) + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java new file mode 100644 index 000000000000..81c8bf61d996 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/DefaultKafkaReceiverInstrumentation.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import reactor.core.publisher.Flux; + +// handles versions 1.0.0 - 1.2.+ +public class DefaultKafkaReceiverInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("reactor.kafka.receiver.internals.DefaultKafkaReceiver"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("createConsumerFlux").and(returns(named("reactor.core.publisher.Flux"))), + this.getClass().getName() + "$CreateConsumerFluxAdvice"); + } + + @SuppressWarnings("unused") + public static class CreateConsumerFluxAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return(readOnly = false) Flux flux) { + if (!(flux instanceof TracingDisablingKafkaFlux)) { + flux = new TracingDisablingKafkaFlux<>(flux); + } + } + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaFlux.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaFlux.java new file mode 100644 index 000000000000..4f8c64db6908 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaFlux.java @@ -0,0 +1,125 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import static io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0.ReactorKafkaSingletons.processInstrumenter; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; +import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxOperator; +import reactor.core.publisher.Operators; + +final class InstrumentedKafkaFlux> extends FluxOperator { + + InstrumentedKafkaFlux(Flux source) { + super(source); + } + + @Override + @SuppressWarnings("unchecked") + public void subscribe(CoreSubscriber actual) { + source.subscribe(new InstrumentedSubscriber((CoreSubscriber>) actual)); + } + + static final class InstrumentedSubscriber + implements CoreSubscriber>, Subscription, Scannable { + + private final CoreSubscriber> actual; + private final Context currentContext; + private Subscription subscription; + + InstrumentedSubscriber(CoreSubscriber> actual) { + this.actual = actual; + currentContext = + ContextPropagationOperator.getOpenTelemetryContext( + actual.currentContext(), Context.current()); + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.subscription, s)) { + this.subscription = s; + + actual.onSubscribe(this); + } + } + + @Override + public reactor.util.context.Context currentContext() { + return actual.currentContext(); + } + + @Override + public void onNext(ConsumerRecord record) { + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(record); + Context receiveContext = consumerContext.getContext(); + // use the receive CONSUMER span as parent if it's available + Context parentContext = receiveContext != null ? receiveContext : currentContext; + + KafkaProcessRequest request = KafkaProcessRequest.create(consumerContext, record); + if (!processInstrumenter().shouldStart(parentContext, request)) { + actual.onNext(record); + return; + } + + Context context = processInstrumenter().start(parentContext, request); + Throwable error = null; + try (Scope ignored = context.makeCurrent()) { + actual.onNext(record); + } catch (Throwable t) { + error = t; + throw t; + } finally { + processInstrumenter().end(context, request, null, error); + } + } + + @Override + public void onError(Throwable throwable) { + try (Scope ignored = currentContext.makeCurrent()) { + actual.onError(throwable); + } + } + + @Override + public void onComplete() { + try (Scope ignored = currentContext.makeCurrent()) { + actual.onComplete(); + } + } + + @Override + public void request(long l) { + subscription.request(l); + } + + @Override + public void cancel() { + subscription.cancel(); + } + + @SuppressWarnings("rawtypes") // that's how the method is defined + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.ACTUAL) { + return actual; + } + if (key == Attr.PARENT) { + return subscription; + } + return null; + } + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java new file mode 100644 index 000000000000..2c24c0095246 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import java.util.function.Function; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.sender.TransactionManager; + +public final class InstrumentedKafkaReceiver implements KafkaReceiver { + + private final KafkaReceiver actual; + + public InstrumentedKafkaReceiver(KafkaReceiver actual) { + this.actual = actual; + } + + @Override + public Flux> receive() { + return new InstrumentedKafkaFlux<>(actual.receive()); + } + + @Override + public Flux>> receiveAutoAck() { + return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new); + } + + @Override + public Flux> receiveAtmostOnce() { + return new InstrumentedKafkaFlux<>(actual.receiveAtmostOnce()); + } + + @Override + public Flux>> receiveExactlyOnce( + TransactionManager transactionManager) { + return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new); + } + + @Override + public Mono doOnConsumer(Function, ? extends T> function) { + return actual.doOnConsumer(function); + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java new file mode 100644 index 000000000000..fcae8dc7c455 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiverInstrumentation.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import static net.bytebuddy.matcher.ElementMatchers.isStatic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import reactor.kafka.receiver.KafkaReceiver; + +public class KafkaReceiverInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("reactor.kafka.receiver.KafkaReceiver"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("create").and(isStatic()).and(returns(named("reactor.kafka.receiver.KafkaReceiver"))), + this.getClass().getName() + "$CreateAdvice"); + } + + @SuppressWarnings("unused") + public static class CreateAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return(readOnly = false) KafkaReceiver receiver) { + if (!(receiver instanceof InstrumentedKafkaReceiver)) { + receiver = new InstrumentedKafkaReceiver<>(receiver); + } + } + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java new file mode 100644 index 000000000000..43f19a9b1ceb --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import static java.util.Arrays.asList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class ReactorKafkaInstrumentationModule extends InstrumentationModule { + + public ReactorKafkaInstrumentationModule() { + super("reactor-kafka", "reactor-kafka-1.0"); + } + + @Override + public List typeInstrumentations() { + return asList( + new KafkaReceiverInstrumentation(), + new ReceiverRecordInstrumentation(), + new DefaultKafkaReceiverInstrumentation()); + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaSingletons.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaSingletons.java new file mode 100644 index 000000000000..b5b8e309f58a --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaSingletons.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; +import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; + +final class ReactorKafkaSingletons { + + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.reactor-kafka-1.0"; + + private static final Instrumenter PROCESS_INSTRUMENTER = + new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .setCaptureExperimentalSpanAttributes( + InstrumentationConfig.get() + .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) + .setMessagingReceiveInstrumentationEnabled( + ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) + .createConsumerProcessInstrumenter(); + + public static Instrumenter processInstrumenter() { + return PROCESS_INSTRUMENTER; + } + + private ReactorKafkaSingletons() {} +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReceiverRecordInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReceiverRecordInstrumentation.java new file mode 100644 index 000000000000..d49da860be77 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReceiverRecordInstrumentation.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import reactor.kafka.receiver.ReceiverRecord; + +public class ReceiverRecordInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("reactor.kafka.receiver.ReceiverRecord"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerRecord"))), + this.getClass().getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static class ConstructorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This ReceiverRecord copy, @Advice.Argument(0) ConsumerRecord original) { + KafkaConsumerContextUtil.copy(original, copy); + } + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/TracingDisablingKafkaFlux.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/TracingDisablingKafkaFlux.java new file mode 100644 index 000000000000..c3fb3c409556 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/TracingDisablingKafkaFlux.java @@ -0,0 +1,93 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxOperator; +import reactor.core.publisher.Operators; + +public final class TracingDisablingKafkaFlux extends FluxOperator { + + public TracingDisablingKafkaFlux(Flux source) { + super(source); + } + + @Override + public void subscribe(CoreSubscriber actual) { + source.subscribe(new TracingDisablingSubscriber<>(actual)); + } + + static final class TracingDisablingSubscriber + implements CoreSubscriber, Subscription, Scannable { + + private final CoreSubscriber actual; + private Subscription subscription; + + TracingDisablingSubscriber(CoreSubscriber actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.subscription, s)) { + this.subscription = s; + + actual.onSubscribe(this); + } + } + + @Override + public reactor.util.context.Context currentContext() { + return actual.currentContext(); + } + + @Override + public void onNext(T record) { + boolean previous = KafkaClientsConsumerProcessTracing.setEnabled(false); + try { + actual.onNext(record); + } finally { + KafkaClientsConsumerProcessTracing.setEnabled(previous); + } + } + + @Override + public void onError(Throwable throwable) { + actual.onError(throwable); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void request(long l) { + subscription.request(l); + } + + @Override + public void cancel() { + subscription.cancel(); + } + + @SuppressWarnings("rawtypes") // that's how the method is defined + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.ACTUAL) { + return actual; + } + if (key == Attr.PARENT) { + return subscription; + } + return null; + } + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java new file mode 100644 index 000000000000..0d9a2d6322f9 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java @@ -0,0 +1,171 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Collections.singleton; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; + +abstract class AbstractReactorKafkaTest { + + private static final Logger logger = LoggerFactory.getLogger(AbstractReactorKafkaTest.class); + + @RegisterExtension + protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + static KafkaContainer kafka; + static KafkaSender sender; + static KafkaReceiver receiver; + + @BeforeAll + static void setUpAll() { + kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.10")) + .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m") + .withLogConsumer(new Slf4jLogConsumer(logger)) + .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) + .withStartupTimeout(Duration.ofMinutes(1)); + kafka.start(); + + sender = KafkaSender.create(SenderOptions.create(producerProps())); + receiver = + KafkaReceiver.create( + ReceiverOptions.create(consumerProps()) + .subscription(singleton("testTopic"))); + } + + @AfterAll + static void tearDownAll() { + if (sender != null) { + sender.close(); + } + kafka.stop(); + } + + private static Properties producerProps() { + Properties props = new Properties(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + props.put("retries", 0); + props.put("key.serializer", StringSerializer.class); + props.put("value.serializer", StringSerializer.class); + return props; + } + + private static Properties consumerProps() { + Properties props = new Properties(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + props.put("group.id", "test"); + props.put("enable.auto.commit", true); + props.put("auto.commit.interval.ms", 10); + props.put("session.timeout.ms", 30000); + props.put("auto.offset.reset", "earliest"); + props.put("key.deserializer", StringDeserializer.class); + props.put("value.deserializer", StringDeserializer.class); + return props; + } + + protected static List sendAttributes(ProducerRecord record) { + List assertions = + new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, + AbstractLongAssert::isNotNegative))); + String messageKey = record.key(); + if (messageKey != null) { + assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, messageKey)); + } + return assertions; + } + + protected static List receiveAttributes(String topic) { + return new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, topic), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")))); + } + + protected static List processAttributes( + ProducerRecord record) { + List assertions = + new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, + AbstractLongAssert::isNotNegative))); + if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) { + assertions.add( + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)); + } + String messageKey = record.key(); + if (messageKey != null) { + assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, messageKey)); + } + String messageValue = record.value(); + if (messageValue != null) { + assertions.add( + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + messageValue.getBytes(StandardCharsets.UTF_8).length)); + } + return assertions; + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java new file mode 100644 index 000000000000..65a3f77dc2f7 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java @@ -0,0 +1,67 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.kafka.sender.SenderRecord; + +public class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest { + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + @Test + void shouldCreateSpansForSingleRecordProcess() { + Disposable disposable = + receiver.receive().subscribe(record -> testing.runWithSpan("consumer", () -> {})); + cleanup.deferCleanup(disposable::dispose); + + SenderRecord record = + SenderRecord.create("testTopic", 0, null, "10", "testSpan", null); + Flux producer = sender.send(Flux.just(record)); + testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30))); + + AtomicReference producerSpan = new AtomicReference<>(); + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record))); + + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes("testTopic")), + span -> + span.hasName("testTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 8524cfad3f9f..461fb138d703 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -418,6 +418,7 @@ hideFromDependabot(":instrumentation:ratpack:ratpack-1.7:library") hideFromDependabot(":instrumentation:reactor:reactor-3.1:javaagent") hideFromDependabot(":instrumentation:reactor:reactor-3.1:library") hideFromDependabot(":instrumentation:reactor:reactor-3.1:testing") +hideFromDependabot(":instrumentation:reactor:reactor-kafka-1.0:javaagent") hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-0.9:javaagent") hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:javaagent") hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:javaagent-unit-tests")