From 0d3d716454d0e10e890272b72aa6ef4fac60e4e9 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 27 Oct 2022 15:30:26 -0400 Subject: [PATCH] GH-1382: Republish Recoverer Improvements Resolves https://github.com/spring-projects/spring-amqp/issues/1382 Add expressions; make private method protected. **cherry-pick to 2.4.x** --- .../retry/RepublishMessageRecoverer.java | 76 +++++++++++++++---- ...va => RepublishMessageRecovererTests.java} | 30 +++++++- src/reference/asciidoc/amqp.adoc | 5 +- 3 files changed, 94 insertions(+), 17 deletions(-) rename spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/{RepublishMessageRecovererTest.java => RepublishMessageRecovererTests.java} (81%) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java index 271b5c2d75..efe28d49f5 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2021 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,10 @@ import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.RabbitUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -68,9 +72,11 @@ public class RepublishMessageRecoverer implements MessageRecoverer { protected final AmqpTemplate errorTemplate; // NOSONAR - protected final String errorRoutingKey; // NOSONAR + protected final Expression errorRoutingKeyExpression; // NOSONAR - protected final String errorExchangeName; // NOSONAR + protected final Expression errorExchangeNameExpression; // NOSONAR + + protected final EvaluationContext evaluationContext = new StandardEvaluationContext(); private String errorRoutingKeyPrefix = "error."; @@ -80,19 +86,48 @@ public class RepublishMessageRecoverer implements MessageRecoverer { private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT; + /** + * Create an instance with the provided template. + * @param errorTemplate the template. + */ public RepublishMessageRecoverer(AmqpTemplate errorTemplate) { - this(errorTemplate, null, null); + this(errorTemplate, (String) null, (String) null); } + /** + * Create an instance with the provided properties. + * @param errorTemplate the template. + * @param errorExchange the exchange. + */ public RepublishMessageRecoverer(AmqpTemplate errorTemplate, String errorExchange) { this(errorTemplate, errorExchange, null); } + /** + * Create an instance with the provided properties. If the exchange or routing key is null, + * the template's default will be used. + * @param errorTemplate the template. + * @param errorExchange the exchange. + * @param errorRoutingKey the routing key. + */ public RepublishMessageRecoverer(AmqpTemplate errorTemplate, String errorExchange, String errorRoutingKey) { + this(errorTemplate, new LiteralExpression(errorExchange), new LiteralExpression(errorRoutingKey)); + } + + /** + * Create an instance with the provided properties. If the exchange or routing key + * evaluate to null, the template's default will be used. + * @param errorTemplate the template. + * @param errorExchange the exchange expression, evaluated against the message. + * @param errorRoutingKey the routing key, evaluated against the message. + */ + public RepublishMessageRecoverer(AmqpTemplate errorTemplate, @Nullable Expression errorExchange, + @Nullable Expression errorRoutingKey) { + Assert.notNull(errorTemplate, "'errorTemplate' cannot be null"); this.errorTemplate = errorTemplate; - this.errorExchangeName = errorExchange; - this.errorRoutingKey = errorRoutingKey; + this.errorExchangeNameExpression = errorExchange != null ? errorExchange : new LiteralExpression(null); + this.errorRoutingKeyExpression = errorRoutingKey != null ? errorRoutingKey : new LiteralExpression(null); if (!(this.errorTemplate instanceof RabbitTemplate)) { this.maxStackTraceLength = Integer.MAX_VALUE; } @@ -175,17 +210,17 @@ public void recover(Message message, Throwable cause) { messageProperties.setDeliveryMode(this.deliveryMode); } - if (null != this.errorExchangeName) { - String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey - : this.prefixedOriginalRoutingKey(message); - doSend(this.errorExchangeName, routingKey, message); + String exchangeName = this.errorExchangeNameExpression.getValue(this.evaluationContext, message, String.class); + String rk = this.errorRoutingKeyExpression.getValue(this.evaluationContext, message, String.class); + String routingKey = rk != null ? rk : this.prefixedOriginalRoutingKey(message); + if (null != exchangeName) { + doSend(exchangeName, routingKey, message); if (this.logger.isWarnEnabled()) { - this.logger.warn("Republishing failed message to exchange '" + this.errorExchangeName + this.logger.warn("Republishing failed message to exchange '" + exchangeName + "' with routing key " + routingKey); } } else { - final String routingKey = this.prefixedOriginalRoutingKey(message); doSend(null, routingKey, message); if (this.logger.isWarnEnabled()) { this.logger.warn("Republishing failed message to the template's default exchange with routing key " @@ -271,11 +306,24 @@ else if (stackTraceAsString.length() + exceptionMessage.length() > this.maxStack return null; } - private String prefixedOriginalRoutingKey(Message message) { + /** + * The default behavior of this method is to append the received routing key to the + * {@link #setErrorRoutingKeyPrefix(String) routingKeyPrefix}. This is only invoked + * if the routing key is null. + * @param message the message. + * @return the routing key. + */ + protected String prefixedOriginalRoutingKey(Message message) { return this.errorRoutingKeyPrefix + message.getMessageProperties().getReceivedRoutingKey(); } - private String getStackTraceAsString(Throwable cause) { + /** + * Create a String representation of the stack trace. + * @param cause the throwable. + * @return the String. + * @since 2.4.8 + */ + protected String getStackTraceAsString(Throwable cause) { StringWriter stringWriter = new StringWriter(); PrintWriter printWriter = new PrintWriter(stringWriter, true); cause.printStackTrace(printWriter); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererTest.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererTests.java similarity index 81% rename from spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererTest.java rename to spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererTests.java index 4f5e912a26..675c697426 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererTest.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; +import org.springframework.expression.spel.standard.SpelExpressionParser; /** * @author James Carr @@ -42,7 +43,7 @@ * @since 1.3 */ @ExtendWith(MockitoExtension.class) -public class RepublishMessageRecovererTest { +public class RepublishMessageRecovererTests { private final Message message = new Message("".getBytes(), new MessageProperties()); @@ -151,4 +152,29 @@ void setDeliveryModeIfNull() { assertThat(this.message.getMessageProperties().getDeliveryMode()).isEqualTo(MessageDeliveryMode.NON_PERSISTENT); } + @Test + void dynamicExRk() { + this.recoverer = new RepublishMessageRecoverer(this.amqpTemplate, + new SpelExpressionParser().parseExpression("messageProperties.headers.get('errorExchange')"), + new SpelExpressionParser().parseExpression("messageProperties.headers.get('errorRK')")); + this.message.getMessageProperties().setHeader("errorExchange", "ex"); + this.message.getMessageProperties().setHeader("errorRK", "rk"); + + this.recoverer.recover(this.message, this.cause); + + verify(this.amqpTemplate).send("ex", "rk", this.message); + } + + @Test + void dynamicRk() { + this.recoverer = new RepublishMessageRecoverer(this.amqpTemplate, null, + new SpelExpressionParser().parseExpression("messageProperties.headers.get('errorRK')")); + this.message.getMessageProperties().setHeader("errorExchange", "ex"); + this.message.getMessageProperties().setHeader("errorRK", "rk"); + + this.recoverer.recover(this.message, this.cause); + + verify(this.amqpTemplate).send("rk", this.message); + } + } diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index ff6edd75b6..c015048056 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -6742,7 +6742,7 @@ The default `MessageRecoverer` consumes the errant message and emits a `WARN` me Starting with version 1.3, a new `RepublishMessageRecoverer` is provided, to allow publishing of failed messages after retries are exhausted. -When a recoverer consumes the final exception, the message is ack'd and is not sent to the dead letter exchange, if any. +When a recoverer consumes the final exception, the message is ack'd and is not sent to the dead letter exchange by the broker, if configured. NOTE: When `RepublishMessageRecoverer` is used on the consumer side, the received message has `deliveryMode` in the `receivedDeliveryMode` message property. In this case the `deliveryMode` is `null`. @@ -6793,6 +6793,9 @@ Starting with versions 2.1.13, 2.2.3, the exception message is included in this * if the stack trace is small, the message will be truncated (plus `...`) to fit in the available bytes (but the message within the stack trace itself is truncated to 97 bytes plus `...`). Whenever a truncation of any kind occurs, the original exception will be logged to retain the complete information. +The evaluation is performed after the headers are enhanced so information such as the exception type can be used in the expressions. + +Starting with version 2.4.8, the error exchange and routing key can be provided as SpEL expressions, with the `Message` being the root object for the evaluation. Starting with version 2.3.3, a new subclass `RepublishMessageRecovererWithConfirms` is provided; this supports both styles of publisher confirms and will wait for the confirmation before returning (or throw an exception if not confirmed or the message is returned).