Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement kafka client id and consumer id attributes #7860

Merged
merged 3 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Iterator;
import java.util.List;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

Expand Down Expand Up @@ -70,10 +69,8 @@ public static <K, V> void wrap(
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
Consumer<K, V> consumer =
VirtualField.find(ConsumerRecords.class, Consumer.class).get(records);
iterable = TracingIterable.wrap(iterable, receiveContext, consumer);
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
iterable = TracingIterable.wrap(iterable, consumerContext);
}
}

Expand All @@ -90,10 +87,8 @@ public static <K, V> void wrap(
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
Consumer<K, V> consumer =
VirtualField.find(ConsumerRecords.class, Consumer.class).get(records);
list = TracingList.wrap(list, receiveContext, consumer);
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
list = TracingList.wrap(list, consumerContext);
}
}

Expand All @@ -110,10 +105,8 @@ public static <K, V> void wrap(
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
Consumer<K, V> consumer =
VirtualField.find(ConsumerRecords.class, Consumer.class).get(records);
iterator = TracingIterator.wrap(iterator, receiveContext, consumer);
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
iterator = TracingIterator.wrap(iterator, consumerContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
import io.opentelemetry.instrumentation.kafka.internal.Timer;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
Expand Down Expand Up @@ -94,21 +94,15 @@ public static void onExit(
return;
}

// we're attaching the consumer to the records to be able to retrieve things like consumer
// group or clientId later
VirtualField<ConsumerRecords<?, ?>, Consumer<?, ?>> consumerRecordsConsumer =
VirtualField.find(ConsumerRecords.class, Consumer.class);
consumerRecordsConsumer.set(records, consumer);

Context parentContext = currentContext();
ConsumerAndRecord<ConsumerRecords<?, ?>> request =
ConsumerAndRecord.create(consumer, records);

if (consumerReceiveInstrumenter().shouldStart(parentContext, request)) {
// disable process tracing and store the receive span for each individual record too
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
Context context =
KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumer);

// disable process tracing and store the receive span for each individual record too
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
Context context = null;
if (consumerReceiveInstrumenter().shouldStart(parentContext, request)) {
context =
InstrumenterUtil.startAndEnd(
consumerReceiveInstrumenter(),
parentContext,
Expand All @@ -117,23 +111,21 @@ public static void onExit(
error,
timer.startTime(),
timer.now());
}

// we're storing the context of the receive span so that process spans can use it as
// parent context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
// we're attaching the consumer to the records to be able to retrieve things like consumer
// group or clientId later
KafkaConsumerContextUtil.set(records, context, consumer);

// we're storing the context of the receive span so that process spans can use it as
// parent context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);

VirtualField<ConsumerRecord<?, ?>, Context> consumerRecordContext =
VirtualField.find(ConsumerRecord.class, Context.class);
for (ConsumerRecord<?, ?> record : records) {
consumerRecordContext.set(record, context);
}
} finally {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
for (ConsumerRecord<?, ?> record : records) {
KafkaConsumerContextUtil.set(record, context, consumer);
}
} finally {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaPropagation;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
Expand Down Expand Up @@ -74,32 +75,35 @@ public static void onEnter(@Advice.Argument(0) Properties config) {
public static class SendAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
public static KafkaProducerRequest onEnter(
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
@Advice.FieldValue("clientId") String clientId,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
@Advice.Argument(value = 1, readOnly = false) Callback callback,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
Context parentContext = Java8BytecodeBridge.currentContext();
if (!producerInstrumenter().shouldStart(parentContext, record)) {
return;
if (!producerInstrumenter().shouldStart(parentContext, request)) {
return null;
}

context = producerInstrumenter().start(parentContext, record);
context = producerInstrumenter().start(parentContext, request);
scope = context.makeCurrent();

if (KafkaSingletons.isProducerPropagationEnabled()
&& KafkaPropagation.shouldPropagate(apiVersions)) {
record = KafkaPropagation.propagateContext(context, record);
}

callback = new ProducerCallback(callback, parentContext, context, record);
callback = new ProducerCallback(callback, parentContext, context, request);
return request;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
@Advice.Enter KafkaProducerRequest request,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
Expand All @@ -109,7 +113,7 @@ public static void stopSpan(
scope.close();

if (throwable != null) {
producerInstrumenter().end(context, record, null, throwable);
producerInstrumenter().end(context, request, null, throwable);
}
// span finished by ProducerCallback
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public final class KafkaSingletons {
Expand All @@ -34,11 +33,9 @@ public final class KafkaSingletons {
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);

private static final Instrumenter<ProducerRecord<?, ?>, RecordMetadata> PRODUCER_INSTRUMENTER;
private static final Instrumenter<ConsumerAndRecord<ConsumerRecords<?, ?>>, Void>
CONSUMER_RECEIVE_INSTRUMENTER;
private static final Instrumenter<ConsumerAndRecord<ConsumerRecord<?, ?>>, Void>
CONSUMER_PROCESS_INSTRUMENTER;
private static final Instrumenter<KafkaProducerRequest, RecordMetadata> PRODUCER_INSTRUMENTER;
private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER;
private static final Instrumenter<KafkaProcessRequest, Void> CONSUMER_PROCESS_INSTRUMENTER;

static {
KafkaInstrumenterFactory instrumenterFactory =
Expand All @@ -58,17 +55,15 @@ public static boolean isProducerPropagationEnabled() {
return PRODUCER_PROPAGATION_ENABLED;
}

public static Instrumenter<ProducerRecord<?, ?>, RecordMetadata> producerInstrumenter() {
public static Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
}

public static Instrumenter<ConsumerAndRecord<ConsumerRecords<?, ?>>, Void>
consumerReceiveInstrumenter() {
public static Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter() {
return CONSUMER_RECEIVE_INSTRUMENTER;
}

public static Instrumenter<ConsumerAndRecord<ConsumerRecord<?, ?>>, Void>
consumerProcessInstrumenter() {
public static Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter() {
return CONSUMER_PROCESS_INSTRUMENTER;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerCallback implements Callback {
private final Callback callback;
private final Context parentContext;
private final Context context;
private final ProducerRecord<?, ?> request;
private final KafkaProducerRequest request;

public ProducerCallback(
Callback callback, Context parentContext, Context context, ProducerRecord<?, ?> request) {
Callback callback, Context parentContext, Context context, KafkaProducerRequest request) {
this.callback = callback;
this.parentContext = parentContext;
this.context = context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,26 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
private final Iterable<ConsumerRecord<K, V>> delegate;
@Nullable private final Context receiveContext;
private final Consumer<K, V> consumer;
private final KafkaConsumerContext consumerContext;
private boolean firstIterator = true;

protected TracingIterable(
Iterable<ConsumerRecord<K, V>> delegate,
@Nullable Context receiveContext,
Consumer<K, V> consumer) {
Iterable<ConsumerRecord<K, V>> delegate, KafkaConsumerContext consumerContext) {
this.delegate = delegate;
this.receiveContext = receiveContext;
this.consumer = consumer;
this.consumerContext = consumerContext;
}

public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
Iterable<ConsumerRecord<K, V>> delegate,
@Nullable Context receiveContext,
Consumer<K, V> consumer) {
Iterable<ConsumerRecord<K, V>> delegate, KafkaConsumerContext consumerContext) {
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
return new TracingIterable<>(delegate, receiveContext, consumer);
return new TracingIterable<>(delegate, consumerContext);
}
return delegate;
}
Expand All @@ -44,7 +36,7 @@ public Iterator<ConsumerRecord<K, V>> iterator() {
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
// ConsumerRecords is performed in the same thread that called poll()
if (firstIterator) {
it = TracingIterator.wrap(delegate.iterator(), receiveContext, consumer);
it = TracingIterator.wrap(delegate.iterator(), consumerContext);
firstIterator = false;
} else {
it = delegate.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,41 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.kafka.internal.ConsumerAndRecord;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {

private final Iterator<ConsumerRecord<K, V>> delegateIterator;
private final Context parentContext;
private final Consumer<K, V> consumer;
private final KafkaConsumerContext consumerContext;

/*
* Note: this may potentially create problems if this iterator is used from different threads. But
* at the moment we cannot do much about this.
*/
@Nullable private ConsumerAndRecord<ConsumerRecord<?, ?>> currentRequest;
@Nullable private KafkaProcessRequest currentRequest;
@Nullable private Context currentContext;
@Nullable private Scope currentScope;

private TracingIterator(
Iterator<ConsumerRecord<K, V>> delegateIterator,
@Nullable Context receiveContext,
Consumer<K, V> consumer) {
Iterator<ConsumerRecord<K, V>> delegateIterator, KafkaConsumerContext consumerContext) {
this.delegateIterator = delegateIterator;

Context receiveContext = consumerContext.getContext();
// use the receive CONSUMER as parent if it's available
this.parentContext = receiveContext != null ? receiveContext : Context.current();
this.consumer = consumer;
this.consumerContext = consumerContext;
}

public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
Iterator<ConsumerRecord<K, V>> delegateIterator,
@Nullable Context receiveContext,
Consumer<K, V> consumer) {
Iterator<ConsumerRecord<K, V>> delegateIterator, KafkaConsumerContext consumerContext) {
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
return new TracingIterator<>(delegateIterator, receiveContext, consumer);
return new TracingIterator<>(delegateIterator, consumerContext);
}
return delegateIterator;
}
Expand All @@ -69,7 +66,7 @@ public ConsumerRecord<K, V> next() {
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
ConsumerRecord<K, V> next = delegateIterator.next();
if (next != null && KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
currentRequest = ConsumerAndRecord.create(consumer, next);
currentRequest = KafkaProcessRequest.create(consumerContext, next);
currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest);
currentScope = currentContext.makeCurrent();
}
Expand Down
Loading