Skip to content

Commit 165ac33

Browse files
Add auto-configuration to Kafka Retry Topics
Closes gh-28450
1 parent 638b5a9 commit 165ac33

File tree

6 files changed

+311
-0
lines changed

6 files changed

+311
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,13 @@
3333
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3434
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3535
import org.springframework.kafka.core.KafkaAdmin;
36+
import org.springframework.kafka.core.KafkaOperations;
3637
import org.springframework.kafka.core.KafkaTemplate;
3738
import org.springframework.kafka.core.ProducerFactory;
39+
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
40+
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
3841
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
42+
import org.springframework.kafka.support.JavaUtils;
3943
import org.springframework.kafka.support.LoggingProducerListener;
4044
import org.springframework.kafka.support.ProducerListener;
4145
import org.springframework.kafka.support.converter.RecordMessageConverter;
@@ -48,6 +52,7 @@
4852
* @author Stephane Nicoll
4953
* @author Eddú Meléndez
5054
* @author Nakul Mishra
55+
* @author Tomaz Fernandes
5156
* @since 1.5.0
5257
*/
5358
@Configuration(proxyBeanMethods = false)
@@ -113,6 +118,20 @@ public ProducerListener<Object, Object> kafkaProducerListener() {
113118
return new KafkaTransactionManager<>(producerFactory);
114119
}
115120

