Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start a CONSUMER span for Kafka poll(); and refactor spring-kafka... #4041

Merged
merged 5 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ muzzle {
}

dependencies {
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))

library("org.apache.kafka:kafka-clients:0.11.0.0")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import java.util.Iterator;
import java.util.List;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerRecordsInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.consumer.ConsumerRecords");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, String.class))
.and(returns(Iterable.class)),
ConsumerRecordsInstrumentation.class.getName() + "$IterableAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
.and(returns(List.class)),
ConsumerRecordsInstrumentation.class.getName() + "$ListAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("iterator"))
.and(takesArguments(0))
.and(returns(Iterator.class)),
ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice");
}

@SuppressWarnings("unused")
public static class IterableAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<?, ?>> iterable) {
if (iterable != null) {
ContextStore<ConsumerRecords, SpanContext> consumerRecordsSpan =
InstrumentationContext.get(ConsumerRecords.class, SpanContext.class);
iterable = new TracingIterable(iterable);
}
}
}

@SuppressWarnings("unused")
public static class ListAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(@Advice.Return(readOnly = false) List<ConsumerRecord<?, ?>> iterable) {
if (iterable != null) {
iterable = new TracingList(iterable);
}
}
}

@SuppressWarnings("unused")
public static class IteratorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<?, ?>> iterator) {
if (iterator != null) {
iterator = new TracingIterator(iterator);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public KafkaClientsInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new KafkaConsumerInstrumentation(), new KafkaProducerInstrumentation());
return asList(
new KafkaProducerInstrumentation(),
new KafkaConsumerInstrumentation(),
new ConsumerRecordsInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,76 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.spanFromContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Iterator;
import java.util.List;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import java.time.Duration;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class KafkaConsumerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.consumer.ConsumerRecords");
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
named("poll")
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, String.class))
.and(returns(Iterable.class)),
KafkaConsumerInstrumentation.class.getName() + "$IterableAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
.and(returns(List.class)),
KafkaConsumerInstrumentation.class.getName() + "$ListAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("iterator"))
.and(takesArguments(0))
.and(returns(Iterator.class)),
KafkaConsumerInstrumentation.class.getName() + "$IteratorAdvice");
.and(takesArguments(1))
.and(takesArgument(0, long.class).or(takesArgument(0, Duration.class)))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
this.getClass().getName() + "$PollAdvice");
}

@SuppressWarnings("unused")
public static class IterableAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<?, ?>> iterable) {
if (iterable != null) {
iterable = new TracingIterable(iterable);
}
public static class PollAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Timer onEnter() {
return Timer.start();
}
}

@SuppressWarnings("unused")
public static class ListAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onExit(
@Advice.Enter Timer timer,
@Advice.Return ConsumerRecords<?, ?> records,
@Advice.Thrown Throwable error) {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(@Advice.Return(readOnly = false) List<ConsumerRecord<?, ?>> iterable) {
if (iterable != null) {
iterable = new TracingList(iterable);
// don't create spans when no records were received
if (records == null || records.isEmpty()) {
return;
}
}
}

@SuppressWarnings("unused")
public static class IteratorAdvice {
Context parentContext = currentContext();
ReceivedRecords receivedRecords = ReceivedRecords.create(records, timer);
if (consumerReceiveInstrumenter().shouldStart(parentContext, receivedRecords)) {
Context context = consumerReceiveInstrumenter().start(parentContext, receivedRecords);
consumerReceiveInstrumenter().end(context, receivedRecords, null, error);

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<?, ?>> iterator) {
if (iterator != null) {
iterator = new TracingIterator(iterator, consumerInstrumenter());
// we're storing the context of the receive span so that process spans can use it as parent
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
ContextStore<ConsumerRecords, SpanContext> consumerRecordsSpan =
InstrumentationContext.get(ConsumerRecords.class, SpanContext.class);
consumerRecordsSpan.put(records, spanFromContext(context).getSpanContext());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.kafka;
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
Expand All @@ -13,70 +13,71 @@
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class BatchConsumerAttributesExtractor
extends MessagingAttributesExtractor<BatchRecords<?, ?>, Void> {
public final class KafkaReceiveAttributesExtractor
extends MessagingAttributesExtractor<ReceivedRecords, Void> {

@Override
protected String system(BatchRecords<?, ?> batchRecords) {
protected String system(ReceivedRecords receivedRecords) {
return "kafka";
}

@Override
protected String destinationKind(BatchRecords<?, ?> batchRecords) {
protected String destinationKind(ReceivedRecords receivedRecords) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}

@Override
protected @Nullable String destination(BatchRecords<?, ?> batchRecords) {
protected @Nullable String destination(ReceivedRecords receivedRecords) {
Set<String> topics =
batchRecords.records().partitions().stream()
receivedRecords.records().partitions().stream()
.map(TopicPartition::topic)
.collect(Collectors.toSet());
// only return topic when there's exactly one in the batch
return topics.size() == 1 ? topics.iterator().next() : null;
}

@Override
protected boolean temporaryDestination(BatchRecords<?, ?> batchRecords) {
protected boolean temporaryDestination(ReceivedRecords receivedRecords) {
return false;
}

@Override
protected @Nullable String protocol(BatchRecords<?, ?> batchRecords) {
protected @Nullable String protocol(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable String protocolVersion(BatchRecords<?, ?> batchRecords) {
protected @Nullable String protocolVersion(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable String url(BatchRecords<?, ?> batchRecords) {
protected @Nullable String url(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable String conversationId(BatchRecords<?, ?> batchRecords) {
protected @Nullable String conversationId(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable Long messagePayloadSize(BatchRecords<?, ?> batchRecords) {
protected @Nullable Long messagePayloadSize(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable Long messagePayloadCompressedSize(BatchRecords<?, ?> batchRecords) {
protected @Nullable Long messagePayloadCompressedSize(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected MessageOperation operation(BatchRecords<?, ?> batchRecords) {
return MessageOperation.PROCESS;
protected MessageOperation operation(ReceivedRecords receivedRecords) {
return MessageOperation.RECEIVE;
}

@Override
protected @Nullable String messageId(BatchRecords<?, ?> batchRecords, @Nullable Void unused) {
protected @Nullable String messageId(ReceivedRecords receivedRecords, @Nullable Void unused) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ public final class KafkaSingletons {

private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER =
buildProducerInstrumenter();
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_INSTRUMENTER =
buildConsumerInstrumenter();
private static final Instrumenter<ReceivedRecords, Void> CONSUMER_RECEIVE_INSTRUMENTER =
buildConsumerReceiveInstrumenter();
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER =
buildConsumerProcessInstrumenter();

private static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter() {
KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor();
Expand All @@ -39,7 +41,19 @@ public final class KafkaSingletons {
.newInstrumenter(SpanKindExtractor.alwaysProducer());
}

private static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerInstrumenter() {
private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter() {
KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor();
SpanNameExtractor<ReceivedRecords> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

return Instrumenter.<ReceivedRecords, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.setTimeExtractors(ReceivedRecords::startTime, (request, response) -> request.now())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerProcessInstrumenter() {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
Expand All @@ -62,8 +76,12 @@ public final class KafkaSingletons {
return PRODUCER_INSTRUMENTER;
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> consumerInstrumenter() {
return CONSUMER_INSTRUMENTER;
public static Instrumenter<ReceivedRecords, Void> consumerReceiveInstrumenter() {
return CONSUMER_RECEIVE_INSTRUMENTER;
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter() {
return CONSUMER_PROCESS_INSTRUMENTER;
}

private KafkaSingletons() {}
Expand Down
Loading