Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2116: Add blocking retries to RT #2124

Merged
merged 5 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,80 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver)
----
====

[[retry-topic-combine-blocking]]
==== Combining blocking and non-blocking retries

Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows.
The default policy is `FixedBackOff`, with nine retries and no delay between them.
Optionally, you can provide your own back off policy.

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
----
====

If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as:

====
[source, java]
----
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
----
====

NOTE: In combination with the global retryable topic's fatal exceptions classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.

Here's an example with both configurations working together:

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
return lcfc;
}

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
return ddtr;
}

----
====

In this example:

* `ShouldRetryOnlyBlockingException.class` would retry only via blocking and, if all retries fail, would go straight to the DLT.
* `ShouldRetryViaBothException.class` would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts.
* `ShouldSkipBothRetriesException.class` would never be retried in any way and would go straight to the DLT if the first processing attempt failed.

IMPORTANT: Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don't want to do non-blocking retries, but to send directly to the DLT instead.

IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration.

==== Topic Naming

Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
Expand Down Expand Up @@ -746,6 +820,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti
return retryTopicConfigurer;
}
----
====

[[change-kboe-logging-level]]
==== Changing KafkaBackOffException Logging Level
Expand Down
5 changes: 4 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,8 @@ See <<retry-topic-lcf>> for more information.
There's now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT.
Refer to <<retry-topic-ex-classifier>> to see how to manage it.

You can now use blocking and non-blocking retries in conjunction.
See <<retry-topic-combine-blocking>> for more information.

The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level.
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
Consumer<?, ?> consumer, MessageListenerContainer container) {

SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -180,12 +181,14 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
* </ul>
* All others will be retried, unless {@link #defaultFalse()} has been called.
* @param exceptionType the exception type.
* @return true if the removal was successful.
* @return the classification of the exception if removal was successful;
* null otherwise.
* @since 2.8.4
* @see #addNotRetryableExceptions(Class...)
* @see #setClassifications(Map, boolean)
*/
public boolean removeClassification(Class<? extends Exception> exceptionType) {
@Nullable
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This change is because as is if we try to remove an exception that is not there we get a NPE for trying to convert null to a boolean. With this change it'll return null and the user can deal with that as they wish.

I for one thought it would return false for not finding the exception in the map, but turns out it returns the current value, or null if it's not found.

* <p>Returns the value to which this map previously associated the key, * or {@code null} if the map contained no mapping for the key.

It's not something extremely necessary, I can roll it back if you prefer. Or maybe it's worth adding some explanation to the javadocs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops; thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change the javadoc to @return the classification of the exception if the removal was successful; null otherwise.

I can do it during the merge if you like; but I have to run out for a short while.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, no worries, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the javadoc and added two assertions to the LCFC new methods

return this.classifier.getClassified().remove(exceptionType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.BiPredicate;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -126,12 +127,26 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
* @since 2.7
*/
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records, Exception thrownException) {
return getRecoveryStrategy(records, null, thrownException);
}

/**
* Return a {@link RecoveryStrategy} to call to determine whether the first record in the
* list should be skipped.
* @param records the records.
* @param recoveryConsumer the consumer.
* @param thrownException the exception.
* @return the {@link RecoveryStrategy}.
* @since 2.8.4
*/
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records,
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) {
if (getClassifier().classify(thrownException)) {
return this.failureTracker::recovered;
}
else {
try {
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException);
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
}
catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -240,7 +240,7 @@ void clearThreadState() {
this.failures.remove();
}

BiConsumer<ConsumerRecord<?, ?>, Exception> getRecoverer() {
ConsumerAwareRecordRecoverer getRecoverer() {
return this.recoverer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.retrytopic;

import java.time.Clock;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
Expand All @@ -42,7 +43,7 @@
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.util.backoff.BackOff;

/**
*
Expand Down Expand Up @@ -81,6 +82,10 @@ public class ListenerContainerFactoryConfigurer {

private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;

private BackOff providedBlockingBackOff = null;

private Class<? extends Exception>[] blockingExceptionTypes = null;

private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
};

Expand Down Expand Up @@ -158,6 +163,42 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerPr
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false);
}

/**
* Set a {@link BackOff} to be used with blocking retries.
* If the BackOff execution returns STOP, the record will be forwarded
* to the next retry topic or to the DLT, depending on how the non-blocking retries
* are configured.
* @param blockingBackOff the BackOff policy to be used by blocking retries.
* @since 2.8.4
* @see DefaultErrorHandler
*/
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
Assert.notNull(blockingBackOff, "The provided BackOff cannot be null");
Assert.state(this.providedBlockingBackOff == null, () ->
"Blocking retries back off has already been set. Current: "
+ this.providedBlockingBackOff
+ " You provided: " + blockingBackOff);
this.providedBlockingBackOff = blockingBackOff;
}

/**
* Specify the exceptions to be retried via blocking.
* @param exceptionTypes the exceptions that should be retried.
* @since 2.8.4
* @see DefaultErrorHandler
*/
@SafeVarargs
@SuppressWarnings("varargs")
public final void setBlockingRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
Assert.notNull(exceptionTypes, "The exception types cannot be null");
Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements");
Assert.state(this.blockingExceptionTypes == null,
() -> "Blocking retryable exceptions have already been set."
+ "Current ones: " + Arrays.toString(this.blockingExceptionTypes)
+ " You provided: " + Arrays.toString(exceptionTypes));
this.blockingExceptionTypes = exceptionTypes;
}

private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
boolean isSetContainerProperties) {
Expand Down Expand Up @@ -193,14 +234,23 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC

protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
Configuration configuration) {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
new FixedBackOff(0, 0));
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
errorHandler.defaultFalse();
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
if (this.blockingExceptionTypes != null) {
errorHandler.addRetryableExceptions(this.blockingExceptionTypes);
}
this.errorHandlerCustomizer.accept(errorHandler);
return errorHandler;
}

protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return this.providedBlockingBackOff != null
? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff)
: new DefaultErrorHandler(deadLetterPublishingRecoverer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't right; the default fixed back off is (0, 9), we need (0, 0).

Why not just set the field to new FixedBackOff(0, 0);? We then don't need this test. Rename the field too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, considering all exceptions will return false when classified by default, it doesn't really matter what's the backoff policy - we can just leave the default one and then the user would have only to add the retryable exceptions, which would then make the default or provided back off kick in.

Anyway, we can set the no ops back off too, no worries. Either way I think the other classes modifications to pass the consumer to DLPR are still needed, since we might go that route for exception classifications that return false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh; Ok; makes sense to leave it.

}

protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container,
Configuration configuration, boolean isSetContainerProperties) {
AcknowledgingConsumerAwareMessageListener<?, ?> listener = checkAndCast(container.getContainerProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package org.springframework.kafka.retrytopic;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

Expand Down Expand Up @@ -59,13 +61,18 @@
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author Tomaz Fernandes
* @since 2.7
*/
@ExtendWith(MockitoExtension.class)
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
class ListenerContainerFactoryConfigurerTests {

@Mock
Expand Down Expand Up @@ -404,10 +411,63 @@ void shouldDecorateFactory() {
.createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer));
assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId);
then(listener).should(times(1)).onMessage(data, ack, consumer);

then(this.configurerContainerCustomizer).should(times(1)).accept(container);
}

@Test
void shouldUseGivenBackOffAndExceptions() {

// given
given(container.getContainerProperties()).willReturn(containerProperties);
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
given(containerProperties.getMessageListener()).willReturn(listener);
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
BackOff backOffMock = mock(BackOff.class);
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
given(backOffMock.start()).willReturn(backOffExecutionMock);

ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);
configurer.setBlockingRetriesBackOff(backOffMock);
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);

// when
KafkaListenerContainerFactory<?> decoratedFactory =
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
decoratedFactory.createListenerContainer(endpoint);

// then
then(backOffMock).should().start();
then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture());
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler;
assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue();
assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue();
assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isNull();

}


@Test
void shouldThrowIfBackOffOrRetryablesAlreadySet() {
// given
BackOff backOff = new FixedBackOff();
ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);
configurer.setBlockingRetriesBackOff(backOff);
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);

// when / then
assertThatThrownBy(() -> configurer.setBlockingRetriesBackOff(backOff)).isInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> configurer.setBlockingRetryableExceptions(ConversionException.class, DeserializationException.class))
.isInstanceOf(IllegalStateException.class);
}


@Test
void shouldCacheFactoryInstances() {

Expand Down
Loading