Skip to content

Commit 73439a5

Browse files
Move validation to Topic in KafkaProperties
Add validation for randomBackOff with policy not exponential Add test case
1 parent e8a59b0 commit 73439a5

File tree

3 files changed

+49
-29
lines changed

3 files changed

+49
-29
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.boot.autoconfigure.kafka;
1818

1919
import java.io.IOException;
20-
import java.math.BigDecimal;
2120

2221
import org.springframework.beans.factory.ObjectProvider;
2322
import org.springframework.boot.autoconfigure.AutoConfiguration;
@@ -47,7 +46,6 @@
4746
import org.springframework.kafka.transaction.KafkaTransactionManager;
4847
import org.springframework.retry.backoff.BackOffPolicyBuilder;
4948
import org.springframework.retry.backoff.SleepingBackOffPolicy;
50-
import org.springframework.util.Assert;
5149

5250
/**
5351
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
@@ -65,8 +63,6 @@
6563
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
6664
public class KafkaAutoConfiguration {
6765

68-
private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property spring.kafka.retry.topic.%s should be greater than or equal to %s. Provided value was %s.";
69-
7066
private final KafkaProperties properties;
7167

7268
public KafkaAutoConfiguration(KafkaProperties properties) {
@@ -152,7 +148,6 @@ public KafkaAdmin kafkaAdmin() {
152148
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
153149
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
154150
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
155-
validateRetryTopicInput(retryTopic);
156151
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
157152
.maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues()
158153
.doNotAutoCreateRetryTopics();
@@ -168,17 +163,4 @@ private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Top
168163
.multiplier(retryTopic.getMultiplier()).random(retryTopic.isRandomBackOff()).build()));
169164
}
170165

171-
private static void validateRetryTopicInput(Topic retryTopic) {
172-
assertProperty("attempts", retryTopic.getAttempts(), 1);
173-
assertProperty("delay", retryTopic.getDelayMillis(), 0);
174-
assertProperty("multiplier", retryTopic.getMultiplier(), 0);
175-
assertProperty("maxDelayMillis", retryTopic.getMaxDelayMillis(), 0);
176-
}
177-
178-
private static void assertProperty(String propertyName, Number providedValue, int minValue) {
179-
Assert.notNull(providedValue, () -> "spring.kafka.retry.topic." + propertyName + " cannot be null.");
180-
Assert.isTrue(new BigDecimal(providedValue.toString()).compareTo(BigDecimal.valueOf(minValue)) >= 0,
181-
() -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue));
182-
}
183-
184166
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.boot.autoconfigure.kafka;
1818

1919
import java.io.IOException;
20+
import java.math.BigDecimal;
2021
import java.time.Duration;
2122
import java.time.temporal.ChronoUnit;
2223
import java.util.ArrayList;
@@ -41,6 +42,7 @@
4142
import org.springframework.core.io.Resource;
4243
import org.springframework.kafka.listener.ContainerProperties.AckMode;
4344
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
45+
import org.springframework.util.Assert;
4446
import org.springframework.util.CollectionUtils;
4547
import org.springframework.util.unit.DataSize;
4648

@@ -1344,7 +1346,7 @@ public static class Retry {
13441346
private Topic topic = new Topic();
13451347

13461348
public Topic getTopic() {
1347-
return this.topic;
1349+
return this.topic.validate();
13481350
}
13491351

13501352
public void setTopic(Topic topic) {
@@ -1356,6 +1358,11 @@ public void setTopic(Topic topic) {
13561358
*/
13571359
public static class Topic {
13581360

1361+
private static final String RETRY_TOPIC_PROPERTIES_PREFIX = "spring.kafka.retry.topic.";
1362+
1363+
private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property " + RETRY_TOPIC_PROPERTIES_PREFIX
1364+
+ "%s should be greater than or equal to %s. Provided value was %s.";
1365+
13591366
/**
13601367
* Whether to enable topic-based retries auto-configuration.
13611368
*/
@@ -1387,7 +1394,7 @@ public static class Topic {
13871394

13881395
/**
13891396
* In the exponential case, set this to true to have the backoff delays
1390-
* randomized. This has no effect for other back off types.
1397+
* randomized.
13911398
*/
13921399
private Boolean randomBackOff = false;
13931400

@@ -1447,6 +1454,23 @@ public void setRandomBackOff(Boolean randomBackOff) {
14471454
this.randomBackOff = randomBackOff;
14481455
}
14491456

1457+
private Topic validate() {
1458+
validateProperty("attempts", this.attempts, 1);
1459+
validateProperty("delay", this.getDelayMillis(), 0);
1460+
validateProperty("multiplier", this.multiplier, 0);
1461+
validateProperty("maxDelay", this.getMaxDelayMillis(), 0);
1462+
Assert.isTrue(this.multiplier != 0 || !this.isRandomBackOff(),
1463+
"Property " + RETRY_TOPIC_PROPERTIES_PREFIX
1464+
+ "randomBackOff should not be true with non-exponential back offs.");
1465+
return this;
1466+
}
1467+
1468+
private static void validateProperty(String propertyName, Number providedValue, int minValue) {
1469+
Assert.notNull(providedValue, () -> RETRY_TOPIC_PROPERTIES_PREFIX + propertyName + " cannot be null.");
1470+
Assert.isTrue(new BigDecimal(providedValue.toString()).compareTo(BigDecimal.valueOf(minValue)) >= 0,
1471+
() -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue));
1472+
}
1473+
14501474
}
14511475

