diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 8a0a415e93..9b3e06c29c 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -328,35 +328,6 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`. -[[retry-topic-combine-blocking]] -===== Combine blocking and non-blocking retries - -Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction. -For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. - -To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows. -The default policy is FixedBackOff, with ten retries and no delay between them. -Optionally, you can also set a different back off policy. - -==== -[source, java] ----- -@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) -public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, - DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, - @Qualifier(RetryTopicInternalBeanNames - .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { - ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); - lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler) - .addRetryableExceptions(MyBlockingRetryException.class); - lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional - return lcfc; -} ----- -==== - -NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. - ===== Include and Exclude Topics @@ -459,6 +430,80 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) ---- ==== +[[retry-topic-combine-blocking]] +==== Combining blocking and non-blocking retries + +Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction. +For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT. + +To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows. +The default policy is `FixedBackOff`, with nine retries and no delay between them. +Optionally, you can provide your own back off policy. + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class); + lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional + return lcfc; +} +---- +==== + +If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as: + +==== +[source, java] +---- +lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue)); +---- +==== + +NOTE: In combination with the global retryable topic's fatal exceptions classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind. + +Here's an example with both configurations working together: + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class); + return lcfc; +} + +@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME) +public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext); + ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class); + return ddtr; +} + +---- +==== + +In this example: + +* `ShouldRetryOnlyBlockingException.class` would retry only via blocking and, if all retries fail, would go straight to the DLT. +* `ShouldRetryViaBothException.class` would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts. +* `ShouldSkipBothRetriesException.class` would never be retried in any way and would go straight to the DLT if the first processing attempt failed. + +IMPORTANT: Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don't want to do non-blocking retries, but to send directly to the DLT instead. + +IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration. + ==== Topic Naming Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic. @@ -775,6 +820,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti return retryTopicConfigurer; } ---- +==== [[change-kboe-logging-level]] ==== Changing KafkaBackOffException Logging Level diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index cc5efc7b79..988d72e6ba 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -88,5 +88,8 @@ See <> for more information. There's now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT. Refer to <> to see how to manage it. +You can now use blocking and non-blocking retries in conjunction. +See <> for more information. + The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. -See <> if you need to change the logging level back to WARN or set it to any other level. +See <> if you need to change the logging level back to WARN or set it to any other level. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java index 4b5626794a..281bc7fc6a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java @@ -22,6 +22,7 @@ import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.lang.Nullable; import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException; import org.springframework.util.Assert; @@ -185,7 +186,8 @@ public boolean removeNotRetryableException(Class exceptionT * @see #addNotRetryableExceptions(Class...) * @see #setClassifications(Map, boolean) */ - public boolean removeClassification(Class exceptionType) { + @Nullable + public Boolean removeClassification(Class exceptionType) { return this.classifier.getClassified().remove(exceptionType); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index 8902c9a887..ba3816d52a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -83,6 +83,8 @@ public class ListenerContainerFactoryConfigurer { private BackOff providedBlockingBackOff = null; + private Class[] blockingExceptionTypes = null; + private Consumer> containerCustomizer = container -> { }; @@ -162,9 +164,9 @@ public KafkaListenerContainerFactory decorateFactoryWithoutSettingContainerPr /** * Set a {@link BackOff} to be used with blocking retries. - * You can specify the exceptions to be retried using the method - * {@link org.springframework.kafka.listener.ExceptionClassifier#addRetryableExceptions(Class[])} - * By default, no exceptions are retried via blocking. + * If the BackOff execution returns STOP, the record will be forwarded + * to the next retry topic or to the DLT, depending on how the non-blocking retries + * are configured. * @param blockingBackOff the BackOff policy to be used by blocking retries. * @since 2.8.4 * @see DefaultErrorHandler @@ -174,6 +176,20 @@ public void setBlockingRetriesBackOff(BackOff blockingBackOff) { this.providedBlockingBackOff = blockingBackOff; } + /** + * Specify the exceptions to be retried via blocking. + * @param exceptionTypes the exceptions that should be retried. + * @since 2.8.4 + * @see DefaultErrorHandler + */ + @SafeVarargs + @SuppressWarnings("varargs") + public final void setBlockingRetryableExceptions(Class... exceptionTypes) { + Assert.notNull(exceptionTypes, "The exception types cannot be null"); + Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements"); + this.blockingExceptionTypes = exceptionTypes; + } + private ConcurrentKafkaListenerContainerFactory doConfigure( ConcurrentKafkaListenerContainerFactory containerFactory, Configuration configuration, boolean isSetContainerProperties) { @@ -213,6 +229,9 @@ protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer de errorHandler.defaultFalse(); errorHandler.setCommitRecovered(true); errorHandler.setLogLevel(KafkaException.Level.DEBUG); + if (this.blockingExceptionTypes != null) { + errorHandler.addRetryableExceptions(this.blockingExceptionTypes); + } this.errorHandlerCustomizer.accept(errorHandler); return errorHandler; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java index 3df47db9f1..750844c339 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java @@ -60,6 +60,7 @@ import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.converter.ConversionException; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.BackOffExecution; @@ -407,12 +408,11 @@ void shouldDecorateFactory() { .createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer)); assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId); then(listener).should(times(1)).onMessage(data, ack, consumer); - then(this.configurerContainerCustomizer).should(times(1)).accept(container); } @Test - void shouldUseGivenBackOff() { + void shouldUseGivenBackOffAndExceptions() { // given given(container.getContainerProperties()).willReturn(containerProperties); @@ -427,8 +427,8 @@ void shouldUseGivenBackOff() { ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); - configurer.setBlockingRetriesBackOff(backOffMock); + configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class); // when KafkaListenerContainerFactory decoratedFactory = @@ -437,6 +437,14 @@ void shouldUseGivenBackOff() { // then then(backOffMock).should().start(); + then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture()); + CommonErrorHandler errorHandler = errorHandlerCaptor.getValue(); + assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue(); + DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler; + assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue(); + assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue(); + assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isNull(); + } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java index 7e890f4b11..0c190b71b9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -52,7 +52,6 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.ConversionException; @@ -391,8 +390,7 @@ public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafka ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3)); - lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh) - .addRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)); + lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class); return lcfc; }