2020import java .util .HashMap ;
2121import java .util .List ;
2222import java .util .Map ;
23- import java .util .Objects ;
2423import java .util .concurrent .atomic .AtomicInteger ;
2524
2625import com .rabbitmq .client .Channel ;
3635import org .springframework .amqp .rabbit .listener .api .ChannelAwareMessageListener ;
3736import org .springframework .amqp .rabbit .retry .MessageBatchRecoverer ;
3837import org .springframework .amqp .rabbit .retry .MessageRecoverer ;
38+ import org .springframework .amqp .rabbit .support .ListenerExecutionFailedException ;
3939import org .springframework .amqp .support .AmqpHeaders ;
4040import org .springframework .amqp .support .converter .MessageConversionException ;
4141import org .springframework .amqp .support .converter .MessageConverter ;
4242import org .springframework .amqp .support .converter .SimpleMessageConverter ;
4343import org .springframework .core .AttributeAccessor ;
44+ import org .springframework .core .retry .RetryException ;
45+ import org .springframework .core .retry .RetryOperations ;
46+ import org .springframework .core .retry .RetryTemplate ;
4447import org .springframework .integration .IntegrationMessageHeaderAccessor ;
4548import org .springframework .integration .StaticMessageHeaderAccessor ;
4649import org .springframework .integration .amqp .support .AmqpHeaderMapper ;
4750import org .springframework .integration .amqp .support .AmqpMessageHeaderErrorMessageStrategy ;
4851import org .springframework .integration .amqp .support .DefaultAmqpHeaderMapper ;
4952import org .springframework .integration .amqp .support .EndpointUtils ;
5053import org .springframework .integration .context .OrderlyShutdownCapable ;
54+ import org .springframework .integration .core .RecoveryCallback ;
5155import org .springframework .integration .endpoint .MessageProducerSupport ;
5256import org .springframework .integration .support .ErrorMessageUtils ;
5357import org .springframework .messaging .MessageChannel ;
54- import org .springframework .retry .RecoveryCallback ;
55- import org .springframework .retry .RetryOperations ;
56- import org .springframework .retry .support .RetrySynchronizationManager ;
57- import org .springframework .retry .support .RetryTemplate ;
5858import org .springframework .util .Assert ;
5959
6060/**
6161 * Adapter that receives Messages from an AMQP Queue, converts them into
62- * Spring Integration Messages, and sends the results to a Message Channel.
62+ * Spring Integration messages and sends the results to a Message Channel.
6363 *
6464 * @author Mark Fisher
6565 * @author Gary Russell
@@ -274,24 +274,24 @@ private void setupRecoveryCallbackIfAny() {
274274 "The 'messageRecoverer' must be an instance of MessageBatchRecoverer " +
275275 "when consumer configured for batch mode" );
276276 this .recoveryCallback =
277- context -> {
277+ ( context , cause ) -> {
278278 @ SuppressWarnings ("unchecked" )
279279 List <Message > messagesToRecover =
280280 (List <Message >) context .getAttribute (AmqpMessageHeaderErrorMessageStrategy .AMQP_RAW_MESSAGE );
281281 if (messagesToRecover != null ) {
282282 ((MessageBatchRecoverer ) messageRecovererToUse )
283- .recover (messagesToRecover , context . getLastThrowable () );
283+ .recover (messagesToRecover , cause );
284284 }
285285 return null ;
286286 };
287287 }
288288 else {
289289 this .recoveryCallback =
290- context -> {
290+ ( context , cause ) -> {
291291 Message messageToRecover =
292292 (Message ) context .getAttribute (AmqpMessageHeaderErrorMessageStrategy .AMQP_RAW_MESSAGE );
293293 if (messageToRecover != null ) {
294- messageRecovererToUse .recover (messageToRecover , context . getLastThrowable () );
294+ messageRecovererToUse .recover (messageToRecover , cause );
295295 }
296296 return null ;
297297 };
@@ -321,9 +321,9 @@ public int afterShutdown() {
321321 }
322322
323323 /**
324- * If there's a retry template, it will set the attributes holder via the listener. If
325- * there's no retry template, but there's an error channel, we create a new attributes
326- * holder here. If an attributes holder exists (by either method), we set the
324+ * If there's a retry template, it will set the attribute holder via the listener. If
325+ * there's no retry template, but there's an error channel, we create a new attribute
326+ * holder here. If an attribute holder exists (by either method), we set the
327327 * attributes for use by the
328328 * {@link org.springframework.integration.support.ErrorMessageStrategy}.
329329 * @param amqpMessage the AMQP message to use.
@@ -333,19 +333,15 @@ public int afterShutdown() {
333333 private void setAttributesIfNecessary (Object amqpMessage ,
334334 org .springframework .messaging .@ Nullable Message <?> message ) {
335335
336- boolean needHolder = getErrorChannel () != null && this .retryTemplate == null ;
337- boolean needAttributes = needHolder || this .retryTemplate != null ;
336+ boolean needHolder = getErrorChannel () != null || this .retryTemplate != null ;
338337 if (needHolder ) {
339- ATTRIBUTES_HOLDER .set (ErrorMessageUtils .getAttributeAccessor (null , null ));
340- }
341- if (needAttributes ) {
342- AttributeAccessor attributes = this .retryTemplate != null
343- ? RetrySynchronizationManager .getContext ()
344- : ATTRIBUTES_HOLDER .get ();
345- if (attributes != null ) {
346- attributes .setAttribute (ErrorMessageUtils .INPUT_MESSAGE_CONTEXT_KEY , message );
347- attributes .setAttribute (AmqpMessageHeaderErrorMessageStrategy .AMQP_RAW_MESSAGE , amqpMessage );
338+ AttributeAccessor attributes = ATTRIBUTES_HOLDER .get ();
339+ if (attributes == null ) {
340+ attributes = ErrorMessageUtils .getAttributeAccessor (null , null );
341+ ATTRIBUTES_HOLDER .set (attributes );
348342 }
343+ attributes .setAttribute (ErrorMessageUtils .INPUT_MESSAGE_CONTEXT_KEY , message );
344+ attributes .setAttribute (AmqpMessageHeaderErrorMessageStrategy .AMQP_RAW_MESSAGE , amqpMessage );
349345 }
350346 }
351347
@@ -385,16 +381,27 @@ public void onMessage(final Message message, @Nullable Channel channel) {
385381 else {
386382 final org .springframework .messaging .Message <Object > toSend =
387383 createMessageFromAmqp (message , channel );
388- this .retryOps .execute (
389- context -> {
390- AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor .getDeliveryAttempt (toSend );
391- if (deliveryAttempt != null ) {
392- deliveryAttempt .incrementAndGet ();
393- }
394- setAttributesIfNecessary (message , toSend );
395- sendMessage (toSend );
396- return null ;
397- }, this .recoverer );
384+ try {
385+ this .retryOps .execute (
386+ () -> {
387+ AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor .getDeliveryAttempt (toSend );
388+ if (deliveryAttempt != null ) {
389+ deliveryAttempt .incrementAndGet ();
390+ }
391+ setAttributesIfNecessary (message , toSend );
392+ sendMessage (toSend );
393+ return null ;
394+ });
395+ }
396+ catch (RetryException ex ) {
397+ if (this .recoverer != null ) {
398+ this .recoverer .recover (getErrorMessageAttributes (toSend ), ex .getCause ());
399+ }
400+ else {
401+ throw new ListenerExecutionFailedException (
402+ "Failed handling message after '" + ex .getRetryCount () + "' retries" , ex , message );
403+ }
404+ }
398405 }
399406 }
400407 catch (MessageConversionException e ) {
@@ -404,17 +411,14 @@ public void onMessage(final Message message, @Nullable Channel channel) {
404411 getMessagingTemplate ()
405412 .send (errorChannel ,
406413 buildErrorMessage (null ,
407-
408414 EndpointUtils .errorMessagePayload (message , channel , this .manualAcks , e )));
409415 }
410416 else {
411417 throw e ;
412418 }
413419 }
414420 finally {
415- if (this .retryOps == null ) {
416- ATTRIBUTES_HOLDER .remove ();
417- }
421+ ATTRIBUTES_HOLDER .remove ();
418422 }
419423 }
420424
@@ -453,7 +457,7 @@ protected Object convertPayload(Message message) {
453457
454458 protected org .springframework .messaging .Message <Object > createMessageFromPayload (Object payload ,
455459 @ Nullable Channel channel , Map <String , @ Nullable Object > headers , long deliveryTag ,
456- @ Nullable List <Map <String , Object >> listHeaders ) {
460+ @ Nullable List <Map <String , @ Nullable Object >> listHeaders ) {
457461
458462 if (this .manualAcks ) {
459463 headers .put (AmqpHeaders .DELIVERY_TAG , deliveryTag );
@@ -480,14 +484,14 @@ protected class BatchListener extends Listener implements ChannelAwareBatchMessa
480484 @ Override
481485 public void onMessageBatch (List <Message > messages , @ Nullable Channel channel ) {
482486 List <?> converted ;
483- List <Map <String , Object >> headers = null ;
487+ List <Map <String , @ Nullable Object >> headers = null ;
484488 if (this .batchModeMessages ) {
485489 converted = convertMessages (messages , channel );
486490 }
487491 else {
488492 converted = convertPayloads (messages , channel );
489493 if (BatchMode .EXTRACT_PAYLOADS_WITH_HEADERS .equals (AmqpInboundChannelAdapter .this .batchMode )) {
490- List <Map <String , Object >> listHeaders = new ArrayList <>();
494+ List <Map <String , @ Nullable Object >> listHeaders = new ArrayList <>();
491495 messages .forEach (msg -> listHeaders .add (AmqpInboundChannelAdapter .this .headerMapper
492496 .toHeadersFromRequest (msg .getMessageProperties ())));
493497 headers = listHeaders ;
@@ -503,31 +507,45 @@ public void onMessageBatch(List<Message> messages, @Nullable Channel channel) {
503507 sendMessage (message );
504508 }
505509 else {
506- this .retryOps .execute (
507- context -> {
508- AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor .getDeliveryAttempt (message );
509- if (deliveryAttempt != null ) {
510- deliveryAttempt .incrementAndGet ();
511- }
512- if (this .batchModeMessages ) {
513- @ SuppressWarnings ("unchecked" )
514- List <org .springframework .messaging .Message <?>> payloads =
515- (List <org .springframework .messaging .Message <?>>) message .getPayload ();
516- payloads .forEach (payload ->
517- Objects .requireNonNull (
518- StaticMessageHeaderAccessor .getDeliveryAttempt (payload ))
519- .incrementAndGet ());
520- }
521- setAttributesIfNecessary (messages , message );
522- sendMessage (message );
523- return null ;
524- }, this .recoverer );
510+ try {
511+ this .retryOps .execute (
512+ () -> {
513+ AtomicInteger deliveryAttempt =
514+ StaticMessageHeaderAccessor .getDeliveryAttempt (message );
515+ if (deliveryAttempt != null ) {
516+ deliveryAttempt .incrementAndGet ();
517+ }
518+ if (this .batchModeMessages ) {
519+ @ SuppressWarnings ("unchecked" )
520+ List <org .springframework .messaging .Message <?>> payloads =
521+ (List <org .springframework .messaging .Message <?>>) message .getPayload ();
522+ payloads .forEach (payload -> {
523+ AtomicInteger batchItemDeliveryAttempt =
524+ StaticMessageHeaderAccessor .getDeliveryAttempt (payload );
525+ if (batchItemDeliveryAttempt != null ) {
526+ batchItemDeliveryAttempt .incrementAndGet ();
527+ }
528+ });
529+ }
530+ setAttributesIfNecessary (messages , message );
531+ sendMessage (message );
532+ return null ;
533+ });
534+ }
535+ catch (RetryException ex ) {
536+ if (this .recoverer != null ) {
537+ this .recoverer .recover (getErrorMessageAttributes (null ), ex .getCause ());
538+ }
539+ else {
540+ throw new ListenerExecutionFailedException (
541+ "Failed handling messages after '" + ex .getRetryCount () + "' retries" , ex ,
542+ messages .toArray (new Message [0 ]));
543+ }
544+ }
525545 }
526546 }
527547 finally {
528- if (this .retryOps == null ) {
529- ATTRIBUTES_HOLDER .remove ();
530- }
548+ ATTRIBUTES_HOLDER .remove ();
531549 }
532550 }
533551 }
0 commit comments