Skip to content

Commit

Permalink
Fix relationship handling.
Browse files Browse the repository at this point in the history
Signed-off-by: Ales Justin <ales.justin@gmail.com>
  • Loading branch information
alesj committed Jul 5, 2022
1 parent 896f12a commit 703bff5
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 70 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-common</artifactId>
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public static void main(String[] args) {
.setConfig(new JsonObject().put("raw-data", true));

ConfigRetrieverOptions options = new ConfigRetrieverOptions()
.addStore(fileStore)
.addStore(envStore);
.addStore(fileStore)
.addStore(envStore);

ConfigRetriever retriever = ConfigRetriever.create(vertx, options);
retriever.getConfig(ar -> {
Expand Down Expand Up @@ -125,14 +125,14 @@ public static void main(String[] args) {
}
}

// register Jaeger tracer - if set, etc
// register tracing - if set, etc
TracingUtil.initialize(bridgeConfig);

// when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well,
// so no need for a standalone embedded HTTP server
if (!bridgeConfig.getHttpConfig().isEnabled()) {
EmbeddedHttpServer embeddedHttpServer =
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
embeddedHttpServer.start();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.kafka.client.producer.KafkaHeader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;

Expand Down Expand Up @@ -272,18 +271,12 @@ private void doPoll(RoutingContext routingContext) {

TracingHandle tracing = TracingUtil.getTracing();
SpanBuilderHandle<K, V> builder = tracing.builder(routingContext, HttpOpenApiOperations.POLL.toString());
SpanHandle<K, V> span = builder.span(routingContext);

for (int i = 0; i < records.result().size(); i++) {
KafkaConsumerRecord<K, V> record = records.result().recordAt(i);

Map<String, String> headers = new HashMap<>();
for (KafkaHeader header : record.headers()) {
headers.put(header.key(), header.value().toString());
}

builder.addRef(headers);
tracing.handleRecordSpan(span, record);
}
SpanHandle<K, V> span = builder.span(routingContext);

span.inject(routingContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
Expand All @@ -35,11 +34,8 @@
import org.apache.kafka.common.serialization.Serializer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Map.Entry;

public class HttpSourceBridgeEndpoint<K, V> extends SourceBridgeEndpoint<K, V> {

Expand Down Expand Up @@ -89,12 +85,6 @@ public void handle(Endpoint<?> endpoint) {

boolean isAsync = Boolean.parseBoolean(routingContext.queryParams().get("async"));

MultiMap httpHeaders = routingContext.request().headers();
Map<String, String> headers = new HashMap<>();
for (Entry<String, String> header: httpHeaders.entries()) {
headers.put(header.getKey(), header.getValue());
}

String operationName = partition == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString();

TracingHandle tracing = TracingUtil.getTracing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.RoutingContext;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import java.util.Map;
Expand Down Expand Up @@ -112,6 +111,25 @@ public <K, V> SpanBuilderHandle<K, V> builder(RoutingContext routingContext, Str
return new OTelSpanBuilderHandle<>(spanBuilder);
}

@Override
public <K, V> void handleRecordSpan(SpanHandle<K, V> parentSpanHandle, KafkaConsumerRecord<K, V> record) {
String operationName = String.format("%s %s", record.topic(), MessageOperation.RECEIVE);
SpanBuilder spanBuilder = get().spanBuilder(operationName);
Context parentContext = propagator().extract(Context.current(), TracingUtil.toHeaders(record), MG);
if (parentContext != null) {
Span parentSpan = Span.fromContext(parentContext);
SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null;
if (psc != null) {
spanBuilder.addLink(psc);
}
}
spanBuilder
.setSpanKind(SpanKind.CONSUMER)
.setParent(Context.current())
.startSpan()
.end();
}

private static TextMapPropagator propagator() {
return GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
}
Expand Down Expand Up @@ -161,18 +179,6 @@ public OTelSpanBuilderHandle(SpanBuilder spanBuilder) {
this.spanBuilder = spanBuilder;
}

@Override
public void addRef(Map<String, String> headers) {
Context parentContext = propagator().extract(Context.current(), headers, MG);
if (parentContext != null) {
Span parentSpan = Span.fromContext(parentContext);
SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null;
if (psc != null) {
spanBuilder.addLink(psc);
}
}
}

@Override
public SpanHandle<K, V> span(RoutingContext routingContext) {
return buildSpan(spanBuilder, routingContext);
Expand All @@ -181,7 +187,6 @@ public SpanHandle<K, V> span(RoutingContext routingContext) {

@Override
public void kafkaConsumerConfig(Properties props) {
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
}

@Override
Expand All @@ -201,16 +206,16 @@ public OTelSpanHandle(Span span) {
@Override
public void prepare(KafkaProducerRecord<K, V> record) {
String uuid = UUID.randomUUID().toString();
spans.put(uuid, span);
record.addHeader(_UUID, uuid);
SPANS.put(uuid, span);
record.addHeader(X_UUID, uuid);
}

@Override
public void clean(KafkaProducerRecord<K, V> record) {
Optional<KafkaHeader> oh = record.headers().stream().filter(h -> h.key().equals(_UUID)).findFirst();
Optional<KafkaHeader> oh = record.headers().stream().filter(h -> h.key().equals(X_UUID)).findFirst();
oh.ifPresent(h -> {
String uuid = h.value().toString();
spans.remove(uuid);
SPANS.remove(uuid);
});
}

Expand All @@ -236,16 +241,16 @@ public void finish(int code) {
}
}

static final String _UUID = "_UUID";
static final Map<String, Span> spans = new ConcurrentHashMap<>();
static final String X_UUID = "_UUID";
static final Map<String, Span> SPANS = new ConcurrentHashMap<>();

public static class ContextAwareTracingProducerInterceptor<K, V> extends TracingProducerInterceptor<K, V> {
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
Headers headers = record.headers();
String key = Buffer.buffer(headers.lastHeader(_UUID).value()).toString();
headers.remove(_UUID);
Span span = spans.remove(key);
String key = Buffer.buffer(headers.lastHeader(X_UUID).value()).toString();
headers.remove(X_UUID);
Span span = SPANS.remove(key);
try (Scope ignored = span.makeCurrent()) {
return super.onSend(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.contrib.kafka.TracingConsumerInterceptor;
import io.opentracing.contrib.kafka.TracingKafkaUtils;
import io.opentracing.contrib.kafka.TracingProducerInterceptor;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
Expand All @@ -20,8 +20,8 @@
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.RoutingContext;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Iterator;
Expand Down Expand Up @@ -74,6 +74,20 @@ public <K, V> SpanHandle<K, V> span(RoutingContext routingContext, String operat
return buildSpan(spanBuilder, routingContext);
}

@SuppressWarnings("rawtypes")
@Override
public <K, V> void handleRecordSpan(SpanHandle<K, V> parentSpanHandle, KafkaConsumerRecord<K, V> record) {
Tracer tracer = GlobalTracer.get();
Span span = ((OTSpanHandle) parentSpanHandle).span;
Tracer.SpanBuilder spanBuilder = tracer.buildSpan(TracingKafkaUtils.FROM_PREFIX + record.topic())
.asChildOf(span).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER);
SpanContext parentSpan = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(TracingUtil.toHeaders(record)));
if (parentSpan != null) {
spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan);
}
spanBuilder.start().finish();
}

private Tracer.SpanBuilder getSpanBuilder(RoutingContext rc, String operationName) {
Tracer tracer = GlobalTracer.get();
SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new RequestTextMap(rc.request()));
Expand Down Expand Up @@ -111,15 +125,6 @@ public OTSpanBuilderHandle(Tracer.SpanBuilder spanBuilder) {
this.spanBuilder = spanBuilder;
}

@Override
public void addRef(Map<String, String> headers) {
Tracer tracer = GlobalTracer.get();
SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers));
if (parentSpan != null) {
spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan);
}
}

@Override
public SpanHandle<K, V> span(RoutingContext routingContext) {
return buildSpan(spanBuilder, routingContext);
Expand All @@ -128,7 +133,6 @@ public SpanHandle<K, V> span(RoutingContext routingContext) {

@Override
public void kafkaConsumerConfig(Properties props) {
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
}

@Override
Expand Down Expand Up @@ -162,7 +166,7 @@ public Iterator<Map.Entry<String, String>> iterator() {
@Override
public void inject(RoutingContext routingContext) {
Tracer tracer = GlobalTracer.get();
tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() {
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new TextMap() {
@Override
public void put(String key, String value) {
routingContext.response().headers().add(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

import io.vertx.ext.web.RoutingContext;

import java.util.Map;

/**
* Simple SpanBuilder handle.
* SpanBuilder handle - an abstraction over actual span builder implementation.
*/
public interface SpanBuilderHandle<K, V> {
void addRef(Map<String, String> headers);
/**
* Build span handle from underlying span builder implementation.
*
* @param routingContext Vert.x routing context
* @return the span handle
*/
SpanHandle<K, V> span(RoutingContext routingContext);
}
29 changes: 28 additions & 1 deletion src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,43 @@
import io.vertx.kafka.client.producer.KafkaProducerRecord;

/**
* Simple Span handle.
* Span handle, an abstraction over actual span implementation.
*/
public interface SpanHandle<K, V> {
/**
* Prepare Kafka producer record before async send.
*
* @param record Kafka producer record to use as payload
*/
default void prepare(KafkaProducerRecord<K, V> record) {
}

/**
* Clean Kafka producer record after async send.
*
* @param record Kafka producer record used as payload
*/
default void clean(KafkaProducerRecord<K, V> record) {
}

/**
* Inject tracing info into underlying span from Kafka producer record.
*
* @param record Kafka producer record to extract tracing info
*/
void inject(KafkaProducerRecord<K, V> record);

/**
* Inject tracing info into underlying span from Vert.x routing context.
*
* @param routingContext Vert.x routing context to extract tracing info
*/
void inject(RoutingContext routingContext);

/**
* Finish underlying span.
*
* @param code response code
*/
void finish(int code);
}
Loading

0 comments on commit 703bff5

Please sign in to comment.