Skip to content

Commit

Permalink
Add OpenTelemetry support (#633)
Browse files Browse the repository at this point in the history
* Add OpenTelemetry support.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Fix async scope/context handling in OTel.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Fix relationship handling.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Upgrade libs, and consume test
Remove OTel span propagation hack.
Simplify executor service adapt.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Remove OTel executor service workaround.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Add javadoc

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Remove code setup of required config via env vars / sys props.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Remove gRPC impl dependency - not needed.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Upgrade Vert.x to 4.3.3.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Update OTel version, remove unused configuration.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Return dependency -- it's needed after all.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Reduce span builder usage.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Ignore OTel metrics in OpenTracing test.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

* Revert some changes, add more docs.

Signed-off-by: Ales Justin <ales.justin@gmail.com>

Signed-off-by: Ales Justin <ales.justin@gmail.com>
  • Loading branch information
alesj authored Aug 11, 2022
1 parent f3b2e2c commit ad42485
Show file tree
Hide file tree
Showing 22 changed files with 970 additions and 216 deletions.
5 changes: 5 additions & 0 deletions bin/kafka_bridge_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ fi
# Make sure that we use /dev/urandom
JAVA_OPTS="${JAVA_OPTS} -Dvertx.cacheDirBase=/tmp/vertx-cache -Djava.security.egd=file:/dev/./urandom"

# enabling OpenTelemetry with Jaeger by default
if [ -n "$OTEL_SERVICE_NAME" ] && [ -z "$OTEL_TRACES_EXPORTER" ]; then
export OTEL_TRACES_EXPORTER="jaeger"
fi

exec java $JAVA_OPTS $KAFKA_BRIDGE_LOG4J_OPTS -classpath "${MYPATH}/../libs/*" io.strimzi.kafka.bridge.Application "$@"
5 changes: 4 additions & 1 deletion config/application.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#Bridge related settings
bridge.id=my-bridge
# uncomment the following line to enable Jaeger tracing, check the documentation how to configure the tracer
# uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer
# OpenTracing support
#bridge.tracing=jaeger
# OpenTelemetry support
#bridge.tracing=opentelemetry

#Apache Kafka common
kafka.bootstrap.servers=localhost:9092
Expand Down
53 changes: 52 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<slf4j.version>1.7.21</slf4j.version>
<vertx.version>4.3.1</vertx.version>
<vertx.version>4.3.3</vertx.version>
<netty.version>4.1.77.Final</netty.version>
<kafka.version>3.2.0</kafka.version>
<qpid-proton.version>0.33.10</qpid-proton.version>
Expand All @@ -110,6 +110,8 @@
<jaeger.version>1.8.1</jaeger.version>
<opentracing.version>0.33.0</opentracing.version>
<opentracing-kafka-client.version>0.1.15</opentracing-kafka-client.version>
<opentelemetry.alpha-version>1.16.0-alpha</opentelemetry.alpha-version>
<opentelemetry.version>1.16.0</opentelemetry.version>
<micrometer.version>1.3.9</micrometer.version>
<jmx-prometheus-collector.version>0.12.0</jmx-prometheus-collector.version>
<prometheus-simpleclient.version>0.7.0</prometheus-simpleclient.version>
Expand Down Expand Up @@ -267,6 +269,46 @@
<artifactId>opentracing-kafka-client</artifactId>
<version>${opentracing-kafka-client.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-opentelemetry</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
Expand Down Expand Up @@ -349,6 +391,12 @@
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-opentracing</artifactId>
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -498,6 +546,9 @@
<ignoredUnusedDeclaredDependency>org.apache.logging.log4j:log4j-slf4j-impl</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.jaegertracing:jaeger-client</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>org.yaml:snakeyaml</ignoredUnusedDeclaredDependency> <!-- CVE override -->
<ignoredUnusedDeclaredDependency>org.apache.tomcat.embed:tomcat-embed-core</ignoredUnusedDeclaredDependency> <!-- CVE override -->
<!-- OpenTelemetry - used via classpath configuration -->
<ignoredUnusedDeclaredDependency>io.opentelemetry:opentelemetry-exporter-jaeger</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
Expand Down
40 changes: 16 additions & 24 deletions src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@

package io.strimzi.kafka.bridge;

import io.jaegertracing.Configuration;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import io.strimzi.kafka.bridge.amqp.AmqpBridge;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.http.HttpBridge;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
Expand All @@ -30,6 +28,7 @@
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,7 +66,7 @@ public static void main(String[] args) {
try {
VertxOptions vertxOptions = new VertxOptions();
JmxCollectorRegistry jmxCollectorRegistry = null;
if (Boolean.valueOf(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
log.info("Metrics enabled and exposed on the /metrics endpoint");
// setup Micrometer metrics options
vertxOptions.setMetricsOptions(metricsOptions());
Expand All @@ -82,13 +81,13 @@ public static void main(String[] args) {
CommandLine commandLine = new DefaultParser().parse(generateOptions(), args);

ConfigStoreOptions fileStore = new ConfigStoreOptions()
.setType("file")
.setFormat("properties")
.setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true));
.setType("file")
.setFormat("properties")
.setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true));

ConfigStoreOptions envStore = new ConfigStoreOptions()
.setType("env")
.setConfig(new JsonObject().put("raw-data", true));
.setType("env")
.setConfig(new JsonObject().put("raw-data", true));

ConfigRetrieverOptions options = new ConfigRetrieverOptions()
.addStore(fileStore)
Expand Down Expand Up @@ -126,43 +125,36 @@ public static void main(String[] args) {
}
}

// register tracing - if set, etc
TracingUtil.initialize(bridgeConfig);

// when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well,
// so no need for a standalone embedded HTTP server
if (!bridgeConfig.getHttpConfig().isEnabled()) {
EmbeddedHttpServer embeddedHttpServer =
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
embeddedHttpServer.start();
}

// register OpenTracing Jaeger tracer
if ("jaeger".equals(bridgeConfig.getTracing())) {
if (config.get(Configuration.JAEGER_SERVICE_NAME) != null) {
Tracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.registerIfAbsent(tracer);
} else {
log.error("Jaeger tracing cannot be initialized because {} environment variable is not defined", Configuration.JAEGER_SERVICE_NAME);
}
}
}
});
} else {
log.error("Error starting the bridge", ar.cause());
System.exit(1);
}
});
} catch (Exception ex) {
log.error("Error starting the bridge", ex);
} catch (RuntimeException | MalformedObjectNameException | IOException | ParseException e) {
log.error("Error starting the bridge", e);
System.exit(1);
}
}

