From 85c541ef92cfdba9af1ef1d66c0610a62dbe7eed Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 21 Feb 2023 17:31:28 +0200 Subject: [PATCH] Implement kafka client id and consumer id attributes (#7860) Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7771 Adds `messaging.consumer_id` and `messaging.kafka.client_id` attributes to kafka spans. --- .../v0_11/ConsumerRecordsInstrumentation.java | 23 ++-- .../v0_11/KafkaConsumerInstrumentation.java | 54 ++++----- .../v0_11/KafkaProducerInstrumentation.java | 18 +-- .../kafkaclients/v0_11/KafkaSingletons.java | 23 ++-- .../kafkaclients/v0_11/ProducerCallback.java | 6 +- .../kafkaclients/v0_11/TracingIterable.java | 22 ++-- .../kafkaclients/v0_11/TracingIterator.java | 23 ++-- .../kafkaclients/v0_11/TracingList.java | 17 +-- .../kafka/internal/KafkaClientBaseTest.java | 19 ++- .../kafkaclients/v2_6/KafkaTelemetry.java | 49 +++++--- .../v2_6/KafkaTelemetryBuilder.java | 15 ++- .../v2_6/TracingConsumerInterceptor.java | 10 +- .../v2_6/TracingProducerInterceptor.java | 9 +- .../kafkaclients/v2_6/InterceptorsTest.java | 21 +++- .../kafkaclients/v2_6/WrapperTest.java | 100 ++++++++++------ .../AbstractKafkaConsumerRequest.java | 40 +++++++ .../kafka/internal/ConsumerAndRecord.java | 73 ------------ .../KafkaBatchProcessSpanLinksExtractor.java | 15 +-- .../KafkaConsumerAttributesExtractor.java | 22 ++-- .../KafkaConsumerAttributesGetter.java | 38 +++--- .../kafka/internal/KafkaConsumerContext.java | 29 +++++ .../internal/KafkaConsumerContextUtil.java | 55 +++++++++ ...nsumerExperimentalAttributesExtractor.java | 13 +- .../internal/KafkaConsumerRecordGetter.java | 11 +- .../internal/KafkaInstrumenterFactory.java | 35 +++--- .../kafka/internal/KafkaProcessRequest.java | 41 +++++++ .../KafkaProducerAttributesExtractor.java | 14 ++- .../KafkaProducerAttributesGetter.java | 31 +++-- .../kafka/internal/KafkaProducerRequest.java | 56 +++++++++ .../KafkaReceiveAttributesExtractor.java | 22 ++-- .../KafkaReceiveAttributesGetter.java | 36 +++--- .../kafka/internal/KafkaReceiveRequest.java | 44 +++++++ .../kafka/internal/KafkaUtil.java | 112 ++++++++++++++++++ .../kafkastreams/KafkaStreamsSingletons.java | 7 +- .../PartitionGroupInstrumentation.java | 15 ++- .../RecordDeserializerInstrumentation.java | 7 +- ...NodeRecordDeserializerInstrumentation.java | 7 +- .../kafkastreams/StateHolder.java | 12 +- .../StreamTaskInstrumentation.java | 4 +- .../groovy/KafkaStreamsDefaultTest.groovy | 13 ++ ...afkaStreamsSuppressReceiveSpansTest.groovy | 5 + .../kafka/KafkaIntegrationTest.java | 12 +- .../spring/kafka/v2_7/SpringKafkaTest.java | 93 +++++++++++++-- .../v2_7/InstrumentedBatchInterceptor.java | 33 +++--- .../v2_7/InstrumentedRecordInterceptor.java | 44 +++---- .../kafka/v2_7/SpringKafkaTelemetry.java | 14 +-- .../spring/kafka/v2_7/State.java | 8 +- ...ractSpringKafkaNoReceiveTelemetryTest.java | 51 +++++++- .../v3_6/InstrumentedBatchRecordsHandler.java | 30 ++--- .../v3_6/InstrumentedSingleRecordHandler.java | 30 ++--- .../KafkaReadStreamImplInstrumentation.java | 13 +- .../kafka/v3_6/VertxKafkaSingletons.java | 16 +-- .../kafka/v3_6/AbstractVertxKafkaTest.java | 19 ++- 53 files changed, 992 insertions(+), 537 deletions(-) create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractKafkaConsumerRequest.java delete mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/ConsumerAndRecord.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContext.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProcessRequest.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerRequest.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveRequest.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaUtil.java diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java index 1de5e1e4b405..5ba44989bedc 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java @@ -12,8 +12,8 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Iterator; @@ -21,7 +21,6 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -70,10 +69,8 @@ public static void wrap( // leak the context and so there may be a leaked consumer span in the context, in which // case it's important to overwrite the leaked span instead of suppressing the correct span // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) - Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records); - Consumer consumer = - VirtualField.find(ConsumerRecords.class, Consumer.class).get(records); - iterable = TracingIterable.wrap(iterable, receiveContext, consumer); + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + iterable = TracingIterable.wrap(iterable, consumerContext); } } @@ -90,10 +87,8 @@ public static void wrap( // leak the context and so there may be a leaked consumer span in the context, in which // case it's important to overwrite the leaked span instead of suppressing the correct span // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) - Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records); - Consumer consumer = - VirtualField.find(ConsumerRecords.class, Consumer.class).get(records); - list = TracingList.wrap(list, receiveContext, consumer); + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + list = TracingList.wrap(list, consumerContext); } } @@ -110,10 +105,8 @@ public static void wrap( // leak the context and so there may be a leaked consumer span in the context, in which // case it's important to overwrite the leaked span instead of suppressing the correct span // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) - Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records); - Consumer consumer = - VirtualField.find(ConsumerRecords.class, Consumer.class).get(records); - iterator = TracingIterator.wrap(iterator, receiveContext, consumer); + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + iterator = TracingIterator.wrap(iterator, consumerContext); } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java index 709026772546..b3cbfa866de5 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java @@ -17,8 +17,8 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; -import io.opentelemetry.instrumentation.api.util.VirtualField; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; import io.opentelemetry.instrumentation.kafka.internal.Timer; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; @@ -94,21 +94,15 @@ public static void onExit( return; } - // we're attaching the consumer to the records to be able to retrieve things like consumer - // group or clientId later - VirtualField, Consumer> consumerRecordsConsumer = - VirtualField.find(ConsumerRecords.class, Consumer.class); - consumerRecordsConsumer.set(records, consumer); - Context parentContext = currentContext(); - ConsumerAndRecord> request = - ConsumerAndRecord.create(consumer, records); - - if (consumerReceiveInstrumenter().shouldStart(parentContext, request)) { - // disable process tracing and store the receive span for each individual record too - boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false); - try { - Context context = + KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumer); + + // disable process tracing and store the receive span for each individual record too + boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false); + try { + Context context = null; + if (consumerReceiveInstrumenter().shouldStart(parentContext, request)) { + context = InstrumenterUtil.startAndEnd( consumerReceiveInstrumenter(), parentContext, @@ -117,23 +111,21 @@ public static void onExit( error, timer.startTime(), timer.now()); + } + + // we're storing the context of the receive span so that process spans can use it as + // parent context even though the span has ended + // this is the suggested behavior according to the spec batch receive scenario: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving + // we're attaching the consumer to the records to be able to retrieve things like consumer + // group or clientId later + KafkaConsumerContextUtil.set(records, context, consumer); - // we're storing the context of the receive span so that process spans can use it as - // parent context even though the span has ended - // this is the suggested behavior according to the spec batch receive scenario: - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving - VirtualField, Context> consumerRecordsContext = - VirtualField.find(ConsumerRecords.class, Context.class); - consumerRecordsContext.set(records, context); - - VirtualField, Context> consumerRecordContext = - VirtualField.find(ConsumerRecord.class, Context.class); - for (ConsumerRecord record : records) { - consumerRecordContext.set(record, context); - } - } finally { - KafkaClientsConsumerProcessTracing.setEnabled(previousValue); + for (ConsumerRecord record : records) { + KafkaConsumerContextUtil.set(record, context, consumer); } + } finally { + KafkaClientsConsumerProcessTracing.setEnabled(previousValue); } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java index e7b8ed0c286d..4ce176d3897c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java @@ -15,6 +15,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest; import io.opentelemetry.instrumentation.kafka.internal.KafkaPropagation; import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; @@ -74,19 +75,21 @@ public static void onEnter(@Advice.Argument(0) Properties config) { public static class SendAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( + public static KafkaProducerRequest onEnter( @Advice.FieldValue("apiVersions") ApiVersions apiVersions, + @Advice.FieldValue("clientId") String clientId, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @Advice.Argument(value = 1, readOnly = false) Callback callback, @Advice.Local("otelContext") Context context, @Advice.Local("otelScope") Scope scope) { + KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); Context parentContext = Java8BytecodeBridge.currentContext(); - if (!producerInstrumenter().shouldStart(parentContext, record)) { - return; + if (!producerInstrumenter().shouldStart(parentContext, request)) { + return null; } - context = producerInstrumenter().start(parentContext, record); + context = producerInstrumenter().start(parentContext, request); scope = context.makeCurrent(); if (KafkaSingletons.isProducerPropagationEnabled() @@ -94,12 +97,13 @@ public static void onEnter( record = KafkaPropagation.propagateContext(context, record); } - callback = new ProducerCallback(callback, parentContext, context, record); + callback = new ProducerCallback(callback, parentContext, context, request); + return request; } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, + @Advice.Enter KafkaProducerRequest request, @Advice.Thrown Throwable throwable, @Advice.Local("otelContext") Context context, @Advice.Local("otelScope") Scope scope) { @@ -109,7 +113,7 @@ public static void stopSpan( scope.close(); if (throwable != null) { - producerInstrumenter().end(context, record, null, throwable); + producerInstrumenter().end(context, request, null, throwable); } // span finished by ProducerCallback } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java index 72df89de0ca0..27bc234545da 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java @@ -7,8 +7,10 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier; import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties; @@ -16,9 +18,6 @@ import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import java.util.Map; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public final class KafkaSingletons { @@ -34,11 +33,9 @@ public final class KafkaSingletons { InstrumentationConfig.get() .getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true); - private static final Instrumenter, RecordMetadata> PRODUCER_INSTRUMENTER; - private static final Instrumenter>, Void> - CONSUMER_RECEIVE_INSTRUMENTER; - private static final Instrumenter>, Void> - CONSUMER_PROCESS_INSTRUMENTER; + private static final Instrumenter PRODUCER_INSTRUMENTER; + private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER; + private static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER; static { KafkaInstrumenterFactory instrumenterFactory = @@ -58,17 +55,15 @@ public static boolean isProducerPropagationEnabled() { return PRODUCER_PROPAGATION_ENABLED; } - public static Instrumenter, RecordMetadata> producerInstrumenter() { + public static Instrumenter producerInstrumenter() { return PRODUCER_INSTRUMENTER; } - public static Instrumenter>, Void> - consumerReceiveInstrumenter() { + public static Instrumenter consumerReceiveInstrumenter() { return CONSUMER_RECEIVE_INSTRUMENTER; } - public static Instrumenter>, Void> - consumerProcessInstrumenter() { + public static Instrumenter consumerProcessInstrumenter() { return CONSUMER_PROCESS_INSTRUMENTER; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ProducerCallback.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ProducerCallback.java index e9b575e83821..2514b293a450 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ProducerCallback.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ProducerCallback.java @@ -9,18 +9,18 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest; import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class ProducerCallback implements Callback { private final Callback callback; private final Context parentContext; private final Context context; - private final ProducerRecord request; + private final KafkaProducerRequest request; public ProducerCallback( - Callback callback, Context parentContext, Context context, ProducerRecord request) { + Callback callback, Context parentContext, Context context, KafkaProducerRequest request) { this.callback = callback; this.parentContext = parentContext; this.context = context; diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java index 45294758483e..425a9e2d189d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java @@ -5,34 +5,26 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; -import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import java.util.Iterator; -import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; public class TracingIterable implements Iterable> { private final Iterable> delegate; - @Nullable private final Context receiveContext; - private final Consumer consumer; + private final KafkaConsumerContext consumerContext; private boolean firstIterator = true; protected TracingIterable( - Iterable> delegate, - @Nullable Context receiveContext, - Consumer consumer) { + Iterable> delegate, KafkaConsumerContext consumerContext) { this.delegate = delegate; - this.receiveContext = receiveContext; - this.consumer = consumer; + this.consumerContext = consumerContext; } public static Iterable> wrap( - Iterable> delegate, - @Nullable Context receiveContext, - Consumer consumer) { + Iterable> delegate, KafkaConsumerContext consumerContext) { if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingIterable<>(delegate, receiveContext, consumer); + return new TracingIterable<>(delegate, consumerContext); } return delegate; } @@ -44,7 +36,7 @@ public Iterator> iterator() { // However, this is not thread-safe, but usually the first (hopefully only) traversal of // ConsumerRecords is performed in the same thread that called poll() if (firstIterator) { - it = TracingIterator.wrap(delegate.iterator(), receiveContext, consumer); + it = TracingIterator.wrap(delegate.iterator(), consumerContext); firstIterator = false; } else { it = delegate.iterator(); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java index 222a5009796a..957439a4e721 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java @@ -9,44 +9,41 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import java.util.Iterator; import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; public class TracingIterator implements Iterator> { private final Iterator> delegateIterator; private final Context parentContext; - private final Consumer consumer; + private final KafkaConsumerContext consumerContext; /* * Note: this may potentially create problems if this iterator is used from different threads. But * at the moment we cannot do much about this. */ - @Nullable private ConsumerAndRecord> currentRequest; + @Nullable private KafkaProcessRequest currentRequest; @Nullable private Context currentContext; @Nullable private Scope currentScope; private TracingIterator( - Iterator> delegateIterator, - @Nullable Context receiveContext, - Consumer consumer) { + Iterator> delegateIterator, KafkaConsumerContext consumerContext) { this.delegateIterator = delegateIterator; + Context receiveContext = consumerContext.getContext(); // use the receive CONSUMER as parent if it's available this.parentContext = receiveContext != null ? receiveContext : Context.current(); - this.consumer = consumer; + this.consumerContext = consumerContext; } public static Iterator> wrap( - Iterator> delegateIterator, - @Nullable Context receiveContext, - Consumer consumer) { + Iterator> delegateIterator, KafkaConsumerContext consumerContext) { if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingIterator<>(delegateIterator, receiveContext, consumer); + return new TracingIterator<>(delegateIterator, consumerContext); } return delegateIterator; } @@ -69,7 +66,7 @@ public ConsumerRecord next() { // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) ConsumerRecord next = delegateIterator.next(); if (next != null && KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - currentRequest = ConsumerAndRecord.create(consumer, next); + currentRequest = KafkaProcessRequest.create(consumerContext, next); currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest); currentScope = currentContext.makeCurrent(); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java index bf295cd5f98c..7d326978fcc6 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java @@ -5,32 +5,25 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; -import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import java.util.Collection; import java.util.List; import java.util.ListIterator; -import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; public class TracingList extends TracingIterable implements List> { private final List> delegate; - private TracingList( - List> delegate, - @Nullable Context receiveContext, - Consumer consumer) { - super(delegate, receiveContext, consumer); + private TracingList(List> delegate, KafkaConsumerContext consumerContext) { + super(delegate, consumerContext); this.delegate = delegate; } public static List> wrap( - List> delegate, - @Nullable Context receiveContext, - Consumer consumer) { + List> delegate, KafkaConsumerContext consumerContext) { if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingList<>(delegate, receiveContext, consumer); + return new TracingList<>(delegate, consumerContext); } return delegate; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java index 935e840bc19d..e98a8c6a89a9 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java @@ -158,6 +158,9 @@ protected static List sendAttributes( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -186,10 +189,17 @@ protected static List receiveAttributes(boolean testHeaders) equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"))); + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")))); // consumer group id is not available in version 0.11 if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + assertions.add( + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> stringAssert.startsWith("test - consumer"))); } if (testHeaders) { assertions.add( @@ -209,6 +219,9 @@ protected static List processAttributes( equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, AbstractLongAssert::isNotNegative), @@ -221,6 +234,10 @@ protected static List processAttributes( // consumer group id is not available in version 0.11 if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + assertions.add( + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> stringAssert.startsWith("test - consumer"))); } if (messageKey != null) { assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, messageKey)); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java index 73e85122bd1e..c299baba2c49 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java @@ -13,8 +13,10 @@ import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaUtil; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier; import java.lang.reflect.InvocationTargetException; @@ -44,15 +46,14 @@ public final class KafkaTelemetry { private static final TextMapSetter SETTER = KafkaHeadersSetter.INSTANCE; private final OpenTelemetry openTelemetry; - private final Instrumenter, RecordMetadata> producerInstrumenter; - private final Instrumenter>, Void> - consumerProcessInstrumenter; + private final Instrumenter producerInstrumenter; + private final Instrumenter consumerProcessInstrumenter; private final boolean producerPropagationEnabled; KafkaTelemetry( OpenTelemetry openTelemetry, - Instrumenter, RecordMetadata> producerInstrumenter, - Instrumenter>, Void> consumerProcessInstrumenter, + Instrumenter producerInstrumenter, + Instrumenter consumerProcessInstrumenter, boolean producerPropagationEnabled) { this.openTelemetry = openTelemetry; this.producerInstrumenter = producerInstrumenter; @@ -95,7 +96,7 @@ public Producer wrap(Producer producer) { && method.getParameterTypes()[1] == Callback.class ? (Callback) args[1] : null; - return buildAndInjectSpan(record, callback, producer::send); + return buildAndInjectSpan(record, producer, callback, producer::send); } try { return method.invoke(producer, args); @@ -122,7 +123,7 @@ public Consumer wrap(Consumer consumer) { // ConsumerRecords poll(long timeout) // ConsumerRecords poll(Duration duration) if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) { - buildAndFinishSpan(consumer, (ConsumerRecords) result); + buildAndFinishSpan((ConsumerRecords) result, consumer); } return result; }); @@ -173,14 +174,15 @@ public Consumer wrap(Consumer consumer) { * * @param record the producer record to inject span info. */ - void buildAndInjectSpan(ProducerRecord record) { + void buildAndInjectSpan(ProducerRecord record, String clientId) { Context parentContext = Context.current(); - if (!producerInstrumenter.shouldStart(parentContext, record)) { + KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); + if (!producerInstrumenter.shouldStart(parentContext, request)) { return; } - Context context = producerInstrumenter.start(parentContext, record); + Context context = producerInstrumenter.start(parentContext, request); if (producerPropagationEnabled) { try { propagator().inject(context, record.headers(), SETTER); @@ -189,7 +191,7 @@ void buildAndInjectSpan(ProducerRecord record) { logger.log(WARNING, "failed to inject span context. sending record second time?", t); } } - producerInstrumenter.end(context, record, null, null); + producerInstrumenter.end(context, request, null, null); } /** @@ -201,25 +203,34 @@ void buildAndInjectSpan(ProducerRecord record) { */ Future buildAndInjectSpan( ProducerRecord record, + Producer producer, Callback callback, BiFunction, Callback, Future> sendFn) { Context parentContext = Context.current(); - if (!producerInstrumenter.shouldStart(parentContext, record)) { + + KafkaProducerRequest request = KafkaProducerRequest.create(record, producer); + if (!producerInstrumenter.shouldStart(parentContext, request)) { return sendFn.apply(record, callback); } - Context context = producerInstrumenter.start(parentContext, record); + Context context = producerInstrumenter.start(parentContext, request); try (Scope ignored = context.makeCurrent()) { propagator().inject(context, record.headers(), SETTER); - callback = new ProducerCallback(callback, parentContext, context, record); + callback = new ProducerCallback(callback, parentContext, context, request); return sendFn.apply(record, callback); } } - void buildAndFinishSpan(Consumer consumer, ConsumerRecords records) { + private void buildAndFinishSpan(ConsumerRecords records, Consumer consumer) { + buildAndFinishSpan( + records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer)); + } + + void buildAndFinishSpan( + ConsumerRecords records, String consumerGroup, String clientId) { Context parentContext = Context.current(); for (ConsumerRecord record : records) { - ConsumerAndRecord> request = ConsumerAndRecord.create(consumer, record); + KafkaProcessRequest request = KafkaProcessRequest.create(record, consumerGroup, clientId); if (!consumerProcessInstrumenter.shouldStart(parentContext, request)) { continue; } @@ -233,10 +244,10 @@ private class ProducerCallback implements Callback { private final Callback callback; private final Context parentContext; private final Context context; - private final ProducerRecord request; + private final KafkaProducerRequest request; public ProducerCallback( - Callback callback, Context parentContext, Context context, ProducerRecord request) { + Callback callback, Context parentContext, Context context, KafkaProducerRequest request) { this.callback = callback; this.parentContext = parentContext; this.context = context; diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java index 5324b3606567..87937be28d7e 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java @@ -11,23 +11,22 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public final class KafkaTelemetryBuilder { static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-2.6"; private final OpenTelemetry openTelemetry; - private final List, RecordMetadata>> + private final List> producerAttributesExtractors = new ArrayList<>(); - private final List>, Void>> - consumerAttributesExtractors = new ArrayList<>(); + private final List> consumerAttributesExtractors = + new ArrayList<>(); private List capturedHeaders = emptyList(); private boolean captureExperimentalSpanAttributes = false; private boolean propagationEnabled = true; @@ -38,14 +37,14 @@ public final class KafkaTelemetryBuilder { @CanIgnoreReturnValue public KafkaTelemetryBuilder addProducerAttributesExtractors( - AttributesExtractor, RecordMetadata> extractor) { + AttributesExtractor extractor) { producerAttributesExtractors.add(extractor); return this; } @CanIgnoreReturnValue public KafkaTelemetryBuilder addConsumerAttributesExtractors( - AttributesExtractor>, Void> extractor) { + AttributesExtractor extractor) { consumerAttributesExtractors.add(extractor); return this; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java index 83519ca48333..b5a2ad0a5ca6 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java @@ -7,6 +7,8 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import java.util.Map; +import java.util.Objects; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -21,9 +23,12 @@ public class TracingConsumerInterceptor implements ConsumerInterceptor onConsume(ConsumerRecords records) { - telemetry.buildAndFinishSpan(null, records); + telemetry.buildAndFinishSpan(records, consumerGroup, clientId); return records; } @@ -35,6 +40,9 @@ public void close() {} @Override public void configure(Map configs) { + consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null); + clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null); + // TODO: support experimental attributes config } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java index a17d105327ef..3c331241ecf8 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java @@ -7,6 +7,9 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -20,9 +23,11 @@ public class TracingProducerInterceptor implements ProducerInterceptor onSend(ProducerRecord producerRecord) { - telemetry.buildAndInjectSpan(producerRecord); + telemetry.buildAndInjectSpan(producerRecord, clientId); return producerRecord; } @@ -34,6 +39,8 @@ public void close() {} @Override public void configure(Map map) { + clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null); + // TODO: support experimental attributes config } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index ea86a73bec19..1fc0635baa35 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -9,7 +9,6 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static org.assertj.core.api.Assertions.assertThat; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; @@ -83,16 +82,19 @@ void testInterceptors() throws InterruptedException { span.hasName(SHARED_TOPIC + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) - .hasAttributesSatisfying( + .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), - equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")); + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer"))); }, span -> { span.hasName(SHARED_TOPIC + " receive") .hasKind(SpanKind.CONSUMER) .hasParent(trace.getSpan(1)) - .hasAttributesSatisfying( + .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), @@ -104,8 +106,15 @@ void testInterceptors() throws InterruptedException { SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, AbstractLongAssert::isNotNegative), satisfies( - AttributeKey.longKey("messaging.kafka.message.offset"), - AbstractLongAssert::isNotNegative)); + SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, + AbstractLongAssert::isNotNegative), + equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> stringAssert.startsWith("test - consumer"))); }); }, trace -> { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java index 952c88a657df..58d0e61043b3 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java @@ -15,10 +15,14 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -87,44 +91,13 @@ void testWrappers(boolean testHeaders) throws InterruptedException { span.hasName(SHARED_TOPIC + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) - .hasAttributesSatisfying( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), - equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), - satisfies( - SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative)); - if (testHeaders) { - span.hasAttributesSatisfying( - equalTo( - AttributeKey.stringArrayKey("messaging.header.test_message_header"), - Collections.singletonList("test"))); - } + .hasAttributesSatisfyingExactly(sendAttributes(testHeaders)); }, span -> { span.hasName(SHARED_TOPIC + " receive") .hasKind(SpanKind.CONSUMER) .hasParent(trace.getSpan(1)) - .hasAttributesSatisfying( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), - equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), - equalTo( - SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, - greeting.getBytes(StandardCharsets.UTF_8).length), - satisfies( - SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, - AbstractLongAssert::isNotNegative), - satisfies( - AttributeKey.longKey("messaging.kafka.message.offset"), - AbstractLongAssert::isNotNegative)); - if (testHeaders) { - span.hasAttributesSatisfying( - equalTo( - AttributeKey.stringArrayKey("messaging.header.test_message_header"), - Collections.singletonList("test"))); - } + .hasAttributesSatisfyingExactly(receiveAttributes(greeting, testHeaders)); }, span -> { span.hasName("producer callback") @@ -133,4 +106,65 @@ void testWrappers(boolean testHeaders) throws InterruptedException { }); }); } + + protected static List sendAttributes(boolean testHeaders) { + List assertions = + new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, + AbstractLongAssert::isNotNegative))); + if (testHeaders) { + assertions.add( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + return assertions; + } + + private static List receiveAttributes(String greeting, boolean testHeaders) { + List assertions = + new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + greeting.getBytes(StandardCharsets.UTF_8).length), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative), + equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> stringAssert.startsWith("test - consumer")))); + if (testHeaders) { + assertions.add( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + return assertions; + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractKafkaConsumerRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractKafkaConsumerRequest.java new file mode 100644 index 000000000000..c8040478c91e --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractKafkaConsumerRequest.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import javax.annotation.Nullable; + +abstract class AbstractKafkaConsumerRequest { + + @Nullable private final String consumerGroup; + @Nullable private final String clientId; + + AbstractKafkaConsumerRequest(String consumerGroup, String clientId) { + this.consumerGroup = consumerGroup; + this.clientId = clientId; + } + + @Nullable + public String getConsumerGroup() { + return consumerGroup; + } + + @Nullable + public String getClientId() { + return clientId; + } + + @Nullable + public String getConsumerId() { + if (consumerGroup != null) { + if (clientId != null) { + return consumerGroup + " - " + clientId; + } + return consumerGroup; + } + return null; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/ConsumerAndRecord.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/ConsumerAndRecord.java deleted file mode 100644 index 3eeaef9b2b7a..000000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/ConsumerAndRecord.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafka.internal; - -import com.google.auto.value.AutoValue; -import java.lang.invoke.MethodHandle; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.MethodType; -import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.Consumer; - -/** - * This class is internal and is hence not for public use. Its APIs are unstable and can change at - * any time. - */ -@AutoValue -public abstract class ConsumerAndRecord { - - public static ConsumerAndRecord create(@Nullable Consumer consumer, R record) { - return new AutoValue_ConsumerAndRecord<>(consumer, record); - } - - @Nullable - public abstract Consumer consumer(); - - public abstract R record(); - - private static final MethodHandle GET_GROUP_METADATA; - private static final MethodHandle GET_GROUP_ID; - - static { - MethodHandle getGroupMetadata; - MethodHandle getGroupId; - - try { - Class consumerGroupMetadata = - Class.forName("org.apache.kafka.clients.consumer.ConsumerGroupMetadata"); - - MethodHandles.Lookup lookup = MethodHandles.publicLookup(); - getGroupMetadata = - lookup.findVirtual( - Consumer.class, "groupMetadata", MethodType.methodType(consumerGroupMetadata)); - getGroupId = - lookup.findVirtual(consumerGroupMetadata, "groupId", MethodType.methodType(String.class)); - } catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException ignored) { - getGroupMetadata = null; - getGroupId = null; - } - - GET_GROUP_METADATA = getGroupMetadata; - GET_GROUP_ID = getGroupId; - } - - @Nullable - String consumerGroup() { - if (GET_GROUP_METADATA == null || GET_GROUP_ID == null) { - return null; - } - Consumer consumer = consumer(); - if (consumer == null) { - return null; - } - try { - Object metadata = GET_GROUP_METADATA.invoke(consumer); - return (String) GET_GROUP_ID.invoke(metadata); - } catch (Throwable e) { - return null; - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessSpanLinksExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessSpanLinksExtractor.java index 22b7130e831b..d50c5a583020 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessSpanLinksExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessSpanLinksExtractor.java @@ -11,13 +11,10 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -final class KafkaBatchProcessSpanLinksExtractor - implements SpanLinksExtractor>> { +final class KafkaBatchProcessSpanLinksExtractor implements SpanLinksExtractor { - private final SpanLinksExtractor>> - singleRecordLinkExtractor; + private final SpanLinksExtractor singleRecordLinkExtractor; KafkaBatchProcessSpanLinksExtractor(TextMapPropagator propagator) { this.singleRecordLinkExtractor = @@ -26,17 +23,15 @@ final class KafkaBatchProcessSpanLinksExtractor @Override public void extract( - SpanLinksBuilder spanLinks, - Context parentContext, - ConsumerAndRecord> consumerAndRecords) { + SpanLinksBuilder spanLinks, Context parentContext, KafkaReceiveRequest request) { - for (ConsumerRecord record : consumerAndRecords.record()) { + for (ConsumerRecord record : request.getRecords()) { // explicitly passing root to avoid situation where context propagation is turned off and the // parent (CONSUMER receive) span is linked singleRecordLinkExtractor.extract( spanLinks, Context.root(), - ConsumerAndRecord.create(consumerAndRecords.consumer(), record)); + KafkaProcessRequest.create(record, request.getConsumerGroup(), request.getClientId())); } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesExtractor.java index 4146e05777ec..cedad1c5e484 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesExtractor.java @@ -14,15 +14,13 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; final class KafkaConsumerAttributesExtractor - implements AttributesExtractor>, Void> { + implements AttributesExtractor { @Override public void onStart( - AttributesBuilder attributes, - Context parentContext, - ConsumerAndRecord> consumerAndRecord) { + AttributesBuilder attributes, Context parentContext, KafkaProcessRequest request) { - ConsumerRecord record = consumerAndRecord.record(); + ConsumerRecord record = request.getRecord(); attributes.put(SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, (long) record.partition()); attributes.put(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, record.offset()); @@ -35,10 +33,20 @@ public void onStart( attributes.put(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true); } - String consumerGroup = consumerAndRecord.consumerGroup(); + String consumerGroup = request.getConsumerGroup(); if (consumerGroup != null) { attributes.put(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, consumerGroup); } + + String clientId = request.getClientId(); + if (clientId != null) { + attributes.put(SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, clientId); + } + + String consumerId = request.getConsumerId(); + if (consumerId != null) { + attributes.put(SemanticAttributes.MESSAGING_CONSUMER_ID, consumerId); + } } private static boolean canSerialize(Class keyClass) { @@ -51,7 +59,7 @@ private static boolean canSerialize(Class keyClass) { public void onEnd( AttributesBuilder attributes, Context context, - ConsumerAndRecord> consumerAndRecord, + KafkaProcessRequest request, @Nullable Void unused, @Nullable Throwable error) {} } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesGetter.java index 2bef3a4c449b..ad1ca0c3bbfd 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesGetter.java @@ -12,80 +12,74 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.ConsumerRecord; -enum KafkaConsumerAttributesGetter - implements MessagingAttributesGetter>, Void> { +enum KafkaConsumerAttributesGetter implements MessagingAttributesGetter { INSTANCE; @Override - public String getSystem(ConsumerAndRecord> consumerAndRecord) { + public String getSystem(KafkaProcessRequest request) { return "kafka"; } @Override - public String getDestinationKind(ConsumerAndRecord> consumerAndRecord) { + public String getDestinationKind(KafkaProcessRequest request) { return SemanticAttributes.MessagingDestinationKindValues.TOPIC; } @Override - public String getDestination(ConsumerAndRecord> consumerAndRecord) { - return consumerAndRecord.record().topic(); + public String getDestination(KafkaProcessRequest request) { + return request.getRecord().topic(); } @Override - public boolean isTemporaryDestination(ConsumerAndRecord> consumerAndRecord) { + public boolean isTemporaryDestination(KafkaProcessRequest request) { return false; } @Override @Nullable - public String getProtocol(ConsumerAndRecord> consumerAndRecord) { + public String getProtocol(KafkaProcessRequest request) { return null; } @Override @Nullable - public String getProtocolVersion(ConsumerAndRecord> consumerAndRecord) { + public String getProtocolVersion(KafkaProcessRequest request) { return null; } @Override @Nullable - public String getUrl(ConsumerAndRecord> consumerAndRecord) { + public String getUrl(KafkaProcessRequest request) { return null; } @Override @Nullable - public String getConversationId(ConsumerAndRecord> consumerAndRecord) { + public String getConversationId(KafkaProcessRequest request) { return null; } @Override - public Long getMessagePayloadSize(ConsumerAndRecord> consumerAndRecord) { - return (long) consumerAndRecord.record().serializedValueSize(); + public Long getMessagePayloadSize(KafkaProcessRequest request) { + return (long) request.getRecord().serializedValueSize(); } @Override @Nullable - public Long getMessagePayloadCompressedSize( - ConsumerAndRecord> consumerAndRecord) { + public Long getMessagePayloadCompressedSize(KafkaProcessRequest request) { return null; } @Override @Nullable - public String getMessageId( - ConsumerAndRecord> consumerAndRecord, @Nullable Void unused) { + public String getMessageId(KafkaProcessRequest request, @Nullable Void unused) { return null; } @Override - public List getMessageHeader( - ConsumerAndRecord> consumerAndRecord, String name) { - return StreamSupport.stream( - consumerAndRecord.record().headers().headers(name).spliterator(), false) + public List getMessageHeader(KafkaProcessRequest request, String name) { + return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContext.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContext.java new file mode 100644 index 000000000000..44ad04ebcba5 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContext.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.context.Context; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.Consumer; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +@AutoValue +public abstract class KafkaConsumerContext { + + static KafkaConsumerContext create(@Nullable Context context, @Nullable Consumer consumer) { + return new AutoValue_KafkaConsumerContext(context, consumer); + } + + @Nullable + public abstract Context getContext(); + + @Nullable + abstract Consumer getConsumer(); +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java new file mode 100644 index 000000000000..28aaa82c174f --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java @@ -0,0 +1,55 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.util.VirtualField; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class KafkaConsumerContextUtil { + private static final VirtualField, Context> recordContextField = + VirtualField.find(ConsumerRecord.class, Context.class); + private static final VirtualField, Consumer> recordConsumerField = + VirtualField.find(ConsumerRecord.class, Consumer.class); + private static final VirtualField, Context> recordsContextField = + VirtualField.find(ConsumerRecords.class, Context.class); + private static final VirtualField, Consumer> recordsConsumerField = + VirtualField.find(ConsumerRecords.class, Consumer.class); + + public static KafkaConsumerContext get(ConsumerRecord records) { + Context receiveContext = recordContextField.get(records); + Consumer consumer = recordConsumerField.get(records); + return KafkaConsumerContext.create(receiveContext, consumer); + } + + public static KafkaConsumerContext get(ConsumerRecords records) { + Context receiveContext = recordsContextField.get(records); + Consumer consumer = recordsConsumerField.get(records); + return KafkaConsumerContext.create(receiveContext, consumer); + } + + public static void set(ConsumerRecord record, Context context, Consumer consumer) { + recordContextField.set(record, context); + recordConsumerField.set(record, consumer); + } + + public static void set(ConsumerRecord record, KafkaConsumerContext consumerContext) { + set(record, consumerContext.getContext(), consumerContext.getConsumer()); + } + + public static void set(ConsumerRecords records, Context context, Consumer consumer) { + recordsContextField.set(records, context); + recordsConsumerField.set(records, consumer); + } + + private KafkaConsumerContextUtil() {} +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerExperimentalAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerExperimentalAttributesExtractor.java index b87b07a42eca..8aefb139c168 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerExperimentalAttributesExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerExperimentalAttributesExtractor.java @@ -16,20 +16,19 @@ import org.apache.kafka.common.record.TimestampType; final class KafkaConsumerExperimentalAttributesExtractor - implements AttributesExtractor>, Void> { + implements AttributesExtractor { private static final AttributeKey KAFKA_RECORD_QUEUE_TIME_MS = longKey("kafka.record.queue_time_ms"); @Override public void onStart( - AttributesBuilder attributes, - Context parentContext, - ConsumerAndRecord> consumerAndRecord) { + AttributesBuilder attributes, Context parentContext, KafkaProcessRequest request) { + ConsumerRecord record = request.getRecord(); // don't record a duration if the message was sent from an old Kafka client - if (consumerAndRecord.record().timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { - long produceTime = consumerAndRecord.record().timestamp(); + if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { + long produceTime = record.timestamp(); // this attribute shows how much time elapsed between the producer and the consumer of this // message, which can be helpful for identifying queue bottlenecks attributes.put( @@ -41,7 +40,7 @@ public void onStart( public void onEnd( AttributesBuilder attributes, Context context, - ConsumerAndRecord> consumerAndRecord, + KafkaProcessRequest request, @Nullable Void unused, @Nullable Throwable error) {} } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerRecordGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerRecordGetter.java index 98ddecf4837f..714719165101 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerRecordGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerRecordGetter.java @@ -10,23 +10,22 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; -enum KafkaConsumerRecordGetter implements TextMapGetter>> { +enum KafkaConsumerRecordGetter implements TextMapGetter { INSTANCE; @Override - public Iterable keys(ConsumerAndRecord> carrier) { - return StreamSupport.stream(carrier.record().headers().spliterator(), false) + public Iterable keys(KafkaProcessRequest carrier) { + return StreamSupport.stream(carrier.getRecord().headers().spliterator(), false) .map(Header::key) .collect(Collectors.toList()); } @Nullable @Override - public String get(@Nullable ConsumerAndRecord> carrier, String key) { - Header header = carrier.record().headers().lastHeader(key); + public String get(@Nullable KafkaProcessRequest carrier, String key) { + Header header = carrier.getRecord().headers().lastHeader(key); if (header == null) { return null; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java index 95391cd034d4..216fb00c9645 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java @@ -21,9 +21,6 @@ import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; import java.util.Collections; import java.util.List; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** @@ -81,17 +78,17 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled( return this; } - public Instrumenter, RecordMetadata> createProducerInstrumenter() { + public Instrumenter createProducerInstrumenter() { return createProducerInstrumenter(Collections.emptyList()); } - public Instrumenter, RecordMetadata> createProducerInstrumenter( - Iterable, RecordMetadata>> extractors) { + public Instrumenter createProducerInstrumenter( + Iterable> extractors) { KafkaProducerAttributesGetter getter = KafkaProducerAttributesGetter.INSTANCE; MessageOperation operation = MessageOperation.SEND; - return Instrumenter., RecordMetadata>builder( + return Instrumenter.builder( openTelemetry, instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) @@ -103,12 +100,11 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled( .buildInstrumenter(SpanKindExtractor.alwaysProducer()); } - public Instrumenter>, Void> - createConsumerReceiveInstrumenter() { + public Instrumenter createConsumerReceiveInstrumenter() { KafkaReceiveAttributesGetter getter = KafkaReceiveAttributesGetter.INSTANCE; MessageOperation operation = MessageOperation.RECEIVE; - return Instrumenter.>, Void>builder( + return Instrumenter.builder( openTelemetry, instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) @@ -120,20 +116,18 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled( .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } - public Instrumenter>, Void> - createConsumerProcessInstrumenter() { + public Instrumenter createConsumerProcessInstrumenter() { return createConsumerOperationInstrumenter(MessageOperation.PROCESS, Collections.emptyList()); } - public Instrumenter>, Void> - createConsumerOperationInstrumenter( - MessageOperation operation, - Iterable>, Void>> extractors) { + public Instrumenter createConsumerOperationInstrumenter( + MessageOperation operation, + Iterable> extractors) { KafkaConsumerAttributesGetter getter = KafkaConsumerAttributesGetter.INSTANCE; - InstrumenterBuilder>, Void> builder = - Instrumenter.>, Void>builder( + InstrumenterBuilder builder = + Instrumenter.builder( openTelemetry, instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) @@ -157,12 +151,11 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled( } } - public Instrumenter>, Void> - createBatchProcessInstrumenter() { + public Instrumenter createBatchProcessInstrumenter() { KafkaReceiveAttributesGetter getter = KafkaReceiveAttributesGetter.INSTANCE; MessageOperation operation = MessageOperation.PROCESS; - return Instrumenter.>, Void>builder( + return Instrumenter.builder( openTelemetry, instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProcessRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProcessRequest.java new file mode 100644 index 000000000000..cc50c49f395a --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProcessRequest.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public class KafkaProcessRequest extends AbstractKafkaConsumerRequest { + + private final ConsumerRecord record; + + public static KafkaProcessRequest create(ConsumerRecord record, Consumer consumer) { + return create(record, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer)); + } + + public static KafkaProcessRequest create( + KafkaConsumerContext consumerContext, ConsumerRecord record) { + return create(record, consumerContext != null ? consumerContext.getConsumer() : null); + } + + public static KafkaProcessRequest create( + ConsumerRecord record, String consumerGroup, String clientId) { + return new KafkaProcessRequest(record, consumerGroup, clientId); + } + + public KafkaProcessRequest(ConsumerRecord record, String consumerGroup, String clientId) { + super(consumerGroup, clientId); + this.record = record; + } + + public ConsumerRecord getRecord() { + return record; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesExtractor.java index 7b75ce94a805..e6ccfb973966 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesExtractor.java @@ -11,23 +11,25 @@ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.nio.ByteBuffer; import javax.annotation.Nullable; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; final class KafkaProducerAttributesExtractor - implements AttributesExtractor, RecordMetadata> { + implements AttributesExtractor { @Override public void onStart( - AttributesBuilder attributes, Context parentContext, ProducerRecord record) { + AttributesBuilder attributes, Context parentContext, KafkaProducerRequest request) { - Object key = record.key(); + Object key = request.getRecord().key(); if (key != null && canSerialize(key.getClass())) { attributes.put(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, key.toString()); } - if (record.value() == null) { + if (request.getRecord().value() == null) { attributes.put(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true); } + if (request.getClientId() != null) { + attributes.put(SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, request.getClientId()); + } } private static boolean canSerialize(Class keyClass) { @@ -40,7 +42,7 @@ private static boolean canSerialize(Class keyClass) { public void onEnd( AttributesBuilder attributes, Context context, - ProducerRecord producerRecord, + KafkaProducerRequest request, @Nullable RecordMetadata recordMetadata, @Nullable Throwable error) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesGetter.java index 2370d33e21b8..ee6a8c86c9b3 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesGetter.java @@ -12,7 +12,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** @@ -20,75 +19,75 @@ * any time. */ enum KafkaProducerAttributesGetter - implements MessagingAttributesGetter, RecordMetadata> { + implements MessagingAttributesGetter { INSTANCE; @Override - public String getSystem(ProducerRecord producerRecord) { + public String getSystem(KafkaProducerRequest request) { return "kafka"; } @Override - public String getDestinationKind(ProducerRecord producerRecord) { + public String getDestinationKind(KafkaProducerRequest request) { return SemanticAttributes.MessagingDestinationKindValues.TOPIC; } @Override - public String getDestination(ProducerRecord producerRecord) { - return producerRecord.topic(); + public String getDestination(KafkaProducerRequest request) { + return request.getRecord().topic(); } @Override - public boolean isTemporaryDestination(ProducerRecord producerRecord) { + public boolean isTemporaryDestination(KafkaProducerRequest request) { return false; } @Override @Nullable - public String getProtocol(ProducerRecord producerRecord) { + public String getProtocol(KafkaProducerRequest request) { return null; } @Override @Nullable - public String getProtocolVersion(ProducerRecord producerRecord) { + public String getProtocolVersion(KafkaProducerRequest request) { return null; } @Override @Nullable - public String getUrl(ProducerRecord producerRecord) { + public String getUrl(KafkaProducerRequest request) { return null; } @Override @Nullable - public String getConversationId(ProducerRecord producerRecord) { + public String getConversationId(KafkaProducerRequest request) { return null; } @Override @Nullable - public Long getMessagePayloadSize(ProducerRecord producerRecord) { + public Long getMessagePayloadSize(KafkaProducerRequest request) { return null; } @Override @Nullable - public Long getMessagePayloadCompressedSize(ProducerRecord producerRecord) { + public Long getMessagePayloadCompressedSize(KafkaProducerRequest request) { return null; } @Override @Nullable public String getMessageId( - ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata) { + KafkaProducerRequest request, @Nullable RecordMetadata recordMetadata) { return null; } @Override - public List getMessageHeader(ProducerRecord producerRecord, String name) { - return StreamSupport.stream(producerRecord.headers().headers(name).spliterator(), false) + public List getMessageHeader(KafkaProducerRequest request, String name) { + return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerRequest.java new file mode 100644 index 000000000000..b9ad79bf2fc9 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerRequest.java @@ -0,0 +1,56 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import java.util.Iterator; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class KafkaProducerRequest { + + private final ProducerRecord record; + @Nullable private final String clientId; + + public static KafkaProducerRequest create(ProducerRecord record, Producer producer) { + return create(record, extractClientId(producer)); + } + + public static KafkaProducerRequest create(ProducerRecord record, String clientId) { + return new KafkaProducerRequest(record, clientId); + } + + private KafkaProducerRequest(ProducerRecord record, String clientId) { + this.record = record; + this.clientId = clientId; + } + + public ProducerRecord getRecord() { + return record; + } + + public String getClientId() { + return clientId; + } + + private static String extractClientId(Producer producer) { + try { + Map metrics = producer.metrics(); + Iterator metricIterator = metrics.keySet().iterator(); + return metricIterator.hasNext() ? metricIterator.next().tags().get("client-id") : null; + } catch (RuntimeException exception) { + // ExceptionHandlingTest uses a Producer that throws exception on every method call + return null; + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesExtractor.java index baa9fb33f743..c75f0945240d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesExtractor.java @@ -10,29 +10,35 @@ import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.ConsumerRecords; -enum KafkaReceiveAttributesExtractor - implements AttributesExtractor>, Void> { +enum KafkaReceiveAttributesExtractor implements AttributesExtractor { INSTANCE; @Override public void onStart( - AttributesBuilder attributes, - Context parentContext, - ConsumerAndRecord> consumerAndRecords) { + AttributesBuilder attributes, Context parentContext, KafkaReceiveRequest request) { - String consumerGroup = consumerAndRecords.consumerGroup(); + String consumerGroup = request.getConsumerGroup(); if (consumerGroup != null) { attributes.put(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, consumerGroup); } + + String clientId = request.getClientId(); + if (clientId != null) { + attributes.put(SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, clientId); + } + + String consumerId = request.getConsumerId(); + if (consumerId != null) { + attributes.put(SemanticAttributes.MESSAGING_CONSUMER_ID, consumerId); + } } @Override public void onEnd( AttributesBuilder attributes, Context context, - ConsumerAndRecord> request, + KafkaReceiveRequest request, @Nullable Void unused, @Nullable Throwable error) {} } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesGetter.java index 6a4d73bff1e8..2bb19b0999b0 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesGetter.java @@ -13,28 +13,26 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; -enum KafkaReceiveAttributesGetter - implements MessagingAttributesGetter>, Void> { +enum KafkaReceiveAttributesGetter implements MessagingAttributesGetter { INSTANCE; @Override - public String getSystem(ConsumerAndRecord> consumerAndRecords) { + public String getSystem(KafkaReceiveRequest request) { return "kafka"; } @Override - public String getDestinationKind(ConsumerAndRecord> consumerAndRecords) { + public String getDestinationKind(KafkaReceiveRequest request) { return SemanticAttributes.MessagingDestinationKindValues.TOPIC; } @Override @Nullable - public String getDestination(ConsumerAndRecord> consumerAndRecords) { + public String getDestination(KafkaReceiveRequest request) { Set topics = - consumerAndRecords.record().partitions().stream() + request.getRecords().partitions().stream() .map(TopicPartition::topic) .collect(Collectors.toSet()); // only return topic when there's exactly one in the batch @@ -42,59 +40,55 @@ public String getDestination(ConsumerAndRecord> consumerAn } @Override - public boolean isTemporaryDestination( - ConsumerAndRecord> consumerAndRecords) { + public boolean isTemporaryDestination(KafkaReceiveRequest request) { return false; } @Override @Nullable - public String getProtocol(ConsumerAndRecord> consumerAndRecords) { + public String getProtocol(KafkaReceiveRequest request) { return null; } @Override @Nullable - public String getProtocolVersion(ConsumerAndRecord> consumerAndRecords) { + public String getProtocolVersion(KafkaReceiveRequest request) { return null; } @Override @Nullable - public String getUrl(ConsumerAndRecord> consumerAndRecords) { + public String getUrl(KafkaReceiveRequest request) { return null; } @Override @Nullable - public String getConversationId(ConsumerAndRecord> consumerAndRecords) { + public String getConversationId(KafkaReceiveRequest request) { return null; } @Override @Nullable - public Long getMessagePayloadSize(ConsumerAndRecord> consumerAndRecords) { + public Long getMessagePayloadSize(KafkaReceiveRequest request) { return null; } @Override @Nullable - public Long getMessagePayloadCompressedSize( - ConsumerAndRecord> consumerAndRecords) { + public Long getMessagePayloadCompressedSize(KafkaReceiveRequest request) { return null; } @Override @Nullable - public String getMessageId( - ConsumerAndRecord> consumerAndRecords, @Nullable Void unused) { + public String getMessageId(KafkaReceiveRequest request, @Nullable Void unused) { return null; } @Override - public List getMessageHeader( - ConsumerAndRecord> consumerAndRecords, String name) { - return StreamSupport.stream(consumerAndRecords.record().spliterator(), false) + public List getMessageHeader(KafkaReceiveRequest request, String name) { + return StreamSupport.stream(request.getRecords().spliterator(), false) .flatMap( consumerRecord -> StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false)) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveRequest.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveRequest.java new file mode 100644 index 000000000000..d95ffb90e037 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveRequest.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public class KafkaReceiveRequest extends AbstractKafkaConsumerRequest { + + private final ConsumerRecords records; + + public static KafkaReceiveRequest create( + ConsumerRecords records, @Nullable Consumer consumer) { + return create(records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer)); + } + + public static KafkaReceiveRequest create( + KafkaConsumerContext consumerContext, ConsumerRecords records) { + return create(records, consumerContext != null ? consumerContext.getConsumer() : null); + } + + public static KafkaReceiveRequest create( + ConsumerRecords records, String consumerGroup, String clientId) { + return new KafkaReceiveRequest(records, consumerGroup, clientId); + } + + private KafkaReceiveRequest( + ConsumerRecords records, String consumerGroup, String clientId) { + super(consumerGroup, clientId); + this.records = records; + } + + public ConsumerRecords getRecords() { + return records; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaUtil.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaUtil.java new file mode 100644 index 000000000000..2d1d2f7d1ac7 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaUtil.java @@ -0,0 +1,112 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import io.opentelemetry.instrumentation.api.util.VirtualField; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class KafkaUtil { + + private static final String CONSUMER_GROUP = "consumer_group"; + private static final String CLIENT_ID = "client_id"; + + private static final VirtualField, Map> consumerInfoField = + VirtualField.find(Consumer.class, Map.class); + + private static final MethodHandle GET_GROUP_METADATA; + private static final MethodHandle GET_GROUP_ID; + + static { + MethodHandle getGroupMetadata; + MethodHandle getGroupId; + + try { + Class consumerGroupMetadata = + Class.forName("org.apache.kafka.clients.consumer.ConsumerGroupMetadata"); + + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + getGroupMetadata = + lookup.findVirtual( + Consumer.class, "groupMetadata", MethodType.methodType(consumerGroupMetadata)); + getGroupId = + lookup.findVirtual(consumerGroupMetadata, "groupId", MethodType.methodType(String.class)); + } catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException ignored) { + getGroupMetadata = null; + getGroupId = null; + } + + GET_GROUP_METADATA = getGroupMetadata; + GET_GROUP_ID = getGroupId; + } + + @Nullable + public static String getConsumerGroup(Consumer consumer) { + return getConsumerInfo(consumer).get(CONSUMER_GROUP); + } + + @Nullable + public static String getClientId(Consumer consumer) { + return getConsumerInfo(consumer).get(CLIENT_ID); + } + + private static Map getConsumerInfo(Consumer consumer) { + if (consumer == null) { + return Collections.emptyMap(); + } + Map map = consumerInfoField.get(consumer); + if (map == null) { + map = new HashMap<>(); + map.put(CONSUMER_GROUP, extractConsumerGroup(consumer)); + map.put(CLIENT_ID, extractClientId(consumer)); + consumerInfoField.set(consumer, map); + } + return map; + } + + @Nullable + private static String extractConsumerGroup(Consumer consumer) { + if (GET_GROUP_METADATA == null || GET_GROUP_ID == null) { + return null; + } + if (consumer == null) { + return null; + } + try { + Object metadata = GET_GROUP_METADATA.invoke(consumer); + return (String) GET_GROUP_ID.invoke(metadata); + } catch (Throwable e) { + return null; + } + } + + @Nullable + private static String extractClientId(Consumer consumer) { + try { + Map metrics = consumer.metrics(); + Iterator metricIterator = metrics.keySet().iterator(); + return metricIterator.hasNext() ? metricIterator.next().tags().get("client-id") : null; + } catch (RuntimeException exception) { + // ExceptionHandlingTest uses a Consumer that throws exception on every method call + return null; + } + } + + private KafkaUtil() {} +} diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java index 98f05d4261af..8abc39e5ea68 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java @@ -7,17 +7,16 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; public final class KafkaStreamsSingletons { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-streams-0.11"; - private static final Instrumenter>, Void> INSTRUMENTER = + private static final Instrumenter INSTRUMENTER = new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .setCaptureExperimentalSpanAttributes( @@ -27,7 +26,7 @@ public final class KafkaStreamsSingletons { ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) .createConsumerProcessInstrumenter(); - public static Instrumenter>, Void> instrumenter() { + public static Instrumenter instrumenter() { return INSTRUMENTER; } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java index 4c1f9c3dab9b..459bfe1ecc1a 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java @@ -14,14 +14,14 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.util.VirtualField; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.processor.internals.StampedRecord; // the advice applied by this instrumentation actually starts the span @@ -57,19 +57,18 @@ public static void onExit(@Advice.Return StampedRecord record) { return; } - Context receiveContext = - VirtualField.find(ConsumerRecord.class, Context.class).get(record.value); + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(record.value); + Context receiveContext = consumerContext.getContext(); // use the receive CONSUMER span as parent if it's available Context parentContext = receiveContext != null ? receiveContext : currentContext(); - ConsumerAndRecord> request = - ConsumerAndRecord.create(null, record.value); + KafkaProcessRequest request = KafkaProcessRequest.create(consumerContext, record.value); if (!instrumenter().shouldStart(parentContext, request)) { return; } Context context = instrumenter().start(parentContext, request); - holder.set(record.value, context, context.makeCurrent()); + holder.set(request, context, context.makeCurrent()); } } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java index 9b1d5bb54eff..e5dd893efc2a 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java @@ -13,8 +13,7 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; @@ -62,9 +61,7 @@ public static void onExit( } // copy the receive CONSUMER span association - VirtualField, Context> singleRecordReceiveContext = - VirtualField.find(ConsumerRecord.class, Context.class); - singleRecordReceiveContext.set(result, singleRecordReceiveContext.get(incoming)); + KafkaConsumerContextUtil.set(result, KafkaConsumerContextUtil.get(incoming)); } } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java index 41f110913b84..8523792e9a45 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java @@ -11,8 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; @@ -57,9 +56,7 @@ public static void saveHeaders( } // copy the receive CONSUMER span association - VirtualField, Context> singleRecordReceiveContext = - VirtualField.find(ConsumerRecord.class, Context.class); - singleRecordReceiveContext.set(result, singleRecordReceiveContext.get(incoming)); + KafkaConsumerContextUtil.set(result, KafkaConsumerContextUtil.get(incoming)); } } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StateHolder.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StateHolder.java index b75c133b07ae..13b8a3eb1e6b 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StateHolder.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StateHolder.java @@ -7,12 +7,12 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; public final class StateHolder { public static final ThreadLocal HOLDER = new ThreadLocal<>(); - private ConsumerRecord record; + private KafkaProcessRequest request; private Context context; private Scope scope; @@ -20,16 +20,16 @@ public void closeScope() { scope.close(); } - public ConsumerRecord getRecord() { - return record; + public KafkaProcessRequest getRequest() { + return request; } public Context getContext() { return context; } - public void set(ConsumerRecord record, Context context, Scope scope) { - this.record = record; + public void set(KafkaProcessRequest request, Context context, Scope scope) { + this.request = request; this.context = context; this.scope = scope; } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskInstrumentation.java index eaca099fd2be..96f3d998299f 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskInstrumentation.java @@ -11,7 +11,6 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; @@ -52,8 +51,7 @@ public static void stopSpan( Context context = holder.getContext(); if (context != null) { holder.closeScope(); - instrumenter() - .end(context, ConsumerAndRecord.create(null, holder.getRecord()), null, throwable); + instrumenter().end(context, holder.getRequest(), null, throwable); } } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy index 549f64a7ad11..b45cb51a1c44 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy @@ -100,6 +100,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" { it.startsWith("producer") } "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" @@ -119,8 +120,10 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "receive" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" { it.endsWith("consumer") } if (Boolean.getBoolean("testLatestDeps")) { "$SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application" + "$SemanticAttributes.MESSAGING_CONSUMER_ID" { it.startsWith("test-application - ") } } } } @@ -135,12 +138,17 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" { it.endsWith("consumer") } "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long "$SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" "kafka.record.queue_time_ms" { it >= 0 } "asdf" "testing" + if (Boolean.getBoolean("testLatestDeps")) { + "$SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application" + "$SemanticAttributes.MESSAGING_CONSUMER_ID" { it.startsWith("test-application - ") } + } } } // kafka-clients PRODUCER @@ -152,6 +160,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" { it.endsWith("producer") } "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 } @@ -170,8 +179,10 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "receive" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" { it.startsWith("consumer") } if (Boolean.getBoolean("testLatestDeps")) { "$SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test" + "$SemanticAttributes.MESSAGING_CONSUMER_ID" { it.startsWith("test - ") } } } } @@ -186,12 +197,14 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" { it.startsWith("consumer") } "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long "$SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" if (Boolean.getBoolean("testLatestDeps")) { "$SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test" + "$SemanticAttributes.MESSAGING_CONSUMER_ID" { it.startsWith("test - consumer") } } "kafka.record.queue_time_ms" { it >= 0 } "testing" 123 diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy index 6c017fae976e..c9e440279a3d 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy @@ -95,6 +95,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" "producer-1" "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" @@ -110,6 +111,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" { it.endsWith("consumer") } "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long "$SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 @@ -130,6 +132,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" String "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 } @@ -144,12 +147,14 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID" "consumer-1" "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long "$SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" if (Boolean.getBoolean("testLatestDeps")) { "$SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test" + "$SemanticAttributes.MESSAGING_CONSUMER_ID" { it.startsWith("test - consumer") } } "kafka.record.queue_time_ms" { it >= 0 } "testing" 123 diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaIntegrationTest.java b/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaIntegrationTest.java index 0b1817e35ab8..cf35a2214817 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaIntegrationTest.java +++ b/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaIntegrationTest.java @@ -107,6 +107,9 @@ private static void runShouldInstrumentProducerAndConsumer( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testTopic"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -134,7 +137,14 @@ private static void runShouldInstrumentProducerAndConsumer( AbstractLongAssert::isNotNegative), equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), equalTo( - SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testListener")), + SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testListener - consumer"))), span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java index 5d93215f233b..71c79e7c4b74 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java @@ -75,7 +75,10 @@ void shouldCreateSpansForSingleRecordProcess() { satisfies( SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), - equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"))); + equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")))); producer.set(trace.getSpan(1)); }, @@ -93,7 +96,14 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testSingleListener")), + "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testSingleListener - consumer"))), span -> span.hasName("testSingleTopic process") .hasKind(SpanKind.CONSUMER) @@ -118,6 +128,13 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testSingleListener - consumer")), satisfies( longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative)), @@ -157,7 +174,10 @@ void shouldHandleFailureInKafkaListener() { satisfies( SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), - equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"))); + equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")))); producer.set(trace.getSpan(1)); }, @@ -175,7 +195,14 @@ void shouldHandleFailureInKafkaListener() { equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testSingleListener")), + "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testSingleListener - consumer"))), span -> span.hasName("testSingleTopic process") .hasKind(SpanKind.CONSUMER) @@ -202,6 +229,13 @@ void shouldHandleFailureInKafkaListener() { equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testSingleListener - consumer")), satisfies( longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative)), @@ -237,7 +271,10 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { satisfies( SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), - equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10")), + equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer"))), span -> span.hasName("testBatchTopic send") .hasKind(SpanKind.PRODUCER) @@ -252,7 +289,10 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { satisfies( SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), - equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "20"))); + equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "20"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")))); producer1.set(trace.getSpan(1)); producer2.set(trace.getSpan(2)); @@ -271,7 +311,14 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener")), + "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testBatchListener - consumer"))), span -> span.hasName("testBatchTopic process") .hasKind(SpanKind.CONSUMER) @@ -287,7 +334,14 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener")), + "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testBatchListener - consumer"))), span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } @@ -324,7 +378,10 @@ void shouldHandleFailureInKafkaBatchListener() { satisfies( SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), - equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"))); + equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")))); producer.set(trace.getSpan(1)); }, @@ -342,7 +399,14 @@ void shouldHandleFailureInKafkaBatchListener() { equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener")), + "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testBatchListener - consumer"))), span -> span.hasName("testBatchTopic process") .hasKind(SpanKind.CONSUMER) @@ -358,7 +422,14 @@ void shouldHandleFailureInKafkaBatchListener() { equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener")), + "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testBatchListener - consumer"))), span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } } diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java index 16b0274abfd1..4d30d8c97b15 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java @@ -9,7 +9,9 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.util.VirtualField; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -17,17 +19,14 @@ final class InstrumentedBatchInterceptor implements BatchInterceptor { - private static final VirtualField, Context> receiveContextField = - VirtualField.find(ConsumerRecords.class, Context.class); - private static final VirtualField, State> stateField = + private static final VirtualField, State> stateField = VirtualField.find(ConsumerRecords.class, State.class); - private final Instrumenter>, Void> - batchProcessInstrumenter; + private final Instrumenter batchProcessInstrumenter; @Nullable private final BatchInterceptor decorated; InstrumentedBatchInterceptor( - Instrumenter>, Void> batchProcessInstrumenter, + Instrumenter batchProcessInstrumenter, @Nullable BatchInterceptor decorated) { this.batchProcessInstrumenter = batchProcessInstrumenter; this.decorated = decorated; @@ -37,18 +36,19 @@ final class InstrumentedBatchInterceptor implements BatchInterceptor public ConsumerRecords intercept(ConsumerRecords records, Consumer consumer) { Context parentContext = getParentContext(records); - ConsumerAndRecord> request = ConsumerAndRecord.create(consumer, records); + KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumer); if (batchProcessInstrumenter.shouldStart(parentContext, request)) { Context context = batchProcessInstrumenter.start(parentContext, request); Scope scope = context.makeCurrent(); - stateField.set(records, State.create(context, scope)); + stateField.set(records, State.create(request, context, scope)); } return decorated == null ? records : decorated.intercept(records, consumer); } private static Context getParentContext(ConsumerRecords records) { - Context receiveContext = receiveContextField.get(records); + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + Context receiveContext = consumerContext.getContext(); // use the receive CONSUMER span as parent if it's available return receiveContext != null ? receiveContext : Context.current(); @@ -56,7 +56,7 @@ private static Context getParentContext(ConsumerRecords records) { @Override public void success(ConsumerRecords records, Consumer consumer) { - end(ConsumerAndRecord.create(consumer, records), null); + end(records, null); if (decorated != null) { decorated.success(records, consumer); } @@ -64,20 +64,19 @@ public void success(ConsumerRecords records, Consumer consumer) { @Override public void failure(ConsumerRecords records, Exception exception, Consumer consumer) { - end(ConsumerAndRecord.create(consumer, records), exception); + end(records, exception); if (decorated != null) { decorated.failure(records, exception, consumer); } } - private void end( - ConsumerAndRecord> consumerAndRecord, @Nullable Throwable error) { - ConsumerRecords records = consumerAndRecord.record(); - State state = stateField.get(records); + private void end(ConsumerRecords records, @Nullable Throwable error) { + State state = stateField.get(records); stateField.set(records, null); if (state != null) { + KafkaReceiveRequest request = state.request(); state.scope().close(); - batchProcessInstrumenter.end(state.context(), consumerAndRecord, null, error); + batchProcessInstrumenter.end(state.context(), request, null, error); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java index 1cb16ceefee1..992a91d93701 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java @@ -9,7 +9,9 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.util.VirtualField; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.Consumer; @@ -18,16 +20,14 @@ final class InstrumentedRecordInterceptor implements RecordInterceptor { - private static final VirtualField, Context> receiveContextField = - VirtualField.find(ConsumerRecord.class, Context.class); - private static final VirtualField, State> stateField = + private static final VirtualField, State> stateField = VirtualField.find(ConsumerRecord.class, State.class); - private final Instrumenter>, Void> processInstrumenter; + private final Instrumenter processInstrumenter; @Nullable private final RecordInterceptor decorated; InstrumentedRecordInterceptor( - Instrumenter>, Void> processInstrumenter, + Instrumenter processInstrumenter, @Nullable RecordInterceptor decorated) { this.processInstrumenter = processInstrumenter; this.decorated = decorated; @@ -38,29 +38,30 @@ final class InstrumentedRecordInterceptor implements RecordInterceptor intercept(ConsumerRecord record) { - start(ConsumerAndRecord.create(null, record)); + start(record, null); return decorated == null ? record : decorated.intercept(record); } @Override public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) { - start(ConsumerAndRecord.create(consumer, record)); + start(record, consumer); return decorated == null ? record : decorated.intercept(record, consumer); } - private void start(ConsumerAndRecord> consumerAndRecord) { - ConsumerRecord record = consumerAndRecord.record(); + private void start(ConsumerRecord record, Consumer consumer) { Context parentContext = getParentContext(record); - if (processInstrumenter.shouldStart(parentContext, consumerAndRecord)) { - Context context = processInstrumenter.start(parentContext, consumerAndRecord); + KafkaProcessRequest request = KafkaProcessRequest.create(record, consumer); + if (processInstrumenter.shouldStart(parentContext, request)) { + Context context = processInstrumenter.start(parentContext, request); Scope scope = context.makeCurrent(); - stateField.set(record, State.create(context, scope)); + stateField.set(record, State.create(request, context, scope)); } } - private static Context getParentContext(ConsumerRecord records) { - Context receiveContext = receiveContextField.get(records); + private static Context getParentContext(ConsumerRecord record) { + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(record); + Context receiveContext = consumerContext.getContext(); // use the receive CONSUMER span as parent if it's available return receiveContext != null ? receiveContext : Context.current(); @@ -68,7 +69,7 @@ private static Context getParentContext(ConsumerRecord records) { @Override public void success(ConsumerRecord record, Consumer consumer) { - end(ConsumerAndRecord.create(consumer, record), null); + end(record, null); if (decorated != null) { decorated.success(record, consumer); } @@ -76,20 +77,19 @@ public void success(ConsumerRecord record, Consumer consumer) { @Override public void failure(ConsumerRecord record, Exception exception, Consumer consumer) { - end(ConsumerAndRecord.create(consumer, record), exception); + end(record, exception); if (decorated != null) { decorated.failure(record, exception, consumer); } } - private void end( - ConsumerAndRecord> consumerAndRecord, @Nullable Throwable error) { - ConsumerRecord record = consumerAndRecord.record(); - State state = stateField.get(record); + private void end(ConsumerRecord record, @Nullable Throwable error) { + State state = stateField.get(record); stateField.set(record, null); if (state != null) { + KafkaProcessRequest request = state.request(); state.scope().close(); - processInstrumenter.end(state.context(), consumerAndRecord, null, error); + processInstrumenter.end(state.context(), request, null, error); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java index 8e8a6b73f01a..cfdbc4cfc2be 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java @@ -8,9 +8,8 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.RecordInterceptor; @@ -31,13 +30,12 @@ public static SpringKafkaTelemetryBuilder builder(OpenTelemetry openTelemetry) { return new SpringKafkaTelemetryBuilder(openTelemetry); } - private final Instrumenter>, Void> processInstrumenter; - private final Instrumenter>, Void> - batchProcessInstrumenter; + private final Instrumenter processInstrumenter; + private final Instrumenter batchProcessInstrumenter; SpringKafkaTelemetry( - Instrumenter>, Void> processInstrumenter, - Instrumenter>, Void> batchProcessInstrumenter) { + Instrumenter processInstrumenter, + Instrumenter batchProcessInstrumenter) { this.processInstrumenter = processInstrumenter; this.batchProcessInstrumenter = batchProcessInstrumenter; } diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java index aa9085cd97cf..47dbf0a3758b 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java @@ -10,12 +10,14 @@ import io.opentelemetry.context.Scope; @AutoValue -abstract class State { +abstract class State { - static State create(Context context, Scope scope) { - return new AutoValue_State(context, scope); + static State create(REQUEST request, Context context, Scope scope) { + return new AutoValue_State<>(request, context, scope); } + abstract REQUEST request(); + abstract Context context(); abstract Scope scope(); diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java index 2815aab69bcd..b47638dbe021 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java @@ -49,6 +49,9 @@ void shouldCreateSpansForSingleRecordProcess() { SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -79,7 +82,14 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testSingleListener")), + "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testSingleListener - consumer"))), span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); } @@ -111,6 +121,9 @@ void shouldHandleFailureInKafkaListener() { SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -143,7 +156,14 @@ void shouldHandleFailureInKafkaListener() { equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testSingleListener")), + "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testSingleListener - consumer"))), span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); } @@ -172,6 +192,9 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -188,6 +211,9 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -218,7 +244,14 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener")), + "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testBatchListener - consumer"))), span -> span.hasName("consumer").hasParent(trace.getSpan(0)))); } @@ -252,6 +285,9 @@ void shouldHandleFailureInKafkaBatchListener() { equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -280,7 +316,14 @@ void shouldHandleFailureInKafkaBatchListener() { equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), equalTo( SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener")), + "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> + stringAssert.startsWith("testBatchListener - consumer"))), span -> span.hasName("consumer").hasParent(trace.getSpan(0)))); } } diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedBatchRecordsHandler.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedBatchRecordsHandler.java index 656f95890524..829945a766d4 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedBatchRecordsHandler.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedBatchRecordsHandler.java @@ -9,35 +9,30 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.util.VirtualField; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.vertx.core.Handler; import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; public final class InstrumentedBatchRecordsHandler implements Handler> { - private final VirtualField, Context> receiveContextField; - private final Consumer kafkaConsumer; @Nullable private final Handler> delegate; - public InstrumentedBatchRecordsHandler( - VirtualField, Context> receiveContextField, - Consumer kafkaConsumer, - @Nullable Handler> delegate) { - this.receiveContextField = receiveContextField; - this.kafkaConsumer = kafkaConsumer; + public InstrumentedBatchRecordsHandler(@Nullable Handler> delegate) { this.delegate = delegate; } @Override public void handle(ConsumerRecords records) { - Context parentContext = getParentContext(records); - ConsumerAndRecord> request = - ConsumerAndRecord.create(kafkaConsumer, records); + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + Context receiveContext = consumerContext.getContext(); + // use the receive CONSUMER span as parent if it's available + Context parentContext = receiveContext != null ? receiveContext : Context.current(); + KafkaReceiveRequest request = KafkaReceiveRequest.create(consumerContext, records); if (!batchProcessInstrumenter().shouldStart(parentContext, request)) { callDelegateHandler(records); return; @@ -61,13 +56,6 @@ public void handle(ConsumerRecords records) { } } - private Context getParentContext(ConsumerRecords records) { - Context receiveContext = receiveContextField.get(records); - - // use the receive CONSUMER span as parent if it's available - return receiveContext != null ? receiveContext : Context.current(); - } - private void callDelegateHandler(ConsumerRecords records) { if (delegate != null) { delegate.handle(records); diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java index 460b33fd0ce3..c0b23b21873e 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java @@ -9,34 +9,29 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.util.VirtualField; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; import io.vertx.core.Handler; import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; public final class InstrumentedSingleRecordHandler implements Handler> { - private final VirtualField, Context> receiveContextField; - private final Consumer kafkaConsumer; @Nullable private final Handler> delegate; - public InstrumentedSingleRecordHandler( - VirtualField, Context> receiveContextField, - Consumer kafkaConsumer, - @Nullable Handler> delegate) { - this.receiveContextField = receiveContextField; - this.kafkaConsumer = kafkaConsumer; + public InstrumentedSingleRecordHandler(@Nullable Handler> delegate) { this.delegate = delegate; } @Override public void handle(ConsumerRecord record) { - Context parentContext = getParentContext(record); - ConsumerAndRecord> request = - ConsumerAndRecord.create(kafkaConsumer, record); + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(record); + Context receiveContext = consumerContext.getContext(); + // use the receive CONSUMER span as parent if it's available + Context parentContext = receiveContext != null ? receiveContext : Context.current(); + KafkaProcessRequest request = KafkaProcessRequest.create(consumerContext, record); if (!processInstrumenter().shouldStart(parentContext, request)) { callDelegateHandler(record); return; @@ -54,13 +49,6 @@ public void handle(ConsumerRecord record) { } } - private Context getParentContext(ConsumerRecord record) { - Context receiveContext = receiveContextField.get(record); - - // use the receive CONSUMER span as parent if it's available - return receiveContext != null ? receiveContext : Context.current(); - } - private void callDelegateHandler(ConsumerRecord record) { if (delegate != null) { delegate.handle(record); diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java index 36484345a2df..24d396cad2bb 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java @@ -10,8 +10,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; @@ -20,7 +18,6 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -55,10 +52,7 @@ public static void onEnter( @Advice.This KafkaReadStreamImpl readStream, @Advice.Argument(value = 0, readOnly = false) Handler> handler) { - Consumer consumer = readStream.unwrap(); - VirtualField, Context> receiveContextField = - VirtualField.find(ConsumerRecord.class, Context.class); - handler = new InstrumentedSingleRecordHandler<>(receiveContextField, consumer, handler); + handler = new InstrumentedSingleRecordHandler<>(handler); } } @@ -70,10 +64,7 @@ public static void onEnter( @Advice.This KafkaReadStreamImpl readStream, @Advice.Argument(value = 0, readOnly = false) Handler> handler) { - Consumer consumer = readStream.unwrap(); - VirtualField, Context> receiveContextField = - VirtualField.find(ConsumerRecords.class, Context.class); - handler = new InstrumentedBatchRecordsHandler<>(receiveContextField, consumer, handler); + handler = new InstrumentedBatchRecordsHandler<>(handler); } } diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java index 0926f3351591..5b09afb40110 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java @@ -7,21 +7,18 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; public final class VertxKafkaSingletons { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-kafka-client-3.6"; - private static final Instrumenter>, Void> - BATCH_PROCESS_INSTRUMENTER; - private static final Instrumenter>, Void> - PROCESS_INSTRUMENTER; + private static final Instrumenter BATCH_PROCESS_INSTRUMENTER; + private static final Instrumenter PROCESS_INSTRUMENTER; static { KafkaInstrumenterFactory factory = @@ -36,12 +33,11 @@ public final class VertxKafkaSingletons { PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter(); } - public static Instrumenter>, Void> - batchProcessInstrumenter() { + public static Instrumenter batchProcessInstrumenter() { return BATCH_PROCESS_INSTRUMENTER; } - public static Instrumenter>, Void> processInstrumenter() { + public static Instrumenter processInstrumenter() { return PROCESS_INSTRUMENTER; } diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java index 8c88d6142b72..f4c345f075d8 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java @@ -201,6 +201,9 @@ protected static List sendAttributes( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("producer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -229,10 +232,17 @@ private static List batchConsumerAttributes(String topic, St equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, topic), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, operation))); + equalTo(SemanticAttributes.MESSAGING_OPERATION, operation), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")))); // consumer group id is not available in version 0.11 if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + assertions.add( + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> stringAssert.startsWith("test - consumer"))); } return assertions; } @@ -246,6 +256,9 @@ protected static List processAttributes( equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), satisfies( SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, AbstractLongAssert::isNotNegative), @@ -261,6 +274,10 @@ protected static List processAttributes( // consumer group id is not available in version 0.11 if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + assertions.add( + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> stringAssert.startsWith("test - consumer"))); } String messageKey = record.key(); if (messageKey != null) {