Skip to content

Commit

Permalink
add support for kafka streams new processing api (#1367)
Browse files Browse the repository at this point in the history
  • Loading branch information
dermotmburke authored Apr 14, 2023
1 parent 2ef59ea commit c455de9
Show file tree
Hide file tree
Showing 12 changed files with 541 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 The OpenZipkin Authors
* Copyright 2013-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down
32 changes: 17 additions & 15 deletions instrumentation/kafka-streams/RATIONALE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,13 @@ Typically, there are at least two spans involved in traces produces by a Kafka S
By receiving records in a Kafka Streams application with Tracing enabled, the span created, once
a record is received, will inject the span context on the headers of the Record, and it will get
propagated downstream onto the Stream topology. The span context is stored in the Record Headers,
the Producers at the middle (e.g. `builder.through(topic)`) or at the end of a Stream topology
the Producers at the middle (e.g. `builder.through(topic)` or `builder.repartition()`) or at the end of a Stream topology
will reference the initial span, and mark the end of a Stream Process.

If intermediate steps on the Stream topology require tracing, `TracingProcessorSupplier` and
`TracingTransformerSupplier` record execution into a new Span,
If intermediate steps on the Stream topology require tracing, `TracingV2ProcessorSupplier` and
`TracingV2FixedKeyProcessorSupplier` record execution into a new Span,
referencing the parent context stored on Headers, if available.

### Transformers and Partitioning

The behaviour of some operations wrapped into Kafka Streams Processor API types could change the underlying topology.

For example, `filter` operation on the Kafka Streams DSL is stateless and doesn't impact partitioning;
but `kafkaStreamsTracing.filter()` returns a `Transformer` that if grouping or joining operations
follows, it could lead to **unintentional partitioning**.

Be aware operations that any usage of `builder.transformer(...)` will cause re-partitioning when
grouping or joining downstream ([Kafka docs](https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration)).

### Why doesn't this trace all Kafka Streams operations?

When starting to design this instrumentation, “trace everything” was the first idea:
Expand All @@ -43,5 +32,18 @@ traces would be harder to understand, leading to requests to disable tracing. Th
code involved to disable tracing may mean more code than visa versa!

Given the current scenario, `KafkaStreamsTracing` is equipped with a set of common DSL operation wrapped as
Processors/Transformers APIs that enable tracing when needed;
Processor APIs that enable tracing when needed;
apart from `poll` and `send` spans available out-of-the-box.

### Transformers and Partitioning (Kafka Streams < v3.4.0)

The behaviour of some operations wrapped into Kafka Streams Processor API types could change the underlying topology.

For example, `filter` operation on the Kafka Streams DSL is stateless and doesn't impact partitioning;
but `kafkaStreamsTracing.filter()` returns a `Transformer` that if grouping or joining operations
follows, it could lead to **unintentional partitioning**.

Be aware operations that any usage of `builder.transformer(...)` will cause re-partitioning when
grouping or joining downstream ([Kafka docs](https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration)).


26 changes: 26 additions & 0 deletions instrumentation/kafka-streams/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,32 @@ import brave.kafka.streams.KafkaStreamsTracing;
kafkaStreamsTracing = KafkaStreamsTracing.create(tracing);
```

[KIP-820](https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API)
introduces new processor APIs to the Kafka Streams DSL.
The following sections show how to instrument applications with the latest and previous APIs.

## Kafka Streams >= v3.4.0

To trace a processor in your application use the `TracingV2ProcessorSupplier`, provided by instrumentation API:

```java
builder.stream(inputTopic)
.process(kafkaStreamsTracing.process(
"process",
customProcessor));
```

or the `TracingV2FixedKeyProcessorSupplier`, provided by instrumentation API:

```java
builder.stream(inputTopic)
.processValues(kafkaStreamsTracing.processValues(
"process",
customProcessor));
```

## Kafka Streams < v3.4.0

To trace a processor in your application use `TracingProcessorSupplier`, provided by instrumentation API:

```java
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 The OpenZipkin Authors
* Copyright 2013-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessingContext;

/** Use this class to decorate Kafka Stream Topologies and enable Tracing. */
public final class KafkaStreamsTracing {
Expand Down Expand Up @@ -135,7 +136,10 @@ public KafkaStreams kafkaStreams(Topology topology, Properties streamsConfig) {
* }</pre>
*
* @see TracingKafkaClientSupplier
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V> ProcessorSupplier<K, V> processor(String spanName,
ProcessorSupplier<K, V> processorSupplier) {
return new TracingProcessorSupplier<>(this, spanName, processorSupplier);
Expand All @@ -151,7 +155,10 @@ public <K, V> ProcessorSupplier<K, V> processor(String spanName,
* .transform(kafkaStreamsTracing.transformer("my-transformer", myTransformerSupplier)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V, R> TransformerSupplier<K, V, R> transformer(String spanName,
TransformerSupplier<K, V, R> transformerSupplier) {
return new TracingTransformerSupplier<>(this, spanName, transformerSupplier);
Expand All @@ -167,7 +174,10 @@ public <K, V, R> TransformerSupplier<K, V, R> transformer(String spanName,
* .transformValues(kafkaStreamsTracing.valueTransformer("my-transformer", myTransformerSupplier)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <V, VR> ValueTransformerSupplier<V, VR> valueTransformer(String spanName,
ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
return new TracingValueTransformerSupplier<>(this, spanName, valueTransformerSupplier);
Expand All @@ -183,7 +193,10 @@ public <V, VR> ValueTransformerSupplier<V, VR> valueTransformer(String spanName,
* .transformValues(kafkaStreamsTracing.valueTransformerWithKey("my-transformer", myTransformerSupplier)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWithKey(
String spanName,
ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWithKeySupplier) {
Expand All @@ -201,7 +214,10 @@ public <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWith
* builder.stream(inputTopic)
* .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V> ProcessorSupplier<K, V> foreach(String spanName, ForeachAction<K, V> action) {
return new TracingProcessorSupplier<>(this, spanName, () ->
new AbstractProcessor<K, V>() {
Expand All @@ -222,7 +238,10 @@ public <K, V> ProcessorSupplier<K, V> foreach(String spanName, ForeachAction<K,
* .transformValues(kafkaStreamsTracing.peek("myPeek", (k, v) -> ...)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V> ValueTransformerWithKeySupplier<K, V, V> peek(String spanName,
ForeachAction<K, V> action) {
return new TracingValueTransformerWithKeySupplier<>(this, spanName, () ->
Expand Down Expand Up @@ -252,7 +271,10 @@ public <K, V> ValueTransformerWithKeySupplier<K, V, V> peek(String spanName,
* .transform(kafkaStreamsTracing.mark("end-complex-transformation")
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V> ValueTransformerWithKeySupplier<K, V, V> mark(String spanName) {
return new TracingValueTransformerWithKeySupplier<>(this, spanName, () ->
new AbstractTracingValueTransformerWithKey<K, V, V>() {
Expand All @@ -273,7 +295,10 @@ public <K, V> ValueTransformerWithKeySupplier<K, V, V> mark(String spanName) {
* .transform(kafkaStreamsTracing.map("myMap", (k, v) -> ...)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V, KR, VR> TransformerSupplier<K, V, KeyValue<KR, VR>> map(String spanName,
KeyValueMapper<K, V, KeyValue<KR, VR>> mapper) {
return new TracingTransformerSupplier<>(this, spanName, () ->
Expand All @@ -295,7 +320,10 @@ public <K, V, KR, VR> TransformerSupplier<K, V, KeyValue<KR, VR>> map(String spa
* .flatTransform(kafkaStreamsTracing.flatMap("myflatMap", (k, v) -> ...)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V, KR, VR> TransformerSupplier<K, V, Iterable<KeyValue<KR, VR>>> flatMap(
String spanName,
KeyValueMapper<K, V, Iterable<KeyValue<KR, VR>>> mapper) {
Expand Down Expand Up @@ -324,7 +352,10 @@ public <K, V, KR, VR> TransformerSupplier<K, V, Iterable<KeyValue<KR, VR>>> flat
* .transform(kafkaStreamsTracing.filter("myFilter", (k, v) -> ...)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V> TransformerSupplier<K, V, KeyValue<K, V>> filter(String spanName,
Predicate<K, V> predicate) {
return new TracingFilterTransformerSupplier<>(this, spanName, predicate, false);
Expand All @@ -345,7 +376,10 @@ public <K, V> TransformerSupplier<K, V, KeyValue<K, V>> filter(String spanName,
* .transform(kafkaStreamsTracing.filterNot("myFilter", (k, v) -> ...)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V> TransformerSupplier<K, V, KeyValue<K, V>> filterNot(String spanName,
Predicate<K, V> predicate) {
return new TracingFilterTransformerSupplier<>(this, spanName, predicate, true);
Expand All @@ -370,7 +404,10 @@ public <K, V> TransformerSupplier<K, V, KeyValue<K, V>> filterNot(String spanNam
* .filterNot((k, v) -> Objects.isNull(v))
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V> ValueTransformerWithKeySupplier<K, V, V> markAsFiltered(String spanName,
Predicate<K, V> predicate) {
return new TracingFilterValueTransformerWithKeySupplier<>(this, spanName, predicate, false);
Expand All @@ -395,7 +432,10 @@ public <K, V> ValueTransformerWithKeySupplier<K, V, V> markAsFiltered(String spa
* .filterNot((k, v) -> Objects.isNull(v))
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V> ValueTransformerWithKeySupplier<K, V, V> markAsNotFiltered(String spanName,
Predicate<K, V> predicate) {
return new TracingFilterValueTransformerWithKeySupplier<>(this, spanName, predicate, true);
Expand All @@ -412,7 +452,10 @@ public <K, V> ValueTransformerWithKeySupplier<K, V, V> markAsNotFiltered(String
* .transformValues(kafkaStreamsTracing.mapValues("myMapValues", (k, v) -> ...)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> mapValues(String spanName,
ValueMapperWithKey<K, V, VR> mapper) {
return new TracingValueTransformerWithKeySupplier<>(this, spanName, () ->
Expand All @@ -434,7 +477,10 @@ public <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> mapValues(String spa
* .transformValues(kafkaStreamsTracing.mapValues("myMapValues", v -> ...)
* .to(outputTopic);
* }</pre>
* @deprecated Use {@link KafkaStreamsTracing#process(String, org.apache.kafka.streams.processor.api.ProcessorSupplier)} or
* {@link KafkaStreamsTracing#processValues(String, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier)} instead.
*/
@Deprecated
public <V, VR> ValueTransformerSupplier<V, VR> mapValues(String spanName,
ValueMapper<V, VR> mapper) {
return new TracingValueTransformerSupplier<>(this, spanName, () ->
Expand All @@ -445,11 +491,50 @@ public <V, VR> ValueTransformerSupplier<V, VR> mapValues(String spanName,
});
}

/**
* Create a tracing-decorated {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}
*
* <p>Simple example using Kafka Streams DSL:
* <pre>{@code
* StreamsBuilder builder = new StreamsBuilder();
* builder.stream(inputTopic)
* .process(kafkaStreamsTracing.process("my-processor", myProcessorSupplier);
* }</pre>
*
* @see TracingKafkaClientSupplier
*/
public <KIn, VIn, KOut, VOut> org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> process(String spanName,
org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
return new TracingV2ProcessorSupplier<>(this, spanName, processorSupplier);
}

/**
* Create a tracing-decorated {@link org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier}
*
* <p>Simple example using Kafka Streams DSL:
* <pre>{@code
* StreamsBuilder builder = new StreamsBuilder();
* builder.stream(inputTopic)
* .processValues(kafkaStreamsTracing.processValues("my-processor", myFixedKeyProcessorSupplier);
* }</pre>
*
* @see TracingKafkaClientSupplier
*/
public <KIn, VIn, VOut> org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn, VIn, VOut> processValues(String spanName,
org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
return new TracingV2FixedKeyProcessorSupplier<>(this, spanName, processorSupplier);
}

static void addTags(ProcessorContext processorContext, SpanCustomizer result) {
result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, processorContext.applicationId());
result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processorContext.taskId().toString());
}

static void addTags(org.apache.kafka.streams.processor.api.ProcessingContext processingContext, SpanCustomizer result) {
result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, processingContext.applicationId());
result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processingContext.taskId().toString());
}

Span nextSpan(ProcessorContext context) {
TraceContextOrSamplingFlags extracted = extractor.extract(context.headers());
// Clear any propagation keys present in the headers
Expand All @@ -463,6 +548,19 @@ Span nextSpan(ProcessorContext context) {
return result;
}

Span nextSpan(ProcessingContext context, Headers headers) {
TraceContextOrSamplingFlags extracted = extractor.extract(headers);
// Clear any propagation keys present in the headers
if (!extracted.equals(emptyExtraction)) {
clearHeaders(headers);
}
Span result = tracer.nextSpan(extracted);
if (!result.isNoop()) {
addTags(context, result);
}
return result;
}

// We can't just skip clearing headers we use because we might inject B3 single, yet have stale B3
// multi, or visa versa.
void clearHeaders(Headers headers) {
Expand Down
Loading

0 comments on commit c455de9

Please sign in to comment.