121+
@Bean
122+
@ConditionalOnProperty(name = "spring.kafka.retry-topic.enabled")
123+
@ConditionalOnMissingBean
124+
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
125+
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance();
126+
KafkaProperties.RetryTopic retryTopic = this.properties.getRetryTopic();
127+
JavaUtils.INSTANCE.acceptIfNotNull(retryTopic.getAttempts(), builder::maxAttempts).acceptIfNotNull(
128+
retryTopic.getBackOff(),
129+
(backOff) -> builder
130+
.customBackoff(KafkaAutoConfigurationUtils.createBackOffFrom(backOff.getDelayMillis(),
131+
backOff.getMaxDelayMillis(), backOff.getMultiplier(), backOff.isRandom())));
132+
return builder.create(kafkaOperations);
133+
}
134+
116135
@Bean
117136
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
118137
@ConditionalOnMissingBean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2012-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
20+
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;
21+
import org.springframework.retry.backoff.FixedBackOffPolicy;
22+
import org.springframework.retry.backoff.SleepingBackOffPolicy;
23+
import org.springframework.retry.backoff.UniformRandomBackOffPolicy;
24+
25+
/**
26+
* Utilities class for Kafka auto-configuration.
27+
*
28+
* @author Tomaz Fernandes
29+
* @since 2.7.0
30+
*/
31+
public final class KafkaAutoConfigurationUtils {
32+
33+
private KafkaAutoConfigurationUtils() {
34+
35+
}
36+
37+
static SleepingBackOffPolicy<?> createBackOffFrom(Long min, Long max, Double multiplier, Boolean isRandom) {
38+
if (multiplier != null && multiplier > 0) {
39+
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
40+
if (isRandom != null && isRandom) {
41+
policy = new ExponentialRandomBackOffPolicy();
42+
}
43+
policy.setInitialInterval((min != null) ? min : ExponentialBackOffPolicy.DEFAULT_INITIAL_INTERVAL);
44+
policy.setMultiplier(multiplier);
45+
policy.setMaxInterval(
46+
(max != null && min != null && max > min) ? max : ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);
47+
return policy;
48+
}
49+
if (max != null && min != null && max > min) {
50+
UniformRandomBackOffPolicy policy = new UniformRandomBackOffPolicy();
51+
policy.setMinBackOffPeriod(min);
52+
policy.setMaxBackOffPeriod(max);
53+
return policy;
54+
}
55+
FixedBackOffPolicy policy = new FixedBackOffPolicy();
56+
if (min != null) {
57+
policy.setBackOffPeriod(min);
58+
}
59+
return policy;
60+
}
61+
62+
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
* @author Stephane Nicoll
5454
* @author Artem Bilan
5555
* @author Nakul Mishra
56+
* @author Tomaz Fernandes
5657
* @since 1.5.0
5758
*/
5859
@ConfigurationProperties(prefix = "spring.kafka")
@@ -93,6 +94,8 @@ public class KafkaProperties {
9394

9495
private final Security security = new Security();
9596

97+
private final RetryTopic retryTopic = new RetryTopic();
98+
9699
public List<String> getBootstrapServers() {
97100
return this.bootstrapServers;
98101
}
@@ -149,6 +152,10 @@ public Security getSecurity() {
149152
return this.security;
150153
}
151154

155+
public RetryTopic getRetryTopic() {
156+
return this.retryTopic;
157+
}
158+
152159
private Map<String, Object> buildCommonProperties() {
153160
Map<String, Object> properties = new HashMap<>();
154161
if (this.bootstrapServers != null) {
@@ -1314,6 +1321,82 @@ public void setOptions(Map<String, String> options) {
13141321

13151322
}
13161323

1324+
public static class RetryTopic {
1325+
1326+
private Integer attempts;
1327+
1328+
private BackOff backOff;
1329+
1330+
public Integer getAttempts() {
1331+
return this.attempts;
1332+
}
1333+
1334+
public void setAttempts(Integer attempts) {
1335+
this.attempts = attempts;
1336+
}
1337+
1338+
public BackOff getBackOff() {
1339+
return this.backOff;
1340+
}
1341+
1342+
public void setBackOff(BackOff backOff) {
1343+
this.backOff = backOff;
1344+
}
1345+
1346+
public static class BackOff {
1347+
1348+
private Duration delay;
1349+
1350+
private Double multiplier;
1351+
1352+
private Duration maxDelay;
1353+
1354+
private Boolean random;
1355+
1356+
public Duration getDelay() {
1357+
return this.delay;
1358+
}
1359+
1360+
public Long getDelayMillis() {
1361+
return (this.delay != null) ? this.delay.toMillis() : null;
1362+
}
1363+
1364+
public void setDelay(Duration delay) {
1365+
this.delay = delay;
1366+
}
1367+
1368+
public Double getMultiplier() {
1369+
return this.multiplier;
1370+
}
1371+
1372+
public void setMultiplier(Double multiplier) {
1373+
this.multiplier = multiplier;
1374+
}
1375+
1376+
public Duration getMaxDelay() {
1377+
return this.maxDelay;
1378+
}
1379+
1380+
public Long getMaxDelayMillis() {
1381+
return (this.maxDelay != null) ? this.maxDelay.toMillis() : null;
1382+
}
1383+
1384+
public void setMaxDelay(Duration maxDelay) {
1385+
this.maxDelay = maxDelay;
1386+
}
1387+
1388+
public Boolean isRandom() {
1389+
return this.random;
1390+
}
1391+
1392+
public void setRandom(Boolean random) {
1393+
this.random = random;
1394+
}
1395+
1396+
}
1397+
1398+
}
1399+
13171400
public static class Security {
13181401

13191402
/**

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.boot.autoconfigure.kafka;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
1921
import java.util.concurrent.CountDownLatch;
2022
import java.util.concurrent.TimeUnit;
2123
import java.util.regex.Pattern;
@@ -53,12 +55,14 @@
5355
*
5456
* @author Gary Russell
5557
* @author Stephane Nicoll
58+
* @author Tomaz Fernandes
5659
*/
5760
@DisabledOnOs(OS.WINDOWS)
5861
@EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC)
5962
class KafkaAutoConfigurationIntegrationTests {
6063

6164
static final String TEST_TOPIC = "testTopic";
65+
static final String TEST_RETRY_TOPIC = "testRetryTopic";
6266

6367
private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic";
6468

@@ -89,6 +93,27 @@ void testEndToEnd() throws Exception {
8993
producer.close();
9094
}
9195

96+
@SuppressWarnings({ "unchecked", "rawtypes" })
97+
@Test
98+
void testEndToEndWithRetryTopics() throws Exception {
99+
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(),
100+
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry-topic.enabled=true",
101+
"spring.kafka.retry-topic.attempts=4", "spring.kafka.retry-topic.back-off.delay=100ms",
102+
"spring.kafka.retry-topic.back-off.multiplier=2", "spring.kafka.retry-topic.back-off.max-delay=300ms",
103+
"spring.kafka.consumer.auto-offset-reset=earliest");
104+
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
105+
template.send(TEST_RETRY_TOPIC, "foo", "bar");
106+
RetryListener listener = this.context.getBean(RetryListener.class);
107+
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
108+
assertThat(listener.key).isEqualTo("foo");
109+
assertThat(listener.received).isEqualTo("bar");
110+
assertThat(listener.topics.size()).isEqualTo(4);
111+
assertThat(listener.topics.get(0)).isEqualTo("testRetryTopic");
112+
assertThat(listener.topics.get(1)).isEqualTo("testRetryTopic-retry-100");
113+
assertThat(listener.topics.get(2)).isEqualTo("testRetryTopic-retry-200");
114+
assertThat(listener.topics.get(3)).isEqualTo("testRetryTopic-retry-300");
115+
}
116+
92117
@Test
93118
void testStreams() {
94119
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
@@ -121,6 +146,11 @@ Listener listener() {
121146
return new Listener();
122147
}
123148

149+
@Bean
150+
RetryListener retryListener() {
151+
return new RetryListener();
152+
}
153+
124154
@Bean
125155
NewTopic adminCreated() {
126156
return TopicBuilder.name(ADMIN_CREATED_TOPIC).partitions(10).replicas(1).build();
@@ -157,4 +187,26 @@ void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
157187

158188
}
159189

190+
static class RetryListener {
191+
192+
private final CountDownLatch latch = new CountDownLatch(4);
193+
194+
private final List<String> topics = new ArrayList<>();
195+
196+
private volatile String received;
197+
198+
private volatile String key;
199+
200+
@KafkaListener(topics = TEST_RETRY_TOPIC)
201+
void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
202+
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
203+
this.received = foo;
204+
this.key = key;
205+
this.topics.add(topic);
206+
this.latch.countDown();
207+
throw new RuntimeException("Test exception");
208+
}
209+
210+
}
211+
160212
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import org.springframework.kafka.listener.ErrorHandler;
6666
import org.springframework.kafka.listener.RecordInterceptor;
6767
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
68+
import org.springframework.kafka.retrytopic.DestinationTopic;
69+
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
6870
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
6971
import org.springframework.kafka.support.converter.BatchMessageConverter;
7072
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
@@ -87,6 +89,7 @@
8789
* @author Stephane Nicoll
8890
* @author Eddú Meléndez
8991
* @author Nakul Mishra
92+
* @author Tomaz Fernandes
9093
*/
9194
class KafkaAutoConfigurationTests {
9295

@@ -317,6 +320,25 @@ void streamsWithCustomKafkaConfiguration() {
317320
});
318321
}
319322

323+
@Test
324+
void retryTopicConfiguration() {
325+
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
326+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry-topic.enabled=true",
327+
"spring.kafka.retry-topic.attempts=5", "spring.kafka.retry-topic.back-off.delay=100ms",
328+
"spring.kafka.retry-topic.back-off.multiplier=2", "spring.kafka.retry-topic.back-off.max-delay=300ms")
329+
.run((context) -> {
330+
RetryTopicConfiguration config = context.getBean(RetryTopicConfiguration.class);
331+
List<DestinationTopic.Properties> properties = config.getDestinationTopicProperties();
332+
assertThat(properties.size()).isEqualTo(6);
333+
assertThat(properties.get(0).delay()).isEqualTo(0);
334+
assertThat(properties.get(1).delay()).isEqualTo(100);
335+
assertThat(properties.get(2).delay()).isEqualTo(200);
336+
assertThat(properties.get(3).delay()).isEqualTo(300);
337+
assertThat(properties.get(4).delay()).isEqualTo(300);
338+
assertThat(properties.get(5).delay()).isEqualTo(0);
339+
});
340+
}
341+
320342
@SuppressWarnings("unchecked")
321343
@Test
322344
void streamsWithSeveralStreamsBuilderFactoryBeans() {

0 commit comments

Comments
 (0)