diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index d937b202e2..d78d1fe820 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -22,18 +22,13 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; import org.springframework.retry.annotation.Backoff; - -// TODO -// 2. inject exception detection when sending to DLT (consider traversing causes) -// 3. route the message to the configured additional destination or to the default DLT - /** * * Annotation to create the retry and dlt topics for a {@link KafkaListener} annotated @@ -45,6 +40,7 @@ * @author Gary Russell * @author Fabio da Silva Jr. * @author João Lima + * @author Adrian Chlebosz * @since 2.7 * * @see org.springframework.kafka.retrytopic.RetryTopicConfigurer @@ -177,7 +173,12 @@ */ String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX; - ExceptionBasedDltRouting additionalExceptionBasedDltRouting() default @ExceptionBasedDltRouting; + /** + * The DLT routing allowing to redirect the message to the custom DLT based on the + * exception thrown during the processing. + * @return the exception based DLT routing + */ + ExceptionBasedDltRouting exceptionBasedDltRouting() default @ExceptionBasedDltRouting; /** * Whether the retry topics will be suffixed with the delay value for that topic or a diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index 5b716bf719..f19e8b7c6f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -21,7 +21,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Objects; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanInitializationException; @@ -34,6 +33,8 @@ import org.springframework.core.annotation.AnnotationUtils; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt; +import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; @@ -58,6 +59,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -148,6 +150,9 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .autoStartDltHandler(autoStartDlt) .setTopicSuffixingStrategy(annotation.topicSuffixingStrategy()) .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) + .timeoutAfter(timeout) + .dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting())) + .create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); .timeoutAfter(timeout); Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true); @@ -207,6 +212,11 @@ private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, @N return policy; } + private Map>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltRouting routingSpec) { + return Arrays.stream(routingSpec.routingRules()) + .collect(Collectors.toMap(ExceptionBasedDestinationDlt::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions()))); + } + private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) { Class declaringClass = listenerMethod.getDeclaringClass(); return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass)) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 30af0d3718..b155c48304 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -49,6 +49,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Yvette Quinby + * @author Adrian Chlebosz * @since 2.7 * */ @@ -174,11 +175,26 @@ private static boolean isMatchingDltTopic(DestinationTopic destination, Exceptio } boolean isDltIntendedForCurrentExc = destination.usedForExceptions().stream() - .anyMatch(excType -> excType.isInstance(e)); + .anyMatch(excType -> isDirectExcOrCause(e, excType)); boolean isGenericPurposeDlt = destination.usedForExceptions().isEmpty(); return isDltIntendedForCurrentExc || isGenericPurposeDlt; } + private static boolean isDirectExcOrCause(Exception e, Class excType) { + Throwable toMatch = e; + + boolean isMatched = excType.isInstance(toMatch); + while (!isMatched) { + toMatch = toMatch.getCause(); + if (toMatch == null) { + return false; + } + isMatched = excType.isInstance(toMatch); + } + + return isMatched; + } + @Override public DestinationTopic getNextDestinationTopicFor(String mainListenerId, String topic) { return getDestinationHolderFor(mainListenerId, topic).getNextDestination(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 880abc6e5a..9a2aee0bf6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -30,6 +30,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -195,6 +196,7 @@ public Properties(Properties sourceProperties, String suffix, Type type) { * @param shouldRetryOn the exception classifications. * @param timeout the timeout. * @param autoStartDltHandler whether or not to start the DLT handler. + * @param usedForExceptions the exceptions which destination is intended for * @since 2.8 */ public Properties(long delayMs, String suffix, Type type, @@ -202,7 +204,7 @@ public Properties(long delayMs, String suffix, Type type, DltStrategy dltStrategy, KafkaOperations kafkaOperations, BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler, - Set> usedForExceptions) { + Set> usedForExceptions) { this.delayMs = delayMs; this.suffix = suffix; @@ -248,6 +250,10 @@ public Boolean autoStartDltHandler() { return this.autoStartDltHandler; } + public Set> usedForExceptions() { + return this.usedForExceptions; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java index cc89f61888..61160fa237 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java @@ -26,6 +26,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ public interface DestinationTopicContainer { @@ -65,9 +66,10 @@ public interface DestinationTopicContainer { * DLT for the given topic, or null if none is found. * @param mainListenerId the listener id. * @param topicName the topic name for which to look the DLT for + * @param exc the exception which is being handled * @return The {@link DestinationTopic} instance corresponding to the DLT. */ @Nullable - DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e); + DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index e8f3cb1ff5..58cd6786cb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -17,10 +17,14 @@ package org.springframework.kafka.retrytopic; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.BiPredicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.kafka.core.KafkaOperations; @@ -28,11 +32,6 @@ import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.stream.Stream; - /** * * Creates a list of {@link DestinationTopic.Properties} based on the @@ -41,6 +40,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author João Lima + * @author Adrian Chlebosz * @since 2.7 * */ @@ -68,7 +68,7 @@ public class DestinationTopicPropertiesFactory { private final long timeout; - private final Map>> exceptionBasedRouting; + private final Map>> dltRoutingRules; @Nullable private Boolean autoStartDltHandler; @@ -85,6 +85,7 @@ public class DestinationTopicPropertiesFactory { * @param topicSuffixingStrategy the topic suffixing strategy. * @param sameIntervalTopicReuseStrategy the same interval reuse strategy. * @param timeout the timeout. + * @param dltRoutingRules the specification of which DLT should be used for the particular exception type * @since 3.0.12 */ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, @@ -94,7 +95,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff TopicSuffixingStrategy topicSuffixingStrategy, SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, long timeout, - Map>> exceptionBasedRouting) { + Map>> dltRoutingRules) { this.dltStrategy = dltStrategy; this.kafkaOperations = kafkaOperations; @@ -104,7 +105,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); - this.exceptionBasedRouting = exceptionBasedRouting; + this.dltRoutingRules = dltRoutingRules; this.backOffValues = backOffValues; // Max Attempts include the initial try. this.maxAttempts = this.backOffValues.size() + 1; @@ -132,7 +133,8 @@ private List createPropertiesForFixedDelaySingleTop DestinationTopic.Properties retryTopicProperties = createRetryProperties(1, getShouldRetryOn()); if (isNoDltStrategy()) { return Arrays.asList(mainTopicProperties, retryTopicProperties); - } else { + } + else { DestinationTopic.Properties dltProperties = createDltProperties(); List customDltProperties = createCustomDltProperties(); return Stream.concat( @@ -159,7 +161,10 @@ private List createPropertiesForDefaultTopicStrateg .mapToObj(this::createTopicProperties) .collect(Collectors.toList()); if (!isNoDltStrategy()) { - basicProperties.addAll(createCustomDltProperties()); + List customDltProperties = createCustomDltProperties(); + // add all custom DLTs properties before the generic purpose one to make sure that they are considered + // before the generic purpose one is considered and chosen + basicProperties.addAll(basicProperties.size() - 1, customDltProperties); } return basicProperties; @@ -206,8 +211,8 @@ private DestinationTopic.Properties createDltProperties() { } private List createCustomDltProperties() { - return exceptionBasedRouting.entrySet().stream() - .map(entry -> new DestinationTopic.Properties(0, entry.getKey() + "-" + this.destinationTopicSuffixes.getDltSuffix(), + return this.dltRoutingRules.entrySet().stream() + .map(entry -> new DestinationTopic.Properties(0, entry.getKey() + this.destinationTopicSuffixes.getDltSuffix(), DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, entry.getValue())) .toList(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java index 4ed6d46911..061191a4d2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java @@ -1,6 +1,48 @@ +/* + * Copyright 2016-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.kafka.retrytopic; +/** + * Annotation allowing to specify additional DLT which will be chosen when message + * processing caused the configured exception to be thrown. + * + * @author Adrian Chlebosz + * @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting + * @since 3.1.1 + */ public @interface ExceptionBasedDestinationDlt { - String customSuffix(); - Class[] exceptions(); + + /** + * Suffix extension used when constructing the name for the new DLT. It is placed + * before the main suffix configured through the + * ${@link org.springframework.kafka.annotation.RetryableTopic#dltTopicSuffix()}, so the + * final name is the product of these two. + * + * @return the configured suffix extension + */ + String suffix(); + + /** + * When message processing throws one of the exceptions configured here, then + * it should be eventually redirected to the DLT with name containing the extension + * configured through {@link #suffix()}. The causes of the thrown exception will be + * traversed to match with any of configured ones. + * + * @return configured exceptions + */ + Class[] exceptions(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java index e58cf71479..f9b3ba9ace 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java @@ -1,5 +1,36 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.kafka.retrytopic; +/** + * Annotation allowing to specify the custom DLT routing rules steered by exceptions + * which might be thrown during the processing. + * + * @author Adrian Chlebosz + * @see org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt + * @since 3.1.1 + */ public @interface ExceptionBasedDltRouting { - ExceptionBasedDestinationDlt[] routing() default {}; + + /** + * Specific rules expressing to which custom DLT the message should be redirected + * when the specified exception has been thrown during its processing. + * + * @return configured routing + */ + ExceptionBasedDestinationDlt[] routingRules() default {}; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 36bb131c7e..4adb565f52 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -44,6 +44,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -78,7 +79,7 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; - private Map>> exceptionBasedDltRouting = new HashMap<>(); + private Map>> dltRoutingRules = new HashMap<>(); private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; @@ -528,8 +529,17 @@ private BinaryExceptionClassifierBuilder classifierBuilder() { return this.classifierBuilder; } - public RetryTopicConfigurationBuilder exceptionBasedDltRouting(Map>> exceptionBasedDltRouting) { - this.exceptionBasedDltRouting = exceptionBasedDltRouting; + /** + * Configure to set DLT routing rules causing the message to be redirected to the custom + * DLT when the configured exception has been thrown during message processing. + * The cause of the originally thrown exception will be traversed in order to find the + * match with the configured exceptions. + * @param dltRoutingRules specification of custom DLT name extensions and exceptions which should be matched for them + * @return the builder + * @since 3.1.1 + */ + public RetryTopicConfigurationBuilder dltRoutingRules(Map>> dltRoutingRules) { + this.dltRoutingRules = dltRoutingRules; return this; } @@ -578,7 +588,7 @@ public RetryTopicConfiguration create(KafkaOperations sendToTopicKafkaTemp new DestinationTopicPropertiesFactory(this.retryTopicSuffix, this.dltSuffix, backOffValues, buildClassifier(), this.topicCreationConfiguration.getNumPartitions(), sendToTopicKafkaTemplate, this.dltStrategy, - this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout, this.exceptionBasedDltRouting) + this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout, this.dltRoutingRules) .autoStartDltHandler(this.autoStartDltHandler) .createProperties(); return new RetryTopicConfiguration(destinationTopicProperties, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java index c741687824..8efea11c1a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java @@ -35,6 +35,7 @@ /** * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index e938fc31fe..5fd891532f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -45,6 +45,7 @@ * @author Tomaz Fernandes * @author Yvette Quinby * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -146,13 +147,14 @@ void shouldResolveDltDestinationForFatalDefaultException() { @Test void shouldResolveDeserializationDltDestinationForDeserializationException() { DeserializationException exc = new DeserializationException("", new byte[] {}, false, new IllegalStateException()); + TimestampedException timestampedExc = new TimestampedException(exc); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), - 1, exc, originalTimestamp)).isEqualTo(deserializationExcDltDestinationTopic); + 1, timestampedExc, originalTimestamp)).isEqualTo(deserializationExcDltDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", deserializationExcDltDestinationTopic.getDestinationName(), - 1, exc, originalTimestamp)).isEqualTo(dltDestinationTopic); + 1, timestampedExc, originalTimestamp)).isEqualTo(dltDestinationTopic); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 1b9a2052c2..855cacdd50 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -18,7 +18,10 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -27,6 +30,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; + import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; import org.springframework.kafka.core.KafkaOperations; @@ -35,13 +39,9 @@ import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.backoff.FixedBackOffPolicy; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - - /** * @author Tomaz Fernandes + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -219,8 +219,8 @@ void shouldCreateDltPropertiesForCustomExceptionBasedRouting() { // then assertThat(propertiesList.size()).isSameAs(3); - assertDltTopic(propertiesList.get(1)); - assertDltTopic(propertiesList.get(2), desExcDltSuffix + "-" + this.dltSuffix); + assertDltTopic(propertiesList.get(1), desExcDltSuffix + this.dltSuffix); + assertDltTopic(propertiesList.get(2)); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 7e900d9353..d90786900b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -31,6 +31,7 @@ /** * @author Tomaz Fernandes + * @author Adrian Chlebosz * @since 2.7 */ public class DestinationTopicTests { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java index 5da6ba70f6..990fe99e34 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,11 +30,13 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.util.ReflectionTestUtils; /** * @author Tomaz Fernandes + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -187,4 +191,20 @@ void shouldSetNotAutoCreateTopics() { RetryTopicConfiguration.TopicCreation config = configuration.forKafkaTopicAutoCreation(); assertThat(config.shouldCreateTopics()).isFalse(); } + + @Test + void shouldSetDltRoutingRules() { + // setup + RetryTopicConfigurationBuilder builder = new RetryTopicConfigurationBuilder(); + + //when + RetryTopicConfiguration configuration = builder + .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class))) + .create(kafkaOperations); + + // then + DestinationTopic.Properties desExcDltProps = configuration.getDestinationTopicProperties().get(3); + assertThat(desExcDltProps.suffix()).isEqualTo("-deserialization-dlt"); + assertThat(desExcDltProps.usedForExceptions()).containsExactly(DeserializationException.class); + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java index 5daa04fcd2..8508b7dcd5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.support.EndpointHandlerMethod; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.retry.annotation.Backoff; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ReflectionUtils; @@ -50,6 +51,7 @@ /** * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @SuppressWarnings("deprecation") @@ -98,6 +100,15 @@ class RetryableTopicAnnotationProcessorTests { private final Object bean = createBean(); + // Retry with custom DLT routing + private final Method listenWithCustomDltRouting = ReflectionUtils + .findMethod(RetryableTopicAnnotationFactoryWithCustomDltRouting.class, listenerMethodName); + + private final RetryableTopic annotationWithCustomDltRouting = AnnotationUtils.findAnnotation( + listenWithCustomDltRouting, RetryableTopic.class); + + private final Object beanWithCustomDltRouting = createBean(); + private Object createBean() { try { return RetryableTopicAnnotationFactory.class.getDeclaredConstructor().newInstance(); @@ -107,17 +118,18 @@ private Object createBean() { } } + @Test void shouldGetDltHandlerMethod() { // setup given(beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) - .willReturn(kafkaOperationsFromDefaultName); + .willReturn(kafkaOperationsFromDefaultName); RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); + .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); // then EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod(); @@ -125,7 +137,7 @@ void shouldGetDltHandlerMethod() { assertThat(method.getName()).isEqualTo("handleDlt"); assertThat(new DestinationTopic("", - configuration.getDestinationTopicProperties().get(0)).isAlwaysRetryOnDltFailure()).isFalse(); + configuration.getDestinationTopicProperties().get(0)).isAlwaysRetryOnDltFailure()).isFalse(); } @Test @@ -313,6 +325,27 @@ void shouldCreateFixedBackoff() { } + @Test + void shouldCreateExceptionBasedDltRoutingSpec() { + // setup + given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) + .willReturn(kafkaOperationsFromDefaultName); + RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); + + // given + RetryTopicConfiguration configuration = processor + .processAnnotation( + topics, listenWithCustomDltRouting, annotationWithCustomDltRouting, beanWithCustomDltRouting); + + // then + List destinationTopicProperties = configuration.getDestinationTopicProperties(); + + assertThat(destinationTopicProperties).hasSize(3); + assertThat(destinationTopicProperties.get(0).suffix()).isEmpty(); + assertThat(destinationTopicProperties.get(1).suffix()).isEqualTo("-deserialization-dlt"); + assertThat(destinationTopicProperties.get(2).suffix()).isEqualTo("-dlt"); + } + static class RetryableTopicAnnotationFactory { @KafkaListener @@ -336,4 +369,19 @@ void handleDlt() { // NoOps } } + + static class RetryableTopicAnnotationFactoryWithCustomDltRouting { + @KafkaListener + @RetryableTopic( + attempts = "1", + exceptionBasedDltRouting = @ExceptionBasedDltRouting(routingRules = { + @ExceptionBasedDestinationDlt( + suffix = "-deserialization", exceptions = {DeserializationException.class} + ) + }) + ) + void listenWithRetry() { + // NoOps + } + } }