diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/ITKafkaTracing.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/ITKafkaTracing.java index 01bb673fb..fe484da80 100644 --- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/ITKafkaTracing.java +++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/ITKafkaTracing.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 The OpenZipkin Authors + * Copyright 2013-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/instrumentation/kafka-streams/RATIONALE.md b/instrumentation/kafka-streams/RATIONALE.md index c5c73377a..30b51db59 100644 --- a/instrumentation/kafka-streams/RATIONALE.md +++ b/instrumentation/kafka-streams/RATIONALE.md @@ -8,24 +8,13 @@ Typically, there are at least two spans involved in traces produces by a Kafka S By receiving records in a Kafka Streams application with Tracing enabled, the span created, once a record is received, will inject the span context on the headers of the Record, and it will get propagated downstream onto the Stream topology. The span context is stored in the Record Headers, -the Producers at the middle (e.g. `builder.through(topic)`) or at the end of a Stream topology +the Producers at the middle (e.g. `builder.through(topic)` or `builder.repartition()`) or at the end of a Stream topology will reference the initial span, and mark the end of a Stream Process. -If intermediate steps on the Stream topology require tracing, `TracingProcessorSupplier` and -`TracingTransformerSupplier` record execution into a new Span, +If intermediate steps on the Stream topology require tracing, `TracingV2ProcessorSupplier` and +`TracingV2FixedKeyProcessorSupplier` record execution into a new Span, referencing the parent context stored on Headers, if available. -### Transformers and Partitioning - -The behaviour of some operations wrapped into Kafka Streams Processor API types could change the underlying topology. - -For example, `filter` operation on the Kafka Streams DSL is stateless and doesn't impact partitioning; -but `kafkaStreamsTracing.filter()` returns a `Transformer` that if grouping or joining operations -follows, it could lead to **unintentional partitioning**. - -Be aware operations that any usage of `builder.transformer(...)` will cause re-partitioning when -grouping or joining downstream ([Kafka docs](https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration)). - ### Why doesn't this trace all Kafka Streams operations? When starting to design this instrumentation, “trace everything” was the first idea: @@ -43,5 +32,18 @@ traces would be harder to understand, leading to requests to disable tracing. Th code involved to disable tracing may mean more code than visa versa! Given the current scenario, `KafkaStreamsTracing` is equipped with a set of common DSL operation wrapped as -Processors/Transformers APIs that enable tracing when needed; +Processor APIs that enable tracing when needed; apart from `poll` and `send` spans available out-of-the-box. + +### Transformers and Partitioning (Kafka Streams < v3.4.0) + +The behaviour of some operations wrapped into Kafka Streams Processor API types could change the underlying topology. + +For example, `filter` operation on the Kafka Streams DSL is stateless and doesn't impact partitioning; +but `kafkaStreamsTracing.filter()` returns a `Transformer` that if grouping or joining operations +follows, it could lead to **unintentional partitioning**. + +Be aware operations that any usage of `builder.transformer(...)` will cause re-partitioning when +grouping or joining downstream ([Kafka docs](https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration)). + + diff --git a/instrumentation/kafka-streams/README.md b/instrumentation/kafka-streams/README.md index f9b5fe2c0..9fc36dffd 100644 --- a/instrumentation/kafka-streams/README.md +++ b/instrumentation/kafka-streams/README.md @@ -18,6 +18,32 @@ import brave.kafka.streams.KafkaStreamsTracing; kafkaStreamsTracing = KafkaStreamsTracing.create(tracing); ``` +[KIP-820](https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API) +introduces new processor APIs to the Kafka Streams DSL. +The following sections show how to instrument applications with the latest and previous APIs. + +## Kafka Streams >= v3.4.0 + +To trace a processor in your application use the `TracingV2ProcessorSupplier`, provided by instrumentation API: + +```java +builder.stream(inputTopic) + .process(kafkaStreamsTracing.process( + "process", + customProcessor)); +``` + +or the `TracingV2FixedKeyProcessorSupplier`, provided by instrumentation API: + +```java +builder.stream(inputTopic) + .processValues(kafkaStreamsTracing.processValues( + "process", + customProcessor)); +``` + +## Kafka Streams < v3.4.0 + To trace a processor in your application use `TracingProcessorSupplier`, provided by instrumentation API: ```java diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java index 0697b2685..6e14987d0 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 The OpenZipkin Authors + * Copyright 2013-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -46,6 +46,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ProcessingContext; /** Use this class to decorate Kafka Stream Topologies and enable Tracing. */ public final class KafkaStreamsTracing { @@ -135,7 +136,10 @@ public KafkaStreams kafkaStreams(Topology topology, Properties streamsConfig) { * } * * @see TracingKafkaClientSupplier + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ProcessorSupplier processor(String spanName, ProcessorSupplier processorSupplier) { return new TracingProcessorSupplier<>(this, spanName, processorSupplier); @@ -151,7 +155,10 @@ public ProcessorSupplier processor(String spanName, * .transform(kafkaStreamsTracing.transformer("my-transformer", myTransformerSupplier) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public TransformerSupplier transformer(String spanName, TransformerSupplier transformerSupplier) { return new TracingTransformerSupplier<>(this, spanName, transformerSupplier); @@ -167,7 +174,10 @@ public TransformerSupplier transformer(String spanName, * .transformValues(kafkaStreamsTracing.valueTransformer("my-transformer", myTransformerSupplier) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ValueTransformerSupplier valueTransformer(String spanName, ValueTransformerSupplier valueTransformerSupplier) { return new TracingValueTransformerSupplier<>(this, spanName, valueTransformerSupplier); @@ -183,7 +193,10 @@ public ValueTransformerSupplier valueTransformer(String spanName, * .transformValues(kafkaStreamsTracing.valueTransformerWithKey("my-transformer", myTransformerSupplier) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ValueTransformerWithKeySupplier valueTransformerWithKey( String spanName, ValueTransformerWithKeySupplier valueTransformerWithKeySupplier) { @@ -201,7 +214,10 @@ public ValueTransformerWithKeySupplier valueTransformerWith * builder.stream(inputTopic) * .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ProcessorSupplier foreach(String spanName, ForeachAction action) { return new TracingProcessorSupplier<>(this, spanName, () -> new AbstractProcessor() { @@ -222,7 +238,10 @@ public ProcessorSupplier foreach(String spanName, ForeachAction ...) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ValueTransformerWithKeySupplier peek(String spanName, ForeachAction action) { return new TracingValueTransformerWithKeySupplier<>(this, spanName, () -> @@ -252,7 +271,10 @@ public ValueTransformerWithKeySupplier peek(String spanName, * .transform(kafkaStreamsTracing.mark("end-complex-transformation") * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ValueTransformerWithKeySupplier mark(String spanName) { return new TracingValueTransformerWithKeySupplier<>(this, spanName, () -> new AbstractTracingValueTransformerWithKey() { @@ -273,7 +295,10 @@ public ValueTransformerWithKeySupplier mark(String spanName) { * .transform(kafkaStreamsTracing.map("myMap", (k, v) -> ...) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public TransformerSupplier> map(String spanName, KeyValueMapper> mapper) { return new TracingTransformerSupplier<>(this, spanName, () -> @@ -295,7 +320,10 @@ public TransformerSupplier> map(String spa * .flatTransform(kafkaStreamsTracing.flatMap("myflatMap", (k, v) -> ...) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public TransformerSupplier>> flatMap( String spanName, KeyValueMapper>> mapper) { @@ -324,7 +352,10 @@ public TransformerSupplier>> flat * .transform(kafkaStreamsTracing.filter("myFilter", (k, v) -> ...) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public TransformerSupplier> filter(String spanName, Predicate predicate) { return new TracingFilterTransformerSupplier<>(this, spanName, predicate, false); @@ -345,7 +376,10 @@ public TransformerSupplier> filter(String spanName, * .transform(kafkaStreamsTracing.filterNot("myFilter", (k, v) -> ...) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public TransformerSupplier> filterNot(String spanName, Predicate predicate) { return new TracingFilterTransformerSupplier<>(this, spanName, predicate, true); @@ -370,7 +404,10 @@ public TransformerSupplier> filterNot(String spanNam * .filterNot((k, v) -> Objects.isNull(v)) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ValueTransformerWithKeySupplier markAsFiltered(String spanName, Predicate predicate) { return new TracingFilterValueTransformerWithKeySupplier<>(this, spanName, predicate, false); @@ -395,7 +432,10 @@ public ValueTransformerWithKeySupplier markAsFiltered(String spa * .filterNot((k, v) -> Objects.isNull(v)) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ValueTransformerWithKeySupplier markAsNotFiltered(String spanName, Predicate predicate) { return new TracingFilterValueTransformerWithKeySupplier<>(this, spanName, predicate, true); @@ -412,7 +452,10 @@ public ValueTransformerWithKeySupplier markAsNotFiltered(String * .transformValues(kafkaStreamsTracing.mapValues("myMapValues", (k, v) -> ...) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ValueTransformerWithKeySupplier mapValues(String spanName, ValueMapperWithKey mapper) { return new TracingValueTransformerWithKeySupplier<>(this, spanName, () -> @@ -434,7 +477,10 @@ public ValueTransformerWithKeySupplier mapValues(String spa * .transformValues(kafkaStreamsTracing.mapValues("myMapValues", v -> ...) * .to(outputTopic); * } + * @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or + * {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead. */ + @Deprecated public ValueTransformerSupplier mapValues(String spanName, ValueMapper mapper) { return new TracingValueTransformerSupplier<>(this, spanName, () -> @@ -445,11 +491,50 @@ public ValueTransformerSupplier mapValues(String spanName, }); } + /** + * Create a tracing-decorated {@link org.apache.kafka.streams.processor.api.ProcessorSupplier} + * + *

