Skip to content

Commit

Permalink
Apply feedback, #3.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Sep 28, 2021
1 parent ae7aad5 commit 40dcf49
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;

public final class KafkaSingletons {
public static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11.javaagent";
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";

private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER =
KafkaUtils.buildProducerInstrumenter(INSTRUMENTATION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.slf4j.LoggerFactory;

public class KafkaTracing {
public static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11.library";

private static final Logger logger = LoggerFactory.getLogger(KafkaTracing.class);

private static final TextMapGetter<Headers> GETTER = new KafkaHeadersGetter();
Expand All @@ -42,7 +40,13 @@ public class KafkaTracing {
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
}

public static KafkaTracingBuilder create(OpenTelemetry openTelemetry) {
/** Returns a new {@link KafkaTracing} configured with the given {@link OpenTelemetry}. */
public static KafkaTracing create(OpenTelemetry openTelemetry) {
return newBuilder(openTelemetry).build();
}

/** Returns a new {@link KafkaTracingBuilder} configured with the given {@link OpenTelemetry}. */
public static KafkaTracingBuilder newBuilder(OpenTelemetry openTelemetry) {
return new KafkaTracingBuilder(openTelemetry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,39 @@
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTracingBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";

private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<ProducerRecord<?, ?>, Void>> producerExtractors =
new ArrayList<>();
private final List<AttributesExtractor<ConsumerRecord<?, ?>, Void>> consumerProcessExtractors =
private final List<AttributesExtractor<ProducerRecord<?, ?>, Void>> producerAttributesExtractors =
new ArrayList<>();
private final List<AttributesExtractor<ConsumerRecord<?, ?>, Void>>
consumerProcessAttributesExtractors = new ArrayList<>();

KafkaTracingBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = Objects.requireNonNull(openTelemetry);
}

public void addProducerExtractors(AttributesExtractor<ProducerRecord<?, ?>, Void> extractor) {
producerExtractors.add(extractor);
public void addProducerAttributesExtractors(
AttributesExtractor<ProducerRecord<?, ?>, Void> extractor) {
producerAttributesExtractors.add(extractor);
}

public void addConsumerProcessExtractors(
public void addConsumerAttributesProcessExtractors(
AttributesExtractor<ConsumerRecord<?, ?>, Void> extractor) {
consumerProcessExtractors.add(extractor);
consumerProcessAttributesExtractors.add(extractor);
}

@SuppressWarnings("unchecked")
public KafkaTracing build() {
return new KafkaTracing(
KafkaUtils.buildProducerInstrumenter(
KafkaTracing.INSTRUMENTATION_NAME,
INSTRUMENTATION_NAME,
openTelemetry,
producerExtractors.toArray(new AttributesExtractor[0])),
producerAttributesExtractors.toArray(new AttributesExtractor[0])),
KafkaUtils.buildConsumerOperationInstrumenter(
KafkaTracing.INSTRUMENTATION_NAME,
INSTRUMENTATION_NAME,
openTelemetry,
MessageOperation.RECEIVE,
consumerProcessExtractors.toArray(new AttributesExtractor[0])));
consumerProcessAttributesExtractors.toArray(new AttributesExtractor[0])));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ abstract class KafkaTracingHolder {

public synchronized KafkaTracing getTracing() {
if (tracing == null) {
tracing = KafkaTracing.create(GlobalOpenTelemetry.get()).build();
tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
}
return tracing;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,8 @@

package io.opentelemetry.javaagent.instrumentation.kafkastreams;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerAttributesExtractor;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.KafkaPropagation;
import io.opentelemetry.instrumentation.kafka.KafkaUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public final class KafkaStreamsSingletons {
Expand All @@ -28,30 +16,7 @@ public final class KafkaStreamsSingletons {
private static final Instrumenter<ConsumerRecord<?, ?>, Void> INSTRUMENTER = buildInstrumenter();

private static Instrumenter<ConsumerRecord<?, ?>, Void> buildInstrumenter() {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

InstrumenterBuilder<ConsumerRecord<?, ?>, Void> builder =
Instrumenter.<ConsumerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor());
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}

if (!KafkaPropagation.isPropagationEnabled()) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaConsumerRecordGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaConsumerRecordGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return KafkaUtils.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter() {
Expand Down

0 comments on commit 40dcf49

Please sign in to comment.