14521476
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -371,9 +371,10 @@ void retryTopicConfigurationWithNoBackOff() {
371371

372372
@Test
373373
void retryTopicConfigurationWithNegativeDelay() {
374-
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
375-
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
376-
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=-1")
374+
this.contextRunner
375+
.withPropertyValues("spring.application.name=my-test-app",
376+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
377+
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.delay=-1")
377378
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
378379
.isInstanceOf(IllegalArgumentException.class).message()
379380
.isEqualTo("Property spring.kafka.retry.topic.delay"
@@ -382,9 +383,10 @@ void retryTopicConfigurationWithNegativeDelay() {
382383

383384
@Test
384385
void retryTopicConfigurationWithNegativeMultiplier() {
385-
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
386-
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
387-
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.multiplier=-1")
386+
this.contextRunner
387+
.withPropertyValues("spring.application.name=my-test-app",
388+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
389+
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.multiplier=-1")
388390
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
389391
.isInstanceOf(IllegalArgumentException.class).message()
390392
.isEqualTo("Property spring.kafka.retry.topic.multiplier"
@@ -393,9 +395,10 @@ void retryTopicConfigurationWithNegativeMultiplier() {
393395

394396
@Test
395397
void retryTopicConfigurationWithNegativeMaxDelay() {
396-
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
397-
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
398-
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.maxDelay=-1")
398+
this.contextRunner
399+
.withPropertyValues("spring.application.name=my-test-app",
400+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
401+
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.maxDelay=-1")
399402
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
400403
.isInstanceOf(IllegalArgumentException.class).message()
401404
.isEqualTo("Property spring.kafka.retry.topic.maxDelay"
@@ -414,6 +417,17 @@ void retryTopicConfigurationWithZeroAttempts() {
414417
+ " should be greater than or equal to 1. Provided value was 0."));
415418
}
416419

420+
@Test
421+
void retryTopicConfigurationWithZeroMultiplierAndRandomBackOff() {
422+
this.contextRunner
423+
.withPropertyValues("spring.application.name=my-test-app",
424+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
425+
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.randomBackOff=true")
426+
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
427+
.isInstanceOf(IllegalArgumentException.class).message().isEqualTo(
428+
"Property spring.kafka.retry.topic.randomBackOff should not be true with non-exponential back offs."));
429+
}
430+
417431
@SuppressWarnings("unchecked")
418432
@Test
419433
void streamsWithSeveralStreamsBuilderFactoryBeans() {

0 commit comments

Comments
 (0)