Skip to content

Commit 9e86163

Browse files
garyrussellartembilan
authored andcommitted
GH-981: Add StreamsBuilderFactoryBeanCustomizer
Resolves #981
1 parent 814ce54 commit 9e86163

File tree

5 files changed

+83
-5
lines changed

5 files changed

+83
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2018 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import org.springframework.context.annotation.Configuration;
2424
import org.springframework.kafka.config.KafkaStreamsConfiguration;
2525
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
26+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
2627

2728
/**
2829
* {@code @Configuration} class that registers a {@link StreamsBuilderFactoryBean}
@@ -55,11 +56,17 @@ public class KafkaStreamsDefaultConfiguration {
5556
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
5657
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
5758
@Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME)
58-
ObjectProvider<KafkaStreamsConfiguration> streamsConfigProvider) {
59+
ObjectProvider<KafkaStreamsConfiguration> streamsConfigProvider,
60+
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider) {
5961

6062
KafkaStreamsConfiguration streamsConfig = streamsConfigProvider.getIfAvailable();
6163
if (streamsConfig != null) {
62-
return new StreamsBuilderFactoryBean(streamsConfig);
64+
StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig);
65+
StreamsBuilderFactoryBeanCustomizer customizer = customizerProvider.getIfUnique();
66+
if (customizer != null) {
67+
customizer.configure(fb);
68+
}
69+
return fb;
6370
}
6471
else {
6572
throw new UnsatisfiedDependencyException(KafkaStreamsDefaultConfiguration.class.getName(),
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
/**
20+
* A customizer for the {@link StreamsBuilderFactoryBean} that is implicitly created by
21+
* {@link org.springframework.kafka.annotation.EnableKafkaStreams}. If exactly one
22+
* implementation of this interface is found in the application context (or one is marked
23+
* as {@link org.springframework.context.annotation.Primary}, it will be invoked after the
24+
* factory bean has been created and before it is started.
25+
*
26+
* @author Gary Russell
27+
* @since 2.3
28+
*
29+
*/
30+
@FunctionalInterface
31+
public interface StreamsBuilderFactoryBeanCustomizer {
32+
33+
/**
34+
* Configure the factory bean.
35+
* @param factoryBean the factory bean.
36+
*/
37+
void configure(StreamsBuilderFactoryBean factoryBean);
38+
39+
}

spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.UUID;
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829

2930
import org.apache.kafka.clients.consumer.ConsumerConfig;
3031
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -56,6 +57,7 @@
5657
import org.springframework.kafka.config.KafkaListenerContainerFactory;
5758
import org.springframework.kafka.config.KafkaStreamsConfiguration;
5859
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
60+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
5961
import org.springframework.kafka.core.ConsumerFactory;
6062
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
6163
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -113,6 +115,9 @@ public class KafkaStreamsTests {
113115
@Value("${streaming.topic.two}")
114116
private String streamingTopic2;
115117

118+
@Autowired
119+
private AtomicBoolean stateChangeCalled;
120+
116121
@Test
117122
public void testKStreams() throws Exception {
118123
assertThat(this.embeddedKafka.getKafkaServer(0).config().autoCreateTopicsEnable()).isFalse();
@@ -150,6 +155,7 @@ public void testKStreams() throws Exception {
150155
StreamThread[] threads = KafkaTestUtils.getPropertyValue(kafkaStreams, "threads", StreamThread[].class);
151156
assertThat(threads).isNotEmpty();
152157
assertThat(threads[0].getUncaughtExceptionHandler()).isSameAs(exceptionHandler);
158+
assertThat(this.stateChangeCalled.get()).isTrue();
153159
}
154160

155161
@Configuration
@@ -193,6 +199,18 @@ public KafkaStreamsConfiguration kStreamsConfigs() {
193199
return new KafkaStreamsConfiguration(props);
194200
}
195201

202+
@Bean
203+
public AtomicBoolean stateChangeCalled() {
204+
return new AtomicBoolean();
205+
}
206+
207+
@Bean
208+
public StreamsBuilderFactoryBeanCustomizer customizer() {
209+
return fb -> fb.setStateListener((newState, oldState) -> {
210+
stateChangeCalled().set(true);
211+
});
212+
}
213+
196214
@Bean
197215
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
198216
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);

src/reference/asciidoc/streams.adoc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,12 @@ See the Apache Kafka https://kafka.apache.org/0102/documentation/#streamsconfigs
179179

180180
IMPORTANT: Starting with version 2.2, the stream configuration is now provided as a `KafkaStreamsConfiguration` object, rather than as a `StreamsConfig`.
181181

182-
To avoid boilerplate code for most cases, especially when you develop microservices, Spring for Apache Kafka provides the `@EnableKafkaStreams` annotation, which you should placed on a `@Configuration` class.
182+
To avoid boilerplate code for most cases, especially when you develop microservices, Spring for Apache Kafka provides the `@EnableKafkaStreams` annotation, which you should place on a `@Configuration` class.
183183
All you need is to declare a `KafkaStreamsConfiguration` bean named `defaultKafkaStreamsConfig`.
184-
A `StreamsBuilder` bean, named `defaultKafkaStreamsBuilder`, is automatically declared in the application context.
184+
A `StreamsBuilderFactoryBean` bean, named `defaultKafkaStreamsBuilder`, is automatically declared in the application context.
185185
You can declare and use any additional `StreamsBuilderFactoryBean` beans as well.
186+
Starting with version 2.3, you can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanCustomizer`.
187+
There must only be one such bean, or one must be marked `@Primary`.
186188

187189
By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
188190
Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
@@ -209,6 +211,13 @@ public static class KafkaStreamsConfig {
209211
return new KafkaStreamsConfiguration(props);
210212
}
211213
214+
@Bean
215+
public StreamsBuilderFactoryBeanCustomizer customizer() {
216+
return fb -> fb.setStateListener((newState, oldState) -> {
217+
System.out.println("State transition from " + oldState + " to " + newState);
218+
});
219+
}
220+
212221
@Bean
213222
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
214223
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,8 @@ It now sets it to false automatically unless specifically set in the consumer fa
2020

2121
A new class `TopicBuilder` is provided for more convenient creation of `NewTopic` `@Bean` s for automatic topic provisioning.
2222
See <<configuring-topics>> for more information.
23+
24+
==== Kafka Streams Changes
25+
26+
You can now perform additional configuration of the `StreamsBuilderFactoryBean` created by `@EnableKafkaStreams`.
27+
See <<streams-config, Streams Configuration>> for more information.

0 commit comments

Comments
 (0)