Skip to content

Commit

Permalink
Add OpenTelemetry support.
Browse files Browse the repository at this point in the history
Signed-off-by: Ales Justin <ales.justin@gmail.com>
  • Loading branch information
alesj committed Nov 22, 2021
1 parent 882be99 commit 0929c8c
Show file tree
Hide file tree
Showing 18 changed files with 738 additions and 122 deletions.
5 changes: 5 additions & 0 deletions bin/kafka_bridge_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ fi
# Make sure that we use /dev/urandom
JAVA_OPTS="${JAVA_OPTS} -Dvertx.cacheDirBase=/tmp/vertx-cache -Djava.security.egd=file:/dev/./urandom"

# enabling OpenTelemetry with Jaeger
if [ -n "$OTEL_SERVICE_NAME" ]; then
export OTEL_TRACES_EXPORTER="jaeger"
fi

exec java $JAVA_OPTS $KAFKA_BRIDGE_LOG4J_OPTS -classpath "${MYPATH}/../libs/*" io.strimzi.kafka.bridge.Application "$@"
3 changes: 2 additions & 1 deletion config/application.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#Bridge related settings
bridge.id=my-bridge
# uncomment the following line to enable Jaeger tracing, check the documentation how to configure the tracer
#bridge.tracing=jaeger
#bridge.tracing=jaeger-otel
#bridge.tracing.service-name=strimzi-kafka-bridge

#Apache Kafka common
kafka.bootstrap.servers=localhost:9092
Expand Down
39 changes: 39 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@
<jaeger.version>1.6.0</jaeger.version>
<opentracing.version>0.33.0</opentracing.version>
<opentracing-kafka-client.version>0.1.15</opentracing-kafka-client.version>
<opentelemetry.version>1.7.0-alpha</opentelemetry.version>
<opentelemetry-stable.version>1.7.0</opentelemetry-stable.version>
<grpc.version>1.41.0</grpc.version>
<micrometer.version>1.3.9</micrometer.version>
<jmx-prometheus-collector.version>0.12.0</jmx-prometheus-collector.version>
<commons-cli.version>1.4</commons-cli.version>
Expand Down Expand Up @@ -202,6 +205,36 @@
<artifactId>opentracing-kafka-client</artifactId>
<version>${opentracing-kafka-client.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-common</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>${opentelemetry-stable.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
Expand Down Expand Up @@ -294,6 +327,12 @@
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-opentelemetry</artifactId>
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
19 changes: 5 additions & 14 deletions src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@

package io.strimzi.kafka.bridge;

import io.jaegertracing.Configuration;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import io.strimzi.kafka.bridge.amqp.AmqpBridge;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.http.HttpBridge;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
Expand Down Expand Up @@ -67,7 +65,7 @@ public static void main(String[] args) {
try {
VertxOptions vertxOptions = new VertxOptions();
JmxCollectorRegistry jmxCollectorRegistry = null;
if (Boolean.valueOf(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
log.info("Metrics enabled and exposed on the /metrics endpoint");
// setup Micrometer metrics options
vertxOptions.setMetricsOptions(metricsOptions());
Expand Down Expand Up @@ -126,23 +124,16 @@ public static void main(String[] args) {
}
}

// register Jaeger tracer - if set, etc
TracingUtil.initialize(bridgeConfig);

// when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well,
// so no need for a standalone embedded HTTP server
if (!bridgeConfig.getHttpConfig().isEnabled()) {
EmbeddedHttpServer embeddedHttpServer =
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
embeddedHttpServer.start();
}

// register OpenTracing Jaeger tracer
if ("jaeger".equals(bridgeConfig.getTracing())) {
if (config.get(Configuration.JAEGER_SERVICE_NAME) != null) {
Tracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.registerIfAbsent(tracer);
} else {
log.error("Jaeger tracing cannot be initialized because {} environment variable is not defined", Configuration.JAEGER_SERVICE_NAME);
}
}
}
});
} else {
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package io.strimzi.kafka.bridge;

import io.opentracing.contrib.kafka.TracingConsumerInterceptor;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.strimzi.kafka.bridge.tracker.OffsetTracker;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
Expand Down Expand Up @@ -167,9 +168,9 @@ protected void initConsumer(boolean shouldAttachBatchHandler, Properties config)
props.putAll(kafkaConfig.getConfig());
props.putAll(kafkaConfig.getConsumerConfig().getConfig());
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
if (this.bridgeConfig.getTracing() != null) {
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
}

TracingHandle tracing = TracingUtil.getTracing();
tracing.kafkaConsumerConfig(props);

if (config != null)
props.putAll(config);
Expand Down Expand Up @@ -200,7 +201,7 @@ protected void subscribe(boolean shouldAttachHandler) {
this.subscribed = true;
this.setPartitionsAssignmentHandlers();

Set<String> topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet());
Set<String> topics = this.topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet());
this.consumer.subscribe(topics, this::subscribeHandler);
}

