diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java index 8c9b1df3fd..350eb8fa8f 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java @@ -35,10 +35,10 @@ */ public class DefaultRetryHandler implements RetryHandler { - private final BiPredicate queueRecoveryRetryCondition; - private final BiPredicate exchangeRecoveryRetryCondition; - private final BiPredicate bindingRecoveryRetryCondition; - private final BiPredicate consumerRecoveryRetryCondition; + private final BiPredicate queueRecoveryRetryCondition; + private final BiPredicate exchangeRecoveryRetryCondition; + private final BiPredicate bindingRecoveryRetryCondition; + private final BiPredicate consumerRecoveryRetryCondition; private final RetryOperation queueRecoveryRetryOperation; private final RetryOperation exchangeRecoveryRetryOperation; @@ -49,10 +49,10 @@ public class DefaultRetryHandler implements RetryHandler { private final BackoffPolicy backoffPolicy; - public DefaultRetryHandler(BiPredicate queueRecoveryRetryCondition, - BiPredicate exchangeRecoveryRetryCondition, - BiPredicate bindingRecoveryRetryCondition, - BiPredicate consumerRecoveryRetryCondition, + public DefaultRetryHandler(BiPredicate queueRecoveryRetryCondition, + BiPredicate exchangeRecoveryRetryCondition, + BiPredicate bindingRecoveryRetryCondition, + BiPredicate consumerRecoveryRetryCondition, RetryOperation queueRecoveryRetryOperation, RetryOperation exchangeRecoveryRetryOperation, RetryOperation bindingRecoveryRetryOperation, @@ -72,27 +72,31 @@ public DefaultRetryHandler(BiPredicate queueRecoveryRe this.retryAttempts = retryAttempts; } + @SuppressWarnings("unchecked") @Override public RetryResult retryQueueRecovery(RetryContext context) throws Exception { - return doRetry(queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context); + return doRetry((BiPredicate) queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context); } + @SuppressWarnings("unchecked") @Override public RetryResult retryExchangeRecovery(RetryContext context) throws Exception { - return doRetry(exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context); + return doRetry((BiPredicate) exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context); } + @SuppressWarnings("unchecked") @Override public RetryResult retryBindingRecovery(RetryContext context) throws Exception { - return doRetry(bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context); + return doRetry((BiPredicate) bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context); } + @SuppressWarnings("unchecked") @Override public RetryResult retryConsumerRecovery(RetryContext context) throws Exception { - return doRetry(consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context); + return doRetry((BiPredicate) consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context); } - protected RetryResult doRetry(BiPredicate condition, RetryOperation operation, T entity, RetryContext context) + protected RetryResult doRetry(BiPredicate condition, RetryOperation operation, RecordedEntity entity, RetryContext context) throws Exception { int attempts = 0; Exception exception = context.exception(); @@ -107,7 +111,6 @@ protected RetryResult doRetry(BiPredicate queueRecoveryRetryCondition = (q, e) -> false; - private BiPredicate exchangeRecoveryRetryCondition = (ex, e) -> false; - private BiPredicate bindingRecoveryRetryCondition = (b, e) -> false; - private BiPredicate consumerRecoveryRetryCondition = (c, e) -> false; + private BiPredicate queueRecoveryRetryCondition = (q, e) -> false; + private BiPredicate exchangeRecoveryRetryCondition = (ex, e) -> false; + private BiPredicate bindingRecoveryRetryCondition = (b, e) -> false; + private BiPredicate consumerRecoveryRetryCondition = (c, e) -> false; private DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation = context -> null; private DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation = context -> null; @@ -50,25 +50,25 @@ public static TopologyRecoveryRetryHandlerBuilder builder() { } public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryCondition( - BiPredicate queueRecoveryRetryCondition) { + BiPredicate queueRecoveryRetryCondition) { this.queueRecoveryRetryCondition = queueRecoveryRetryCondition; return this; } public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryCondition( - BiPredicate exchangeRecoveryRetryCondition) { + BiPredicate exchangeRecoveryRetryCondition) { this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition; return this; } public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryCondition( - BiPredicate bindingRecoveryRetryCondition) { + BiPredicate bindingRecoveryRetryCondition) { this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition; return this; } public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryCondition( - BiPredicate consumerRecoveryRetryCondition) { + BiPredicate consumerRecoveryRetryCondition) { this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition; return this; } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java index 8e6e16a790..582475b702 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -18,6 +18,7 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.ShutdownSignalException; +import java.util.function.BiPredicate; import java.util.function.Predicate; /** @@ -31,9 +32,9 @@ */ public abstract class TopologyRecoveryRetryLogic { - public static final Predicate CHANNEL_CLOSED_NOT_FOUND = e -> { - if (e.getCause() instanceof ShutdownSignalException) { - ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + public static final BiPredicate CHANNEL_CLOSED_NOT_FOUND = (entity, ex) -> { + if (ex.getCause() instanceof ShutdownSignalException) { + ShutdownSignalException cause = (ShutdownSignalException) ex.getCause(); if (cause.getReason() instanceof AMQP.Channel.Close) { return ((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404; } diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java index 4f8ab6054a..49819ce8d3 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java @@ -58,8 +58,8 @@ public void topologyRecoveryRetry() throws Exception { protected ConnectionFactory newConnectionFactory() { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); connectionFactory.setTopologyRecoveryRetryHandler( - builder().bindingRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e)) - .consumerRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e)) + builder().bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) + .consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)) .consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER))) .build()