From cc49f7e3e4f6a5fa41fe443d094f63a274501906 Mon Sep 17 00:00:00 2001 From: tomazfernandes Date: Sun, 20 Feb 2022 13:41:48 -0300 Subject: [PATCH] GH-2116: Add blocking retries to RT Before we hardcoded a no-ops back off in the DefaultErrorHandler used in the Retryable Topics feature. Adds a setter to let the user provide their own back off policy and configure blocking retries in conjunction with RT. --- .../src/main/asciidoc/retrytopic.adoc | 32 +++++++++++++++++++ .../ListenerContainerFactoryConfigurer.java | 21 ++++++++++-- ...stenerContainerFactoryConfigurerTests.java | 31 ++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 65fda4ddab..0536e25887 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -274,6 +274,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate templa NOTE: The default is having no timeout set, which can also be achieved by providing -1 as the timout value. +[[retry-topic-exception-classifier]] ===== Exception Classifier You can specify which exceptions you want to retry on and which not to. @@ -306,6 +307,37 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate t NOTE: The default behavior is retrying on all exceptions and not traversing causes. +[[retry-topic-combine-blocking]] +===== Combine blocking and non-blocking retries + +Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction. +For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. + +You can configure the blocking retries as follows: + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3)); + lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler) + .addNotRetryableExceptions(MyFatalException.class); + return lcfc; +} +---- +==== + +NOTE: If you set a blocking retry back off, the default is to retry on all exceptions except the fatal ones in <>. +You can add or remove exceptions using the `addNotRetryableException` and `removeNotRetryableException` methods in the `ListenerContainerFactoryConfigurer`. + +NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. + + ===== Include and Exclude Topics You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection topics) .excludeTopic(String topic) and .excludeTopics(Collection topics) methods. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index a140af0266..e7744ebf0d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -43,6 +43,7 @@ import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.util.Assert; +import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; /** @@ -82,6 +83,10 @@ public class ListenerContainerFactoryConfigurer { private static final long LOWEST_BACKOFF_THRESHOLD = 1500L; + private static final BackOff DEFAULT_BLOCKING_BACKOFF = new FixedBackOff(0, 0); + + private BackOff blockingBackOff = DEFAULT_BLOCKING_BACKOFF; + private Consumer> containerCustomizer = container -> { }; @@ -159,6 +164,19 @@ public KafkaListenerContainerFactory decorateFactoryWithoutBackOffValues( return new RetryTopicListenerContainerFactoryDecorator(factory, Collections.emptyList()); } + /** + * Set a {@link BackOff} to be used by blocking retries. + * You can add and remove exceptions to be retried this way using this class' + * superclass + * {@link org.springframework.kafka.listener.ExceptionClassifier#addNotRetryableExceptions(Class[])} + * and + * {@link org.springframework.kafka.listener.ExceptionClassifier#removeNotRetryableException(Class)}} + * @param blockingBackOff the BackOff policy to be used + */ + public void setBlockingRetriesBackOff(BackOff blockingBackOff) { + this.blockingBackOff = blockingBackOff; + } + private ConcurrentKafkaListenerContainerFactory doConfigure( ConcurrentKafkaListenerContainerFactory containerFactory, List backOffValues) { @@ -192,8 +210,7 @@ public void setErrorHandlerCustomizer(Consumer errorHandlerC } private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { - DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, - new FixedBackOff(0, 0)); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, this.blockingBackOff); errorHandler.setCommitRecovered(true); errorHandler.setLogLevel(KafkaException.Level.DEBUG); this.errorHandlerCustomizer.accept(errorHandler); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java index e5a6606dad..3df47db9f1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java @@ -23,6 +23,7 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -59,6 +60,8 @@ import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; /** * @author Tomaz Fernandes @@ -408,6 +411,34 @@ void shouldDecorateFactory() { then(this.configurerContainerCustomizer).should(times(1)).accept(container); } + @Test + void shouldUseGivenBackOff() { + + // given + given(container.getContainerProperties()).willReturn(containerProperties); + given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer); + given(containerProperties.getMessageListener()).willReturn(listener); + given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration); + willReturn(container).given(containerFactory).createListenerContainer(endpoint); + BackOff backOffMock = mock(BackOff.class); + BackOffExecution backOffExecutionMock = mock(BackOffExecution.class); + given(backOffMock.start()).willReturn(backOffExecutionMock); + + ListenerContainerFactoryConfigurer configurer = + new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, + deadLetterPublishingRecovererFactory, clock); + + configurer.setBlockingRetriesBackOff(backOffMock); + + // when + KafkaListenerContainerFactory decoratedFactory = + configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer()); + decoratedFactory.createListenerContainer(endpoint); + + // then + then(backOffMock).should().start(); + } + @Test void shouldCacheFactoryInstances() {