diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index a35a55f64d..0816abdde3 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -29,6 +29,7 @@ import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.RetryHandler; import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import java.io.IOException; @@ -192,6 +193,13 @@ public class ConnectionFactory implements Cloneable { */ private Predicate connectionRecoveryTriggeringCondition; + /** + * Retry handler for topology recovery. + * Default is no retry. + * @since 5.4.0 + */ + private RetryHandler topologyRecoveryRetryHandler; + /** @return the default host to use for connections */ public String getHost() { return host; @@ -1087,6 +1095,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setErrorOnWriteListener(errorOnWriteListener); result.setTopologyRecoveryFilter(topologyRecoveryFilter); result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition); + result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler); return result; } @@ -1454,4 +1463,13 @@ public void setConnectionRecoveryTriggeringCondition(Predicate connectionRecoveryTriggeringCondition; + private RetryHandler topologyRecoveryRetryHandler; private ExceptionHandler exceptionHandler; private ThreadFactory threadFactory; @@ -257,4 +259,11 @@ public Predicate getConnectionRecoveryTriggeringConditi return connectionRecoveryTriggeringCondition; } + public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) { + this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler; + } + + public RetryHandler getTopologyRecoveryRetryHandler() { + return topologyRecoveryRetryHandler; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 9223cb11aa..c3e66e0d2f 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -95,6 +95,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC private final Predicate connectionRecoveryTriggeringCondition; + private final RetryHandler retryHandler; + public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List
addrs) { this(params, f, new ListAddressResolver(addrs)); } @@ -115,6 +117,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, this.channels = new ConcurrentHashMap<>(); this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ? letAllPassFilter() : params.getTopologyRecoveryFilter(); + + this.retryHandler = params.getTopologyRecoveryRetryHandler(); } private void setupErrorOnWriteListenerForPotentialRecovery() { @@ -125,12 +129,9 @@ private void setupErrorOnWriteListenerForPotentialRecovery() { // we should trigger the error handling and the recovery only once if (errorOnWriteLock.tryLock()) { try { - Thread recoveryThread = threadFactory.newThread(new Runnable() { - @Override - public void run() { - AMQConnection c = (AMQConnection) connection; - c.handleIoError(exception); - } + Thread recoveryThread = threadFactory.newThread(() -> { + AMQConnection c = (AMQConnection) connection; + c.handleIoError(exception); }); recoveryThread.setName("RabbitMQ Error On Write Thread"); recoveryThread.start(); @@ -630,6 +631,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) { } } + void recoverChannel(AutorecoveringChannel channel) throws IOException { + channel.automaticallyRecover(this, this.delegate); + } + private void notifyRecoveryListenersComplete() { for (RecoveryListener f : Utility.copy(this.recoveryListeners)) { f.handleRecovery(this); @@ -651,16 +656,16 @@ private void recoverTopology(final ExecutorService executor) { if (executor == null) { // recover entities in serial on the main connection thread for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) { - recoverExchange(exchange); + recoverExchange(exchange, true); } for (final Map.Entry entry : Utility.copy(recordedQueues).entrySet()) { - recoverQueue(entry.getKey(), entry.getValue()); + recoverQueue(entry.getKey(), entry.getValue(), true); } for (final RecordedBinding b : Utility.copy(recordedBindings)) { - recoverBinding(b); + recoverBinding(b, true); } for (final Map.Entry entry : Utility.copy(consumers).entrySet()) { - recoverConsumer(entry.getKey(), entry.getValue()); + recoverConsumer(entry.getKey(), entry.getValue(), true); } } else { // Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers @@ -680,11 +685,19 @@ private void recoverTopology(final ExecutorService executor) { } } - private void recoverExchange(final RecordedExchange x) { + private void recoverExchange(RecordedExchange x, boolean retry) { // recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK. try { if (topologyRecoveryFilter.filterExchange(x)) { - x.recover(); + if (retry) { + final RecordedExchange entity = x; + x = (RecordedExchange) wrapRetryIfNecessary(x, () -> { + entity.recover(); + return null; + }).getRecordedEntity(); + } else { + x.recover(); + } LOGGER.debug("{} has recovered", x); } } catch (Exception cause) { @@ -695,12 +708,20 @@ private void recoverExchange(final RecordedExchange x) { } } - private void recoverQueue(final String oldName, final RecordedQueue q) { + void recoverQueue(final String oldName, RecordedQueue q, boolean retry) { try { if (topologyRecoveryFilter.filterQueue(q)) { LOGGER.debug("Recovering {}", q); - q.recover(); + if (retry) { + final RecordedQueue entity = q; + q = (RecordedQueue) wrapRetryIfNecessary(q, () -> { + entity.recover(); + return null; + }).getRecordedEntity(); + } else { + q.recover(); + } String newName = q.getName(); if (!oldName.equals(newName)) { // make sure server-named queues are re-added with @@ -731,10 +752,18 @@ private void recoverQueue(final String oldName, final RecordedQueue q) { } } - private void recoverBinding(final RecordedBinding b) { + private void recoverBinding(RecordedBinding b, boolean retry) { try { if (this.topologyRecoveryFilter.filterBinding(b)) { - b.recover(); + if (retry) { + final RecordedBinding entity = b; + b = (RecordedBinding) wrapRetryIfNecessary(b, () -> { + entity.recover(); + return null; + }).getRecordedEntity(); + } else { + b.recover(); + } LOGGER.debug("{} has recovered", b); } } catch (Exception cause) { @@ -745,11 +774,20 @@ private void recoverBinding(final RecordedBinding b) { } } - private void recoverConsumer(final String tag, final RecordedConsumer consumer) { + private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) { try { if (this.topologyRecoveryFilter.filterConsumer(consumer)) { LOGGER.debug("Recovering {}", consumer); - String newTag = consumer.recover(); + String newTag = null; + if (retry) { + final RecordedConsumer entity = consumer; + RetryResult retryResult = wrapRetryIfNecessary(consumer, () -> entity.recover()); + consumer = (RecordedConsumer) retryResult.getRecordedEntity(); + newTag = (String) retryResult.getResult(); + } else { + newTag = consumer.recover(); + } + // make sure server-generated tags are re-added. MK. if(tag != null && !tag.equals(newTag)) { synchronized (this.consumers) { @@ -772,6 +810,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer) } } + private RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable recoveryAction) throws Exception { + if (this.retryHandler == null) { + T result = recoveryAction.call(); + return new RetryResult(entity, result); + } else { + try { + T result = recoveryAction.call(); + return new RetryResult(entity, result); + } catch (Exception e) { + RetryContext retryContext = new RetryContext(entity, e, this); + RetryResult retryResult; + if (entity instanceof RecordedQueue) { + retryResult = this.retryHandler.retryQueueRecovery(retryContext); + } else if (entity instanceof RecordedExchange) { + retryResult = this.retryHandler.retryExchangeRecovery(retryContext); + } else if (entity instanceof RecordedBinding) { + retryResult = this.retryHandler.retryBindingRecovery(retryContext); + } else if (entity instanceof RecordedConsumer) { + retryResult = this.retryHandler.retryConsumerRecovery(retryContext); + } else { + throw new IllegalArgumentException("Unknown type of recorded entity: " + entity); + } + return retryResult; + } + } + } + private void propagateQueueNameChangeToBindings(String oldName, String newName) { for (RecordedBinding b : Utility.copy(this.recordedBindings)) { if (b.getDestination().equals(oldName)) { @@ -820,15 +885,15 @@ private List> groupEntitiesByChannel callables.add(Executors.callable(() -> { for (final E entity : entityList) { if (entity instanceof RecordedExchange) { - recoverExchange((RecordedExchange)entity); + recoverExchange((RecordedExchange)entity, true); } else if (entity instanceof RecordedQueue) { final RecordedQueue q = (RecordedQueue) entity; - recoverQueue(q.getName(), q); + recoverQueue(q.getName(), q, true); } else if (entity instanceof RecordedBinding) { - recoverBinding((RecordedBinding) entity); + recoverBinding((RecordedBinding) entity, true); } else if (entity instanceof RecordedConsumer) { final RecordedConsumer c = (RecordedConsumer) entity; - recoverConsumer(c.getConsumerTag(), c); + recoverConsumer(c.getConsumerTag(), c, true); } } })); diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java b/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java new file mode 100644 index 0000000000..a05c2a8a3c --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java @@ -0,0 +1,34 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Backoff policy for topology recovery retry attempts. + * + * @see DefaultRetryHandler + * @see TopologyRecoveryRetryHandlerBuilder + * @since 5.4.0 + */ +@FunctionalInterface +public interface BackoffPolicy { + + /** + * Wait depending on the current attempt number (1, 2, 3, etc) + * @param attemptNumber current attempt number + * @throws InterruptedException + */ + void backoff(int attemptNumber) throws InterruptedException; +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java new file mode 100644 index 0000000000..8c9b1df3fd --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java @@ -0,0 +1,131 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +import java.util.Objects; +import java.util.function.BiPredicate; + +/** + * Composable topology recovery retry handler. + * This retry handler implementations let the user choose the condition + * to trigger retry and the retry operation for each type of recoverable + * entities. The number of attempts and the backoff policy (time to wait + * between retries) are also configurable. + *

+ * See also {@link TopologyRecoveryRetryHandlerBuilder} to easily create + * instances and {@link TopologyRecoveryRetryLogic} for ready-to-use + * conditions and operations. + * + * @see TopologyRecoveryRetryHandlerBuilder + * @see TopologyRecoveryRetryLogic + * @since 5.4.0 + */ +public class DefaultRetryHandler implements RetryHandler { + + private final BiPredicate queueRecoveryRetryCondition; + private final BiPredicate exchangeRecoveryRetryCondition; + private final BiPredicate bindingRecoveryRetryCondition; + private final BiPredicate consumerRecoveryRetryCondition; + + private final RetryOperation queueRecoveryRetryOperation; + private final RetryOperation exchangeRecoveryRetryOperation; + private final RetryOperation bindingRecoveryRetryOperation; + private final RetryOperation consumerRecoveryRetryOperation; + + private final int retryAttempts; + + private final BackoffPolicy backoffPolicy; + + public DefaultRetryHandler(BiPredicate queueRecoveryRetryCondition, + BiPredicate exchangeRecoveryRetryCondition, + BiPredicate bindingRecoveryRetryCondition, + BiPredicate consumerRecoveryRetryCondition, + RetryOperation queueRecoveryRetryOperation, + RetryOperation exchangeRecoveryRetryOperation, + RetryOperation bindingRecoveryRetryOperation, + RetryOperation consumerRecoveryRetryOperation, int retryAttempts, BackoffPolicy backoffPolicy) { + this.queueRecoveryRetryCondition = queueRecoveryRetryCondition; + this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition; + this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition; + this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition; + this.queueRecoveryRetryOperation = queueRecoveryRetryOperation; + this.exchangeRecoveryRetryOperation = exchangeRecoveryRetryOperation; + this.bindingRecoveryRetryOperation = bindingRecoveryRetryOperation; + this.consumerRecoveryRetryOperation = consumerRecoveryRetryOperation; + this.backoffPolicy = backoffPolicy; + if (retryAttempts <= 0) { + throw new IllegalArgumentException("Number of retry attempts must be greater than 0"); + } + this.retryAttempts = retryAttempts; + } + + @Override + public RetryResult retryQueueRecovery(RetryContext context) throws Exception { + return doRetry(queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context); + } + + @Override + public RetryResult retryExchangeRecovery(RetryContext context) throws Exception { + return doRetry(exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context); + } + + @Override + public RetryResult retryBindingRecovery(RetryContext context) throws Exception { + return doRetry(bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context); + } + + @Override + public RetryResult retryConsumerRecovery(RetryContext context) throws Exception { + return doRetry(consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context); + } + + protected RetryResult doRetry(BiPredicate condition, RetryOperation operation, T entity, RetryContext context) + throws Exception { + int attempts = 0; + Exception exception = context.exception(); + while (attempts < retryAttempts) { + if (condition.test(entity, exception)) { + backoffPolicy.backoff(attempts + 1); + try { + Object result = operation.call(context); + return new RetryResult( + entity, result == null ? null : result.toString() + ); + } catch (Exception e) { + exception = e; + attempts++; + continue; + } + } else { + throw exception; + } + } + throw context.exception(); + } + + public interface RetryOperation { + + T call(RetryContext context) throws Exception; + + default RetryOperation andThen(RetryOperation after) { + Objects.requireNonNull(after); + return (context) -> { + call(context); + return after.call(context); + }; + } + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java new file mode 100644 index 0000000000..a9bdc05e5f --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java @@ -0,0 +1,99 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * The context of a topology recovery retry operation. + * + * @since 5.4.0 + */ +public class RetryContext { + + private final RecordedEntity entity; + + private final Exception exception; + + private final AutorecoveringConnection connection; + + public RetryContext(RecordedEntity entity, Exception exception, AutorecoveringConnection connection) { + this.entity = entity; + this.exception = exception; + this.connection = connection; + } + + /** + * The underlying connection. + * + * @return + */ + public AutorecoveringConnection connection() { + return connection; + } + + /** + * The exception that triggered the retry attempt. + * + * @return + */ + public Exception exception() { + return exception; + } + + /** + * The to-be-recovered entity. + * + * @return + */ + public RecordedEntity entity() { + return entity; + } + + /** + * The to-be-recovered entity as a queue. + * + * @return + */ + public RecordedQueue queue() { + return (RecordedQueue) entity; + } + + /** + * The to-be-recovered entity as an exchange. + * + * @return + */ + public RecordedExchange exchange() { + return (RecordedExchange) entity; + } + + /** + * The to-be-recovered entity as a binding. + * + * @return + */ + public RecordedBinding binding() { + return (RecordedBinding) entity; + } + + /** + * The to-be-recovered entity as a consumer. + * + * @return + */ + public RecordedConsumer consumer() { + return (RecordedConsumer) entity; + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java new file mode 100644 index 0000000000..5ed7f823f0 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java @@ -0,0 +1,62 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Contract to retry failed operations during topology recovery. + * Not all operations have to be retried, it's a decision of the + * underlying implementation. + * + * @since 5.4.0 + */ +public interface RetryHandler { + + /** + * Retry a failed queue recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryQueueRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed exchange recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryExchangeRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed binding recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryBindingRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed consumer recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryConsumerRecovery(RetryContext context) throws Exception; +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java new file mode 100644 index 0000000000..c4797c39bf --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java @@ -0,0 +1,57 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * The retry of a retried topology recovery operation. + * + * @since 5.4.0 + */ +public class RetryResult { + + /** + * The entity to recover. + */ + private final RecordedEntity recordedEntity; + + /** + * The result of the recovery operation. + * E.g. a consumer tag when recovering a consumer. + */ + private final Object result; + + public RetryResult(RecordedEntity recordedEntity, Object result) { + this.recordedEntity = recordedEntity; + this.result = result; + } + + /** + * The entity to recover. + * + * @return + */ + public RecordedEntity getRecordedEntity() { + return recordedEntity; + } + + /** + * The result of the recovery operation. + * E.g. a consumer tag when recovering a consumer. + */ + public Object getResult() { + return result; + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java new file mode 100644 index 0000000000..07141b43e5 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java @@ -0,0 +1,115 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +import java.util.function.BiPredicate; + +/** + * Builder to ease creation of {@link DefaultRetryHandler} instances. + *

+ * Just override what you need. By default, retry conditions don't trigger retry, + * retry operations are no-op, the number of retry attempts is 1, and the backoff + * policy doesn't wait at all. + * + * @see DefaultRetryHandler + * @see TopologyRecoveryRetryLogic + * @since 5.4.0 + */ +public class TopologyRecoveryRetryHandlerBuilder { + + private BiPredicate queueRecoveryRetryCondition = (q, e) -> false; + private BiPredicate exchangeRecoveryRetryCondition = (ex, e) -> false; + private BiPredicate bindingRecoveryRetryCondition = (b, e) -> false; + private BiPredicate consumerRecoveryRetryCondition = (c, e) -> false; + + private DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation = context -> null; + private DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation = context -> null; + private DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation = context -> null; + private DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation = context -> null; + + private int retryAttempts = 1; + + private BackoffPolicy backoffPolicy = nbAttempts -> { + }; + + public static TopologyRecoveryRetryHandlerBuilder builder() { + return new TopologyRecoveryRetryHandlerBuilder(); + } + + public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryCondition( + BiPredicate queueRecoveryRetryCondition) { + this.queueRecoveryRetryCondition = queueRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryCondition( + BiPredicate exchangeRecoveryRetryCondition) { + this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryCondition( + BiPredicate bindingRecoveryRetryCondition) { + this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryCondition( + BiPredicate consumerRecoveryRetryCondition) { + this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryOperation(DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation) { + this.queueRecoveryRetryOperation = queueRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryOperation(DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation) { + this.exchangeRecoveryRetryOperation = exchangeRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryOperation(DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation) { + this.bindingRecoveryRetryOperation = bindingRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryOperation(DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation) { + this.consumerRecoveryRetryOperation = consumerRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder backoffPolicy(BackoffPolicy backoffPolicy) { + this.backoffPolicy = backoffPolicy; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder retryAttempts(int retryAttempts) { + this.retryAttempts = retryAttempts; + return this; + } + + public RetryHandler build() { + return new DefaultRetryHandler( + queueRecoveryRetryCondition, exchangeRecoveryRetryCondition, + bindingRecoveryRetryCondition, consumerRecoveryRetryCondition, + queueRecoveryRetryOperation, exchangeRecoveryRetryOperation, + bindingRecoveryRetryOperation, consumerRecoveryRetryOperation, + retryAttempts, + backoffPolicy); + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java new file mode 100644 index 0000000000..8e6e16a790 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -0,0 +1,85 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.ShutdownSignalException; + +import java.util.function.Predicate; + +/** + * Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}. + * They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}. + * + * @see DefaultRetryHandler + * @see RetryHandler + * @see TopologyRecoveryRetryHandlerBuilder + * @since 5.4.0 + */ +public abstract class TopologyRecoveryRetryLogic { + + public static final Predicate CHANNEL_CLOSED_NOT_FOUND = e -> { + if (e.getCause() instanceof ShutdownSignalException) { + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + if (cause.getReason() instanceof AMQP.Channel.Close) { + return ((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404; + } + } + return false; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CHANNEL = context -> { + if (!context.entity().getChannel().isOpen()) { + context.connection().recoverChannel(context.entity().getChannel()); + } + return null; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_BINDING_QUEUE = context -> { + if (context.entity() instanceof RecordedQueueBinding) { + RecordedBinding binding = context.binding(); + AutorecoveringConnection connection = context.connection(); + RecordedQueue recordedQueue = connection.getRecordedQueues().get(binding.getDestination()); + if (recordedQueue != null) { + connection.recoverQueue( + recordedQueue.getName(), recordedQueue, false + ); + } + } + return null; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_BINDING = context -> { + context.binding().recover(); + return null; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CONSUMER_QUEUE = context -> { + if (context.entity() instanceof RecordedConsumer) { + RecordedConsumer consumer = context.consumer(); + AutorecoveringConnection connection = context.connection(); + RecordedQueue recordedQueue = connection.getRecordedQueues().get(consumer.getQueue()); + if (recordedQueue != null) { + connection.recoverQueue( + recordedQueue.getName(), recordedQueue, false + ); + } + } + return null; + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CONSUMER = context -> context.consumer().recover(); +} diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java index 3747c71be9..9c2994b671 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTests.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java @@ -62,7 +62,8 @@ StrictExceptionHandlerTest.class, NoAutoRecoveryWhenTcpWindowIsFullTest.class, JsonRpcTest.class, - AddressTest.class + AddressTest.class, + DefaultRetryHandlerTest.class }) public class ClientTests { diff --git a/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java b/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java new file mode 100644 index 0000000000..cc105304a7 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java @@ -0,0 +1,257 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.impl.recovery.BackoffPolicy; +import com.rabbitmq.client.impl.recovery.DefaultRetryHandler; +import com.rabbitmq.client.impl.recovery.RecordedBinding; +import com.rabbitmq.client.impl.recovery.RecordedConsumer; +import com.rabbitmq.client.impl.recovery.RecordedExchange; +import com.rabbitmq.client.impl.recovery.RecordedQueue; +import com.rabbitmq.client.impl.recovery.RetryContext; +import com.rabbitmq.client.impl.recovery.RetryHandler; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.verification.VerificationMode; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiPredicate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.intThat; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +/** + * + */ +public class DefaultRetryHandlerTest { + + RetryHandler handler; + + @Mock + BiPredicate queueRecoveryRetryCondition; + @Mock + BiPredicate exchangeRecoveryRetryCondition; + @Mock + BiPredicate bindingRecoveryRetryCondition; + @Mock + BiPredicate consumerRecoveryRetryCondition; + + @Mock + DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation; + + @Mock + BackoffPolicy backoffPolicy; + + @Before + public void init() { + initMocks(this); + } + + @Test + public void shouldNotRetryWhenConditionReturnsFalse() throws Exception { + conditionsReturn(false); + handler = handler(); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + () -> handler.retryQueueRecovery(retryContext()) + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + () -> handler.retryExchangeRecovery(retryContext()) + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + () -> handler.retryBindingRecovery(retryContext()) + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + () -> handler.retryConsumerRecovery(retryContext()) + ); + verifyConditionsInvocation(times(1)); + verifyOperationsInvocation(never()); + verify(backoffPolicy, never()).backoff(anyInt()); + } + + @Test + public void shouldReturnOperationResultInRetryResultWhenRetrying() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("queue"); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("exchange"); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("binding"); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("consumer"); + handler = handler(); + assertEquals( + "queue", + handler.retryQueueRecovery(retryContext()).getResult() + ); + assertEquals( + "exchange", + handler.retryExchangeRecovery(retryContext()).getResult() + ); + assertEquals( + "binding", + handler.retryBindingRecovery(retryContext()).getResult() + ); + assertEquals( + "consumer", + handler.retryConsumerRecovery(retryContext()).getResult() + ); + verifyConditionsInvocation(times(1)); + verifyOperationsInvocation(times(1)); + verify(backoffPolicy, times(1 * 4)).backoff(1); + } + + @Test + public void shouldRetryWhenOperationFailsAndConditionIsTrue() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("queue"); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("exchange"); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("binding"); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("consumer"); + handler = handler(2); + assertEquals( + "queue", + handler.retryQueueRecovery(retryContext()).getResult() + ); + assertEquals( + "exchange", + handler.retryExchangeRecovery(retryContext()).getResult() + ); + assertEquals( + "binding", + handler.retryBindingRecovery(retryContext()).getResult() + ); + assertEquals( + "consumer", + handler.retryConsumerRecovery(retryContext()).getResult() + ); + verifyConditionsInvocation(times(2)); + verifyOperationsInvocation(times(2)); + checkBackoffSequence(1, 2, 1, 2, 1, 2, 1, 2); + } + + @Test + public void shouldThrowExceptionWhenRetryAttemptsIsExceeded() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + handler = handler(3); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + () -> handler.retryQueueRecovery(retryContext()) + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + () -> handler.retryExchangeRecovery(retryContext()) + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + () -> handler.retryBindingRecovery(retryContext()) + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + () -> handler.retryConsumerRecovery(retryContext()) + ); + verifyConditionsInvocation(times(3)); + verifyOperationsInvocation(times(3)); + checkBackoffSequence(1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3); + } + + private void assertExceptionIsThrown(String message, Callable action) { + try { + action.call(); + fail(message); + } catch (Exception e) { + } + } + + private void conditionsReturn(boolean shouldRetry) { + when(queueRecoveryRetryCondition.test(nullable(RecordedQueue.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(exchangeRecoveryRetryCondition.test(nullable(RecordedExchange.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(bindingRecoveryRetryCondition.test(nullable(RecordedBinding.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(consumerRecoveryRetryCondition.test(nullable(RecordedConsumer.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + } + + private void verifyConditionsInvocation(VerificationMode mode) { + verify(queueRecoveryRetryCondition, mode).test(nullable(RecordedQueue.class), any(Exception.class)); + verify(exchangeRecoveryRetryCondition, mode).test(nullable(RecordedExchange.class), any(Exception.class)); + verify(bindingRecoveryRetryCondition, mode).test(nullable(RecordedBinding.class), any(Exception.class)); + verify(consumerRecoveryRetryCondition, mode).test(nullable(RecordedConsumer.class), any(Exception.class)); + } + + private void verifyOperationsInvocation(VerificationMode mode) throws Exception { + verify(queueRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(exchangeRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(bindingRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(consumerRecoveryRetryOperation, mode).call(any(RetryContext.class)); + } + + private RetryHandler handler() { + return handler(1); + } + + private RetryHandler handler(int retryAttempts) { + return new DefaultRetryHandler( + queueRecoveryRetryCondition, exchangeRecoveryRetryCondition, + bindingRecoveryRetryCondition, consumerRecoveryRetryCondition, + queueRecoveryRetryOperation, exchangeRecoveryRetryOperation, + bindingRecoveryRetryOperation, consumerRecoveryRetryOperation, + retryAttempts, + backoffPolicy); + } + + private RetryContext retryContext() { + return new RetryContext(null, new Exception(), null); + } + + private void checkBackoffSequence(int... sequence) throws InterruptedException { + AtomicInteger count = new AtomicInteger(0); + verify(backoffPolicy, times(sequence.length)) + // for some reason Mockito calls the matchers twice as many times as the target method + .backoff(intThat(i -> i == sequence[count.getAndIncrement() % sequence.length])); + } +} diff --git a/src/test/java/com/rabbitmq/client/test/TestUtils.java b/src/test/java/com/rabbitmq/client/test/TestUtils.java index 644f3a1707..c44e8b26a4 100644 --- a/src/test/java/com/rabbitmq/client/test/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/test/TestUtils.java @@ -15,11 +15,28 @@ package com.rabbitmq.client.test; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.Recoverable; +import com.rabbitmq.client.RecoverableConnection; +import com.rabbitmq.client.RecoveryListener; +import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.impl.NetworkConnection; +import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.tools.Host; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertTrue; public class TestUtils { @@ -27,7 +44,7 @@ public class TestUtils { public static ConnectionFactory connectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); - if(USE_NIO) { + if (USE_NIO) { connectionFactory.useNio(); } else { connectionFactory.useBlockingIo(); @@ -36,7 +53,7 @@ public static ConnectionFactory connectionFactory() { } public static void close(Connection connection) { - if(connection != null) { + if (connection != null) { try { connection.close(); } catch (IOException e) { @@ -66,12 +83,108 @@ public static boolean isVersion37orLater(Connection connection) { LoggerFactory.getLogger(TestUtils.class).warn("Unable to parse broker version {}", currentVersion, e); throw e; } + } + + public static boolean sendAndConsumeMessage(String exchange, String routingKey, String queue, Connection c) + throws IOException, TimeoutException, InterruptedException { + Channel ch = c.createChannel(); + try { + ch.confirmSelect(); + final CountDownLatch latch = new CountDownLatch(1); + ch.basicConsume(queue, true, new DefaultConsumer(ch) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + latch.countDown(); + } + }); + ch.basicPublish(exchange, routingKey, null, "".getBytes()); + ch.waitForConfirmsOrDie(5000); + return latch.await(5, TimeUnit.SECONDS); + } finally { + if (ch != null && ch.isOpen()) { + ch.close(); + } + } + } + + public static boolean resourceExists(Callable callback) throws Exception { + Channel declarePassiveChannel = null; + try { + declarePassiveChannel = callback.call(); + return true; + } catch (IOException e) { + if (e.getCause() instanceof ShutdownSignalException) { + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + if (cause.getReason() instanceof AMQP.Channel.Close) { + if (((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404) { + return false; + } else { + throw e; + } + } + return false; + } else { + throw e; + } + } finally { + if (declarePassiveChannel != null && declarePassiveChannel.isOpen()) { + declarePassiveChannel.close(); + } + } + } + + public static boolean queueExists(final String queue, final Connection connection) throws Exception { + return resourceExists(() -> { + Channel channel = connection.createChannel(); + channel.queueDeclarePassive(queue); + return channel; + }); + } + + public static boolean exchangeExists(final String exchange, final Connection connection) throws Exception { + return resourceExists(() -> { + Channel channel = connection.createChannel(); + channel.exchangeDeclarePassive(exchange); + return channel; + }); + } + + public static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { + CountDownLatch latch = prepareForRecovery(connection); + Host.closeConnection((NetworkConnection) connection); + wait(latch); + } + + public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException { + CountDownLatch latch = prepareForRecovery(connection); + Host.closeAllConnections(); + wait(latch); + } + + public static CountDownLatch prepareForRecovery(Connection conn) { + final CountDownLatch latch = new CountDownLatch(1); + ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { + + @Override + public void handleRecovery(Recoverable recoverable) { + latch.countDown(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + // No-op + } + }); + return latch; + } + + private static void wait(CountDownLatch latch) throws InterruptedException { + assertTrue(latch.await(90, TimeUnit.SECONDS)); } /** * http://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java - * */ static int versionCompare(String str1, String str2) { String[] vals1 = str1.split("\\."); @@ -90,5 +203,4 @@ static int versionCompare(String str1, String str2) { // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4" return Integer.signum(vals1.length - vals2.length); } - } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index a89bb16995..04cf8b04d2 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static com.rabbitmq.client.test.TestUtils.prepareForRecovery; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.*; @@ -62,7 +63,7 @@ public class ConnectionRecovery extends BrokerTestCase { try { assertTrue(c.isOpen()); assertEquals(connectionName, c.getClientProvidedName()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); assertEquals(connectionName, c.getClientProvidedName()); } finally { @@ -82,7 +83,7 @@ public class ConnectionRecovery extends BrokerTestCase { RecoverableConnection c = newRecoveringConnection(addresses); try { assertTrue(c.isOpen()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); } finally { c.abort(); @@ -98,7 +99,7 @@ public class ConnectionRecovery extends BrokerTestCase { RecoverableConnection c = newRecoveringConnection(addresses); try { assertTrue(c.isOpen()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); } finally { c.abort(); @@ -157,7 +158,7 @@ public String getPassword() { assertThat(usernameRequested.get(), is(1)); assertThat(passwordRequested.get(), is(1)); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); // username is requested in AMQConnection#toString, so it can be accessed at any time assertThat(usernameRequested.get(), greaterThanOrEqualTo(2)); @@ -804,7 +805,7 @@ public void handleDelivery(String consumerTag, Connection testConnection = connectionFactory.newConnection(); try { assertTrue(testConnection.isOpen()); - closeAndWaitForRecovery((RecoverableConnection) testConnection); + TestUtils.closeAndWaitForRecovery((RecoverableConnection) testConnection); assertTrue(testConnection.isOpen()); } finally { connection.close(); @@ -851,7 +852,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie } } // now do recovery - closeAndWaitForRecovery(testConnection); + TestUtils.closeAndWaitForRecovery(testConnection); // verify channels & topology recovered by publishing a message to each for (int i=0; i < channelCount; i++) { @@ -935,21 +936,6 @@ private static void expectExchangeRecovery(Channel ch, String x) throws IOExcept ch.exchangeDeclarePassive(x); } - private static CountDownLatch prepareForRecovery(Connection conn) { - final CountDownLatch latch = new CountDownLatch(1); - ((AutorecoveringConnection)conn).addRecoveryListener(new RecoveryListener() { - @Override - public void handleRecovery(Recoverable recoverable) { - latch.countDown(); - } - @Override - public void handleRecoveryStarted(Recoverable recoverable) { - // No-op - } - }); - return latch; - } - private static CountDownLatch prepareForShutdown(Connection conn) { final CountDownLatch latch = new CountDownLatch(1); conn.addShutdownListener(new ShutdownListener() { @@ -962,13 +948,7 @@ public void shutdownCompleted(ShutdownSignalException cause) { } private void closeAndWaitForRecovery() throws IOException, InterruptedException { - closeAndWaitForRecovery((AutorecoveringConnection)this.connection); - } - - private static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { - CountDownLatch latch = prepareForRecovery(connection); - Host.closeConnection((NetworkConnection) connection); - wait(latch); + TestUtils.closeAndWaitForRecovery((AutorecoveringConnection)this.connection); } private void restartPrimaryAndWaitForRecovery() throws IOException, InterruptedException { diff --git a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java index aeadb303dd..fb48f29585 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java +++ b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java @@ -78,7 +78,8 @@ Nack.class, ExceptionMessages.class, Metrics.class, - TopologyRecoveryFiltering.class + TopologyRecoveryFiltering.class, + TopologyRecoveryRetry.class }) public class FunctionalTests { diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java index 480075258f..3eb9687eea 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java @@ -21,12 +21,7 @@ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoverableConnection; -import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.ShutdownSignalException; -import com.rabbitmq.client.impl.NetworkConnection; -import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; import com.rabbitmq.client.impl.recovery.RecordedBinding; import com.rabbitmq.client.impl.recovery.RecordedConsumer; import com.rabbitmq.client.impl.recovery.RecordedExchange; @@ -34,16 +29,18 @@ import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import com.rabbitmq.client.test.BrokerTestCase; import com.rabbitmq.client.test.TestUtils; -import com.rabbitmq.tools.Host; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static com.rabbitmq.client.test.TestUtils.closeAndWaitForRecovery; +import static com.rabbitmq.client.test.TestUtils.exchangeExists; +import static com.rabbitmq.client.test.TestUtils.queueExists; +import static com.rabbitmq.client.test.TestUtils.sendAndConsumeMessage; import static org.awaitility.Awaitility.waitAtMost; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertFalse; @@ -62,98 +59,6 @@ public class TopologyRecoveryFiltering extends BrokerTestCase { }; Connection c; - private static boolean sendAndConsumeMessage(String exchange, String routingKey, String queue, Connection c) - throws IOException, TimeoutException, InterruptedException { - Channel ch = c.createChannel(); - try { - ch.confirmSelect(); - final CountDownLatch latch = new CountDownLatch(1); - ch.basicConsume(queue, true, new DefaultConsumer(ch) { - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - latch.countDown(); - } - }); - ch.basicPublish(exchange, routingKey, null, "".getBytes()); - ch.waitForConfirmsOrDie(5000); - return latch.await(5, TimeUnit.SECONDS); - } finally { - if (ch != null && ch.isOpen()) { - ch.close(); - } - } - } - - private static boolean resourceExists(Callable callback) throws Exception { - Channel declarePassiveChannel = null; - try { - declarePassiveChannel = callback.call(); - return true; - } catch (IOException e) { - if (e.getCause() instanceof ShutdownSignalException) { - ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); - if (cause.getReason() instanceof AMQP.Channel.Close) { - if (((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404) { - return false; - } else { - throw e; - } - } - return false; - } else { - throw e; - } - } finally { - if (declarePassiveChannel != null && declarePassiveChannel.isOpen()) { - declarePassiveChannel.close(); - } - } - } - - private static boolean queueExists(final String queue, final Connection connection) throws Exception { - return resourceExists(() -> { - Channel channel = connection.createChannel(); - channel.queueDeclarePassive(queue); - return channel; - }); - } - - private static boolean exchangeExists(final String exchange, final Connection connection) throws Exception { - return resourceExists(() -> { - Channel channel = connection.createChannel(); - channel.exchangeDeclarePassive(exchange); - return channel; - }); - } - - private static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { - CountDownLatch latch = prepareForRecovery(connection); - Host.closeConnection((NetworkConnection) connection); - wait(latch); - } - - private static CountDownLatch prepareForRecovery(Connection conn) { - final CountDownLatch latch = new CountDownLatch(1); - ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { - - @Override - public void handleRecovery(Recoverable recoverable) { - latch.countDown(); - } - - @Override - public void handleRecoveryStarted(Recoverable recoverable) { - // No-op - } - }); - return latch; - } - - private static void wait(CountDownLatch latch) throws InterruptedException { - assertTrue(latch.await(20, TimeUnit.SECONDS)); - } - @Override protected ConnectionFactory newConnectionFactory() { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java new file mode 100644 index 0000000000..4f8ab6054a --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java @@ -0,0 +1,70 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test.functional; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.test.BrokerTestCase; +import com.rabbitmq.client.test.TestUtils; +import org.junit.Test; + +import java.util.HashMap; + +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.CHANNEL_CLOSED_NOT_FOUND; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING_QUEUE; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CHANNEL; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER_QUEUE; +import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class TopologyRecoveryRetry extends BrokerTestCase { + + @Test + public void topologyRecoveryRetry() throws Exception { + int nbQueues = 2000; + String prefix = "topology-recovery-retry-" + System.currentTimeMillis(); + for (int i = 0; i < nbQueues; i++) { + String queue = prefix + i; + channel.queueDeclare(queue, false, false, true, new HashMap<>()); + channel.queueBind(queue, "amq.direct", queue); + channel.basicConsume(queue, true, new DefaultConsumer(channel)); + } + + closeAllConnectionsAndWaitForRecovery(this.connection); + + assertTrue(channel.isOpen()); + } + + @Override + protected ConnectionFactory newConnectionFactory() { + ConnectionFactory connectionFactory = TestUtils.connectionFactory(); + connectionFactory.setTopologyRecoveryRetryHandler( + builder().bindingRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e)) + .consumerRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e)) + .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)) + .consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER))) + .build() + ); + connectionFactory.setNetworkRecoveryInterval(1000); + return connectionFactory; + } +} diff --git a/src/test/java/com/rabbitmq/tools/Host.java b/src/test/java/com/rabbitmq/tools/Host.java index 5924e5931d..c919d78621 100644 --- a/src/test/java/com/rabbitmq/tools/Host.java +++ b/src/test/java/com/rabbitmq/tools/Host.java @@ -169,6 +169,10 @@ public static void closeConnection(String pid) throws IOException { rabbitmqctl("close_connection '" + pid + "' 'Closed via rabbitmqctl'"); } + public static void closeAllConnections() throws IOException { + rabbitmqctl("close_all_connections 'Closed via rabbitmqctl'"); + } + public static void closeConnection(NetworkConnection c) throws IOException { Host.ConnectionInfo ci = findConnectionInfoFor(Host.listConnections(), c); closeConnection(ci.getPid());