Simple example using Kafka Streams DSL: + *

{@code
+   * StreamsBuilder builder = new StreamsBuilder();
+   * builder.stream(inputTopic)
+   *        .process(kafkaStreamsTracing.process("my-processor", myProcessorSupplier);
+   * }
+ * + * @see TracingKafkaClientSupplier + */ + public org.apache.kafka.streams.processor.api.ProcessorSupplier process(String spanName, + org.apache.kafka.streams.processor.api.ProcessorSupplier processorSupplier) { + return new TracingV2ProcessorSupplier<>(this, spanName, processorSupplier); + } + + /** + * Create a tracing-decorated {@link org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier} + * + *

Simple example using Kafka Streams DSL: + *

{@code
+   * StreamsBuilder builder = new StreamsBuilder();
+   * builder.stream(inputTopic)
+   *        .processValues(kafkaStreamsTracing.processValues("my-processor", myFixedKeyProcessorSupplier);
+   * }
+ * + * @see TracingKafkaClientSupplier + */ + public org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier processValues(String spanName, + org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier processorSupplier) { + return new TracingV2FixedKeyProcessorSupplier<>(this, spanName, processorSupplier); + } + static void addTags(ProcessorContext processorContext, SpanCustomizer result) { result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, processorContext.applicationId()); result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processorContext.taskId().toString()); } + static void addTags(org.apache.kafka.streams.processor.api.ProcessingContext processingContext, SpanCustomizer result) { + result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, processingContext.applicationId()); + result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processingContext.taskId().toString()); + } + Span nextSpan(ProcessorContext context) { TraceContextOrSamplingFlags extracted = extractor.extract(context.headers()); // Clear any propagation keys present in the headers @@ -463,6 +548,19 @@ Span nextSpan(ProcessorContext context) { return result; } + Span nextSpan(ProcessingContext context, Headers headers) { + TraceContextOrSamplingFlags extracted = extractor.extract(headers); + // Clear any propagation keys present in the headers + if (!extracted.equals(emptyExtraction)) { + clearHeaders(headers); + } + Span result = tracer.nextSpan(extracted); + if (!result.isNoop()) { + addTags(context, result); + } + return result; + } + // We can't just skip clearing headers we use because we might inject B3 single, yet have stale B3 // multi, or visa versa. void clearHeaders(Headers headers) { diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessor.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessor.java new file mode 100644 index 000000000..2e49f250d --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessor.java @@ -0,0 +1,79 @@ +/* + * Copyright 2013-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.kafka.streams; + +import brave.Span; +import brave.Tracer; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; + +import static brave.internal.Throwables.propagateIfFatal; + +/* + * Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes + * and those that implement the new kafka streams API introduced in version 3.4.0 + */ +class TracingV2FixedKeyProcessor implements FixedKeyProcessor { + final KafkaStreamsTracing kafkaStreamsTracing; + final Tracer tracer; + final String spanName; + final FixedKeyProcessor delegateProcessor; + + FixedKeyProcessorContext processorContext; + + TracingV2FixedKeyProcessor(KafkaStreamsTracing kafkaStreamsTracing, + String spanName, FixedKeyProcessor delegateProcessor) { + this.kafkaStreamsTracing = kafkaStreamsTracing; + this.tracer = kafkaStreamsTracing.tracer; + this.spanName = spanName; + this.delegateProcessor = delegateProcessor; + } + + @Override + public void init(FixedKeyProcessorContext context) { + this.processorContext = context; + delegateProcessor.init(processorContext); + } + + @Override + public void process(FixedKeyRecord record) { + Span span = kafkaStreamsTracing.nextSpan(processorContext, record.headers()); + if (!span.isNoop()) { + span.name(spanName); + span.start(); + } + + Tracer.SpanInScope ws = tracer.withSpanInScope(span); + Throwable error = null; + try { + delegateProcessor.process(record); + } catch (Throwable e) { + error = e; + propagateIfFatal(e); + throw e; + } finally { + // Inject this span so that the next stage uses it as a parent + kafkaStreamsTracing.injector.inject(span.context(), record.headers()); + if (error != null) span.error(error); + span.finish(); + ws.close(); + } + } + + @Override + public void close() { + delegateProcessor.close(); + } +} diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessorSupplier.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessorSupplier.java new file mode 100644 index 000000000..d2cbe7001 --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessorSupplier.java @@ -0,0 +1,40 @@ +/* + * Copyright 2013-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.kafka.streams; + +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; + +/* + * Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes + * and those that implement the new kafka streams API introduced in version 3.4.0 + */ +class TracingV2FixedKeyProcessorSupplier implements FixedKeyProcessorSupplier { + final KafkaStreamsTracing kafkaStreamsTracing; + final String spanName; + final FixedKeyProcessorSupplier delegateProcessorSupplier; + + TracingV2FixedKeyProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing, + String spanName, + FixedKeyProcessorSupplier processorSupplier) { + this.kafkaStreamsTracing = kafkaStreamsTracing; + this.spanName = spanName; + this.delegateProcessorSupplier = processorSupplier; + } + + /** This wraps process method to enable tracing. */ + @Override public FixedKeyProcessor get() { + return new TracingV2FixedKeyProcessor<>(kafkaStreamsTracing, spanName, delegateProcessorSupplier.get()); + } +} diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2Processor.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2Processor.java new file mode 100644 index 000000000..683da2853 --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2Processor.java @@ -0,0 +1,79 @@ +/* + * Copyright 2013-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.kafka.streams; + +import brave.Span; +import brave.Tracer; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; + +import static brave.internal.Throwables.propagateIfFatal; + +/* + * Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes + * and those that implement the new kafka streams API introduced in version 3.4.0 + */ +class TracingV2Processor implements Processor { + final KafkaStreamsTracing kafkaStreamsTracing; + final Tracer tracer; + final String spanName; + final Processor delegateProcessor; + + ProcessorContext processorContext; + + TracingV2Processor(KafkaStreamsTracing kafkaStreamsTracing, + String spanName, Processor delegateProcessor) { + this.kafkaStreamsTracing = kafkaStreamsTracing; + this.tracer = kafkaStreamsTracing.tracer; + this.spanName = spanName; + this.delegateProcessor = delegateProcessor; + } + + @Override + public void init(ProcessorContext context) { + this.processorContext = context; + delegateProcessor.init(processorContext); + } + + @Override + public void process(Record record) { + Span span = kafkaStreamsTracing.nextSpan(processorContext, record.headers()); + if (!span.isNoop()) { + span.name(spanName); + span.start(); + } + + Tracer.SpanInScope ws = tracer.withSpanInScope(span); + Throwable error = null; + try { + delegateProcessor.process(record); + } catch (Throwable e) { + error = e; + propagateIfFatal(e); + throw e; + } finally { + // Inject this span so that the next stage uses it as a parent + kafkaStreamsTracing.injector.inject(span.context(), record.headers()); + if (error != null) span.error(error); + span.finish(); + ws.close(); + } + } + + @Override + public void close() { + delegateProcessor.close(); + } +} diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2ProcessorSupplier.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2ProcessorSupplier.java new file mode 100644 index 000000000..bc96efd7a --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2ProcessorSupplier.java @@ -0,0 +1,40 @@ +/* + * Copyright 2013-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.kafka.streams; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; + +/* + * Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes + * and those that implement the new kafka streams API introduced in version 3.4.0 + */ +class TracingV2ProcessorSupplier implements ProcessorSupplier { + final KafkaStreamsTracing kafkaStreamsTracing; + final String spanName; + final ProcessorSupplier delegateProcessorSupplier; + + TracingV2ProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing, + String spanName, + ProcessorSupplier processorSupplier) { + this.kafkaStreamsTracing = kafkaStreamsTracing; + this.spanName = spanName; + this.delegateProcessorSupplier = processorSupplier; + } + + /** This wraps process method to enable tracing. */ + @Override public Processor get() { + return new TracingV2Processor<>(kafkaStreamsTracing, spanName, delegateProcessorSupplier.get()); + } +} diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java index 08217faf8..d50ff9aa8 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 The OpenZipkin Authors + * Copyright 2013-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -240,6 +240,78 @@ public void process(String key, String value) { streams.cleanUp(); } + @Test + public void should_create_spans_from_stream_with_tracing_v2_processor() { + org.apache.kafka.streams.processor.api.ProcessorSupplier processorSupplier = + kafkaStreamsTracing.process( + "forward-1", () -> + record -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + String inputTopic = testName.getMethodName() + "-input"; + + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) + .process(processorSupplier); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)); + + waitForStreamToRun(streams); + + MutableSpan spanInput = testSpanHandler.takeRemoteSpan(CONSUMER); + assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic); + + MutableSpan spanProcessor = testSpanHandler.takeLocalSpan(); + assertChildOf(spanProcessor, spanInput); + + streams.close(); + streams.cleanUp(); + } + + @Test + public void should_create_spans_from_stream_with_tracing_v2_fixed_key_processor() { + org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier processorSupplier = + kafkaStreamsTracing.processValues( + "forward-1", () -> + record -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + String inputTopic = testName.getMethodName() + "-input"; + + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) + .processValues(processorSupplier); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)); + + waitForStreamToRun(streams); + + MutableSpan spanInput = testSpanHandler.takeRemoteSpan(CONSUMER); + assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic); + + MutableSpan spanProcessor = testSpanHandler.takeLocalSpan(); + assertChildOf(spanProcessor, spanInput); + + streams.close(); + streams.cleanUp(); + } + @Test public void should_create_spans_from_stream_with_tracing_filter_predicate_true() { String inputTopic = testName.getMethodName() + "-input"; diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTest.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTest.java index 41e1af49e..bdb2715c9 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTest.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 The OpenZipkin Authors + * Copyright 2013-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,6 +21,7 @@ import brave.propagation.TraceContext; import brave.test.TestSpanHandler; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Transformer; @@ -33,6 +34,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.Record; import static brave.test.ITRemote.BAGGAGE_FIELD; import static brave.test.ITRemote.BAGGAGE_FIELD_KEY; @@ -71,6 +73,15 @@ class KafkaStreamsTest { return processorContext; }; + Supplier> processorV2ContextSupplier = + () -> + { + org.apache.kafka.streams.processor.api.ProcessorContext processorContext = mock(org.apache.kafka.streams.processor.api.ProcessorContext.class); + when(processorContext.applicationId()).thenReturn(TEST_APPLICATION_ID); + when(processorContext.taskId()).thenReturn(new TaskId(0, 0)); + return processorContext; + }; + ProcessorSupplier fakeProcessorSupplier = kafkaStreamsTracing.processor( "forward-1", () -> @@ -81,6 +92,21 @@ public void process(String key, String value) { } }); + org.apache.kafka.streams.processor.api.ProcessorSupplier fakeV2ProcessorSupplier = + kafkaStreamsTracing.process( + "forward-1", () -> + new org.apache.kafka.streams.processor.api.Processor() { + org.apache.kafka.streams.processor.api.ProcessorContext context; + @Override + public void init(org.apache.kafka.streams.processor.api.ProcessorContext context) { + this.context = context; + } + @Override + public void process(Record record) { + context.forward(record); + } + }); + TransformerSupplier> fakeTransformerSupplier = kafkaStreamsTracing.transformer( "transformer-1", () -> diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTracingTest.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTracingTest.java index 48a05080c..a804b96f6 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTracingTest.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTracingTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 The OpenZipkin Authors + * Copyright 2013-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,7 +25,9 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; import org.junit.Test; +import java.util.Date; import static brave.test.ITRemote.BAGGAGE_FIELD; import static brave.test.ITRemote.BAGGAGE_FIELD_KEY; @@ -46,12 +48,31 @@ public void nextSpan_uses_current_context() { .isEqualTo(parent.spanIdString()); } + @Test + public void nextSpanWithHeaders_uses_current_context() { + org.apache.kafka.streams.processor.api.ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); + Span child; + try (Scope ws = tracing.currentTraceContext().newScope(parent)) { + child = kafkaStreamsTracing.nextSpan(fakeProcessorContext, new RecordHeaders()); + } + child.finish(); + + assertThat(child.context().parentIdString()) + .isEqualTo(parent.spanIdString()); + } + @Test public void nextSpan_should_create_span_if_no_headers() { ProcessorContext fakeProcessorContext = processorContextSupplier.apply(new RecordHeaders()); assertThat(kafkaStreamsTracing.nextSpan(fakeProcessorContext)).isNotNull(); } + @Test + public void nextSpanWithHeaders_should_create_span_if_no_headers() { + org.apache.kafka.streams.processor.api.ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); + assertThat(kafkaStreamsTracing.nextSpan(fakeProcessorContext, new RecordHeaders())).isNotNull(); + } + @Test public void nextSpan_should_tag_app_id_and_task_id() { ProcessorContext fakeProcessorContext = processorContextSupplier.apply(new RecordHeaders()); @@ -63,6 +84,17 @@ public void nextSpan_should_tag_app_id_and_task_id() { entry("kafka.streams.task.id", TEST_TASK_ID)); } + @Test + public void nextSpanWithHeaders_should_tag_app_id_and_task_id() { + org.apache.kafka.streams.processor.api.ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); + kafkaStreamsTracing.nextSpan(fakeProcessorContext, new RecordHeaders()).start().finish(); + + assertThat(spans.get(0).tags()) + .containsOnly( + entry("kafka.streams.application.id", TEST_APPLICATION_ID), + entry("kafka.streams.task.id", TEST_TASK_ID)); + } + @Test public void processorSupplier_should_tag_app_id_and_task_id() { Processor processor = fakeProcessorSupplier.get(); @@ -75,6 +107,18 @@ public void processorSupplier_should_tag_app_id_and_task_id() { entry("kafka.streams.task.id", TEST_TASK_ID)); } + @Test + public void newProcessorSupplier_should_tag_app_id_and_task_id() { + org.apache.kafka.streams.processor.api.Processor processor = fakeV2ProcessorSupplier.get(); + processor.init(processorV2ContextSupplier.get()); + processor.process(new Record<>(TEST_KEY, TEST_VALUE, new Date().getTime())); + + assertThat(spans.get(0).tags()) + .containsOnly( + entry("kafka.streams.application.id", TEST_APPLICATION_ID), + entry("kafka.streams.task.id", TEST_TASK_ID)); + } + @Test public void processorSupplier_should_add_baggage_field() { ProcessorSupplier processorSupplier = @@ -92,6 +136,19 @@ public void process(String key, String value) { processor.process(TEST_KEY, TEST_VALUE); } + @Test + public void newProcessorSupplier_should_add_baggage_field() { + org.apache.kafka.streams.processor.api.ProcessorSupplier processorSupplier = + kafkaStreamsTracing.process( + "forward-1", () -> + (org.apache.kafka.streams.processor.api.Processor) record -> + assertThat(BAGGAGE_FIELD.getValue(currentTraceContext.get())).isEqualTo("user1")); + Headers headers = new RecordHeaders().add(BAGGAGE_FIELD_KEY, "user1".getBytes()); + org.apache.kafka.streams.processor.api.Processor processor = processorSupplier.get(); + processor.init(processorV2ContextSupplier.get()); + processor.process(new Record<>(TEST_KEY, TEST_VALUE, new Date().getTime(), headers)); + } + @Test public void transformSupplier_should_tag_app_id_and_task_id() { Transformer> processor = fakeTransformerSupplier.get(); diff --git a/pom.xml b/pom.xml index dd328a6a7..1e41f5b51 100755 --- a/pom.xml +++ b/pom.xml @@ -101,7 +101,7 @@ 11.0.11 5.0.0 - 3.2.3 + 3.4.0 5.16.0 2.26.0 2.3.2 @@ -126,7 +126,7 @@ 3.6.28 2.33 - 4.2.3 + 4.2.4 1.15.1 ${skipTests}