diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 522256b436..30af0d3718 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -97,8 +97,8 @@ public DestinationTopic resolveDestinationTopic(String mainListenerId, String to : destinationTopicHolder.getSourceDestination().shouldRetryOn(attempt, maybeUnwrapException(e)) && isNotFatalException(e) && !isPastTimout(originalTimestamp, destinationTopicHolder) - ? resolveRetryDestination(destinationTopicHolder) - : getDltOrNoOpsDestination(mainListenerId, topic); + ? resolveRetryDestination(mainListenerId, destinationTopicHolder, e) + : getDltOrNoOpsDestination(mainListenerId, topic, e); } private Boolean isNotFatalException(Exception e) { @@ -128,10 +128,20 @@ && isNotFatalException(e) } @SuppressWarnings("deprecation") - private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) { - return (destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) - ? destinationTopicHolder.getSourceDestination() - : destinationTopicHolder.getNextDestination(); + private DestinationTopic resolveRetryDestination(String mainListenerId, DestinationTopicHolder destinationTopicHolder, Exception e) { + if (destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) { + return destinationTopicHolder.getSourceDestination(); + } + + if (isAlreadyDltDestination(destinationTopicHolder)) { + return getDltOrNoOpsDestination(mainListenerId, destinationTopicHolder.getSourceDestination().getDestinationName(), e); + } + + return destinationTopicHolder.getNextDestination(); + } + + private static boolean isAlreadyDltDestination(DestinationTopicHolder destinationTopicHolder) { + return destinationTopicHolder.getNextDestination().isDltTopic(); } @Override @@ -144,18 +154,29 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String @Nullable @Override - public DestinationTopic getDltFor(String mainListenerId, String topicName) { - DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName); + public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) { + DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName, e); return destination.isNoOpsTopic() ? null : destination; } - private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic) { + private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic, Exception e) { DestinationTopic destination = getNextDestinationTopicFor(mainListenerId, topic); - return destination.isDltTopic() || destination.isNoOpsTopic() - ? destination - : getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName()); + return isMatchingDltTopic(destination, e) || destination.isNoOpsTopic() ? + destination : + getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName(), e); + } + + private static boolean isMatchingDltTopic(DestinationTopic destination, Exception e) { + if (!destination.isDltTopic()) { + return false; + } + + boolean isDltIntendedForCurrentExc = destination.usedForExceptions().stream() + .anyMatch(excType -> excType.isInstance(e)); + boolean isGenericPurposeDlt = destination.usedForExceptions().isEmpty(); + return isDltIntendedForCurrentExc || isGenericPurposeDlt; } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 711da8195d..880abc6e5a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -90,6 +90,10 @@ public boolean shouldRetryOn(Integer attempt, Throwable e) { return this.properties.shouldRetryOn.test(attempt, e); } + public Set> usedForExceptions() { + return Collections.unmodifiableSet(this.properties.usedForExceptions); + } + @Override public String toString() { return "DestinationTopic{" + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java index 25dc22705b..cc89f61888 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,6 +68,6 @@ public interface DestinationTopicContainer { * @return The {@link DestinationTopic} instance corresponding to the DLT. */ @Nullable - DestinationTopic getDltFor(String mainListenerId, String topicName); + DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java index 8f2943d501..c741687824 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,11 +75,12 @@ void shouldRegisterTopicDestinations() { // then assertThat(context.destinationsByTopicMap.containsKey(FIRST_TOPIC)).isTrue(); List destinationTopicsForFirstTopic = context.destinationsByTopicMap.get(FIRST_TOPIC); - assertThat(destinationTopicsForFirstTopic.size()).isEqualTo(4); + assertThat(destinationTopicsForFirstTopic.size()).isEqualTo(5); assertThat(destinationTopicsForFirstTopic.get(0)).isEqualTo(mainDestinationTopic); assertThat(destinationTopicsForFirstTopic.get(1)).isEqualTo(firstRetryDestinationTopic); assertThat(destinationTopicsForFirstTopic.get(2)).isEqualTo(secondRetryDestinationTopic); - assertThat(destinationTopicsForFirstTopic.get(3)).isEqualTo(dltDestinationTopic); + assertThat(destinationTopicsForFirstTopic.get(3)).isEqualTo(deserializationExcDltDestinationTopic); + assertThat(destinationTopicsForFirstTopic.get(4)).isEqualTo(dltDestinationTopic); assertThat(context.destinationsByTopicMap.containsKey(SECOND_TOPIC)).isTrue(); List destinationTopicsForSecondTopic = context.destinationsByTopicMap.get(SECOND_TOPIC); @@ -143,7 +144,7 @@ void shouldCreateDestinationMapWhenProcessDestinations() { .flatMap(list -> list.stream()) .collect(Collectors.toList()); - assertThat(destinationList.size()).isEqualTo(11); + assertThat(destinationList.size()).isEqualTo(12); assertThat(destinationList.contains(mainDestinationTopic)).isTrue(); assertThat(destinationList.contains(firstRetryDestinationTopic)).isTrue(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index ea1fa3334b..e938fc31fe 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ import org.springframework.kafka.listener.TimestampedException; import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.kafka.support.converter.ConversionException; +import org.springframework.kafka.support.serializer.DeserializationException; /** * @author Tomaz Fernandes @@ -78,16 +79,16 @@ public void setup() { void shouldResolveRetryDestination() { assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", mainDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", firstRetryDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", dltDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", mainDestinationTopic2.getDestinationName(), 1, @@ -142,6 +143,18 @@ void shouldResolveDltDestinationForFatalDefaultException() { .isEqualTo(dltDestinationTopic); } + @Test + void shouldResolveDeserializationDltDestinationForDeserializationException() { + DeserializationException exc = new DeserializationException("", new byte[] {}, false, new IllegalStateException()); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), + 1, exc, originalTimestamp)).isEqualTo(deserializationExcDltDestinationTopic); + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", deserializationExcDltDestinationTopic.getDestinationName(), + 1, exc, originalTimestamp)).isEqualTo(dltDestinationTopic); + } + @Test void shouldResolveNoOpsForFatalDefaultExceptionInDlt() { assertThat(defaultDestinationTopicContainer @@ -207,7 +220,7 @@ void shouldGetNextDestinationTopic() { @Test void shouldGetDlt() { assertThat(defaultDestinationTopicContainer - .getDltFor("id", mainDestinationTopic.getDestinationName())) + .getDltFor("id", mainDestinationTopic.getDestinationName(), new RuntimeException())) .isEqualTo(dltDestinationTopic); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 4a1574fd10..7e900d9353 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.function.BiPredicate; import org.springframework.classify.BinaryExceptionClassifier; @@ -26,6 +27,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.serializer.DeserializationException; /** * @author Tomaz Fernandes @@ -72,12 +74,16 @@ public class DestinationTopicTests { new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1, DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); + protected DestinationTopic.Properties deserializationDltTopicProps = + new DestinationTopic.Properties(0, "-deserialization" + dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Set.of(DeserializationException.class)); + protected DestinationTopic.Properties dltTopicProps = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Collections.emptySet()); protected List allProps = Arrays - .asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps); + .asList(mainTopicProps, firstRetryProps, secondRetryProps, deserializationDltTopicProps, dltTopicProps); protected DestinationTopic.Properties mainTopicProps2 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, @@ -148,10 +154,12 @@ public class DestinationTopicTests { protected PropsHolder secondRetryDestinationHolder = new PropsHolder(FIRST_TOPIC, secondRetryProps); + protected PropsHolder deserializationDltDestinationHolder = new PropsHolder(FIRST_TOPIC, deserializationDltTopicProps); + protected PropsHolder dltDestinationHolder = new PropsHolder(FIRST_TOPIC, dltTopicProps); protected List allFirstDestinationsHolders = Arrays - .asList(mainDestinationHolder, firstRetryDestinationHolder, secondRetryDestinationHolder, dltDestinationHolder); + .asList(mainDestinationHolder, firstRetryDestinationHolder, secondRetryDestinationHolder, deserializationDltDestinationHolder, dltDestinationHolder); protected final static String SECOND_TOPIC = "secondTopic"; @@ -206,12 +214,15 @@ public class DestinationTopicTests { protected DestinationTopic dltDestinationTopic = new DestinationTopic(FIRST_TOPIC + dltTopicProps.suffix(), dltTopicProps); + protected DestinationTopic deserializationExcDltDestinationTopic = + new DestinationTopic(FIRST_TOPIC + "-deserialization" + dltTopicProps.suffix(), deserializationDltTopicProps); + protected DestinationTopic noOpsDestinationTopic = new DestinationTopic(dltDestinationTopic.getDestinationName() + "-noOps", new DestinationTopic.Properties(dltTopicProps, "-noOps", DestinationTopic.Type.NO_OPS)); protected List allFirstDestinationsTopics = Arrays - .asList(mainDestinationTopic, firstRetryDestinationTopic, secondRetryDestinationTopic, dltDestinationTopic); + .asList(mainDestinationTopic, firstRetryDestinationTopic, secondRetryDestinationTopic, deserializationExcDltDestinationTopic, dltDestinationTopic); protected DestinationTopic mainDestinationTopic2 = new DestinationTopic(SECOND_TOPIC + mainTopicProps2.suffix(), mainTopicProps2);