/**
* Set up the Vert.x metrics options
*
*
* @return instance of the MicrometerMetricsOptions on Vert.x
*/
private static MicrometerMetricsOptions metricsOptions() {
Set<String> set = new HashSet();
Set<String> set = new HashSet<>();
set.add(MetricsDomain.NAMED_POOLS.name());
set.add(MetricsDomain.VERTICLES.name());
return new MicrometerMetricsOptions()
Expand Down Expand Up @@ -218,7 +210,7 @@ private static Future<HttpBridge> deployHttpBridge(Vertx vertx, BridgeConfig bri

if (bridgeConfig.getHttpConfig().isEnabled()) {
HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter);

vertx.deployVerticle(httpBridge, done -> {
if (done.succeeded()) {
log.info("HTTP verticle instance deployed [{}]", done.result());
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public MetricsReporter(JmxCollectorRegistry jmxCollectorRegistry, MeterRegistry
this.jmxCollectorRegistry = jmxCollectorRegistry;
this.meterRegistry = meterRegistry;
if (this.meterRegistry instanceof PrometheusMeterRegistry) {
this.meterRegistry.config().namingConvention(new PrometheusNamingConvention() {
@Override
public String name(String name, Meter.Type type, String baseUnit) {
String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
return super.name(metricName, type, baseUnit);
}
});
this.meterRegistry.config().namingConvention(new MetricsNamingConvention());
}
}

private static class MetricsNamingConvention extends PrometheusNamingConvention {
@Override
public String name(String name, Meter.Type type, String baseUnit) {
String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
return super.name(metricName, type, baseUnit);
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package io.strimzi.kafka.bridge;

import io.opentracing.contrib.kafka.TracingConsumerInterceptor;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracker.OffsetTracker;
Expand Down Expand Up @@ -167,9 +166,6 @@ protected void initConsumer(boolean shouldAttachBatchHandler, Properties config)
props.putAll(kafkaConfig.getConfig());
props.putAll(kafkaConfig.getConsumerConfig().getConfig());
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
if (this.bridgeConfig.getTracing() != null) {
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
}

if (config != null)
props.putAll(config);
Expand Down Expand Up @@ -200,7 +196,7 @@ protected void subscribe(boolean shouldAttachHandler) {
this.subscribed = true;
this.setPartitionsAssignmentHandlers();

Set<String> topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet());
Set<String> topics = this.topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet());
this.consumer.subscribe(topics, this::subscribeHandler);
}

Expand Down Expand Up @@ -710,7 +706,7 @@ protected void consume(Handler<AsyncResult<KafkaConsumerRecords<K, V>>> consumeH
this.consumer.poll(Duration.ofMillis(this.pollTimeOut), consumeHandler);
}

protected void commit(Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsetsData,
protected void commit(Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsetsData,
Handler<AsyncResult<Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata>>> commitOffsetsHandler) {
this.consumer.commit(offsetsData, commitOffsetsHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package io.strimzi.kafka.bridge;

import io.opentracing.contrib.kafka.TracingProducerInterceptor;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -103,9 +104,9 @@ public void open() {
Properties props = new Properties();
props.putAll(kafkaConfig.getConfig());
props.putAll(kafkaConfig.getProducerConfig().getConfig());
if (this.bridgeConfig.getTracing() != null) {
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
}

TracingHandle tracing = TracingUtil.getTracing();
tracing.addTracingPropsToProducerConfig(props);

this.producerUnsettledMode = KafkaProducer.create(this.vertx, props, this.keySerializer, this.valueSerializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;

Expand All @@ -26,7 +27,7 @@ public T deserialize(String topic, byte[] data) {

try (ByteArrayInputStream b = new ByteArrayInputStream(data); ObjectInputStream o = new ObjectInputStream(b)) {
return (T) o.readObject();
} catch (Exception e) {
} catch (IOException | ClassNotFoundException e) {
throw new SerializationException("Error when deserializing", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;

Expand All @@ -20,7 +21,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {

}

@SuppressFBWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
@SuppressFBWarnings({"PZLA_PREFER_ZERO_LENGTH_ARRAYS"})
@Override
public byte[] serialize(String topic, T data) {

Expand All @@ -30,7 +31,7 @@ public byte[] serialize(String topic, T data) {
try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
o.writeObject(data);
return b.toByteArray();
} catch (Exception e) {
} catch (IOException e) {
throw new SerializationException("Error when serializing", e);
}
}
Expand Down
Loading

0 comments on commit ad42485

Please sign in to comment.