From 452827fd207e9bcfaa0d9a691f67ffa8ec792dea Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 25 Aug 2021 16:35:04 +0200 Subject: [PATCH] Add OpenTelemetry support. Signed-off-by: Ales Justin --- bin/kafka_bridge_run.sh | 5 + config/application.properties | 6 +- pom.xml | 45 ++++ .../io/strimzi/kafka/bridge/Application.java | 47 ++-- .../strimzi/kafka/bridge/MetricsReporter.java | 16 +- .../kafka/bridge/SinkBridgeEndpoint.java | 13 +- .../kafka/bridge/SourceBridgeEndpoint.java | 9 +- .../kafka/bridge/config/BridgeConfig.java | 9 + .../bridge/converter/DefaultDeserializer.java | 3 +- .../bridge/converter/DefaultSerializer.java | 5 +- .../bridge/http/HttpSinkBridgeEndpoint.java | 46 +--- .../bridge/http/HttpSourceBridgeEndpoint.java | 58 +---- .../kafka/bridge/http/HttpTracingUtils.java | 24 -- .../bridge/tracing/OpenTelemetryHandle.java | 212 ++++++++++++++++++ .../bridge/tracing/OpenTracingHandle.java | 190 ++++++++++++++++ .../bridge/tracing/SpanBuilderHandle.java | 18 ++ .../kafka/bridge/tracing/SpanHandle.java | 18 ++ .../bridge/tracing/TracingConstants.java | 23 ++ .../kafka/bridge/tracing/TracingHandle.java | 26 +++ .../kafka/bridge/tracing/TracingUtil.java | 110 +++++++++ .../bridge/tracing/OpenTelemetryTest.java | 25 +++ .../kafka/bridge/tracing/TracingTestBase.java | 98 ++++++++ 22 files changed, 852 insertions(+), 154 deletions(-) delete mode 100644 src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java create mode 100644 src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java create mode 100644 src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java diff --git a/bin/kafka_bridge_run.sh b/bin/kafka_bridge_run.sh index 3b57cd720..b7ec89e12 100755 --- a/bin/kafka_bridge_run.sh +++ b/bin/kafka_bridge_run.sh @@ -13,4 +13,9 @@ fi # Make sure that we use /dev/urandom JAVA_OPTS="${JAVA_OPTS} -Dvertx.cacheDirBase=/tmp/vertx-cache -Djava.security.egd=file:/dev/./urandom" +# enabling OpenTelemetry with Jaeger +if [ -n "$OTEL_SERVICE_NAME" ]; then + export OTEL_TRACES_EXPORTER="jaeger" +fi + exec java $JAVA_OPTS $KAFKA_BRIDGE_LOG4J_OPTS -classpath "${MYPATH}/../libs/*" io.strimzi.kafka.bridge.Application "$@" \ No newline at end of file diff --git a/config/application.properties b/config/application.properties index 9bfed2bbe..aadd4cf03 100644 --- a/config/application.properties +++ b/config/application.properties @@ -1,7 +1,11 @@ #Bridge related settings bridge.id=my-bridge -# uncomment the following line to enable Jaeger tracing, check the documentation how to configure the tracer +# uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer +# OpenTracing support #bridge.tracing=jaeger +# OpenTelemetry support +#bridge.tracing=jaeger-otel +#bridge.tracing.service-name=strimzi-kafka-bridge #Apache Kafka common kafka.bootstrap.servers=localhost:9092 diff --git a/pom.xml b/pom.xml index c7bd6a0b6..ce506682d 100644 --- a/pom.xml +++ b/pom.xml @@ -106,6 +106,9 @@ 1.6.0 0.33.0 0.1.15 + 1.7.0-alpha + 1.7.0 + 1.41.0 1.3.9 0.12.0 1.4 @@ -203,6 +206,42 @@ opentracing-kafka-client ${opentracing-kafka-client.version} + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-semconv + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-trace + ${opentelemetry-stable.version} + + + io.opentelemetry.instrumentation + opentelemetry-kafka-clients-2.6 + ${opentelemetry.version} + + + io.opentelemetry.instrumentation + opentelemetry-kafka-clients-common + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-exporter-jaeger + ${opentelemetry-stable.version} + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + io.micrometer micrometer-registry-prometheus @@ -295,6 +334,12 @@ ${vertx.version} test + + io.vertx + vertx-opentelemetry + ${vertx.version} + test + org.mockito mockito-core diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 9f3934e0e..b1bcd8342 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -5,13 +5,11 @@ package io.strimzi.kafka.bridge; -import io.jaegertracing.Configuration; import io.micrometer.core.instrument.MeterRegistry; -import io.opentracing.Tracer; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.amqp.AmqpBridge; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.http.HttpBridge; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.config.ConfigRetriever; import io.vertx.config.ConfigRetrieverOptions; import io.vertx.config.ConfigStoreOptions; @@ -67,7 +65,7 @@ public static void main(String[] args) { try { VertxOptions vertxOptions = new VertxOptions(); JmxCollectorRegistry jmxCollectorRegistry = null; - if (Boolean.valueOf(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) { + if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) { log.info("Metrics enabled and exposed on the /metrics endpoint"); // setup Micrometer metrics options vertxOptions.setMetricsOptions(metricsOptions()); @@ -82,17 +80,17 @@ public static void main(String[] args) { CommandLine commandLine = new DefaultParser().parse(generateOptions(), args); ConfigStoreOptions fileStore = new ConfigStoreOptions() - .setType("file") - .setFormat("properties") - .setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true)); + .setType("file") + .setFormat("properties") + .setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true)); ConfigStoreOptions envStore = new ConfigStoreOptions() - .setType("env") - .setConfig(new JsonObject().put("raw-data", true)); + .setType("env") + .setConfig(new JsonObject().put("raw-data", true)); ConfigRetrieverOptions options = new ConfigRetrieverOptions() - .addStore(fileStore) - .addStore(envStore); + .addStore(fileStore) + .addStore(envStore); ConfigRetriever retriever = ConfigRetriever.create(vertx, options); retriever.getConfig(ar -> { @@ -126,23 +124,16 @@ public static void main(String[] args) { } } + // register Jaeger tracer - if set, etc + TracingUtil.initialize(bridgeConfig); + // when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well, // so no need for a standalone embedded HTTP server if (!bridgeConfig.getHttpConfig().isEnabled()) { EmbeddedHttpServer embeddedHttpServer = - new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); + new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); embeddedHttpServer.start(); } - - // register OpenTracing Jaeger tracer - if ("jaeger".equals(bridgeConfig.getTracing())) { - if (config.get(Configuration.JAEGER_SERVICE_NAME) != null) { - Tracer tracer = Configuration.fromEnv().getTracer(); - GlobalTracer.registerIfAbsent(tracer); - } else { - log.error("Jaeger tracing cannot be initialized because {} environment variable is not defined", Configuration.JAEGER_SERVICE_NAME); - } - } } }); } else { @@ -150,19 +141,21 @@ public static void main(String[] args) { System.exit(1); } }); - } catch (Exception ex) { - log.error("Error starting the bridge", ex); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + log.error("Error starting the bridge", e); System.exit(1); } } /** * Set up the Vert.x metrics options - * + * * @return instance of the MicrometerMetricsOptions on Vert.x */ private static MicrometerMetricsOptions metricsOptions() { - Set set = new HashSet(); + Set set = new HashSet<>(); set.add(MetricsDomain.NAMED_POOLS.name()); set.add(MetricsDomain.VERTICLES.name()); return new MicrometerMetricsOptions() @@ -218,7 +211,7 @@ private static Future deployHttpBridge(Vertx vertx, BridgeConfig bri if (bridgeConfig.getHttpConfig().isEnabled()) { HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter); - + vertx.deployVerticle(httpBridge, done -> { if (done.succeeded()) { log.info("HTTP verticle instance deployed [{}]", done.result()); diff --git a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java b/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java index 4e8e33bad..ab8e2f315 100644 --- a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java +++ b/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java @@ -28,13 +28,15 @@ public MetricsReporter(JmxCollectorRegistry jmxCollectorRegistry, MeterRegistry this.jmxCollectorRegistry = jmxCollectorRegistry; this.meterRegistry = meterRegistry; if (this.meterRegistry instanceof PrometheusMeterRegistry) { - this.meterRegistry.config().namingConvention(new PrometheusNamingConvention() { - @Override - public String name(String name, Meter.Type type, String baseUnit) { - String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name; - return super.name(metricName, type, baseUnit); - } - }); + this.meterRegistry.config().namingConvention(new MetricsNamingConvention()); + } + } + + private static class MetricsNamingConvention extends PrometheusNamingConvention { + @Override + public String name(String name, Meter.Type type, String baseUnit) { + String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name; + return super.name(metricName, type, baseUnit); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java index 481ed2d95..4dae9892f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java @@ -5,9 +5,10 @@ package io.strimzi.kafka.bridge; -import io.opentracing.contrib.kafka.TracingConsumerInterceptor; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.strimzi.kafka.bridge.tracker.OffsetTracker; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; @@ -167,9 +168,9 @@ protected void initConsumer(boolean shouldAttachBatchHandler, Properties config) props.putAll(kafkaConfig.getConfig()); props.putAll(kafkaConfig.getConsumerConfig().getConfig()); props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); - if (this.bridgeConfig.getTracing() != null) { - props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); - } + + TracingHandle tracing = TracingUtil.getTracing(); + tracing.kafkaConsumerConfig(props); if (config != null) props.putAll(config); @@ -200,7 +201,7 @@ protected void subscribe(boolean shouldAttachHandler) { this.subscribed = true; this.setPartitionsAssignmentHandlers(); - Set topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet()); + Set topics = this.topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet()); this.consumer.subscribe(topics, this::subscribeHandler); } @@ -710,7 +711,7 @@ protected void consume(Handler>> consumeH this.consumer.poll(Duration.ofMillis(this.pollTimeOut), consumeHandler); } - protected void commit(Map offsetsData, + protected void commit(Map offsetsData, Handler>> commitOffsetsHandler) { this.consumer.commit(offsetsData, commitOffsetsHandler); } diff --git a/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java index 3d8595605..00fc231cc 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java @@ -5,9 +5,10 @@ package io.strimzi.kafka.bridge; -import io.opentracing.contrib.kafka.TracingProducerInterceptor; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -103,9 +104,9 @@ public void open() { Properties props = new Properties(); props.putAll(kafkaConfig.getConfig()); props.putAll(kafkaConfig.getProducerConfig().getConfig()); - if (this.bridgeConfig.getTracing() != null) { - props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); - } + + TracingHandle tracing = TracingUtil.getTracing(); + tracing.kafkaProducerConfig(props); this.producerUnsettledMode = KafkaProducer.create(this.vertx, props, this.keySerializer, this.valueSerializer); diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java index b1c08166c..ab5a56a94 100644 --- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java +++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java @@ -20,6 +20,7 @@ public class BridgeConfig extends AbstractConfig { public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id"; public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing"; + public static final String TRACING_SERVICE_NAME_TYPE = TRACING_TYPE + ".service-name"; private KafkaConfig kafkaConfig; private AmqpConfig amqpConfig; @@ -103,4 +104,12 @@ public String getTracing() { return config.get(BridgeConfig.TRACING_TYPE).toString(); } } + + public String getTracingServiceName() { + if (config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE) == null) { + return null; + } else { + return config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE).toString(); + } + } } diff --git a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java index 3216bfc9f..04314f5bf 100644 --- a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java +++ b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java @@ -8,6 +8,7 @@ import org.apache.kafka.common.serialization.Deserializer; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.ObjectInputStream; import java.util.Map; @@ -26,7 +27,7 @@ public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream b = new ByteArrayInputStream(data); ObjectInputStream o = new ObjectInputStream(b)) { return (T) o.readObject(); - } catch (Exception e) { + } catch (IOException | ClassNotFoundException e) { throw new SerializationException("Error when deserializing", e); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java index e6c62cc45..3e53809f7 100644 --- a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java +++ b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java @@ -10,6 +10,7 @@ import org.apache.kafka.common.serialization.Serializer; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.ObjectOutputStream; import java.util.Map; @@ -20,7 +21,7 @@ public void configure(Map configs, boolean isKey) { } - @SuppressFBWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS") + @SuppressFBWarnings({"PZLA_PREFER_ZERO_LENGTH_ARRAYS"}) @Override public byte[] serialize(String topic, T data) { @@ -30,7 +31,7 @@ public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) { o.writeObject(data); return b.toByteArray(); - } catch (Exception e) { + } catch (IOException e) { throw new SerializationException("Error when serializing", e); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 8d832b379..92502d0cb 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -6,20 +6,10 @@ package io.strimzi.kafka.bridge.http; import io.netty.handler.codec.http.HttpResponseStatus; -import io.opentracing.References; -import io.opentracing.Span; -import io.opentracing.SpanContext; -import io.opentracing.Tracer; -import io.opentracing.Tracer.SpanBuilder; -import io.opentracing.propagation.Format; -import io.opentracing.propagation.TextMap; -import io.opentracing.propagation.TextMapAdapter; -import io.opentracing.tag.Tags; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.BridgeContentType; +import io.strimzi.kafka.bridge.ConsumerInstanceId; import io.strimzi.kafka.bridge.EmbeddedFormat; import io.strimzi.kafka.bridge.Endpoint; -import io.strimzi.kafka.bridge.ConsumerInstanceId; import io.strimzi.kafka.bridge.SinkBridgeEndpoint; import io.strimzi.kafka.bridge.SinkTopicSubscription; import io.strimzi.kafka.bridge.config.BridgeConfig; @@ -27,6 +17,10 @@ import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter; import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; +import io.strimzi.kafka.bridge.tracing.SpanBuilderHandle; +import io.strimzi.kafka.bridge.tracing.SpanHandle; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; @@ -42,13 +36,11 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.producer.KafkaHeader; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -247,6 +239,7 @@ private void doDeleteConsumer(RoutingContext routingContext) { HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null); } + @SuppressWarnings("checkstyle:NPathComplexity") // tracing abstraction adds new npath complexity ... private void doPoll(RoutingContext routingContext) { if (topicSubscriptionsPattern == null && topicSubscriptions.isEmpty()) { HttpBridgeError error = new HttpBridgeError( @@ -274,9 +267,9 @@ private void doPoll(RoutingContext routingContext) { this.consume(records -> { if (records.succeeded()) { - Tracer tracer = GlobalTracer.get(); + TracingHandle tracing = TracingUtil.getTracing(); + SpanBuilderHandle builder = tracing.builder(routingContext, HttpOpenApiOperations.POLL.toString()); - SpanBuilder spanBuilder = tracer.buildSpan(HttpOpenApiOperations.POLL.toString()); for (int i = 0; i < records.result().size(); i++) { KafkaConsumerRecord record = records.result().recordAt(i); @@ -285,25 +278,11 @@ private void doPoll(RoutingContext routingContext) { headers.put(header.key(), header.value().toString()); } - SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); - if (parentSpan != null) { - spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); - } + builder.addRef(headers); } - Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); - HttpTracingUtils.setCommonTags(span, routingContext); + SpanHandle span = builder.span(routingContext); - tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { - @Override - public void put(String key, String value) { - routingContext.response().headers().add(key, value); - } - - @Override - public Iterator> iterator() { - throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); - } - }); + span.inject(routingContext); HttpResponseStatus responseStatus; try { @@ -332,8 +311,7 @@ public Iterator> iterator() { HttpUtils.sendResponse(routingContext, responseStatus.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); } - Tags.HTTP_STATUS.set(span, responseStatus.code()); - span.finish(); + span.finish(responseStatus.code()); } else { HttpBridgeError error = new HttpBridgeError( diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index bd333a4a8..13cc129a7 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -6,15 +6,6 @@ package io.strimzi.kafka.bridge.http; import io.netty.handler.codec.http.HttpResponseStatus; -import io.opentracing.Span; -import io.opentracing.SpanContext; -import io.opentracing.Tracer; -import io.opentracing.Tracer.SpanBuilder; -import io.opentracing.propagation.Format; -import io.opentracing.propagation.TextMap; -import io.opentracing.propagation.TextMapAdapter; -import io.opentracing.tag.Tags; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.BridgeContentType; import io.strimzi.kafka.bridge.EmbeddedFormat; import io.strimzi.kafka.bridge.Endpoint; @@ -25,10 +16,12 @@ import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; import io.strimzi.kafka.bridge.http.model.HttpBridgeResult; +import io.strimzi.kafka.bridge.tracing.SpanHandle; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.MultiMap; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -41,12 +34,8 @@ import org.apache.kafka.common.serialization.Serializer; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.UUID; -import java.util.Map.Entry; public class HttpSourceBridgeEndpoint extends SourceBridgeEndpoint { @@ -88,24 +77,10 @@ public void handle(Endpoint endpoint) { } } - Tracer tracer = GlobalTracer.get(); - - MultiMap httpHeaders = routingContext.request().headers(); - Map headers = new HashMap<>(); - for (Entry header: httpHeaders.entries()) { - headers.put(header.getKey(), header.getValue()); - } - String operationName = partition == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString(); - SpanBuilder spanBuilder; - SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); - if (parentSpan == null) { - spanBuilder = tracer.buildSpan(operationName); - } else { - spanBuilder = tracer.buildSpan(operationName).asChildOf(parentSpan); - } - Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); - HttpTracingUtils.setCommonTags(span, routingContext); + + TracingHandle tracing = TracingUtil.getTracing(); + SpanHandle span = tracing.span(routingContext, operationName); try { if (messageConverter == null) { @@ -114,24 +89,13 @@ public void handle(Endpoint endpoint) { HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - Tags.HTTP_STATUS.set(span, HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); - span.finish(); + span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); return; } records = messageConverter.toKafkaRecords(topic, partition, routingContext.getBody()); for (KafkaProducerRecord record :records) { - tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { - @Override - public void put(String key, String value) { - record.addHeader(key, value); - } - - @Override - public Iterator> iterator() { - throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); - } - }); + span.inject(record); } } catch (Exception e) { HttpBridgeError error = new HttpBridgeError( @@ -140,8 +104,7 @@ public Iterator> iterator() { HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - Tags.HTTP_STATUS.set(span, HttpResponseStatus.UNPROCESSABLE_ENTITY.code()); - span.finish(); + span.finish(HttpResponseStatus.UNPROCESSABLE_ENTITY.code()); return; } List> results = new ArrayList<>(records.size()); @@ -172,8 +135,7 @@ public Iterator> iterator() { } } - Tags.HTTP_STATUS.set(span, HttpResponseStatus.OK.code()); - span.finish(); + span.finish(HttpResponseStatus.OK.code()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.KAFKA_JSON, buildOffsets(results).toBuffer()); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java deleted file mode 100644 index 64d481178..000000000 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ - -package io.strimzi.kafka.bridge.http; - -import io.opentracing.Span; -import io.opentracing.tag.Tags; -import io.vertx.ext.web.RoutingContext; - -public class HttpTracingUtils { - - public static final String COMPONENT = "strimzi-kafka-bridge"; - public static final String KAFKA_SERVICE = "kafka"; - - - public static void setCommonTags(Span span, RoutingContext routingContext) { - Tags.COMPONENT.set(span, HttpTracingUtils.COMPONENT); - Tags.PEER_SERVICE.set(span, HttpTracingUtils.KAFKA_SERVICE); - Tags.HTTP_METHOD.set(span, routingContext.request().method().name()); - Tags.HTTP_URL.set(span, routingContext.request().uri()); - } -} \ No newline at end of file diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java new file mode 100644 index 000000000..d0a06ff7b --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -0,0 +1,212 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.jaegertracing.Configuration; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor; +import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor; +import io.opentelemetry.sdk.autoconfigure.OpenTelemetrySdkAutoConfiguration; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.jetbrains.annotations.Nullable; + +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.KAFKA_SERVICE; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_ENV_KEY; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY; + +/** + * OpenTelemetry implementation of Tracing. + */ +class OpenTelemetryHandle implements TracingHandle { + + static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) { + builder.setAttribute("component", COMPONENT); + builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); + builder.setAttribute(SemanticAttributes.HTTP_METHOD, routingContext.request().method().name()); + builder.setAttribute(SemanticAttributes.HTTP_URL, routingContext.request().uri()); + } + + @Override + public String envName() { + return OPENTELEMETRY_SERVICE_NAME_ENV_KEY; + } + + @Override + public String serviceName(BridgeConfig config) { + String serviceName = System.getenv(envName()); + if (serviceName == null) { + serviceName = System.getProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY); + if (serviceName == null) { + // legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set) + serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME); + if (serviceName == null) { + serviceName = config.getTracingServiceName(); + } + if (serviceName != null) { + System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, serviceName); + } + } + } + return serviceName; + } + + @Override + public void initialize() { + OpenTelemetrySdkAutoConfiguration.initialize(); + } + + private static Tracer get() { + return GlobalOpenTelemetry.getTracer(COMPONENT); + } + + private SpanBuilder getSpanBuilder(RoutingContext routingContext, String operationName) { + Tracer tracer = get(); + SpanBuilder spanBuilder; + Context parentContext = propagator().extract(Context.current(), routingContext, RCG); + if (parentContext == null) { + spanBuilder = tracer.spanBuilder(operationName); + } else { + spanBuilder = tracer.spanBuilder(operationName).setParent(parentContext); + } + return spanBuilder; + } + + @Override + public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { + SpanBuilder spanBuilder = getSpanBuilder(routingContext, operationName); + return new OTelSpanBuilderHandle<>(spanBuilder); + } + + private static TextMapPropagator propagator() { + return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + } + + private static final TextMapGetter RCG = new TextMapGetter() { + @Override + public Iterable keys(RoutingContext rc) { + return rc.request().headers().names(); + } + + @Nullable + @Override + public String get(@Nullable RoutingContext rc, String key) { + if (rc == null) { + return null; + } + return rc.request().headers().get(key); + } + }; + + private static final TextMapGetter> MG = new TextMapGetter>() { + @Override + public Iterable keys(Map map) { + return map.keySet(); + } + + @Nullable + @Override + public String get(@Nullable Map map, String key) { + return map != null ? map.get(key) : null; + } + }; + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + return buildSpan(getSpanBuilder(routingContext, operationName), routingContext); + } + + private static SpanHandle buildSpan(SpanBuilder spanBuilder, RoutingContext routingContext) { + spanBuilder.setSpanKind(SpanKind.SERVER); + setCommonAttributes(spanBuilder, routingContext); + return new OTelSpanHandle<>(spanBuilder.startSpan()); + } + + private static class OTelSpanBuilderHandle implements SpanBuilderHandle { + private final SpanBuilder spanBuilder; + + public OTelSpanBuilderHandle(SpanBuilder spanBuilder) { + this.spanBuilder = spanBuilder; + } + + @Override + public void addRef(Map headers) { + Context parentContext = propagator().extract(Context.current(), headers, MG); + if (parentContext != null) { + Span parentSpan = Span.fromContext(parentContext); + SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null; + if (psc != null) { + spanBuilder.addLink(psc); + } + } + } + + @Override + public SpanHandle span(RoutingContext routingContext) { + return buildSpan(spanBuilder, routingContext); + } + } + + @Override + public void kafkaConsumerConfig(Properties props) { + props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + } + + @Override + public void kafkaProducerConfig(Properties props) { + props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + } + + private static final class OTelSpanHandle implements SpanHandle { + private final Span span; + private final Scope scope; + + public OTelSpanHandle(Span span) { + this.span = span; + this.scope = span.makeCurrent(); + } + + @Override + public void inject(KafkaProducerRecord record) { + propagator().inject(Context.current(), record, KafkaProducerRecord::addHeader); + } + + @Override + public void inject(RoutingContext routingContext) { + propagator().inject(Context.current(), routingContext, (rc, key, value) -> rc.response().headers().add(key, value)); + } + + @Override + public void finish(int code) { + try { + span.setAttribute(SemanticAttributes.HTTP_STATUS_CODE, code); + span.setStatus(code == HttpResponseStatus.OK.code() ? StatusCode.OK : StatusCode.ERROR); + scope.close(); + } finally { + span.end(); + } + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java new file mode 100644 index 000000000..20c6ba1ad --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java @@ -0,0 +1,190 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.jaegertracing.Configuration; +import io.opentracing.References; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.contrib.kafka.TracingConsumerInterceptor; +import io.opentracing.contrib.kafka.TracingProducerInterceptor; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMap; +import io.opentracing.propagation.TextMapAdapter; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.KAFKA_SERVICE; + +/** + * OpenTracing implementation of TracingHandle. + */ +class OpenTracingHandle implements TracingHandle { + + static void setCommonTags(Span span, RoutingContext routingContext) { + Tags.COMPONENT.set(span, COMPONENT); + Tags.PEER_SERVICE.set(span, KAFKA_SERVICE); + Tags.HTTP_METHOD.set(span, routingContext.request().method().name()); + Tags.HTTP_URL.set(span, routingContext.request().uri()); + } + + @Override + public String envName() { + return Configuration.JAEGER_SERVICE_NAME; + } + + @Override + public String serviceName(BridgeConfig config) { + String serviceName = System.getenv(envName()); + if (serviceName == null) { + serviceName = config.getTracingServiceName(); + } + return serviceName; + } + + @Override + public void initialize() { + Tracer tracer = Configuration.fromEnv().getTracer(); + GlobalTracer.registerIfAbsent(tracer); + } + + @Override + public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { + return new OTSpanBuilderHandle<>(getSpanBuilder(routingContext, operationName)); + } + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + Tracer.SpanBuilder spanBuilder = getSpanBuilder(routingContext, operationName); + return buildSpan(spanBuilder, routingContext); + } + + private Tracer.SpanBuilder getSpanBuilder(RoutingContext rc, String operationName) { + Tracer tracer = GlobalTracer.get(); + Tracer.SpanBuilder spanBuilder; + SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new RequestTextMap(rc.request())); + if (parentSpan == null) { + spanBuilder = tracer.buildSpan(operationName); + } else { + spanBuilder = tracer.buildSpan(operationName).asChildOf(parentSpan); + } + return spanBuilder; + } + + private static class RequestTextMap implements TextMap { + private final HttpServerRequest request; + + public RequestTextMap(HttpServerRequest request) { + this.request = request; + } + + @Override + public Iterator> iterator() { + return request.headers().iterator(); + } + + @Override + public void put(String key, String value) { + request.headers().add(key, value); + } + } + + private static SpanHandle buildSpan(Tracer.SpanBuilder spanBuilder, RoutingContext routingContext) { + Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); + setCommonTags(span, routingContext); + return new OTSpanHandle(span); + } + + private static class OTSpanBuilderHandle implements SpanBuilderHandle { + private final Tracer.SpanBuilder spanBuilder; + + public OTSpanBuilderHandle(Tracer.SpanBuilder spanBuilder) { + this.spanBuilder = spanBuilder; + } + + @Override + public void addRef(Map headers) { + Tracer tracer = GlobalTracer.get(); + SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); + if (parentSpan != null) { + spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); + } + } + + @Override + public SpanHandle span(RoutingContext routingContext) { + return buildSpan(spanBuilder, routingContext); + } + } + + @Override + public void kafkaConsumerConfig(Properties props) { + props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + } + + @Override + public void kafkaProducerConfig(Properties props) { + props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + } + + private static final class OTSpanHandle implements SpanHandle { + private final Span span; + + public OTSpanHandle(Span span) { + this.span = span; + } + + @Override + public void inject(KafkaProducerRecord record) { + Tracer tracer = GlobalTracer.get(); + tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { + @Override + public void put(String key, String value) { + record.addHeader(key, value); + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); + } + }); + } + + @Override + public void inject(RoutingContext routingContext) { + Tracer tracer = GlobalTracer.get(); + tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { + @Override + public void put(String key, String value) { + routingContext.response().headers().add(key, value); + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); + } + }); + } + + @Override + public void finish(int code) { + Tags.HTTP_STATUS.set(span, code); + span.finish(); + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java new file mode 100644 index 000000000..daf2f62ab --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java @@ -0,0 +1,18 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.ext.web.RoutingContext; + +import java.util.Map; + +/** + * Simple SpanBuilder handle. + */ +public interface SpanBuilderHandle { + void addRef(Map headers); + SpanHandle span(RoutingContext routingContext); +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java new file mode 100644 index 000000000..7f1ed468e --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java @@ -0,0 +1,18 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; + +/** + * Simple Span handle. + */ +public interface SpanHandle { + void inject(KafkaProducerRecord record); + void inject(RoutingContext routingContext); + void finish(int code); +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java new file mode 100644 index 000000000..2bea1ecc6 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java @@ -0,0 +1,23 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +/** + * Tracing constants. + */ +public final class TracingConstants { + public static final String COMPONENT = "strimzi-kafka-bridge"; + public static final String KAFKA_SERVICE = "kafka"; + + public static final String JAEGER = "jaeger"; + + public static final String JAEGER_OPENTRACING = JAEGER; + public static final String JAEGER_OPENTELEMETRY = JAEGER + "-otel"; + + public static final String OPENTELEMETRY_SERVICE_NAME_ENV_KEY = "OTEL_SERVICE_NAME"; + public static final String OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY = "otel.service.name"; + public static final String OPENTELEMETRY_TRACES_EXPORTER_KEY = "otel.traces.exporter"; +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java new file mode 100644 index 000000000..3ab981b16 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java @@ -0,0 +1,26 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; + +import java.util.Properties; + +/** + * Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry. + */ +public interface TracingHandle { + String envName(); + String serviceName(BridgeConfig config); + void initialize(); + + SpanBuilderHandle builder(RoutingContext routingContext, String operationName); + SpanHandle span(RoutingContext routingContext, String operationName); + + void kafkaConsumerConfig(Properties props); + void kafkaProducerConfig(Properties props); +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java new file mode 100644 index 000000000..a335e01a5 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java @@ -0,0 +1,110 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER_OPENTELEMETRY; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER_OPENTRACING; + +/** + * Tracing util to hold app's Tracing instance. + */ +public class TracingUtil { + private static final Logger log = LoggerFactory.getLogger(TracingUtil.class); + private static TracingHandle tracing = new NoopTracing(); + + public static TracingHandle getTracing() { + return tracing; + } + + public static void initialize(BridgeConfig config) { + String tracingConfig = config.getTracing(); + if (tracingConfig != null && (tracingConfig.equals(JAEGER_OPENTRACING) || tracingConfig.equals(JAEGER_OPENTELEMETRY))) { + boolean isOpenTelemetry = JAEGER_OPENTELEMETRY.equals(tracingConfig); + TracingHandle instance = isOpenTelemetry ? new OpenTelemetryHandle() : new OpenTracingHandle(); + + String serviceName = instance.serviceName(config); + if (serviceName != null) { + log.info( + "Initializing Jaeger ({}) tracingConfig with service name {}", + isOpenTelemetry ? "OpenTelemetry" : "OpenTracing", + serviceName + ); + instance.initialize(); + tracing = instance; + } else { + log.error("Jaeger tracingConfig cannot be initialized because {} environment variable is not defined", instance.envName()); + } + } + } + + private static final class NoopTracing implements TracingHandle { + @Override + public String envName() { + return null; + } + + @Override + public String serviceName(BridgeConfig config) { + return null; + } + + @Override + public void initialize() { + } + + @Override + public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { + return new NoopSpanBuilderHandle<>(); + } + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + return new NoopSpanHandle<>(); + } + + @Override + public void kafkaConsumerConfig(Properties props) { + } + + @Override + public void kafkaProducerConfig(Properties props) { + } + } + + private static final class NoopSpanBuilderHandle implements SpanBuilderHandle { + @Override + public void addRef(Map headers) { + } + + @Override + public SpanHandle span(RoutingContext routingContext) { + return new NoopSpanHandle<>(); + } + } + + private static final class NoopSpanHandle implements SpanHandle { + @Override + public void inject(KafkaProducerRecord record) { + } + + @Override + public void inject(RoutingContext routingContext) { + } + + @Override + public void finish(int code) { + } + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java new file mode 100644 index 000000000..047368b13 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java @@ -0,0 +1,25 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.core.tracing.TracingOptions; +import io.vertx.tracing.opentelemetry.OpenTelemetryOptions; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_TRACES_EXPORTER_KEY; + +/** + * OpenTelemetry tests + */ +public class OpenTelemetryTest extends TracingTestBase { + @Override + protected TracingOptions tracingOptions() { + System.setProperty(OPENTELEMETRY_TRACES_EXPORTER_KEY, JAEGER); + System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, "strimzi-kafka-bridge-test"); + return new OpenTelemetryOptions(); + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java b/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java new file mode 100644 index 000000000..c149d376c --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java @@ -0,0 +1,98 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.strimzi.kafka.bridge.BridgeContentType; +import io.strimzi.kafka.bridge.http.services.ProducerService; +import io.strimzi.kafka.bridge.utils.Urls; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.core.tracing.TracingOptions; +import io.vertx.core.tracing.TracingPolicy; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * Base for OpenTracing and OpenTelemetry (manual) tests. + *

