Skip to content

Commit

Permalink
Improve API and docs
Browse files Browse the repository at this point in the history
Now retryable exceptions can be set directly in the lcfc class.
Improved the docs on how to combine blocking and non-blocking behaviors.
Added what's new entry for this feature.
  • Loading branch information
tomazfernandes committed Feb 23, 2022
1 parent a7e069b commit de984b5
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 40 deletions.
104 changes: 75 additions & 29 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -328,35 +328,6 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat

NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.

[[retry-topic-combine-blocking]]
===== Combine blocking and non-blocking retries

Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows.
The default policy is FixedBackOff, with ten retries and no delay between them.
Optionally, you can also set a different back off policy.

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler)
.addRetryableExceptions(MyBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
----
====

NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.


===== Include and Exclude Topics

Expand Down Expand Up @@ -459,6 +430,80 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver)
----
====

[[retry-topic-combine-blocking]]
==== Combining blocking and non-blocking retries

Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows.
The default policy is `FixedBackOff`, with nine retries and no delay between them.
Optionally, you can provide your own back off policy.

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
----
====

If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as:

====
[source, java]
----
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
----
====

NOTE: In combination with the global retryable topic's fatal exceptions classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.

Here's an example with both configurations working together:

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
return lcfc;
}
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
return ddtr;
}
----
====

In this example:

* `ShouldRetryOnlyBlockingException.class` would retry only via blocking and, if all retries fail, would go straight to the DLT.
* `ShouldRetryViaBothException.class` would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts.
* `ShouldSkipBothRetriesException.class` would never be retried in any way and would go straight to the DLT if the first processing attempt failed.

IMPORTANT: Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don't want to do non-blocking retries, but to send directly to the DLT instead.

IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration.

==== Topic Naming

Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
Expand Down Expand Up @@ -775,6 +820,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti
return retryTopicConfigurer;
}
----
====

[[change-kboe-logging-level]]
==== Changing KafkaBackOffException Logging Level
Expand Down
5 changes: 4 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,8 @@ See <<retry-topic-lcf>> for more information.
There's now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT.
Refer to <<retry-topic-ex-classifier>> to see how to manage it.

You can now use blocking and non-blocking retries in conjunction.
See <<retry-topic-combine-blocking>> for more information.

The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level.
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -185,7 +186,8 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
* @see #addNotRetryableExceptions(Class...)
* @see #setClassifications(Map, boolean)
*/
public boolean removeClassification(Class<? extends Exception> exceptionType) {
@Nullable
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
return this.classifier.getClassified().remove(exceptionType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class ListenerContainerFactoryConfigurer {

private final Clock clock;

private Class<? extends Exception>[] blockingExceptionTypes = null;

public ListenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
Expand Down Expand Up @@ -162,9 +164,9 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerPr

/**
* Set a {@link BackOff} to be used with blocking retries.
* You can specify the exceptions to be retried using the method
* {@link org.springframework.kafka.listener.ExceptionClassifier#addRetryableExceptions(Class[])}
* By default, no exceptions are retried via blocking.
* If the BackOff execution returns STOP, the record will be forwarded
* to the next retry topic or to the DLT, depending on how the non-blocking retries
* are configured.
* @param blockingBackOff the BackOff policy to be used by blocking retries.
* @since 2.8.4
* @see DefaultErrorHandler
Expand All @@ -174,6 +176,20 @@ public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
this.providedBlockingBackOff = blockingBackOff;
}

/**
* Specify the exceptions to be retried via blocking.
* @param exceptionTypes the exceptions that should be retried.
* @since 2.8.4
* @see DefaultErrorHandler
*/
@SafeVarargs
@SuppressWarnings("varargs")
public final void setBlockingRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
Assert.notNull(exceptionTypes, "The exception types cannot be null");
Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements");
this.blockingExceptionTypes = exceptionTypes;
}

private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
boolean isSetContainerProperties) {
Expand Down Expand Up @@ -213,6 +229,9 @@ protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer de
errorHandler.defaultFalse();
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
if (this.blockingExceptionTypes != null) {
errorHandler.addRetryableExceptions(this.blockingExceptionTypes);
}
this.errorHandlerCustomizer.accept(errorHandler);
return errorHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;

Expand Down Expand Up @@ -407,12 +408,11 @@ void shouldDecorateFactory() {
.createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer));
assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId);
then(listener).should(times(1)).onMessage(data, ack, consumer);

then(this.configurerContainerCustomizer).should(times(1)).accept(container);
}

@Test
void shouldUseGivenBackOff() {
void shouldUseGivenBackOffAndExceptions() {

// given
given(container.getContainerProperties()).willReturn(containerProperties);
Expand All @@ -427,8 +427,8 @@ void shouldUseGivenBackOff() {
ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);

configurer.setBlockingRetriesBackOff(backOffMock);
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);

// when
KafkaListenerContainerFactory<?> decoratedFactory =
Expand All @@ -437,6 +437,14 @@ void shouldUseGivenBackOff() {

// then
then(backOffMock).should().start();
then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture());
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler;
assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue();
assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue();
assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isNull();

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.ConversionException;
Expand Down Expand Up @@ -391,8 +390,7 @@ public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafka
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);

lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3));
lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh)
.addRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class));
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
return lcfc;
}

Expand Down

0 comments on commit de984b5

Please sign in to comment.