Expand Down Expand Up @@ -710,7 +711,7 @@ protected void consume(Handler<AsyncResult<KafkaConsumerRecords<K, V>>> consumeH
this.consumer.poll(Duration.ofMillis(this.pollTimeOut), consumeHandler);
}

protected void commit(Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsetsData,
protected void commit(Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsetsData,
Handler<AsyncResult<Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata>>> commitOffsetsHandler) {
this.consumer.commit(offsetsData, commitOffsetsHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package io.strimzi.kafka.bridge;

import io.opentracing.contrib.kafka.TracingProducerInterceptor;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -103,9 +104,9 @@ public void open() {
Properties props = new Properties();
props.putAll(kafkaConfig.getConfig());
props.putAll(kafkaConfig.getProducerConfig().getConfig());
if (this.bridgeConfig.getTracing() != null) {
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
}

TracingHandle tracing = TracingUtil.getTracing();
tracing.kafkaProducerConfig(props);

this.producerUnsettledMode = KafkaProducer.create(this.vertx, props, this.keySerializer, this.valueSerializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class BridgeConfig extends AbstractConfig {

public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id";
public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing";
public static final String TRACING_SERVICE_NAME_TYPE = TRACING_TYPE + ".service-name";

private KafkaConfig kafkaConfig;
private AmqpConfig amqpConfig;
Expand Down Expand Up @@ -103,4 +104,12 @@ public String getTracing() {
return config.get(BridgeConfig.TRACING_TYPE).toString();
}
}

public String getTracingServiceName() {
if (config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE) == null) {
return null;
} else {
return config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE).toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,21 @@
package io.strimzi.kafka.bridge.http;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentracing.References;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.Tracer.SpanBuilder;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import io.opentracing.propagation.TextMapAdapter;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import io.strimzi.kafka.bridge.BridgeContentType;
import io.strimzi.kafka.bridge.ConsumerInstanceId;
import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.Endpoint;
import io.strimzi.kafka.bridge.ConsumerInstanceId;
import io.strimzi.kafka.bridge.SinkBridgeEndpoint;
import io.strimzi.kafka.bridge.SinkTopicSubscription;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.strimzi.kafka.bridge.tracing.SpanBuilderHandle;
import io.strimzi.kafka.bridge.tracing.SpanHandle;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
Expand All @@ -42,13 +36,11 @@
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.kafka.client.producer.KafkaHeader;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -247,6 +239,7 @@ private void doDeleteConsumer(RoutingContext routingContext) {
HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
}

@SuppressWarnings("checkstyle:NPathComplexity") // tracing abstraction adds new npath complexity ...
private void doPoll(RoutingContext routingContext) {
if (topicSubscriptionsPattern == null && topicSubscriptions.isEmpty()) {
HttpBridgeError error = new HttpBridgeError(
Expand Down Expand Up @@ -274,9 +267,9 @@ private void doPoll(RoutingContext routingContext) {
this.consume(records -> {
if (records.succeeded()) {

Tracer tracer = GlobalTracer.get();
TracingHandle tracing = TracingUtil.getTracing();
SpanBuilderHandle<K, V> builder = tracing.builder(HttpOpenApiOperations.POLL.toString());

SpanBuilder spanBuilder = tracer.buildSpan(HttpOpenApiOperations.POLL.toString());
for (int i = 0; i < records.result().size(); i++) {
KafkaConsumerRecord<K, V> record = records.result().recordAt(i);

Expand All @@ -285,25 +278,11 @@ private void doPoll(RoutingContext routingContext) {
headers.put(header.key(), header.value().toString());
}

SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers));
if (parentSpan != null) {
spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan);
}
builder.addRef(headers);
}
Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start();
HttpTracingUtils.setCommonTags(span, routingContext);
SpanHandle<K, V> span = builder.span(routingContext);

tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() {
@Override
public void put(String key, String value) {
routingContext.response().headers().add(key, value);
}

@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()");
}
});
span.inject(routingContext);

HttpResponseStatus responseStatus;
try {
Expand Down Expand Up @@ -332,8 +311,7 @@ public Iterator<Map.Entry<String, String>> iterator() {
HttpUtils.sendResponse(routingContext, responseStatus.code(),
BridgeContentType.KAFKA_JSON, error.toJson().toBuffer());
}
Tags.HTTP_STATUS.set(span, responseStatus.code());
span.finish();
span.finish(responseStatus.code());

} else {
HttpBridgeError error = new HttpBridgeError(
Expand Down
Loading

0 comments on commit 0929c8c

Please sign in to comment.