Skip to content

Commit 708be91

Browse files
Address review suggestions
Add properties validation Add noBackOffPolicy for zero delay Improve javadocs
1 parent b698f85 commit 708be91

File tree

3 files changed

+39
-15
lines changed

3 files changed

+39
-15
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.springframework.kafka.support.ProducerListener;
4545
import org.springframework.kafka.support.converter.RecordMessageConverter;
4646
import org.springframework.kafka.transaction.KafkaTransactionManager;
47-
import org.springframework.retry.backoff.BackOffPolicy;
4847
import org.springframework.retry.backoff.BackOffPolicyBuilder;
4948
import org.springframework.retry.backoff.SleepingBackOffPolicy;
5049
import org.springframework.util.Assert;
@@ -65,6 +64,8 @@
6564
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
6665
public class KafkaAutoConfiguration {
6766

67+
private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property spring.kafka.retry.topic.%s should be greater than or equal to %s. Provided value was %s.";
68+
6869
private final KafkaProperties properties;
6970

7071
public KafkaAutoConfiguration(KafkaProperties properties) {
@@ -150,18 +151,32 @@ public KafkaAdmin kafkaAdmin() {
150151
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
151152
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
152153
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
153-
return RetryTopicConfigurationBuilder.newInstance().maxAttempts(retryTopic.getAttempts())
154-
.customBackoff(getBackOffPolicy(retryTopic)).useSingleTopicForFixedDelays()
155-
.suffixTopicsWithIndexValues().doNotAutoCreateRetryTopics().create(kafkaOperations);
154+
validateRetryTopicInput(retryTopic);
155+
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
156+
.maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues()
157+
.doNotAutoCreateRetryTopics();
158+
setBackOffPolicy(builder, retryTopic);
159+
return builder.create(kafkaOperations);
160+
}
161+
162+
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
163+
PropertyMapper.get().from(retryTopic.getDelayMillis()).whenEqualTo(0L).toCall(builder::noBackoff);
164+
PropertyMapper.get().from(retryTopic.getDelayMillis()).when((delay) -> delay > 0)
165+
.toCall(() -> builder.customBackoff((SleepingBackOffPolicy<?>) BackOffPolicyBuilder.newBuilder()
166+
.delay(retryTopic.getDelayMillis()).maxDelay(retryTopic.getMaxDelayMillis())
167+
.multiplier(retryTopic.getMultiplier()).random(retryTopic.isRandomBackOff()).build()));
168+
}
169+
170+
private static void validateRetryTopicInput(Topic retryTopic) {
171+
assertProperty("attempts", retryTopic.getAttempts() >= 1, 1, retryTopic.getMaxDelayMillis());
172+
assertProperty("delay", retryTopic.getDelayMillis() >= 0, 0, retryTopic.getDelayMillis());
173+
assertProperty("multiplier", retryTopic.getMultiplier() >= 0, 0, retryTopic.getMultiplier());
174+
assertProperty("maxDelayMillis", retryTopic.getDelayMillis() >= 0, 0, retryTopic.getMaxDelayMillis());
156175
}
157176

158-
private SleepingBackOffPolicy<?> getBackOffPolicy(Topic retryTopic) {
159-
BackOffPolicy policy = BackOffPolicyBuilder.newBuilder().delay(retryTopic.getDelayMillis())
160-
.maxDelay(retryTopic.getMaxDelayMillis()).multiplier(retryTopic.getMultiplier())
161-
.random(retryTopic.isRandomBackOff()).build();
162-
Assert.isInstanceOf(SleepingBackOffPolicy.class, policy,
163-
() -> "BackOffPolicy must be an instance of SleepingBackOffPolicy. Provided: " + policy);
164-
return (SleepingBackOffPolicy<?>) policy;
177+
private static void assertProperty(String propertyName, boolean condition, Object minValue, Object providedValue) {
178+
Assert.isTrue(condition,
179+
() -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue));
165180
}
166181

167182
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,15 +1380,14 @@ public static class Topic {
13801380
private Double multiplier = 0.0;
13811381

13821382
/**
1383-
* The maximimum wait between retries. If less than the delay then the default
1383+
* The maximum wait between retries. If less than the delay then the default
13841384
* of 30 seconds is applied.
13851385
*/
13861386
private Duration maxDelay = Duration.ZERO;
13871387

13881388
/**
1389-
* In the exponential case (multiplier() > 0) set this to true to have the
1390-
* backoff delays randomized, so that the maximum delay is multiplier times
1391-
* the previous delay and the distribution is uniform between the two values.
1389+
* In the exponential case, set this to true to have the backoff delays
1390+
* randomized. This has no effect for other back off types.
13921391
*/
13931392
private Boolean randomBackOff = false;
13941393

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,16 @@ void retryTopicConfigurationWithFixedBackOff() {
359359
.extracting(DestinationTopic.Properties::delay).containsExactly(0L, 2000L, 0L));
360360
}
361361

362+
@Test
363+
void retryTopicConfigurationWithNoBackOff() {
364+
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
365+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
366+
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0")
367+
.run((context) -> assertThat(
368+
context.getBean(RetryTopicConfiguration.class).getDestinationTopicProperties()).hasSize(3)
369+
.extracting(DestinationTopic.Properties::delay).containsExactly(0L, 0L, 0L));
370+
}
371+
362372
@SuppressWarnings("unchecked")
363373
@Test
364374
void streamsWithSeveralStreamsBuilderFactoryBeans() {

0 commit comments

Comments
 (0)