Skip to content

Commit

Permalink
spring-projectsGH-2116: Add blocking retries to RT
Browse files Browse the repository at this point in the history
Before we hardcoded a no-ops back off in the DefaultErrorHandler used in the Retryable Topics feature.
Adds a setter to let the user provide their own back off policy and configure blocking retries in conjunction with RT.
  • Loading branch information
tomazfernandes committed Feb 20, 2022
1 parent f51ddd4 commit cc49f7e
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 2 deletions.
32 changes: 32 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa

NOTE: The default is having no timeout set, which can also be achieved by providing -1 as the timout value.

[[retry-topic-exception-classifier]]
===== Exception Classifier

You can specify which exceptions you want to retry on and which not to.
Expand Down Expand Up @@ -306,6 +307,37 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> t

NOTE: The default behavior is retrying on all exceptions and not traversing causes.

[[retry-topic-combine-blocking]]
===== Combine blocking and non-blocking retries

Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

You can configure the blocking retries as follows:

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3));
lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler)
.addNotRetryableExceptions(MyFatalException.class);
return lcfc;
}
----
====

NOTE: If you set a blocking retry back off, the default is to retry on all exceptions except the fatal ones in <<default-eh>>.
You can add or remove exceptions using the `addNotRetryableException` and `removeNotRetryableException` methods in the `ListenerContainerFactoryConfigurer`.

NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.


===== Include and Exclude Topics

You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

/**
Expand Down Expand Up @@ -82,6 +83,10 @@ public class ListenerContainerFactoryConfigurer {

private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;

private static final BackOff DEFAULT_BLOCKING_BACKOFF = new FixedBackOff(0, 0);

private BackOff blockingBackOff = DEFAULT_BLOCKING_BACKOFF;

private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
};

Expand Down Expand Up @@ -159,6 +164,19 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutBackOffValues(
return new RetryTopicListenerContainerFactoryDecorator(factory, Collections.emptyList());
}

/**
* Set a {@link BackOff} to be used by blocking retries.
* You can add and remove exceptions to be retried this way using this class'
* superclass
* {@link org.springframework.kafka.listener.ExceptionClassifier#addNotRetryableExceptions(Class[])}
* and
* {@link org.springframework.kafka.listener.ExceptionClassifier#removeNotRetryableException(Class)}}
* @param blockingBackOff the BackOff policy to be used
*/
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
this.blockingBackOff = blockingBackOff;
}

private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, List<Long> backOffValues) {

Expand Down Expand Up @@ -192,8 +210,7 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
}

private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
new FixedBackOff(0, 0));
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, this.blockingBackOff);
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
this.errorHandlerCustomizer.accept(errorHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

Expand Down Expand Up @@ -59,6 +60,8 @@
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;

/**
* @author Tomaz Fernandes
Expand Down Expand Up @@ -408,6 +411,34 @@ void shouldDecorateFactory() {
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
}

@Test
void shouldUseGivenBackOff() {

// given
given(container.getContainerProperties()).willReturn(containerProperties);
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
given(containerProperties.getMessageListener()).willReturn(listener);
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
BackOff backOffMock = mock(BackOff.class);
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
given(backOffMock.start()).willReturn(backOffExecutionMock);

ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);

configurer.setBlockingRetriesBackOff(backOffMock);

// when
KafkaListenerContainerFactory<?> decoratedFactory =
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
decoratedFactory.createListenerContainer(endpoint);

// then
then(backOffMock).should().start();
}

@Test
void shouldCacheFactoryInstances() {

Expand Down

0 comments on commit cc49f7e

Please sign in to comment.