Skip to content

Commit

Permalink
GH-2800: create topics used for exception based DLT routing
Browse files Browse the repository at this point in the history
  • Loading branch information
breader124 committed Dec 10, 2023
1 parent 2306ac5 commit 36bf3e2
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.retrytopic.RetryTopicConstants;
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.retry.annotation.Backoff;


// TODO
// 2. inject exception detection when sending to DLT (consider traversing causes)
// 3. route the message to the configured additional destination or to the default DLT

/**
*
* Annotation to create the retry and dlt topics for a {@link KafkaListener} annotated
Expand Down Expand Up @@ -171,6 +177,8 @@
*/
String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX;

ExceptionBasedDltRouting additionalExceptionBasedDltRouting() default @ExceptionBasedDltRouting;

/**
* Whether the retry topics will be suffixed with the delay value for that topic or a
* simple index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.springframework.kafka.retrytopic;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiPredicate;

import org.springframework.kafka.core.KafkaOperations;
Expand Down Expand Up @@ -137,9 +139,10 @@ public static class Properties {

private final long timeout;

private final Set<Class<? extends Throwable>> usedForExceptions;

@Nullable
private final Boolean autoStartDltHandler;

/**
* Create an instance with the provided properties with the DLT container starting
* automatically (if the container factory is so configured).
Expand All @@ -160,7 +163,7 @@ public Properties(long delayMs, String suffix, Type type,
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout) {

this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn,
timeout, null);
timeout, null, Collections.emptySet());
}

/**
Expand All @@ -173,7 +176,7 @@ public Properties(long delayMs, String suffix, Type type,
public Properties(Properties sourceProperties, String suffix, Type type) {
this(sourceProperties.delayMs, suffix, type, sourceProperties.maxAttempts, sourceProperties.numPartitions,
sourceProperties.dltStrategy, sourceProperties.kafkaOperations, sourceProperties.shouldRetryOn,
sourceProperties.timeout, null);
sourceProperties.timeout, null, Collections.emptySet());
}

/**
Expand All @@ -194,7 +197,8 @@ public Properties(long delayMs, String suffix, Type type,
int maxAttempts, int numPartitions,
DltStrategy dltStrategy,
KafkaOperations<?, ?> kafkaOperations,
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) {
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler,
Set<Class<? extends Throwable>> usedForExceptions) {

this.delayMs = delayMs;
this.suffix = suffix;
Expand All @@ -206,6 +210,7 @@ public Properties(long delayMs, String suffix, Type type,
this.shouldRetryOn = shouldRetryOn;
this.timeout = timeout;
this.autoStartDltHandler = autoStartDltHandler;
this.usedForExceptions = usedForExceptions;
}

public boolean isDltTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

/**
*
* Creates a list of {@link DestinationTopic.Properties} based on the
Expand Down Expand Up @@ -63,6 +68,8 @@ public class DestinationTopicPropertiesFactory {

private final long timeout;

private final Map<String, Set<Class<? extends Throwable>>> exceptionBasedRouting;

@Nullable
private Boolean autoStartDltHandler;

Expand All @@ -86,7 +93,8 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
DltStrategy dltStrategy,
TopicSuffixingStrategy topicSuffixingStrategy,
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy,
long timeout) {
long timeout,
Map<String, Set<Class<? extends Throwable>>> exceptionBasedRouting) {

this.dltStrategy = dltStrategy;
this.kafkaOperations = kafkaOperations;
Expand All @@ -96,6 +104,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy;
this.timeout = timeout;
this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix);
this.exceptionBasedRouting = exceptionBasedRouting;
this.backOffValues = backOffValues;
// Max Attempts include the initial try.
this.maxAttempts = this.backOffValues.size() + 1;
Expand All @@ -119,12 +128,18 @@ public List<DestinationTopic.Properties> createProperties() {
}

private List<DestinationTopic.Properties> createPropertiesForFixedDelaySingleTopic() {
return isNoDltStrategy()
? Arrays.asList(createMainTopicProperties(),
createRetryProperties(1, getShouldRetryOn()))
: Arrays.asList(createMainTopicProperties(),
createRetryProperties(1, getShouldRetryOn()),
createDltProperties());
DestinationTopic.Properties mainTopicProperties = createMainTopicProperties();
DestinationTopic.Properties retryTopicProperties = createRetryProperties(1, getShouldRetryOn());
if (isNoDltStrategy()) {
return Arrays.asList(mainTopicProperties, retryTopicProperties);
} else {
DestinationTopic.Properties dltProperties = createDltProperties();
List<DestinationTopic.Properties> customDltProperties = createCustomDltProperties();
return Stream.concat(
Stream.of(mainTopicProperties, retryTopicProperties, dltProperties),
customDltProperties.stream())
.toList();
}
}

private boolean isSingleTopicFixedDelay() {
Expand All @@ -136,14 +151,18 @@ private boolean isSingleTopicSameIntervalTopicReuseStrategy() {
}

private List<DestinationTopic.Properties> createPropertiesForDefaultTopicStrategy() {

int retryTopicsAmount = retryTopicsAmount();

return IntStream.rangeClosed(0, isNoDltStrategy()
? retryTopicsAmount
: retryTopicsAmount + 1)
.mapToObj(this::createTopicProperties)
.collect(Collectors.toList());
List<DestinationTopic.Properties> basicProperties = IntStream.rangeClosed(0, isNoDltStrategy()
? retryTopicsAmount
: retryTopicsAmount + 1)
.mapToObj(this::createTopicProperties)
.collect(Collectors.toList());
if (!isNoDltStrategy()) {
basicProperties.addAll(createCustomDltProperties());
}

return basicProperties;
}

int retryTopicsAmount() {
Expand Down Expand Up @@ -183,7 +202,15 @@ private DestinationTopic.Properties createMainTopicProperties() {
private DestinationTopic.Properties createDltProperties() {
return new DestinationTopic.Properties(0, this.destinationTopicSuffixes.getDltSuffix(),
DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy,
this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler);
this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, Collections.emptySet());
}

private List<DestinationTopic.Properties> createCustomDltProperties() {
return exceptionBasedRouting.entrySet().stream()
.map(entry -> new DestinationTopic.Properties(0, entry.getKey() + "-" + this.destinationTopicSuffixes.getDltSuffix(),
DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy,
this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, entry.getValue()))
.toList();
}

private BiPredicate<Integer, Throwable> getShouldRetryOn() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.springframework.kafka.retrytopic;

public @interface ExceptionBasedDestinationDlt {
String customSuffix();
Class<? extends Throwable>[] exceptions();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.springframework.kafka.retrytopic;

public @interface ExceptionBasedDltRouting {
ExceptionBasedDestinationDlt[] routing() default {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.springframework.kafka.retrytopic;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.classify.BinaryExceptionClassifierBuilder;
Expand Down Expand Up @@ -64,13 +67,16 @@ public class RetryTopicConfigurationBuilder {

private RetryTopicConfiguration.TopicCreation topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation();


private ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory;

private String listenerContainerFactoryName;

@Nullable
private BinaryExceptionClassifierBuilder classifierBuilder;

private Map<String, Set<Class<? extends Throwable>>> exceptionBasedDltRouting = new HashMap<>();

private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR;

private long timeout = RetryTopicConstants.NOT_SET;
Expand Down Expand Up @@ -519,6 +525,11 @@ private BinaryExceptionClassifierBuilder classifierBuilder() {
return this.classifierBuilder;
}

public RetryTopicConfigurationBuilder exceptionBasedDltRouting(Map<String, Set<Class<? extends Throwable>>> exceptionBasedDltRouting) {
this.exceptionBasedDltRouting = exceptionBasedDltRouting;
return this;
}

/* ---------------- Configure KafkaListenerContainerFactory -------------- */
/**
* Configure the container factory to use.
Expand Down Expand Up @@ -564,7 +575,7 @@ public RetryTopicConfiguration create(KafkaOperations<?, ?> sendToTopicKafkaTemp
new DestinationTopicPropertiesFactory(this.retryTopicSuffix, this.dltSuffix, backOffValues,
buildClassifier(), this.topicCreationConfiguration.getNumPartitions(),
sendToTopicKafkaTemplate, this.dltStrategy,
this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout)
this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout, this.exceptionBasedDltRouting)
.autoStartDltHandler(this.autoStartDltHandler)
.createProperties();
return new RetryTopicConfiguration(destinationTopicProperties,
Expand Down
Loading

0 comments on commit 36bf3e2

Please sign in to comment.