+ * Test will only run if the bridge AND tracing server are up-n-running. + */ +@ExtendWith(VertxExtension.class) +public abstract class TracingTestBase { + Logger log = LoggerFactory.getLogger(getClass()); + + private void assumeServer(String url) { + try { + new URL(url).openConnection().getInputStream(); + } catch (Exception e) { + log.info("Cannot connect to server", e); + Assumptions.assumeTrue(false, "Server is not running: " + url); + } + } + + Handler>> verifyOK(VertxTestContext context) { + return ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + }); + context.completeNow(); + }; + } + + @BeforeEach + public void setUp() { + assumeServer(String.format("http://%s:%s", Urls.BRIDGE_HOST, Urls.BRIDGE_PORT)); // bridge + assumeServer("http://localhost:16686"); // jaeger + } + + protected abstract TracingOptions tracingOptions(); + + @Test + public void testSmoke(VertxTestContext context) { + Vertx vertx = Vertx.vertx(new VertxOptions().setTracingOptions(tracingOptions())); + + WebClient client = WebClient.create(vertx, (WebClientOptions) new WebClientOptions() + .setDefaultHost(Urls.BRIDGE_HOST) + .setDefaultPort(Urls.BRIDGE_PORT) + .setTracingPolicy(TracingPolicy.ALWAYS) + ); + + String value = "message-value"; + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + ProducerService.getInstance(client) + .sendRecordsRequest("mytopic", root, BridgeContentType.KAFKA_JSON_JSON) + .sendJsonObject(root, verifyOK(context)); + } +}