From 799894429f674cb41b7d1dba4ac7334b51528c20 Mon Sep 17 00:00:00 2001 From: tomazfernandes Date: Sun, 20 Feb 2022 13:41:48 -0300 Subject: [PATCH 1/5] GH-2116: Add blocking retries to RT Before we hardcoded a no-ops back off in the DefaultErrorHandler used in the Retryable Topics feature. Adds a setter to let the user provide their own back off policy and configure blocking retries in conjunction with RT. --- .../src/main/asciidoc/retrytopic.adoc | 31 +++++++++++++++++++ ...stenerContainerFactoryConfigurerTests.java | 31 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 031f06c700..e4627216b5 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -329,6 +329,37 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`. +[[retry-topic-combine-blocking]] +===== Combine blocking and non-blocking retries + +Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction. +For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. + +You can configure the blocking retries as follows: + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3)); + lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler) + .addNotRetryableExceptions(MyFatalException.class); + return lcfc; +} +---- +==== + +NOTE: If you set a blocking retry back off, the default is to retry on all exceptions except the fatal ones in <>. +You can add or remove exceptions using the `addNotRetryableException` and `removeNotRetryableException` methods in the `ListenerContainerFactoryConfigurer`. + +NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. + + ===== Include and Exclude Topics You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection topics) .excludeTopic(String topic) and .excludeTopics(Collection topics) methods. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java index e5a6606dad..3df47db9f1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java @@ -23,6 +23,7 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -59,6 +60,8 @@ import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; /** * @author Tomaz Fernandes @@ -408,6 +411,34 @@ void shouldDecorateFactory() { then(this.configurerContainerCustomizer).should(times(1)).accept(container); } + @Test + void shouldUseGivenBackOff() { + + // given + given(container.getContainerProperties()).willReturn(containerProperties); + given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer); + given(containerProperties.getMessageListener()).willReturn(listener); + given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration); + willReturn(container).given(containerFactory).createListenerContainer(endpoint); + BackOff backOffMock = mock(BackOff.class); + BackOffExecution backOffExecutionMock = mock(BackOffExecution.class); + given(backOffMock.start()).willReturn(backOffExecutionMock); + + ListenerContainerFactoryConfigurer configurer = + new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, + deadLetterPublishingRecovererFactory, clock); + + configurer.setBlockingRetriesBackOff(backOffMock); + + // when + KafkaListenerContainerFactory decoratedFactory = + configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer()); + decoratedFactory.createListenerContainer(endpoint); + + // then + then(backOffMock).should().start(); + } + @Test void shouldCacheFactoryInstances() { From 85cab3f2d34e5b92b789a5e657d8178cd77b17e1 Mon Sep 17 00:00:00 2001 From: tomazfernandes Date: Wed, 23 Feb 2022 14:32:54 -0300 Subject: [PATCH 2/5] Change DHE in LCFC to defaultFalse With this we no longer need a no ops back off. Some minor adjustments were needed to maintain behavior when the logic gets to DLPR. --- .../src/main/asciidoc/retrytopic.adoc | 30 ++ .../kafka/listener/DefaultErrorHandler.java | 2 +- .../kafka/listener/FailedRecordProcessor.java | 17 +- .../kafka/listener/FailedRecordTracker.java | 4 +- .../ListenerContainerFactoryConfigurer.java | 28 +- ...TopicExceptionRoutingIntegrationTests.java | 500 ++++++++++++++++++ .../src/test/resources/log4j2-test.xml | 1 + 7 files changed, 575 insertions(+), 7 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index e4627216b5..10551080d0 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -360,6 +360,36 @@ You can add or remove exceptions using the `addNotRetryableException` and `remov NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. +[[retry-topic-combine-blocking]] +===== Combine blocking and non-blocking retries + +Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction. +For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. + +To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows. +The default policy is FixedBackOff, with ten retries and no delay between them. +Optionally, you can also set a different back off policy. + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler) + .addRetryableExceptions(MyBlockingRetryException.class); + lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional + return lcfc; +} +---- +==== + +NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. + + ===== Include and Exclude Topics You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection topics) .excludeTopic(String topic) and .excludeTopics(Collection topics) methods. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java index ba8e3a6224..02e4dd7285 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java @@ -131,7 +131,7 @@ public void handleRemaining(Exception thrownException, List Consumer consumer, MessageListenerContainer container) { SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR - getRecoveryStrategy(records, thrownException), this.logger, getLogLevel()); + getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel()); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index 276e1c649b..4f3db37ab7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -22,6 +22,7 @@ import java.util.function.BiPredicate; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.core.log.LogAccessor; @@ -126,12 +127,26 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) { * @since 2.7 */ protected RecoveryStrategy getRecoveryStrategy(List> records, Exception thrownException) { + return getRecoveryStrategy(records, null, thrownException); + } + + /** + * Return a {@link RecoveryStrategy} to call to determine whether the first record in the + * list should be skipped. + * @param records the records. + * @param recoveryConsumer the consumer. + * @param thrownException the exception. + * @return the {@link RecoveryStrategy}. + * @since 2.8.4 + */ + protected RecoveryStrategy getRecoveryStrategy(List> records, + @Nullable Consumer recoveryConsumer, Exception thrownException) { if (getClassifier().classify(thrownException)) { return this.failureTracker::recovered; } else { try { - this.failureTracker.getRecoverer().accept(records.get(0), thrownException); + this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException); this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException)); } catch (Exception ex) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index 4ef6e55488..0aaa60c2d5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -240,7 +240,7 @@ void clearThreadState() { this.failures.remove(); } - BiConsumer, Exception> getRecoverer() { + ConsumerAwareRecordRecoverer getRecoverer() { return this.recoverer; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index 495b0bed12..8902c9a887 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -42,7 +42,7 @@ import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.util.Assert; -import org.springframework.util.backoff.FixedBackOff; +import org.springframework.util.backoff.BackOff; /** * @@ -81,6 +81,8 @@ public class ListenerContainerFactoryConfigurer { private static final long LOWEST_BACKOFF_THRESHOLD = 1500L; + private BackOff providedBlockingBackOff = null; + private Consumer> containerCustomizer = container -> { }; @@ -158,6 +160,20 @@ public KafkaListenerContainerFactory decorateFactoryWithoutSettingContainerPr return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false); } + /** + * Set a {@link BackOff} to be used with blocking retries. + * You can specify the exceptions to be retried using the method + * {@link org.springframework.kafka.listener.ExceptionClassifier#addRetryableExceptions(Class[])} + * By default, no exceptions are retried via blocking. + * @param blockingBackOff the BackOff policy to be used by blocking retries. + * @since 2.8.4 + * @see DefaultErrorHandler + */ + public void setBlockingRetriesBackOff(BackOff blockingBackOff) { + Assert.notNull(blockingBackOff, "The provided BackOff cannot be null"); + this.providedBlockingBackOff = blockingBackOff; + } + private ConcurrentKafkaListenerContainerFactory doConfigure( ConcurrentKafkaListenerContainerFactory containerFactory, Configuration configuration, boolean isSetContainerProperties) { @@ -193,14 +209,20 @@ public void setErrorHandlerCustomizer(Consumer errorHandlerC protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer, Configuration configuration) { - DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, - new FixedBackOff(0, 0)); + DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer); + errorHandler.defaultFalse(); errorHandler.setCommitRecovered(true); errorHandler.setLogLevel(KafkaException.Level.DEBUG); this.errorHandlerCustomizer.accept(errorHandler); return errorHandler; } + protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { + return this.providedBlockingBackOff != null + ? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff) + : new DefaultErrorHandler(deadLetterPublishingRecoverer); + } + protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer container, Configuration configuration, boolean isSetContainerProperties) { AcknowledgingConsumerAwareMessageListener listener = checkAndCast(container.getContainerProperties() diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java new file mode 100644 index 0000000000..7e890f4b11 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -0,0 +1,500 @@ +/* + * Copyright 2021-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.listener.KafkaConsumerBackoffManager; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.converter.ConversionException; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.retry.annotation.Backoff; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.backoff.FixedBackOff; + + +/** + * @author Tomaz Fernandes + * @since 2.8.4 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +public class RetryTopicExceptionRoutingIntegrationTests { + + private static final Logger logger = LoggerFactory.getLogger(RetryTopicExceptionRoutingIntegrationTests.class); + + public final static String BLOCKING_AND_TOPIC_RETRY = "blocking-and-topic-retry"; + public final static String ONLY_RETRY_VIA_BLOCKING = "only-retry-blocking-topic"; + public final static String ONLY_RETRY_VIA_TOPIC = "only-retry-topic"; + public final static String USER_FATAL_EXCEPTION_TOPIC = "user-fatal-topic"; + public final static String FRAMEWORK_FATAL_EXCEPTION_TOPIC = "framework-fatal-topic"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private CountDownLatchContainer latchContainer; + + @Test + void shouldRetryViaBlockingAndTopics() { + logger.debug("Sending message to topic " + BLOCKING_AND_TOPIC_RETRY); + kafkaTemplate.send(BLOCKING_AND_TOPIC_RETRY, "Test message to " + BLOCKING_AND_TOPIC_RETRY); + assertThat(awaitLatch(latchContainer.blockingAndTopicsLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.dltProcessorLatch)).isTrue(); + } + + @Test + void shouldRetryOnlyViaBlocking() { + logger.debug("Sending message to topic " + ONLY_RETRY_VIA_BLOCKING); + kafkaTemplate.send(ONLY_RETRY_VIA_BLOCKING, "Test message to "); + assertThat(awaitLatch(latchContainer.onlyRetryViaBlockingLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.annotatedDltOnlyBlockingLatch)).isTrue(); + } + + @Test + void shouldRetryOnlyViaTopic() { + logger.debug("Sending message to topic " + ONLY_RETRY_VIA_TOPIC); + kafkaTemplate.send(ONLY_RETRY_VIA_TOPIC, "Test message to " + ONLY_RETRY_VIA_TOPIC); + assertThat(awaitLatch(latchContainer.onlyRetryViaTopicLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.dltProcessorWithErrorLatch)).isTrue(); + } + + @Test + public void shouldGoStraightToDltIfUserProvidedFatal() { + logger.debug("Sending message to topic " + USER_FATAL_EXCEPTION_TOPIC); + kafkaTemplate.send(USER_FATAL_EXCEPTION_TOPIC, "Test message to " + USER_FATAL_EXCEPTION_TOPIC); + assertThat(awaitLatch(latchContainer.fatalUserLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.annotatedDltUserFatalLatch)).isTrue(); + } + + @Test + public void shouldGoStraightToDltIfFrameworkProvidedFatal() { + logger.debug("Sending message to topic " + FRAMEWORK_FATAL_EXCEPTION_TOPIC); + kafkaTemplate.send(FRAMEWORK_FATAL_EXCEPTION_TOPIC, "Testing topic with annotation 1"); + assertThat(awaitLatch(latchContainer.fatalFrameworkLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.annotatedDltFrameworkFatalLatch)).isTrue(); + } + + private static void countdownIfCorrectInvocations(AtomicInteger invocations, int expected, CountDownLatch latch) { + int actual = invocations.get(); + if (actual == expected) { + latch.countDown(); + } + else { + logger.error("Wrong number of Listener invocations: expected {} actual {}", expected, actual); + } + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(30, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } + + static class BlockingAndTopicRetriesListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaListener(id = "firstTopicId", topics = BLOCKING_AND_TOPIC_RETRY) + public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Message {} received in topic {}", message, receivedTopic); + container.blockingAndTopicsLatch.countDown(); + container.blockingAndTopicsListenerInvocations.incrementAndGet(); + throw new ShouldRetryViaBothException("Woooops... in topic " + receivedTopic); + } + } + + static class DltProcessor { + + @Autowired + CountDownLatchContainer container; + + public void processDltMessage(Object message) { + countdownIfCorrectInvocations(container.blockingAndTopicsListenerInvocations, 12, + container.dltProcessorLatch); + } + } + + static class OnlyRetryViaTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaListener(topics = ONLY_RETRY_VIA_TOPIC) + public void listenAgain(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Message {} received in topic {} ", message, receivedTopic); + container.onlyRetryViaTopicLatch.countDown(); + container.onlyRetryViaTopicListenerInvocations.incrementAndGet(); + throw new ShouldRetryOnlyByTopicException("Another woooops... " + receivedTopic); + } + } + + static class DltProcessorWithError { + + @Autowired + CountDownLatchContainer container; + + public void processDltMessage(Object message) { + countdownIfCorrectInvocations(container.onlyRetryViaTopicListenerInvocations, + 3, container.dltProcessorWithErrorLatch); + throw new RuntimeException("Dlt Error!"); + } + } + + static class OnlyRetryBlockingListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(exclude = ShouldRetryOnlyBlockingException.class, traversingCauses = "true", + backoff = @Backoff(50), kafkaTemplate = "kafkaTemplate") + @KafkaListener(topics = ONLY_RETRY_VIA_BLOCKING) + public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + container.onlyRetryViaBlockingLatch.countDown(); + container.onlyRetryViaBlockingListenerInvocations.incrementAndGet(); + logger.debug("Message {} received in topic {} ", message, receivedTopic); + throw new ShouldRetryOnlyBlockingException("User provided fatal exception!" + receivedTopic); + } + + @DltHandler + public void annotatedDltMethod(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Received message in Dlt method " + receivedTopic); + countdownIfCorrectInvocations(container.onlyRetryViaBlockingListenerInvocations, 4, + container.annotatedDltOnlyBlockingLatch); + } + } + + static class UserFatalTopicListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(backoff = @Backoff(50), kafkaTemplate = "kafkaTemplate") + @KafkaListener(topics = USER_FATAL_EXCEPTION_TOPIC) + public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + container.fatalUserLatch.countDown(); + container.userFatalListenerInvocations.incrementAndGet(); + logger.debug("Message {} received in topic {} ", message, receivedTopic); + throw new ShouldSkipBothRetriesException("User provided fatal exception!" + receivedTopic); + } + + @DltHandler + public void annotatedDltMethod(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Received message in Dlt method " + receivedTopic); + countdownIfCorrectInvocations(container.userFatalListenerInvocations, 1, + container.annotatedDltUserFatalLatch); + } + } + + static class FrameworkFatalTopicListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, backoff = @Backoff(50)) + @KafkaListener(topics = FRAMEWORK_FATAL_EXCEPTION_TOPIC) + public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + container.fatalFrameworkLatch.countDown(); + container.fatalFrameworkListenerInvocations.incrementAndGet(); + logger.debug("Message {} received in second annotated topic {} ", message, receivedTopic); + throw new ConversionException("Woooops... in topic " + receivedTopic, new RuntimeException("Test RTE")); + } + + @DltHandler + public void annotatedDltMethod(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Received message in annotated Dlt method!"); + countdownIfCorrectInvocations(container.fatalFrameworkListenerInvocations, 1, + container.annotatedDltFrameworkFatalLatch); + throw new ConversionException("Woooops... in topic " + receivedTopic, new RuntimeException("Test RTE")); + } + } + + static class CountDownLatchContainer { + + CountDownLatch blockingAndTopicsLatch = new CountDownLatch(12); + CountDownLatch onlyRetryViaBlockingLatch = new CountDownLatch(4); + CountDownLatch onlyRetryViaTopicLatch = new CountDownLatch(3); + CountDownLatch fatalUserLatch = new CountDownLatch(1); + CountDownLatch fatalFrameworkLatch = new CountDownLatch(1); + CountDownLatch annotatedDltOnlyBlockingLatch = new CountDownLatch(1); + CountDownLatch annotatedDltUserFatalLatch = new CountDownLatch(1); + CountDownLatch annotatedDltFrameworkFatalLatch = new CountDownLatch(1); + CountDownLatch dltProcessorLatch = new CountDownLatch(1); + CountDownLatch dltProcessorWithErrorLatch = new CountDownLatch(1); + + AtomicInteger blockingAndTopicsListenerInvocations = new AtomicInteger(); + AtomicInteger onlyRetryViaTopicListenerInvocations = new AtomicInteger(); + AtomicInteger onlyRetryViaBlockingListenerInvocations = new AtomicInteger(); + AtomicInteger userFatalListenerInvocations = new AtomicInteger(); + AtomicInteger fatalFrameworkListenerInvocations = new AtomicInteger(); + + } + + @SuppressWarnings("serial") + public static class ShouldRetryOnlyByTopicException extends RuntimeException { + public ShouldRetryOnlyByTopicException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + public static class ShouldSkipBothRetriesException extends RuntimeException { + public ShouldSkipBothRetriesException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + public static class ShouldRetryOnlyBlockingException extends RuntimeException { + public ShouldRetryOnlyBlockingException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + public static class ShouldRetryViaBothException extends RuntimeException { + public ShouldRetryViaBothException(String msg) { + super(msg); + } + } + + @Configuration + static class RetryTopicConfigurations { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + @Bean + public RetryTopicConfiguration blockingAndTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .includeTopic(BLOCKING_AND_TOPIC_RETRY) + .dltHandlerMethod("dltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + public RetryTopicConfiguration onlyTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .includeTopic(ONLY_RETRY_VIA_TOPIC) + .useSingleTopicForFixedDelays() + .doNotRetryOnDltFailure() + .dltHandlerMethod("dltProcessorWithError", DLT_METHOD_NAME) + .create(template); + } + + @Bean + public BlockingAndTopicRetriesListener blockingAndTopicRetriesListener() { + return new BlockingAndTopicRetriesListener(); + } + + @Bean + public OnlyRetryViaTopicListener onlyRetryViaTopicListener() { + return new OnlyRetryViaTopicListener(); + } + + @Bean + public UserFatalTopicListener userFatalTopicListener() { + return new UserFatalTopicListener(); + } + + @Bean + public OnlyRetryBlockingListener onlyRetryBlockingListener() { + return new OnlyRetryBlockingListener(); + } + + @Bean + public FrameworkFatalTopicListener frameworkFatalTopicListener() { + return new FrameworkFatalTopicListener(); + } + + @Bean + CountDownLatchContainer latchContainer() { + return new CountDownLatchContainer(); + } + + @Bean + DltProcessor dltProcessor() { + return new DltProcessor(); + } + + @Bean + DltProcessorWithError dltProcessorWithError() { + return new DltProcessorWithError(); + } + + @Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) + public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + + lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3)); + lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh) + .addRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)); + return lcfc; + } + + @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME) + public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext); + ddtr.addNotRetryableExceptions(ShouldSkipBothRetriesException.class); + return ddtr; + } + + } + + @Configuration + public static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.broker.getBrokersAsString()); + configProps.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + configProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } + + @EnableKafka + @Configuration + public static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); + return new KafkaAdmin(configs); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.broker.getBrokersAsString()); + props.put( + ConsumerConfig.GROUP_ID_CONFIG, + "groupId"); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory retryTopicListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ContainerProperties props = factory.getContainerProperties(); + props.setIdleEventInterval(100L); + props.setPollTimeout(50L); + props.setIdlePartitionEventInterval(100L); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer( + container -> container.getContainerProperties().setIdlePartitionEventInterval(100L)); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + return factory; + } + + } +} diff --git a/spring-kafka/src/test/resources/log4j2-test.xml b/spring-kafka/src/test/resources/log4j2-test.xml index 9f33afab77..01973cc185 100644 --- a/spring-kafka/src/test/resources/log4j2-test.xml +++ b/spring-kafka/src/test/resources/log4j2-test.xml @@ -8,6 +8,7 @@ + From a7e069b18364139fd8a57cf23c96bff1b4c9182d Mon Sep 17 00:00:00 2001 From: tomazfernandes Date: Wed, 23 Feb 2022 14:35:57 -0300 Subject: [PATCH 3/5] Change DHE in LCFC to defaultFalse With this we no longer need a no ops back off. Some minor adjustments were needed to maintain behavior when the logic gets to DLPR. --- .../src/main/asciidoc/retrytopic.adoc | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 10551080d0..8a0a415e93 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -328,42 +328,10 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`. - -[[retry-topic-combine-blocking]] -===== Combine blocking and non-blocking retries - -Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction. -For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. - -You can configure the blocking retries as follows: - -==== -[source, java] ----- -@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) -public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, - DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, - @Qualifier(RetryTopicInternalBeanNames - .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { - ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); - lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3)); - lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler) - .addNotRetryableExceptions(MyFatalException.class); - return lcfc; -} ----- -==== - -NOTE: If you set a blocking retry back off, the default is to retry on all exceptions except the fatal ones in <>. -You can add or remove exceptions using the `addNotRetryableException` and `removeNotRetryableException` methods in the `ListenerContainerFactoryConfigurer`. - -NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. - - [[retry-topic-combine-blocking]] ===== Combine blocking and non-blocking retries -Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction. +Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction. For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows. From 29ced81241f5ba686c92aaeff24e9664fc72d2f7 Mon Sep 17 00:00:00 2001 From: tomazfernandes Date: Wed, 23 Feb 2022 20:06:42 -0300 Subject: [PATCH 4/5] Improve API and docs Now retryable exceptions can be set directly in the lcfc class. Improved the docs on how to combine blocking and non-blocking behaviors. Added what's new entry for this feature. --- .../src/main/asciidoc/retrytopic.adoc | 104 +++++++++++++----- .../src/main/asciidoc/whats-new.adoc | 5 +- .../kafka/listener/ExceptionClassifier.java | 4 +- .../ListenerContainerFactoryConfigurer.java | 25 ++++- ...stenerContainerFactoryConfigurerTests.java | 14 ++- ...TopicExceptionRoutingIntegrationTests.java | 4 +- 6 files changed, 116 insertions(+), 40 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 8a0a415e93..9b3e06c29c 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -328,35 +328,6 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`. -[[retry-topic-combine-blocking]] -===== Combine blocking and non-blocking retries - -Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction. -For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. - -To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows. -The default policy is FixedBackOff, with ten retries and no delay between them. -Optionally, you can also set a different back off policy. - -==== -[source, java] ----- -@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) -public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, - DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, - @Qualifier(RetryTopicInternalBeanNames - .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { - ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); - lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler) - .addRetryableExceptions(MyBlockingRetryException.class); - lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional - return lcfc; -} ----- -==== - -NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. - ===== Include and Exclude Topics @@ -459,6 +430,80 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) ---- ==== +[[retry-topic-combine-blocking]] +==== Combining blocking and non-blocking retries + +Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction. +For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. + +To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows. +The default policy is `FixedBackOff`, with nine retries and no delay between them. +Optionally, you can provide your own back off policy. + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class); + lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional + return lcfc; +} +---- +==== + +If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as: + +==== +[source, java] +---- +lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue)); +---- +==== + +NOTE: In combination with the global retryable topic's fatal exceptions classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. + +Here's an example with both configurations working together: + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class); + return lcfc; +} + +@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME) +public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext); + ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class); + return ddtr; +} + +---- +==== + +In this example: + +* `ShouldRetryOnlyBlockingException.class` would retry only via blocking and, if all retries fail, would go straight to the DLT. +* `ShouldRetryViaBothException.class` would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts. +* `ShouldSkipBothRetriesException.class` would never be retried in any way and would go straight to the DLT if the first processing attempt failed. + +IMPORTANT: Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don't want to do non-blocking retries, but to send directly to the DLT instead. + +IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration. + ==== Topic Naming Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic. @@ -775,6 +820,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti return retryTopicConfigurer; } ---- +==== [[change-kboe-logging-level]] ==== Changing KafkaBackOffException Logging Level diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index cc5efc7b79..988d72e6ba 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -88,5 +88,8 @@ See <> for more information. There's now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT. Refer to <> to see how to manage it. +You can now use blocking and non-blocking retries in conjunction. +See <> for more information. + The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. -See <> if you need to change the logging level back to WARN or set it to any other level. +See <> if you need to change the logging level back to WARN or set it to any other level. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java index 4b5626794a..281bc7fc6a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java @@ -22,6 +22,7 @@ import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.lang.Nullable; import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException; import org.springframework.util.Assert; @@ -185,7 +186,8 @@ public boolean removeNotRetryableException(Class exceptionT * @see #addNotRetryableExceptions(Class...) * @see #setClassifications(Map, boolean) */ - public boolean removeClassification(Class exceptionType) { + @Nullable + public Boolean removeClassification(Class exceptionType) { return this.classifier.getClassified().remove(exceptionType); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index 8902c9a887..ba3816d52a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -83,6 +83,8 @@ public class ListenerContainerFactoryConfigurer { private BackOff providedBlockingBackOff = null; + private Class[] blockingExceptionTypes = null; + private Consumer> containerCustomizer = container -> { }; @@ -162,9 +164,9 @@ public KafkaListenerContainerFactory decorateFactoryWithoutSettingContainerPr /** * Set a {@link BackOff} to be used with blocking retries. - * You can specify the exceptions to be retried using the method - * {@link org.springframework.kafka.listener.ExceptionClassifier#addRetryableExceptions(Class[])} - * By default, no exceptions are retried via blocking. + * If the BackOff execution returns STOP, the record will be forwarded + * to the next retry topic or to the DLT, depending on how the non-blocking retries + * are configured. * @param blockingBackOff the BackOff policy to be used by blocking retries. * @since 2.8.4 * @see DefaultErrorHandler @@ -174,6 +176,20 @@ public void setBlockingRetriesBackOff(BackOff blockingBackOff) { this.providedBlockingBackOff = blockingBackOff; } + /** + * Specify the exceptions to be retried via blocking. + * @param exceptionTypes the exceptions that should be retried. + * @since 2.8.4 + * @see DefaultErrorHandler + */ + @SafeVarargs + @SuppressWarnings("varargs") + public final void setBlockingRetryableExceptions(Class... exceptionTypes) { + Assert.notNull(exceptionTypes, "The exception types cannot be null"); + Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements"); + this.blockingExceptionTypes = exceptionTypes; + } + private ConcurrentKafkaListenerContainerFactory doConfigure( ConcurrentKafkaListenerContainerFactory containerFactory, Configuration configuration, boolean isSetContainerProperties) { @@ -213,6 +229,9 @@ protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer de errorHandler.defaultFalse(); errorHandler.setCommitRecovered(true); errorHandler.setLogLevel(KafkaException.Level.DEBUG); + if (this.blockingExceptionTypes != null) { + errorHandler.addRetryableExceptions(this.blockingExceptionTypes); + } this.errorHandlerCustomizer.accept(errorHandler); return errorHandler; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java index 3df47db9f1..750844c339 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java @@ -60,6 +60,7 @@ import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.converter.ConversionException; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.BackOffExecution; @@ -407,12 +408,11 @@ void shouldDecorateFactory() { .createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer)); assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId); then(listener).should(times(1)).onMessage(data, ack, consumer); - then(this.configurerContainerCustomizer).should(times(1)).accept(container); } @Test - void shouldUseGivenBackOff() { + void shouldUseGivenBackOffAndExceptions() { // given given(container.getContainerProperties()).willReturn(containerProperties); @@ -427,8 +427,8 @@ void shouldUseGivenBackOff() { ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); - configurer.setBlockingRetriesBackOff(backOffMock); + configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class); // when KafkaListenerContainerFactory decoratedFactory = @@ -437,6 +437,14 @@ void shouldUseGivenBackOff() { // then then(backOffMock).should().start(); + then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture()); + CommonErrorHandler errorHandler = errorHandlerCaptor.getValue(); + assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue(); + DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler; + assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue(); + assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue(); + assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isNull(); + } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java index 7e890f4b11..0c190b71b9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -52,7 +52,6 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.ConversionException; @@ -391,8 +390,7 @@ public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafka ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3)); - lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh) - .addRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)); + lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class); return lcfc; } From ce9656b27fd64cbfb71dcedacb2d68146b3a74aa Mon Sep 17 00:00:00 2001 From: Tomaz Fernandes Date: Thu, 24 Feb 2022 12:45:06 -0300 Subject: [PATCH 5/5] Improve ExceptionClassifier JavaDoc Also add assertions to the LCFC new methods to warn the user if they already set the blocking configurations. --- .../kafka/listener/ExceptionClassifier.java | 3 ++- .../ListenerContainerFactoryConfigurer.java | 9 ++++++++ ...stenerContainerFactoryConfigurerTests.java | 23 ++++++++++++++++++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java index 281bc7fc6a..723b4949bf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java @@ -181,7 +181,8 @@ public boolean removeNotRetryableException(Class exceptionT * * All others will be retried, unless {@link #defaultFalse()} has been called. * @param exceptionType the exception type. - * @return true if the removal was successful. + * @return the classification of the exception if removal was successful; + * null otherwise. * @since 2.8.4 * @see #addNotRetryableExceptions(Class...) * @see #setClassifications(Map, boolean) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index ba3816d52a..b1793b2492 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -17,6 +17,7 @@ package org.springframework.kafka.retrytopic; import java.time.Clock; +import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -173,6 +174,10 @@ public KafkaListenerContainerFactory decorateFactoryWithoutSettingContainerPr */ public void setBlockingRetriesBackOff(BackOff blockingBackOff) { Assert.notNull(blockingBackOff, "The provided BackOff cannot be null"); + Assert.state(this.providedBlockingBackOff == null, () -> + "Blocking retries back off has already been set. Current: " + + this.providedBlockingBackOff + + " You provided: " + blockingBackOff); this.providedBlockingBackOff = blockingBackOff; } @@ -187,6 +192,10 @@ public void setBlockingRetriesBackOff(BackOff blockingBackOff) { public final void setBlockingRetryableExceptions(Class... exceptionTypes) { Assert.notNull(exceptionTypes, "The exception types cannot be null"); Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements"); + Assert.state(this.blockingExceptionTypes == null, + () -> "Blocking retryable exceptions have already been set." + + "Current ones: " + Arrays.toString(this.blockingExceptionTypes) + + " You provided: " + Arrays.toString(exceptionTypes)); this.blockingExceptionTypes = exceptionTypes; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java index 750844c339..48f24c19a2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.retrytopic; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -61,15 +62,17 @@ import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.converter.ConversionException; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.BackOffExecution; +import org.springframework.util.backoff.FixedBackOff; /** * @author Tomaz Fernandes * @since 2.7 */ @ExtendWith(MockitoExtension.class) -@SuppressWarnings({"unchecked", "rawtypes"}) +@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) class ListenerContainerFactoryConfigurerTests { @Mock @@ -447,6 +450,24 @@ void shouldUseGivenBackOffAndExceptions() { } + + @Test + void shouldThrowIfBackOffOrRetryablesAlreadySet() { + // given + BackOff backOff = new FixedBackOff(); + ListenerContainerFactoryConfigurer configurer = + new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, + deadLetterPublishingRecovererFactory, clock); + configurer.setBlockingRetriesBackOff(backOff); + configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class); + + // when / then + assertThatThrownBy(() -> configurer.setBlockingRetriesBackOff(backOff)).isInstanceOf(IllegalStateException.class); + assertThatThrownBy(() -> configurer.setBlockingRetryableExceptions(ConversionException.class, DeserializationException.class)) + .isInstanceOf(IllegalStateException.class); + } + + @Test void shouldCacheFactoryInstances() {