Skip to content

Commit fe24063

Browse files
committed
review fix
1 parent 5986590 commit fe24063

File tree

7 files changed

+85
-85
lines changed

7 files changed

+85
-85
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Version 2.9 changed the mechanism to bootstrap infrastructure beans; see xref:re
77
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
88
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping.
99

10-
Since 3.2 Non-Blocking Retries support xref:kafka/receiving-messages/class-level-kafkalistener.adoc[@KafkaListener on a Class].
10+
Since 3.2, Spring for Apache Kafka supports non-blocking retries with xref:kafka/receiving-messages/class-level-kafkalistener.adoc[@KafkaListener on a Class].
1111

1212
IMPORTANT: Non-blocking retries are not supported with xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners].
1313

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public void processMessage(MyPojo message) {
2828
}
2929
----
3030

31-
Since 3.2, `@RetryableTopic` support @KafkaListener on a class would be:
31+
Since 3.2, `@RetryableTopic` support for @KafkaListener on a class would be:
3232
[source,java]
3333
----
3434
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -379,8 +379,8 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
379379
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
380380
return (!listenerMethods.isEmpty() ? listenerMethods : null);
381381
});
382-
final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
383-
final boolean hasMethodLevelListeners = !annotatedMethods.isEmpty();
382+
boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
383+
boolean hasMethodLevelListeners = !annotatedMethods.isEmpty();
384384
if (!hasMethodLevelListeners && !hasClassLevelListeners) {
385385
this.nonAnnotatedClasses.add(bean.getClass());
386386
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
@@ -401,7 +401,7 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
401401
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
402402
(ReflectionUtils.MethodFilter) method ->
403403
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
404-
final List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
404+
List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
405405
processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
406406
}
407407
}
@@ -458,7 +458,7 @@ private synchronized void processMultiMethodListeners(Collection<KafkaListener>
458458
Method checked = checkProxy(method, bean);
459459
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
460460
if (annotation != null && annotation.isDefault()) {
461-
final Method toAssert = defaultMethod;
461+
Method toAssert = defaultMethod;
462462
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
463463
+ toAssert.toString() + " and " + method);
464464
defaultMethod = checked;
@@ -745,7 +745,7 @@ private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener k
745745
private void resolveContainerPostProcessor(MethodKafkaListenerEndpoint<?, ?> endpoint,
746746
KafkaListener kafkaListener) {
747747

748-
final String containerPostProcessor = kafkaListener.containerPostProcessor();
748+
String containerPostProcessor = kafkaListener.containerPostProcessor();
749749
if (StringUtils.hasText(containerPostProcessor)) {
750750
endpoint.setContainerPostProcessor(this.beanFactory.getBean(containerPostProcessor,
751751
ContainerPostProcessor.class));

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@
165165
* }</code>
166166
* }</code>
167167
*</pre>
168-
* <p> Since 3.2 , {@link org.springframework.kafka.annotation.RetryableTopic} annotation support
168+
* <p> Since 3.2, {@link org.springframework.kafka.annotation.RetryableTopic} annotation supports
169169
* {@link org.springframework.kafka.annotation.KafkaListener} annotated class, such as:
170170
* <pre>
171171
* <code>@RetryableTopic(attempts = 3,

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicClassLevelIntegrationTests.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
ExistingRetryTopicClassLevelIntegrationTests.MAIN_TOPIC_WITH_PARTITION_INFO,
7878
ExistingRetryTopicClassLevelIntegrationTests.RETRY_TOPIC_WITH_PARTITION_INFO}, partitions = 4)
7979
@TestPropertySource(properties = "two.attempts=2")
80-
public class ExistingRetryTopicClassLevelIntegrationTests {
80+
class ExistingRetryTopicClassLevelIntegrationTests {
8181

8282
public final static String MAIN_TOPIC_WITH_NO_PARTITION_INFO = "main-topic-1";
8383

@@ -216,12 +216,12 @@ static class CountByPartitionContainer {
216216
static class RetryTopicConfigurations {
217217

218218
@Bean
219-
public MainTopicListenerWithPartition mainTopicListenerWithPartition() {
219+
MainTopicListenerWithPartition mainTopicListenerWithPartition() {
220220
return new MainTopicListenerWithPartition();
221221
}
222222

223223
@Bean
224-
public MainTopicListenerWithoutPartition mainTopicListenerWithoutPartition() {
224+
MainTopicListenerWithoutPartition mainTopicListenerWithoutPartition() {
225225
return new MainTopicListenerWithoutPartition();
226226
}
227227

@@ -242,20 +242,20 @@ CountByPartitionContainer countByPartitionContainerWithPartition() {
242242
}
243243

244244
@Configuration
245-
public static class RuntimeConfig {
245+
static class RuntimeConfig {
246246

247247
@Bean(name = "internalBackOffClock")
248-
public Clock clock() {
248+
Clock clock() {
249249
return Clock.systemUTC();
250250
}
251251

252252
@Bean
253-
public TaskExecutor taskExecutor() {
253+
TaskExecutor taskExecutor() {
254254
return new ThreadPoolTaskExecutor();
255255
}
256256

257257
@Bean(destroyMethod = "destroy")
258-
public TaskExecutorManager taskExecutorManager(ThreadPoolTaskExecutor taskExecutor) {
258+
TaskExecutorManager taskExecutorManager(ThreadPoolTaskExecutor taskExecutor) {
259259
return new TaskExecutorManager(taskExecutor);
260260
}
261261
}
@@ -273,13 +273,13 @@ void destroy() {
273273
}
274274

275275
@Configuration
276-
public static class KafkaProducerConfig {
276+
static class KafkaProducerConfig {
277277

278278
@Autowired
279279
EmbeddedKafkaBroker broker;
280280

281281
@Bean
282-
public ProducerFactory<String, String> producerFactory() {
282+
ProducerFactory<String, String> producerFactory() {
283283
Map<String, Object> configProps = new HashMap<>();
284284
configProps.put(
285285
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
@@ -294,27 +294,27 @@ public ProducerFactory<String, String> producerFactory() {
294294
}
295295

296296
@Bean
297-
public KafkaTemplate<String, String> kafkaTemplate() {
297+
KafkaTemplate<String, String> kafkaTemplate() {
298298
return new KafkaTemplate<>(producerFactory());
299299
}
300300
}
301301

302302
@EnableKafka
303303
@Configuration
304-
public static class KafkaConsumerConfig extends RetryTopicConfigurationSupport {
304+
static class KafkaConsumerConfig extends RetryTopicConfigurationSupport {
305305

306306
@Autowired
307307
EmbeddedKafkaBroker broker;
308308

309309
@Bean
310-
public KafkaAdmin kafkaAdmin() {
310+
KafkaAdmin kafkaAdmin() {
311311
Map<String, Object> configs = new HashMap<>();
312312
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString());
313313
return new KafkaAdmin(configs);
314314
}
315315

316316
@Bean
317-
public ConsumerFactory<String, String> consumerFactory() {
317+
ConsumerFactory<String, String> consumerFactory() {
318318
Map<String, Object> props = new HashMap<>();
319319
props.put(
320320
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
@@ -336,7 +336,7 @@ public ConsumerFactory<String, String> consumerFactory() {
336336
}
337337

338338
@Bean
339-
public ConcurrentKafkaListenerContainerFactory<String, String> retryTopicListenerContainerFactory(
339+
ConcurrentKafkaListenerContainerFactory<String, String> retryTopicListenerContainerFactory(
340340
ConsumerFactory<String, String> consumerFactory) {
341341

342342
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
@@ -352,7 +352,7 @@ public ConcurrentKafkaListenerContainerFactory<String, String> retryTopicListene
352352
}
353353

354354
@Bean
355-
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
355+
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
356356
ConsumerFactory<String, String> consumerFactory) {
357357

358358
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelExceptionRoutingIntegrationTests.java

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@
7272
@SpringJUnitConfig
7373
@DirtiesContext
7474
@EmbeddedKafka
75-
public class RetryTopicClassLevelExceptionRoutingIntegrationTests {
75+
class RetryTopicClassLevelExceptionRoutingIntegrationTests {
7676

77-
public final static String BLOCKING_AND_TOPIC_RETRY = "blocking-and-topic-retry";
78-
public final static String ONLY_RETRY_VIA_BLOCKING = "only-retry-blocking-topic";
79-
public final static String ONLY_RETRY_VIA_TOPIC = "only-retry-topic";
80-
public final static String USER_FATAL_EXCEPTION_TOPIC = "user-fatal-topic";
81-
public final static String FRAMEWORK_FATAL_EXCEPTION_TOPIC = "framework-fatal-topic";
77+
final static String BLOCKING_AND_TOPIC_RETRY = "blocking-and-topic-retry";
78+
final static String ONLY_RETRY_VIA_BLOCKING = "only-retry-blocking-topic";
79+
final static String ONLY_RETRY_VIA_TOPIC = "only-retry-topic";
80+
final static String USER_FATAL_EXCEPTION_TOPIC = "user-fatal-topic";
81+
final static String FRAMEWORK_FATAL_EXCEPTION_TOPIC = "framework-fatal-topic";
8282

8383
@Autowired
8484
private KafkaTemplate<String, String> kafkaTemplate;
@@ -108,14 +108,14 @@ void shouldRetryOnlyViaTopic() {
108108
}
109109

110110
@Test
111-
public void shouldGoStraightToDltIfUserProvidedFatal() {
111+
void shouldGoStraightToDltIfUserProvidedFatal() {
112112
kafkaTemplate.send(USER_FATAL_EXCEPTION_TOPIC, "Test message to " + USER_FATAL_EXCEPTION_TOPIC);
113113
assertThat(awaitLatch(latchContainer.fatalUserLatch)).isTrue();
114114
assertThat(awaitLatch(latchContainer.annotatedDltUserFatalLatch)).isTrue();
115115
}
116116

117117
@Test
118-
public void shouldGoStraightToDltIfFrameworkProvidedFatal() {
118+
void shouldGoStraightToDltIfFrameworkProvidedFatal() {
119119
kafkaTemplate.send(FRAMEWORK_FATAL_EXCEPTION_TOPIC, "Testing topic with annotation 1");
120120
assertThat(awaitLatch(latchContainer.fatalFrameworkLatch)).isTrue();
121121
assertThat(awaitLatch(latchContainer.annotatedDltFrameworkFatalLatch)).isTrue();
@@ -275,26 +275,26 @@ static class CountDownLatchContainer {
275275

276276
}
277277

278-
public static class ShouldRetryOnlyByTopicException extends RuntimeException {
279-
public ShouldRetryOnlyByTopicException(String msg) {
278+
static class ShouldRetryOnlyByTopicException extends RuntimeException {
279+
ShouldRetryOnlyByTopicException(String msg) {
280280
super(msg);
281281
}
282282
}
283283

284-
public static class ShouldSkipBothRetriesException extends RuntimeException {
285-
public ShouldSkipBothRetriesException(String msg) {
284+
static class ShouldSkipBothRetriesException extends RuntimeException {
285+
ShouldSkipBothRetriesException(String msg) {
286286
super(msg);
287287
}
288288
}
289289

290-
public static class ShouldRetryOnlyBlockingException extends RuntimeException {
291-
public ShouldRetryOnlyBlockingException(String msg) {
290+
static class ShouldRetryOnlyBlockingException extends RuntimeException {
291+
ShouldRetryOnlyBlockingException(String msg) {
292292
super(msg);
293293
}
294294
}
295295

296-
public static class ShouldRetryViaBothException extends RuntimeException {
297-
public ShouldRetryViaBothException(String msg) {
296+
static class ShouldRetryViaBothException extends RuntimeException {
297+
ShouldRetryViaBothException(String msg) {
298298
super(msg);
299299
}
300300
}
@@ -305,7 +305,7 @@ static class RetryTopicConfigurations {
305305
private static final String DLT_METHOD_NAME = "processDltMessage";
306306

307307
@Bean
308-
public RetryTopicConfiguration blockingAndTopic(KafkaTemplate<String, String> template) {
308+
RetryTopicConfiguration blockingAndTopic(KafkaTemplate<String, String> template) {
309309
return RetryTopicConfigurationBuilder
310310
.newInstance()
311311
.fixedBackOff(50)
@@ -315,7 +315,7 @@ public RetryTopicConfiguration blockingAndTopic(KafkaTemplate<String, String> te
315315
}
316316

317317
@Bean
318-
public RetryTopicConfiguration onlyTopic(KafkaTemplate<String, String> template) {
318+
RetryTopicConfiguration onlyTopic(KafkaTemplate<String, String> template) {
319319
return RetryTopicConfigurationBuilder
320320
.newInstance()
321321
.fixedBackOff(50)
@@ -327,27 +327,27 @@ public RetryTopicConfiguration onlyTopic(KafkaTemplate<String, String> template)
327327
}
328328

329329
@Bean
330-
public BlockingAndTopicRetriesListener blockingAndTopicRetriesListener() {
330+
BlockingAndTopicRetriesListener blockingAndTopicRetriesListener() {
331331
return new BlockingAndTopicRetriesListener();
332332
}
333333

334334
@Bean
335-
public OnlyRetryViaTopicListener onlyRetryViaTopicListener() {
335+
OnlyRetryViaTopicListener onlyRetryViaTopicListener() {
336336
return new OnlyRetryViaTopicListener();
337337
}
338338

339339
@Bean
340-
public UserFatalTopicListener userFatalTopicListener() {
340+
UserFatalTopicListener userFatalTopicListener() {
341341
return new UserFatalTopicListener();
342342
}
343343

344344
@Bean
345-
public OnlyRetryBlockingListener onlyRetryBlockingListener() {
345+
OnlyRetryBlockingListener onlyRetryBlockingListener() {
346346
return new OnlyRetryBlockingListener();
347347
}
348348

349349
@Bean
350-
public FrameworkFatalTopicListener frameworkFatalTopicListener() {
350+
FrameworkFatalTopicListener frameworkFatalTopicListener() {
351351
return new FrameworkFatalTopicListener();
352352
}
353353

@@ -374,7 +374,7 @@ TaskScheduler sched() {
374374
}
375375

376376
@Configuration
377-
public static class RoutingTestsConfigurationSupport extends RetryTopicConfigurationSupport {
377+
static class RoutingTestsConfigurationSupport extends RetryTopicConfigurationSupport {
378378

379379
@Override
380380
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
@@ -390,13 +390,13 @@ protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>>
390390
}
391391

392392
@Configuration
393-
public static class KafkaProducerConfig {
393+
static class KafkaProducerConfig {
394394

395395
@Autowired
396396
EmbeddedKafkaBroker broker;
397397

398398
@Bean
399-
public ProducerFactory<String, String> producerFactory() {
399+
ProducerFactory<String, String> producerFactory() {
400400
Map<String, Object> configProps = new HashMap<>();
401401
configProps.put(
402402
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
@@ -411,27 +411,27 @@ public ProducerFactory<String, String> producerFactory() {
411411
}
412412

413413
@Bean
414-
public KafkaTemplate<String, String> kafkaTemplate() {
414+
KafkaTemplate<String, String> kafkaTemplate() {
415415
return new KafkaTemplate<>(producerFactory());
416416
}
417417
}
418418

419419
@EnableKafka
420420
@Configuration
421-
public static class KafkaConsumerConfig {
421+
static class KafkaConsumerConfig {
422422

423423
@Autowired
424424
EmbeddedKafkaBroker broker;
425425

426426
@Bean
427-
public KafkaAdmin kafkaAdmin() {
427+
KafkaAdmin kafkaAdmin() {
428428
Map<String, Object> configs = new HashMap<>();
429429
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString());
430430
return new KafkaAdmin(configs);
431431
}
432432

433433
@Bean
434-
public ConsumerFactory<String, String> consumerFactory() {
434+
ConsumerFactory<String, String> consumerFactory() {
435435
Map<String, Object> props = new HashMap<>();
436436
props.put(
437437
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
@@ -453,7 +453,7 @@ public ConsumerFactory<String, String> consumerFactory() {
453453
}
454454

455455
@Bean
456-
public ConcurrentKafkaListenerContainerFactory<String, String> retryTopicListenerContainerFactory(
456+
ConcurrentKafkaListenerContainerFactory<String, String> retryTopicListenerContainerFactory(
457457
ConsumerFactory<String, String> consumerFactory) {
458458

459459
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
@@ -469,7 +469,7 @@ public ConcurrentKafkaListenerContainerFactory<String, String> retryTopicListene
469469
}
470470

471471
@Bean
472-
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
472+
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
473473
ConsumerFactory<String, String> consumerFactory) {
474474

475475
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

0 commit comments

Comments
 (0)