Skip to content

Commit

Permalink
GH-2800: wire configuration from annotation and traverse causes
Browse files Browse the repository at this point in the history
Closes #2800
  • Loading branch information
breader124 committed Dec 10, 2023
1 parent 95c5e5d commit c194f24
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,13 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting;
import org.springframework.kafka.retrytopic.RetryTopicConstants;
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.retry.annotation.Backoff;


// TODO
// 2. inject exception detection when sending to DLT (consider traversing causes)
// 3. route the message to the configured additional destination or to the default DLT

/**
*
* Annotation to create the retry and dlt topics for a {@link KafkaListener} annotated
Expand All @@ -45,6 +40,7 @@
* @author Gary Russell
* @author Fabio da Silva Jr.
* @author João Lima
* @author Adrian Chlebosz
* @since 2.7
*
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
Expand Down Expand Up @@ -177,7 +173,12 @@
*/
String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX;

ExceptionBasedDltRouting additionalExceptionBasedDltRouting() default @ExceptionBasedDltRouting;
/**
* The DLT routing allowing to redirect the message to the custom DLT based on the
* exception thrown during the processing.
* @return the exception based DLT routing
*/
ExceptionBasedDltRouting exceptionBasedDltRouting() default @ExceptionBasedDltRouting;

/**
* Whether the retry topics will be suffixed with the delay value for that topic or a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
Expand All @@ -32,6 +35,8 @@
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt;
import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
Expand All @@ -55,6 +60,7 @@
*
* @author Tomaz Fernandes
* @author Gary Russell
* @author Adrian Chlebosz
* @since 2.7
*
*/
Expand Down Expand Up @@ -148,6 +154,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
.sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy())
.timeoutAfter(timeout)
.dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting()))
.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
}

Expand Down Expand Up @@ -191,6 +198,11 @@ private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, Be
return policy;
}

private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltRouting routingSpec) {
return Arrays.stream(routingSpec.routingRules())
.collect(Collectors.toMap(ExceptionBasedDestinationDlt::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions())));
}

private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) {
Class<?> declaringClass = listenerMethod.getDeclaringClass();
return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author Yvette Quinby
* @author Adrian Chlebosz
* @since 2.7
*
*/
Expand Down Expand Up @@ -174,11 +175,26 @@ private static boolean isMatchingDltTopic(DestinationTopic destination, Exceptio
}

boolean isDltIntendedForCurrentExc = destination.usedForExceptions().stream()
.anyMatch(excType -> excType.isInstance(e));
.anyMatch(excType -> isDirectExcOrCause(e, excType));
boolean isGenericPurposeDlt = destination.usedForExceptions().isEmpty();
return isDltIntendedForCurrentExc || isGenericPurposeDlt;
}

private static boolean isDirectExcOrCause(Exception e, Class<? extends Throwable> excType) {
Throwable toMatch = e;

boolean isMatched = excType.isInstance(toMatch);
while (!isMatched) {
toMatch = toMatch.getCause();
if (toMatch == null) {
return false;
}
isMatched = excType.isInstance(toMatch);
}

return isMatched;
}

