diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts index c5bfd9b75c40..acf4944cfe3d 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts @@ -12,6 +12,9 @@ muzzle { } dependencies { + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent")) library("org.apache.kafka:kafka-clients:0.11.0.0") diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java new file mode 100644 index 000000000000..5893dc12a42a --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java @@ -0,0 +1,96 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkaclients; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; +import java.util.Iterator; +import java.util.List; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +public class ConsumerRecordsInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.kafka.clients.consumer.ConsumerRecords"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, String.class)) + .and(returns(Iterable.class)), + ConsumerRecordsInstrumentation.class.getName() + "$IterableAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, named("org.apache.kafka.common.TopicPartition"))) + .and(returns(List.class)), + ConsumerRecordsInstrumentation.class.getName() + "$ListAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("iterator")) + .and(takesArguments(0)) + .and(returns(Iterator.class)), + ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice"); + } + + @SuppressWarnings("unused") + public static class IterableAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) Iterable> iterable) { + if (iterable != null) { + ContextStore consumerRecordsSpan = + InstrumentationContext.get(ConsumerRecords.class, SpanContext.class); + iterable = new TracingIterable(iterable); + } + } + } + + @SuppressWarnings("unused") + public static class ListAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap(@Advice.Return(readOnly = false) List> iterable) { + if (iterable != null) { + iterable = new TracingList(iterable); + } + } + } + + @SuppressWarnings("unused") + public static class IteratorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) Iterator> iterator) { + if (iterator != null) { + iterator = new TracingIterator(iterator); + } + } + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsInstrumentationModule.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsInstrumentationModule.java index 26ab13581ca2..3d53c81d4393 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsInstrumentationModule.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaClientsInstrumentationModule.java @@ -20,6 +20,9 @@ public KafkaClientsInstrumentationModule() { @Override public List typeInstrumentations() { - return asList(new KafkaConsumerInstrumentation(), new KafkaProducerInstrumentation()); + return asList( + new KafkaProducerInstrumentation(), + new KafkaConsumerInstrumentation(), + new ConsumerRecordsInstrumentation()); } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java index b69822cbb09b..8674438ce77f 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java @@ -5,86 +5,76 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerInstrumenter; -import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext; +import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.spanFromContext; +import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.Iterator; -import java.util.List; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; +import java.time.Duration; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; public class KafkaConsumerInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return named("org.apache.kafka.clients.consumer.ConsumerRecords"); + return named("org.apache.kafka.clients.consumer.KafkaConsumer"); } @Override public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( - isMethod() + named("poll") .and(isPublic()) - .and(named("records")) - .and(takesArgument(0, String.class)) - .and(returns(Iterable.class)), - KafkaConsumerInstrumentation.class.getName() + "$IterableAdvice"); - transformer.applyAdviceToMethod( - isMethod() - .and(isPublic()) - .and(named("records")) - .and(takesArgument(0, named("org.apache.kafka.common.TopicPartition"))) - .and(returns(List.class)), - KafkaConsumerInstrumentation.class.getName() + "$ListAdvice"); - transformer.applyAdviceToMethod( - isMethod() - .and(isPublic()) - .and(named("iterator")) - .and(takesArguments(0)) - .and(returns(Iterator.class)), - KafkaConsumerInstrumentation.class.getName() + "$IteratorAdvice"); + .and(takesArguments(1)) + .and(takesArgument(0, long.class).or(takesArgument(0, Duration.class))) + .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), + this.getClass().getName() + "$PollAdvice"); } @SuppressWarnings("unused") - public static class IterableAdvice { - - @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap( - @Advice.Return(readOnly = false) Iterable> iterable) { - if (iterable != null) { - iterable = new TracingIterable(iterable); - } + public static class PollAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Timer onEnter() { + return Timer.start(); } - } - @SuppressWarnings("unused") - public static class ListAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter Timer timer, + @Advice.Return ConsumerRecords records, + @Advice.Thrown Throwable error) { - @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap(@Advice.Return(readOnly = false) List> iterable) { - if (iterable != null) { - iterable = new TracingList(iterable); + // don't create spans when no records were received + if (records == null || records.isEmpty()) { + return; } - } - } - @SuppressWarnings("unused") - public static class IteratorAdvice { + Context parentContext = currentContext(); + ReceivedRecords receivedRecords = ReceivedRecords.create(records, timer); + if (consumerReceiveInstrumenter().shouldStart(parentContext, receivedRecords)) { + Context context = consumerReceiveInstrumenter().start(parentContext, receivedRecords); + consumerReceiveInstrumenter().end(context, receivedRecords, null, error); - @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap( - @Advice.Return(readOnly = false) Iterator> iterator) { - if (iterator != null) { - iterator = new TracingIterator(iterator, consumerInstrumenter()); + // we're storing the context of the receive span so that process spans can use it as parent + // context even though the span has ended + // this is the suggested behavior according to the spec batch receive scenario: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving + ContextStore consumerRecordsSpan = + InstrumentationContext.get(ConsumerRecords.class, SpanContext.class); + consumerRecordsSpan.put(records, spanFromContext(context).getSpanContext()); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchConsumerAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaReceiveAttributesExtractor.java similarity index 51% rename from instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchConsumerAttributesExtractor.java rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaReceiveAttributesExtractor.java index 48af8a3537ac..cd5db4e382bd 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchConsumerAttributesExtractor.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaReceiveAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.spring.kafka; +package io.opentelemetry.javaagent.instrumentation.kafkaclients; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; @@ -13,22 +13,23 @@ import org.apache.kafka.common.TopicPartition; import org.checkerframework.checker.nullness.qual.Nullable; -public final class BatchConsumerAttributesExtractor - extends MessagingAttributesExtractor, Void> { +public final class KafkaReceiveAttributesExtractor + extends MessagingAttributesExtractor { + @Override - protected String system(BatchRecords batchRecords) { + protected String system(ReceivedRecords receivedRecords) { return "kafka"; } @Override - protected String destinationKind(BatchRecords batchRecords) { + protected String destinationKind(ReceivedRecords receivedRecords) { return SemanticAttributes.MessagingDestinationKindValues.TOPIC; } @Override - protected @Nullable String destination(BatchRecords batchRecords) { + protected @Nullable String destination(ReceivedRecords receivedRecords) { Set topics = - batchRecords.records().partitions().stream() + receivedRecords.records().partitions().stream() .map(TopicPartition::topic) .collect(Collectors.toSet()); // only return topic when there's exactly one in the batch @@ -36,47 +37,47 @@ protected String destinationKind(BatchRecords batchRecords) { } @Override - protected boolean temporaryDestination(BatchRecords batchRecords) { + protected boolean temporaryDestination(ReceivedRecords receivedRecords) { return false; } @Override - protected @Nullable String protocol(BatchRecords batchRecords) { + protected @Nullable String protocol(ReceivedRecords receivedRecords) { return null; } @Override - protected @Nullable String protocolVersion(BatchRecords batchRecords) { + protected @Nullable String protocolVersion(ReceivedRecords receivedRecords) { return null; } @Override - protected @Nullable String url(BatchRecords batchRecords) { + protected @Nullable String url(ReceivedRecords receivedRecords) { return null; } @Override - protected @Nullable String conversationId(BatchRecords batchRecords) { + protected @Nullable String conversationId(ReceivedRecords receivedRecords) { return null; } @Override - protected @Nullable Long messagePayloadSize(BatchRecords batchRecords) { + protected @Nullable Long messagePayloadSize(ReceivedRecords receivedRecords) { return null; } @Override - protected @Nullable Long messagePayloadCompressedSize(BatchRecords batchRecords) { + protected @Nullable Long messagePayloadCompressedSize(ReceivedRecords receivedRecords) { return null; } @Override - protected MessageOperation operation(BatchRecords batchRecords) { - return MessageOperation.PROCESS; + protected MessageOperation operation(ReceivedRecords receivedRecords) { + return MessageOperation.RECEIVE; } @Override - protected @Nullable String messageId(BatchRecords batchRecords, @Nullable Void unused) { + protected @Nullable String messageId(ReceivedRecords receivedRecords, @Nullable Void unused) { return null; } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index 709485e84bef..d644b126ca2f 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -24,8 +24,10 @@ public final class KafkaSingletons { private static final Instrumenter, Void> PRODUCER_INSTRUMENTER = buildProducerInstrumenter(); - private static final Instrumenter, Void> CONSUMER_INSTRUMENTER = - buildConsumerInstrumenter(); + private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER = + buildConsumerReceiveInstrumenter(); + private static final Instrumenter, Void> CONSUMER_PROCESS_INSTRUMENTER = + buildConsumerProcessInstrumenter(); private static Instrumenter, Void> buildProducerInstrumenter() { KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor(); @@ -39,7 +41,19 @@ public final class KafkaSingletons { .newInstrumenter(SpanKindExtractor.alwaysProducer()); } - private static Instrumenter, Void> buildConsumerInstrumenter() { + private static Instrumenter buildConsumerReceiveInstrumenter() { + KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor(); + SpanNameExtractor spanNameExtractor = + MessagingSpanNameExtractor.create(attributesExtractor); + + return Instrumenter.newBuilder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + .addAttributesExtractor(attributesExtractor) + .setTimeExtractors(ReceivedRecords::startTime, (request, response) -> request.now()) + .newInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + + private static Instrumenter, Void> buildConsumerProcessInstrumenter() { KafkaConsumerAttributesExtractor attributesExtractor = new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS); SpanNameExtractor> spanNameExtractor = @@ -62,8 +76,12 @@ public final class KafkaSingletons { return PRODUCER_INSTRUMENTER; } - public static Instrumenter, Void> consumerInstrumenter() { - return CONSUMER_INSTRUMENTER; + public static Instrumenter consumerReceiveInstrumenter() { + return CONSUMER_RECEIVE_INSTRUMENTER; + } + + public static Instrumenter, Void> consumerProcessInstrumenter() { + return CONSUMER_PROCESS_INSTRUMENTER; } private KafkaSingletons() {} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ReceivedRecords.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ReceivedRecords.java new file mode 100644 index 000000000000..260457c32d94 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ReceivedRecords.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkaclients; + +import com.google.auto.value.AutoValue; +import java.time.Instant; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +@AutoValue +public abstract class ReceivedRecords { + + public static ReceivedRecords create(ConsumerRecords records, Timer timer) { + return new AutoValue_ReceivedRecords(records, timer); + } + + public abstract ConsumerRecords records(); + + abstract Timer timer(); + + public Instant startTime() { + return timer().startTime(); + } + + public Instant now() { + return timer().now(); + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/Timer.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/Timer.java new file mode 100644 index 000000000000..42b3bc819910 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/Timer.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkaclients; + +import java.time.Instant; + +public final class Timer { + + public static Timer start() { + return new Timer(Instant.now(), System.nanoTime()); + } + + private final Instant startTime; + private final long startNanoTime; + + private Timer(Instant startTime, long startNanoTime) { + this.startTime = startTime; + this.startNanoTime = startNanoTime; + } + + public Instant startTime() { + return startTime; + } + + public Instant now() { + long durationNanos = System.nanoTime() - startNanoTime; + return startTime().plusNanos(durationNanos); + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java index b5644b36cb01..19c5e467e1bc 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java @@ -5,8 +5,6 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerInstrumenter; - import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -25,7 +23,7 @@ public Iterator> iterator() { // However, this is not thread-safe, but usually the first (hopefully only) traversal of // ConsumerRecords is performed in the same thread that called poll() if (firstIterator) { - it = new TracingIterator<>(delegate.iterator(), consumerInstrumenter()); + it = new TracingIterator<>(delegate.iterator()); firstIterator = false; } else { it = delegate.iterator(); diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java index e1ae7b6a9d2b..d4ca812d50ab 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java @@ -5,9 +5,10 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; +import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerProcessInstrumenter; + import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper; import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -17,23 +18,22 @@ public class TracingIterator implements Iterator>, KafkaConsumerIteratorWrapper { private final Iterator> delegateIterator; - private final Instrumenter, Void> instrumenter; + // TODO: use the context extracted from ConsumerRecords (receive context) as the parent span + // for that to work properly we'd have to modify the consumer span suppression strategy to + // differentiate between receive and process consumer spans - right now if we were to pass the + // receive context to this instrumentation it'd be suppressed private final Context parentContext; - /** + /* * Note: this may potentially create problems if this iterator is used from different threads. But * at the moment we cannot do much about this. */ @Nullable private ConsumerRecord currentRequest; - @Nullable private Context currentContext; @Nullable private Scope currentScope; - public TracingIterator( - Iterator> delegateIterator, - Instrumenter, Void> instrumenter) { + public TracingIterator(Iterator> delegateIterator) { this.delegateIterator = delegateIterator; - this.instrumenter = instrumenter; parentContext = Context.current(); } @@ -49,9 +49,9 @@ public ConsumerRecord next() { closeScopeAndEndSpan(); ConsumerRecord next = delegateIterator.next(); - if (next != null && instrumenter.shouldStart(parentContext, next)) { + if (next != null && consumerProcessInstrumenter().shouldStart(parentContext, next)) { currentRequest = next; - currentContext = instrumenter.start(parentContext, currentRequest); + currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest); currentScope = currentContext.makeCurrent(); } return next; @@ -60,7 +60,7 @@ public ConsumerRecord next() { private void closeScopeAndEndSpan() { if (currentScope != null) { currentScope.close(); - instrumenter.end(currentContext, currentRequest, null, null); + consumerProcessInstrumenter().end(currentContext, currentRequest, null, null); currentScope = null; currentRequest = null; currentContext = null; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy index d923ce8e9583..04722dfbf06e 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationDisabledTest.groovy @@ -54,7 +54,7 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { // check that the message was received records.poll(5, TimeUnit.SECONDS) != null - assertTraces(2) { + assertTraces(3) { trace(0, 1) { span(0) { name SHARED_TOPIC + " send" @@ -68,6 +68,19 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { } } trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } + trace(2, 1) { span(0) { name SHARED_TOPIC + " process" kind CONSUMER @@ -84,7 +97,6 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { } } } - } cleanup: diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy index 54b97804bd71..0383f6fa2321 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy @@ -4,9 +4,9 @@ */ import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.PRODUCER -import io.opentelemetry.api.trace.SpanKind import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit @@ -80,11 +80,13 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { received.value() == greeting received.key() == null - assertTraces(1) { + assertTraces(2) { + traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) + trace(0, 4) { span(0) { name "parent" - kind SpanKind.INTERNAL + kind INTERNAL hasNoParent() } span(1) { @@ -114,10 +116,23 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } span(3) { name "producer callback" - kind SpanKind.INTERNAL + kind INTERNAL childOf span(0) } } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } cleanup: @@ -176,11 +191,13 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { received.value() == greeting received.key() == null - assertTraces(1) { + assertTraces(2) { + traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) + trace(0, 4) { span(0) { name "parent" - kind SpanKind.INTERNAL + kind INTERNAL hasNoParent() } span(1) { @@ -210,10 +227,23 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } span(3) { name "producer callback" - kind SpanKind.INTERNAL + kind INTERNAL childOf span(0) } } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } cleanup: @@ -265,7 +295,9 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { received.value() == null received.key() == null - assertTraces(1) { + assertTraces(2) { + traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) + trace(0, 2) { // PRODUCER span 0 span(0) { @@ -297,6 +329,19 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } } } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } cleanup: @@ -339,7 +384,9 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { first.value() == greeting first.key() == null - assertTraces(1) { + assertTraces(2) { + traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) + trace(0, 2) { span(0) { name SHARED_TOPIC + " send" @@ -368,6 +415,19 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } } } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } cleanup: diff --git a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy index d923ce8e9583..3b283fa20538 100644 --- a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationDisabledTest.groovy @@ -54,7 +54,7 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { // check that the message was received records.poll(5, TimeUnit.SECONDS) != null - assertTraces(2) { + assertTraces(3) { trace(0, 1) { span(0) { name SHARED_TOPIC + " send" @@ -68,6 +68,19 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest { } } trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } + trace(2, 1) { span(0) { name SHARED_TOPIC + " process" kind CONSUMER diff --git a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy index 54b97804bd71..0383f6fa2321 100644 --- a/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-2.4.0-testing/src/test/groovy/KafkaClientPropagationEnabledTest.groovy @@ -4,9 +4,9 @@ */ import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.PRODUCER -import io.opentelemetry.api.trace.SpanKind import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit @@ -80,11 +80,13 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { received.value() == greeting received.key() == null - assertTraces(1) { + assertTraces(2) { + traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) + trace(0, 4) { span(0) { name "parent" - kind SpanKind.INTERNAL + kind INTERNAL hasNoParent() } span(1) { @@ -114,10 +116,23 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } span(3) { name "producer callback" - kind SpanKind.INTERNAL + kind INTERNAL childOf span(0) } } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } cleanup: @@ -176,11 +191,13 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { received.value() == greeting received.key() == null - assertTraces(1) { + assertTraces(2) { + traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) + trace(0, 4) { span(0) { name "parent" - kind SpanKind.INTERNAL + kind INTERNAL hasNoParent() } span(1) { @@ -210,10 +227,23 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } span(3) { name "producer callback" - kind SpanKind.INTERNAL + kind INTERNAL childOf span(0) } } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } cleanup: @@ -265,7 +295,9 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { received.value() == null received.key() == null - assertTraces(1) { + assertTraces(2) { + traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) + trace(0, 2) { // PRODUCER span 0 span(0) { @@ -297,6 +329,19 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } } } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } cleanup: @@ -339,7 +384,9 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { first.value() == greeting first.key() == null - assertTraces(1) { + assertTraces(2) { + traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) + trace(0, 2) { span(0) { name SHARED_TOPIC + " send" @@ -368,6 +415,19 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { } } } + trace(1, 1) { + span(0) { + name SHARED_TOPIC + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } cleanup: diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy index dba1f88d44dc..b6f5a7856998 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy @@ -140,7 +140,12 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { received.value() == greeting.toLowerCase() received.key() == null - assertTraces(1) { + assertTraces(3) { + traces.sort(orderByRootSpanName( + STREAM_PENDING + " send", + STREAM_PENDING + " receive", + STREAM_PROCESSED + " receive")) + trace(0, 5) { // PRODUCER span 0 span(0) { @@ -213,6 +218,32 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { } } } + trace(1, 1) { + span(0) { + name STREAM_PENDING + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } + trace(2, 1) { + span(0) { + name STREAM_PROCESSED + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } def headers = received.headers() @@ -233,7 +264,8 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { } }) def spanContext = Span.fromContext(context).getSpanContext() - def streamSendSpan = traces[0][3] + def streamTrace = traces.find { it.size() == 5 } + def streamSendSpan = streamTrace[3] spanContext.traceId == streamSendSpan.traceId spanContext.spanId == streamSendSpan.spanId diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy index 670b3c210fd6..ce2cb0cff985 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy @@ -140,7 +140,12 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { received.value() == greeting.toLowerCase() received.key() == null - assertTraces(1) { + assertTraces(3) { + traces.sort(orderByRootSpanName( + STREAM_PENDING + " send", + STREAM_PENDING + " receive", + STREAM_PROCESSED + " receive")) + trace(0, 5) { // PRODUCER span 0 span(0) { @@ -213,6 +218,32 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { } } } + trace(1, 1) { + span(0) { + name STREAM_PENDING + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } + trace(2, 1) { + span(0) { + name STREAM_PROCESSED + " receive" + kind CONSUMER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + } } def headers = received.headers() @@ -233,7 +264,8 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { } }) def spanContext = Span.fromContext(context).getSpanContext() - def streamSendSpan = traces[0][3] + def streamTrace = traces.find { it.size() == 5 } + def streamSendSpan = streamTrace[3] spanContext.traceId == streamSendSpan.traceId spanContext.spanId == streamSendSpan.spanId diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchRecords.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchRecords.java deleted file mode 100644 index 18b3924c48ca..000000000000 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchRecords.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.spring.kafka; - -import com.google.auto.value.AutoValue; -import io.opentelemetry.api.trace.SpanContext; -import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; -import java.util.List; -import org.apache.kafka.clients.consumer.ConsumerRecords; - -@AutoValue -public abstract class BatchRecords { - - public static BatchRecords create( - ConsumerRecords consumerRecords, List linkedReceiveSpans) { - return new AutoValue_BatchRecords<>(consumerRecords, linkedReceiveSpans); - } - - public abstract ConsumerRecords records(); - - public abstract List linkedReceiveSpans(); - - public static SpanLinksExtractor> spanLinksExtractor() { - return (spanLinks, parentContext, batchRecords) -> { - batchRecords.linkedReceiveSpans().forEach(spanLinks::addLink); - }; - } - - BatchRecords() {} -} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java index 8068c558d72e..e076ecb626a8 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java @@ -6,19 +6,11 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka; import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter; -import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.receiveInstrumenter; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.instrumentation.api.ContextStore; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.kafka.listener.BatchInterceptor; @@ -35,48 +27,18 @@ public InstrumentedBatchInterceptor( } @Override - public ConsumerRecords intercept( - ConsumerRecords consumerRecords, Consumer consumer) { + public ConsumerRecords intercept(ConsumerRecords records, Consumer consumer) { + // TODO: use the receive spanContext that's linked to records Context parentContext = Context.current(); - // create spans for all records received in a batch - List receiveSpanContexts = traceReceivingRecords(parentContext, consumerRecords); - - // then start a span for processing that links all those receive spans - BatchRecords batchRecords = BatchRecords.create(consumerRecords, receiveSpanContexts); - if (processInstrumenter().shouldStart(parentContext, batchRecords)) { - Context context = processInstrumenter().start(parentContext, batchRecords); + if (processInstrumenter().shouldStart(parentContext, records)) { + Context context = processInstrumenter().start(parentContext, records); Scope scope = context.makeCurrent(); - contextStore.put(consumerRecords, State.create(batchRecords, context, scope)); - } - - return decorated == null ? consumerRecords : decorated.intercept(consumerRecords, consumer); - } - - private List traceReceivingRecords( - Context parentContext, ConsumerRecords records) { - List receiveSpanContexts = new ArrayList<>(); - - Iterator> it = records.iterator(); - // this will forcefully suppress the kafka-clients CONSUMER instrumentation even though there's - // no current CONSUMER span - // this instrumentation will create CONSUMER receive spans for each record instead of - // kafka-clients - if (it instanceof KafkaConsumerIteratorWrapper) { - it = ((KafkaConsumerIteratorWrapper) it).unwrap(); - } - - while (it.hasNext()) { - ConsumerRecord record = it.next(); - if (receiveInstrumenter().shouldStart(parentContext, record)) { - Context context = receiveInstrumenter().start(parentContext, record); - receiveSpanContexts.add(Span.fromContext(context).getSpanContext()); - receiveInstrumenter().end(context, record, null, null); - } + contextStore.put(records, State.create(records, context, scope)); } - return receiveSpanContexts; + return decorated == null ? records : decorated.intercept(records, consumer); } @Override diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessAttributesExtractor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessAttributesExtractor.java new file mode 100644 index 000000000000..1d813de517c0 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessAttributesExtractor.java @@ -0,0 +1,81 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.checkerframework.checker.nullness.qual.Nullable; + +public final class KafkaBatchProcessAttributesExtractor + extends MessagingAttributesExtractor, Void> { + @Override + protected String system(ConsumerRecords records) { + return "kafka"; + } + + @Override + protected String destinationKind(ConsumerRecords records) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Override + protected @Nullable String destination(ConsumerRecords records) { + Set topics = + records.partitions().stream().map(TopicPartition::topic).collect(Collectors.toSet()); + // only return topic when there's exactly one in the batch + return topics.size() == 1 ? topics.iterator().next() : null; + } + + @Override + protected boolean temporaryDestination(ConsumerRecords records) { + return false; + } + + @Override + protected @Nullable String protocol(ConsumerRecords records) { + return null; + } + + @Override + protected @Nullable String protocolVersion(ConsumerRecords records) { + return null; + } + + @Override + protected @Nullable String url(ConsumerRecords records) { + return null; + } + + @Override + protected @Nullable String conversationId(ConsumerRecords records) { + return null; + } + + @Override + protected @Nullable Long messagePayloadSize(ConsumerRecords records) { + return null; + } + + @Override + protected @Nullable Long messagePayloadCompressedSize(ConsumerRecords records) { + return null; + } + + @Override + protected MessageOperation operation(ConsumerRecords records) { + return MessageOperation.PROCESS; + } + + @Override + protected @Nullable String messageId(ConsumerRecords records, @Nullable Void unused) { + return null; + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java new file mode 100644 index 000000000000..3896bbabb9e5 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; +import java.util.Iterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +public class KafkaBatchProcessSpanLinksExtractor + implements SpanLinksExtractor> { + + private final SpanLinksExtractor> singleRecordLinkExtractor; + + public KafkaBatchProcessSpanLinksExtractor(ContextPropagators contextPropagators) { + this.singleRecordLinkExtractor = + SpanLinksExtractor.fromUpstreamRequest(contextPropagators, new KafkaHeadersGetter()); + } + + @Override + public void extract( + SpanLinksBuilder spanLinks, Context parentContext, ConsumerRecords records) { + + Iterator> it = records.iterator(); + + // this will forcefully suppress the kafka-clients CONSUMER instrumentation even though there's + // no current CONSUMER span + // this instrumentation will create CONSUMER receive spans for each record instead of + // kafka-clients + if (it instanceof KafkaConsumerIteratorWrapper) { + it = ((KafkaConsumerIteratorWrapper) it).unwrap(); + } + + while (it.hasNext()) { + ConsumerRecord record = it.next(); + // explicitly passing root to avoid situation where context propagation is turned off and the + // parent (CONSUMER receive) span is linked + singleRecordLinkExtractor.extract(spanLinks, Context.root(), record); + } + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java index 7e836c2cbd55..8148037134c5 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java @@ -7,63 +7,39 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; public final class SpringKafkaSingletons { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7"; - private static final Instrumenter, Void> RECEIVE_INSTRUMENTER = - buildReceiveInstrumenter(); - private static final Instrumenter, Void> PROCESS_INSTRUMENTER = + private static final Instrumenter, Void> PROCESS_INSTRUMENTER = buildProcessInstrumenter(); - private static Instrumenter, Void> buildReceiveInstrumenter() { - KafkaConsumerAttributesExtractor consumerAttributesExtractor = - new KafkaConsumerAttributesExtractor(MessageOperation.RECEIVE); - SpanNameExtractor> spanNameExtractor = - MessagingSpanNameExtractor.create(consumerAttributesExtractor); - - InstrumenterBuilder, Void> builder = - Instrumenter., Void>newBuilder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) - .addAttributesExtractor(consumerAttributesExtractor) - .addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor()); - - if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) { - builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); - } - - return builder.newConsumerInstrumenter(new KafkaHeadersGetter()); - } - - private static Instrumenter, Void> buildProcessInstrumenter() { - BatchConsumerAttributesExtractor attributesExtractor = new BatchConsumerAttributesExtractor(); - SpanNameExtractor> spanNameExtractor = + private static Instrumenter, Void> buildProcessInstrumenter() { + KafkaBatchProcessAttributesExtractor attributesExtractor = + new KafkaBatchProcessAttributesExtractor(); + SpanNameExtractor> spanNameExtractor = MessagingSpanNameExtractor.create(attributesExtractor); - return Instrumenter., Void>newBuilder( + return Instrumenter., Void>newBuilder( GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) .addAttributesExtractor(attributesExtractor) - .addSpanLinksExtractor(BatchRecords.spanLinksExtractor()) + .addSpanLinksExtractor( + new KafkaBatchProcessSpanLinksExtractor(GlobalOpenTelemetry.getPropagators())) .setErrorCauseExtractor(new KafkaBatchErrorCauseExtractor()) .newInstrumenter(SpanKindExtractor.alwaysConsumer()); } public static Instrumenter, Void> receiveInstrumenter() { - return RECEIVE_INSTRUMENTER; + return null; } - public static Instrumenter, Void> processInstrumenter() { + public static Instrumenter, Void> processInstrumenter() { return PROCESS_INSTRUMENTER; } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java index 2162b5521f95..589526180e04 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java @@ -8,16 +8,17 @@ import com.google.auto.value.AutoValue; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import org.apache.kafka.clients.consumer.ConsumerRecords; @AutoValue public abstract class State { public static State create( - BatchRecords request, Context context, Scope scope) { + ConsumerRecords request, Context context, Scope scope) { return new AutoValue_State<>(request, context, scope); } - public abstract BatchRecords request(); + public abstract ConsumerRecords request(); public abstract Context context(); diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy index a9113200d6e3..c13b8b08efd6 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy @@ -69,10 +69,12 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } then: - assertTraces(2) { - SpanData consumer1, consumer2 + assertTraces(3) { + traces.sort(orderByRootSpanName("producer", "testTopic receive", "testTopic process")) - trace(0, 5) { + SpanData producer1, producer2 + + trace(0, 3) { span(0) { name "producer" } @@ -87,21 +89,6 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } } span(2) { - name "testTopic receive" - kind CONSUMER - childOf span(1) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" 0 - "kafka.offset" Long - "kafka.record.queue_time_ms" Long - } - } - span(3) { name "testTopic send" kind PRODUCER childOf span(0) @@ -111,31 +98,29 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } - span(4) { + + producer1 = span(1) + producer2 = span(2) + } + trace(1, 1) { + span(0) { name "testTopic receive" kind CONSUMER - childOf span(3) + hasNoParent() attributes { "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" 0 - "kafka.offset" Long - "kafka.record.queue_time_ms" Long } } - - consumer1 = span(2) - consumer2 = span(4) } - trace(1, 2) { + trace(2, 2) { span(0) { name "testTopic process" kind CONSUMER - hasLink consumer1 - hasLink consumer2 + hasLink producer1 + hasLink producer2 attributes { "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" @@ -163,10 +148,12 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } then: - assertTraces(2) { - SpanData consumer + assertTraces(3) { + traces.sort(orderByRootSpanName("producer", "testTopic receive", "testTopic process")) - trace(0, 3) { + SpanData producer + + trace(0, 2) { span(0) { name "producer" } @@ -180,29 +167,27 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } - span(2) { + + producer = span(1) + } + trace(1, 1) { + span(0) { name "testTopic receive" kind CONSUMER - childOf span(1) + hasNoParent() attributes { "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" 0 - "kafka.offset" Long - "kafka.record.queue_time_ms" Long } } - - consumer = span(2) } - trace(1, 2) { + trace(2, 2) { span(0) { name "testTopic process" kind CONSUMER - hasLink consumer + hasLink producer status ERROR errorEvent IllegalArgumentException, "boom" attributes {