Skip to content

Commit 69a6ac3

Browse files
garyrussellartembilan
authored andcommitted
Add Streams HeaderEnricher
* Checkstyle
1 parent fe21442 commit 69a6ac3

File tree

6 files changed

+157
-4
lines changed

6 files changed

+157
-4
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.kafka.common.header.Headers;
24+
import org.apache.kafka.common.header.internals.RecordHeader;
25+
import org.apache.kafka.streams.KeyValue;
26+
import org.apache.kafka.streams.kstream.Transformer;
27+
import org.apache.kafka.streams.processor.ProcessorContext;
28+
29+
import org.springframework.expression.Expression;
30+
31+
/**
32+
* Manipulate the headers.
33+
*
34+
* @param <K> the key type.
35+
* @param <V> the value type.
36+
*
37+
* @author Gary Russell
38+
* @since 2.3
39+
*
40+
*/
41+
public class HeaderEnricher<K, V> implements Transformer<K, V, KeyValue<K, V>> {
42+
43+
private final Map<String, Expression> headerExpressions = new HashMap<>();
44+
45+
private ProcessorContext processorContext;
46+
47+
public HeaderEnricher(Map<String, Expression> headerExpressions) {
48+
this.headerExpressions.putAll(headerExpressions);
49+
}
50+
51+
@Override
52+
public void init(ProcessorContext context) {
53+
this.processorContext = context;
54+
}
55+
56+
@Override
57+
public KeyValue<K, V> transform(K key, V value) {
58+
Headers headers = this.processorContext.headers();
59+
Container<K, V> container = new Container<>(this.processorContext, key, value);
60+
this.headerExpressions.forEach((name, expression) -> {
61+
Object headerValue = expression.getValue(container);
62+
if (headerValue instanceof String) {
63+
headerValue = ((String) headerValue).getBytes(StandardCharsets.UTF_8);
64+
}
65+
else if (!(headerValue instanceof byte[])) {
66+
throw new IllegalStateException("Invalid header value type" + headerValue.getClass());
67+
}
68+
headers.add(new RecordHeader(name, (byte[]) headerValue));
69+
});
70+
return new KeyValue<>(key, value);
71+
}
72+
73+
@Override
74+
public void close() {
75+
// NO-OP
76+
}
77+
78+
/**
79+
* Container object for SpEL evaluation.
80+
*
81+
* @param <K> the key type.
82+
* @param <V> the value type.
83+
*
84+
*/
85+
public static final class Container<K, V> {
86+
87+
private final ProcessorContext context;
88+
89+
private final K key;
90+
91+
private final V value;
92+
93+
private Container(ProcessorContext context, K key, V value) {
94+
this.context = context;
95+
this.key = key;
96+
this.value = value;
97+
}
98+
99+
public ProcessorContext getContext() {
100+
return this.context;
101+
}
102+
103+
public K getKey() {
104+
return this.key;
105+
}
106+
107+
public V getValue() {
108+
return this.value;
109+
}
110+
111+
}
112+
113+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.kstream;
17+
package org.springframework.kafka.streams;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.kstream;
17+
package org.springframework.kafka.streams;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsTests.java renamed to spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.kstream;
17+
package org.springframework.kafka.streams;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.Mockito.mock;
@@ -49,6 +49,9 @@
4949
import org.springframework.beans.factory.annotation.Value;
5050
import org.springframework.context.annotation.Bean;
5151
import org.springframework.context.annotation.Configuration;
52+
import org.springframework.expression.Expression;
53+
import org.springframework.expression.common.LiteralExpression;
54+
import org.springframework.expression.spel.standard.SpelExpressionParser;
5255
import org.springframework.kafka.annotation.EnableKafka;
5356
import org.springframework.kafka.annotation.EnableKafkaStreams;
5457
import org.springframework.kafka.annotation.KafkaListener;
@@ -147,6 +150,9 @@ public void testKStreams() throws Exception {
147150

148151
assertThat(result.topic()).isEqualTo(streamingTopic2);
149152
assertThat(result.value()).isEqualTo(payload.toUpperCase() + payload2.toUpperCase());
153+
assertThat(result.headers().lastHeader("foo")).isNotNull();
154+
assertThat(result.headers().lastHeader("foo").value()).isEqualTo("bar".getBytes());
155+
assertThat(result.headers().lastHeader("spel")).isNotNull();
150156

151157
assertThat(stateLatch.await(10, TimeUnit.SECONDS)).isTrue();
152158

@@ -214,6 +220,11 @@ public StreamsBuilderFactoryBeanCustomizer customizer() {
214220
@Bean
215221
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
216222
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
223+
Map<String, Expression> headers = new HashMap<>();
224+
headers.put("foo", new LiteralExpression("bar"));
225+
SpelExpressionParser parser = new SpelExpressionParser();
226+
headers.put("spel", parser.parseExpression("context.timestamp() + key + value"));
227+
HeaderEnricher<Integer, String> enricher = new HeaderEnricher<>(headers);
217228
stream.mapValues((ValueMapper<String, String>) String::toUpperCase)
218229
.mapValues(Foo::new)
219230
.through(FOOS, Produced.with(Serdes.Integer(), new JsonSerde<Foo>() {
@@ -225,7 +236,9 @@ public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
225236
.reduce((value1, value2) -> value1 + value2, Materialized.as("windowStore"))
226237
.toStream()
227238
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
228-
.filter((i, s) -> s.length() > 40).to(streamingTopic2);
239+
.filter((i, s) -> s.length() > 40)
240+
.transform(() -> enricher)
241+
.to(streamingTopic2);
229242

230243
stream.print(Printed.toSysOut());
231244

src/reference/asciidoc/streams.adoc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,28 @@ There must only be one such bean, or one must be marked `@Primary`.
189189
By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
190190
Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
191191

192+
==== Header Enricher
193+
194+
Version 2.3 added the `HeaderEnricher` implementation of `Transformer`.
195+
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:
196+
197+
* `context` - the `ProcessorContext`, allowing access to the current record metadata
198+
* `key` - the key of the current record
199+
* `value` - the value of the current record
200+
201+
The expressions must return a `byte[]` or a `String` (which will be converted to `byte[]` using `UTF-8`).
202+
203+
To use the enricher within a stream:
204+
205+
====
206+
[source, java]
207+
----
208+
.transform(() -> enricher)
209+
----
210+
====
211+
212+
The transformer does not change the `key` or `value`; it simply adds headers.
213+
192214
==== Kafka Streams Example
193215

194216
The following example combines all the topics we have covered in this chapter:

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,8 @@ When a reply times out, the future is completed exceptionally with a `KafkaReply
6161

6262
Extends the `ReplyingKafkaTemplate` by aggregating replies from multiple receivers.
6363
See <<aggregating-request-reply>> for more information.
64+
65+
==== Kafka Streams
66+
67+
The `HeaderEnricher` transformer has been provided, using SpEL to generate the header values.
68+
See <<streams-header-enricher>> for more information.

0 commit comments

Comments
 (0)