@Override
public DestinationTopic getNextDestinationTopicFor(String mainListenerId, String topic) {
return getDestinationHolderFor(mainListenerId, topic).getNextDestination();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
*
* @author Tomaz Fernandes
* @author Gary Russell
* @author Adrian Chlebosz
* @since 2.7
*
*/
Expand Down Expand Up @@ -195,14 +196,15 @@ public Properties(Properties sourceProperties, String suffix, Type type) {
* @param shouldRetryOn the exception classifications.
* @param timeout the timeout.
* @param autoStartDltHandler whether or not to start the DLT handler.
* @param usedForExceptions the exceptions which destination is intended for
* @since 2.8
*/
public Properties(long delayMs, String suffix, Type type,
int maxAttempts, int numPartitions,
DltStrategy dltStrategy,
KafkaOperations<?, ?> kafkaOperations,
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler,
Set<Class<? extends Throwable>> usedForExceptions) {
Set<Class<? extends Throwable>> usedForExceptions) {

this.delayMs = delayMs;
this.suffix = suffix;
Expand Down Expand Up @@ -248,6 +250,10 @@ public Boolean autoStartDltHandler() {
return this.autoStartDltHandler;
}

public Set<Class<? extends Throwable>> usedForExceptions() {
return this.usedForExceptions;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*
* @author Tomaz Fernandes
* @author Gary Russell
* @author Adrian Chlebosz
* @since 2.7
*/
public interface DestinationTopicContainer {
Expand Down Expand Up @@ -65,9 +66,10 @@ public interface DestinationTopicContainer {
* DLT for the given topic, or null if none is found.
* @param mainListenerId the listener id.
* @param topicName the topic name for which to look the DLT for
* @param exc the exception which is being handled
* @return The {@link DestinationTopic} instance corresponding to the DLT.
*/
@Nullable
DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e);
DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,21 @@
package org.springframework.kafka.retrytopic;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.retrytopic.DestinationTopic.Type;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

/**
*
* Creates a list of {@link DestinationTopic.Properties} based on the
Expand All @@ -41,6 +40,7 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author João Lima
* @author Adrian Chlebosz
* @since 2.7
*
*/
Expand Down Expand Up @@ -68,7 +68,7 @@ public class DestinationTopicPropertiesFactory {

private final long timeout;

private final Map<String, Set<Class<? extends Throwable>>> exceptionBasedRouting;
private final Map<String, Set<Class<? extends Throwable>>> dltRoutingRules;

@Nullable
private Boolean autoStartDltHandler;
Expand All @@ -85,6 +85,7 @@ public class DestinationTopicPropertiesFactory {
* @param topicSuffixingStrategy the topic suffixing strategy.
* @param sameIntervalTopicReuseStrategy the same interval reuse strategy.
* @param timeout the timeout.
* @param dltRoutingRules the specification of which DLT should be used for the particular exception type
* @since 3.0.12
*/
public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List<Long> backOffValues,
Expand All @@ -94,7 +95,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
TopicSuffixingStrategy topicSuffixingStrategy,
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy,
long timeout,
Map<String, Set<Class<? extends Throwable>>> exceptionBasedRouting) {
Map<String, Set<Class<? extends Throwable>>> dltRoutingRules) {

this.dltStrategy = dltStrategy;
this.kafkaOperations = kafkaOperations;
Expand All @@ -104,7 +105,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy;
this.timeout = timeout;
this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix);
this.exceptionBasedRouting = exceptionBasedRouting;
this.dltRoutingRules = dltRoutingRules;
this.backOffValues = backOffValues;
// Max Attempts include the initial try.
this.maxAttempts = this.backOffValues.size() + 1;
Expand Down Expand Up @@ -132,7 +133,8 @@ private List<DestinationTopic.Properties> createPropertiesForFixedDelaySingleTop
DestinationTopic.Properties retryTopicProperties = createRetryProperties(1, getShouldRetryOn());
if (isNoDltStrategy()) {
return Arrays.asList(mainTopicProperties, retryTopicProperties);
} else {
}
else {
DestinationTopic.Properties dltProperties = createDltProperties();
List<DestinationTopic.Properties> customDltProperties = createCustomDltProperties();
return Stream.concat(
Expand All @@ -159,7 +161,10 @@ private List<DestinationTopic.Properties> createPropertiesForDefaultTopicStrateg
.mapToObj(this::createTopicProperties)
.collect(Collectors.toList());
if (!isNoDltStrategy()) {
basicProperties.addAll(createCustomDltProperties());
List<DestinationTopic.Properties> customDltProperties = createCustomDltProperties();
// add all custom DLTs properties before the generic purpose one to make sure that they are considered
// before the generic purpose one is considered and chosen
basicProperties.addAll(basicProperties.size() - 1, customDltProperties);
}

return basicProperties;
Expand Down Expand Up @@ -206,8 +211,8 @@ private DestinationTopic.Properties createDltProperties() {
}

private List<DestinationTopic.Properties> createCustomDltProperties() {
return exceptionBasedRouting.entrySet().stream()
.map(entry -> new DestinationTopic.Properties(0, entry.getKey() + "-" + this.destinationTopicSuffixes.getDltSuffix(),
return this.dltRoutingRules.entrySet().stream()
.map(entry -> new DestinationTopic.Properties(0, entry.getKey() + this.destinationTopicSuffixes.getDltSuffix(),
DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy,
this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, entry.getValue()))
.toList();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,48 @@
/*
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.retrytopic;

/**
* Annotation allowing to specify additional DLT which will be chosen when message
* processing caused the configured exception to be thrown.
*
* @author Adrian Chlebosz
* @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting
* @since 3.1.1
*/
public @interface ExceptionBasedDestinationDlt {
String customSuffix();
Class<? extends Throwable>[] exceptions();

/**
* Suffix extension used when constructing the name for the new DLT. It is placed
* before the main suffix configured through the
* ${@link org.springframework.kafka.annotation.RetryableTopic#dltTopicSuffix()}, so the
* final name is the product of these two.
*
* @return the configured suffix extension
*/
String suffix();

/**
* When message processing throws one of the exceptions configured here, then
* it should be eventually redirected to the DLT with name containing the extension
* configured through {@link #suffix()}. The causes of the thrown exception will be
* traversed to match with any of configured ones.
*
* @return configured exceptions
*/
Class<? extends Throwable>[] exceptions();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.retrytopic;

/**
* Annotation allowing to specify the custom DLT routing rules steered by exceptions
* which might be thrown during the processing.
*
* @author Adrian Chlebosz
* @see org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt
* @since 3.1.1
*/
public @interface ExceptionBasedDltRouting {
ExceptionBasedDestinationDlt[] routing() default {};

/**
* Specific rules expressing to which custom DLT the message should be redirected
* when the specified exception has been thrown during its processing.
*
* @return configured routing
*/
ExceptionBasedDestinationDlt[] routingRules() default {};
}
Loading

0 comments on commit c194f24

Please sign in to comment.