Skip to content

Commit

Permalink
Change DHE in LCFC to defaultFalse
Browse files Browse the repository at this point in the history
With this we no longer need a no ops back off.
Some minor adjustments were needed to maintain behavior when the logic gets to DLPR.
  • Loading branch information
tomazfernandes committed Feb 23, 2022
1 parent 7998944 commit 85cab3f
Show file tree
Hide file tree
Showing 7 changed files with 575 additions and 7 deletions.
30 changes: 30 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,36 @@ You can add or remove exceptions using the `addNotRetryableException` and `remov
NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.


[[retry-topic-combine-blocking]]
===== Combine blocking and non-blocking retries

Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method as follows.
The default policy is FixedBackOff, with ten retries and no delay between them.
Optionally, you can also set a different back off policy.

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler)
.addRetryableExceptions(MyBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
----
====

NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.


===== Include and Exclude Topics

You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
Consumer<?, ?> consumer, MessageListenerContainer container) {

SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.BiPredicate;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -126,12 +127,26 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
* @since 2.7
*/
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records, Exception thrownException) {
return getRecoveryStrategy(records, null, thrownException);
}

/**
* Return a {@link RecoveryStrategy} to call to determine whether the first record in the
* list should be skipped.
* @param records the records.
* @param recoveryConsumer the consumer.
* @param thrownException the exception.
* @return the {@link RecoveryStrategy}.
* @since 2.8.4
*/
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records,
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) {
if (getClassifier().classify(thrownException)) {
return this.failureTracker::recovered;
}
else {
try {
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException);
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
}
catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -240,7 +240,7 @@ void clearThreadState() {
this.failures.remove();
}

BiConsumer<ConsumerRecord<?, ?>, Exception> getRecoverer() {
ConsumerAwareRecordRecoverer getRecoverer() {
return this.recoverer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.util.backoff.BackOff;

/**
*
Expand Down Expand Up @@ -81,6 +81,8 @@ public class ListenerContainerFactoryConfigurer {

private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;

private BackOff providedBlockingBackOff = null;

private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
};

Expand Down Expand Up @@ -158,6 +160,20 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerPr
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false);
}

/**
* Set a {@link BackOff} to be used with blocking retries.
* You can specify the exceptions to be retried using the method
* {@link org.springframework.kafka.listener.ExceptionClassifier#addRetryableExceptions(Class[])}
* By default, no exceptions are retried via blocking.
* @param blockingBackOff the BackOff policy to be used by blocking retries.
* @since 2.8.4
* @see DefaultErrorHandler
*/
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
Assert.notNull(blockingBackOff, "The provided BackOff cannot be null");
this.providedBlockingBackOff = blockingBackOff;
}

private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
boolean isSetContainerProperties) {
Expand Down Expand Up @@ -193,14 +209,20 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC

protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
Configuration configuration) {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
new FixedBackOff(0, 0));
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
errorHandler.defaultFalse();
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
this.errorHandlerCustomizer.accept(errorHandler);
return errorHandler;
}

protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return this.providedBlockingBackOff != null
? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff)
: new DefaultErrorHandler(deadLetterPublishingRecoverer);
}

protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container,
Configuration configuration, boolean isSetContainerProperties) {
AcknowledgingConsumerAwareMessageListener<?, ?> listener = checkAndCast(container.getContainerProperties()
Expand Down
Loading

0 comments on commit 85cab3f

Please sign in to comment.