diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java
index cbbc4338fe..8c7b524917 100644
--- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java
+++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java
@@ -29,6 +29,7 @@
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
+import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
import java.io.IOException;
@@ -180,6 +181,13 @@ public class ConnectionFactory implements Cloneable {
*/
private TopologyRecoveryFilter topologyRecoveryFilter;
+ /**
+ * Retry handler for topology recovery.
+ * Default is no retry.
+ * @since 4.8.0
+ */
+ private RetryHandler topologyRecoveryRetryHandler;
+
/** @return the default host to use for connections */
public String getHost() {
return host;
@@ -1055,6 +1063,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setWorkPoolTimeout(workPoolTimeout);
result.setErrorOnWriteListener(errorOnWriteListener);
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
+ result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
return result;
}
@@ -1396,4 +1405,14 @@ public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
this.topologyRecoveryFilter = topologyRecoveryFilter;
}
+
+ /**
+ * Set retry handler for topology recovery.
+ * Default is no retry.
+ * @param topologyRecoveryRetryHandler
+ * @since 4.8.0
+ */
+ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
+ this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
+ }
}
diff --git a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
index 093142b296..47cd582e8a 100644
--- a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
+++ b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
@@ -19,6 +19,8 @@
import com.rabbitmq.client.RecoveryDelayHandler;
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
import com.rabbitmq.client.SaslConfig;
+import com.rabbitmq.client.ShutdownSignalException;
+import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
import java.util.Map;
@@ -48,6 +50,7 @@ public class ConnectionParams {
private ErrorOnWriteListener errorOnWriteListener;
private int workPoolTimeout = -1;
private TopologyRecoveryFilter topologyRecoveryFilter;
+ private RetryHandler topologyRecoveryRetryHandler;
private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;
@@ -245,4 +248,12 @@ public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFil
public TopologyRecoveryFilter getTopologyRecoveryFilter() {
return topologyRecoveryFilter;
}
+
+ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
+ this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
+ }
+
+ public RetryHandler getTopologyRecoveryRetryHandler() {
+ return topologyRecoveryRetryHandler;
+ }
}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
index 28d64747d1..71dd0886dc 100644
--- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
@@ -90,6 +90,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
// be created after application code has initiated shutdown.
private final Object recoveryLock = new Object();
+ private final RetryHandler retryHandler;
+
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List
addrs) {
this(params, f, new ListAddressResolver(addrs));
}
@@ -109,6 +111,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
letAllPassFilter() : params.getTopologyRecoveryFilter();
+
+ this.retryHandler = params.getTopologyRecoveryRetryHandler();
}
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -633,6 +637,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
}
}
+ void recoverChannel(AutorecoveringChannel channel) throws IOException {
+ channel.automaticallyRecover(this, this.delegate);
+ }
+
private void notifyRecoveryListenersComplete() {
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
f.handleRecovery(this);
@@ -654,16 +662,16 @@ private void recoverTopology(final ExecutorService executor) {
if (executor == null) {
// recover entities in serial on the main connection thread
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
- recoverExchange(exchange);
+ recoverExchange(exchange, true);
}
for (final Map.Entry entry : Utility.copy(recordedQueues).entrySet()) {
- recoverQueue(entry.getKey(), entry.getValue());
+ recoverQueue(entry.getKey(), entry.getValue(), true);
}
for (final RecordedBinding b : Utility.copy(recordedBindings)) {
- recoverBinding(b);
+ recoverBinding(b, true);
}
for (final Map.Entry entry : Utility.copy(consumers).entrySet()) {
- recoverConsumer(entry.getKey(), entry.getValue());
+ recoverConsumer(entry.getKey(), entry.getValue(), true);
}
} else {
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
@@ -683,11 +691,22 @@ private void recoverTopology(final ExecutorService executor) {
}
}
- private void recoverExchange(final RecordedExchange x) {
+ private void recoverExchange(RecordedExchange x, boolean retry) {
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
try {
if (topologyRecoveryFilter.filterExchange(x)) {
- x.recover();
+ if (retry) {
+ final RecordedExchange entity = x;
+ x = (RecordedExchange) wrapRetryIfNecessary(x, new Callable() {
+ @Override
+ public Void call() throws Exception {
+ entity.recover();
+ return null;
+ }
+ }).getRecordedEntity();
+ } else {
+ x.recover();
+ }
LOGGER.debug("{} has recovered", x);
}
} catch (Exception cause) {
@@ -698,12 +717,23 @@ private void recoverExchange(final RecordedExchange x) {
}
}
- private void recoverQueue(final String oldName, final RecordedQueue q) {
+ void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
try {
if (topologyRecoveryFilter.filterQueue(q)) {
LOGGER.debug("Recovering {}", q);
- q.recover();
+ if (retry) {
+ final RecordedQueue entity = q;
+ q = (RecordedQueue) wrapRetryIfNecessary(q, new Callable() {
+ @Override
+ public Void call() throws Exception {
+ entity.recover();
+ return null;
+ }
+ }).getRecordedEntity();
+ } else {
+ q.recover();
+ }
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
@@ -734,10 +764,21 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {
}
}
- private void recoverBinding(final RecordedBinding b) {
+ private void recoverBinding(RecordedBinding b, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterBinding(b)) {
- b.recover();
+ if (retry) {
+ final RecordedBinding entity = b;
+ b = (RecordedBinding) wrapRetryIfNecessary(b, new Callable() {
+ @Override
+ public Void call() throws Exception {
+ entity.recover();
+ return null;
+ }
+ }).getRecordedEntity();
+ } else {
+ b.recover();
+ }
LOGGER.debug("{} has recovered", b);
}
} catch (Exception cause) {
@@ -748,11 +789,25 @@ private void recoverBinding(final RecordedBinding b) {
}
}
- private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
+ private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
LOGGER.debug("Recovering {}", consumer);
- String newTag = consumer.recover();
+ String newTag = null;
+ if (retry) {
+ final RecordedConsumer entity = consumer;
+ RetryResult retryResult = wrapRetryIfNecessary(consumer, new Callable() {
+ @Override
+ public String call() throws Exception {
+ return entity.recover();
+ }
+ });
+ consumer = (RecordedConsumer) retryResult.getRecordedEntity();
+ newTag = (String) retryResult.getResult();
+ } else {
+ newTag = consumer.recover();
+ }
+
// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
@@ -775,6 +830,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer)
}
}
+ private RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable recoveryAction) throws Exception {
+ if (this.retryHandler == null) {
+ T result = recoveryAction.call();
+ return new RetryResult(entity, result);
+ } else {
+ try {
+ T result = recoveryAction.call();
+ return new RetryResult(entity, result);
+ } catch (Exception e) {
+ RetryContext retryContext = new RetryContext(entity, e, this);
+ RetryResult retryResult;
+ if (entity instanceof RecordedQueue) {
+ retryResult = this.retryHandler.retryQueueRecovery(retryContext);
+ } else if (entity instanceof RecordedExchange) {
+ retryResult = this.retryHandler.retryExchangeRecovery(retryContext);
+ } else if (entity instanceof RecordedBinding) {
+ retryResult = this.retryHandler.retryBindingRecovery(retryContext);
+ } else if (entity instanceof RecordedConsumer) {
+ retryResult = this.retryHandler.retryConsumerRecovery(retryContext);
+ } else {
+ throw new IllegalArgumentException("Unknown type of recorded entity: " + entity);
+ }
+ return retryResult;
+ }
+ }
+ }
+
private void propagateQueueNameChangeToBindings(String oldName, String newName) {
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
if (b.getDestination().equals(oldName)) {
@@ -825,15 +907,15 @@ private List> groupEntitiesByChannel
public void run() {
for (final E entity : entityList) {
if (entity instanceof RecordedExchange) {
- recoverExchange((RecordedExchange)entity);
+ recoverExchange((RecordedExchange)entity, true);
} else if (entity instanceof RecordedQueue) {
final RecordedQueue q = (RecordedQueue) entity;
- recoverQueue(q.getName(), q);
+ recoverQueue(q.getName(), q, true);
} else if (entity instanceof RecordedBinding) {
- recoverBinding((RecordedBinding) entity);
+ recoverBinding((RecordedBinding) entity, true);
} else if (entity instanceof RecordedConsumer) {
final RecordedConsumer c = (RecordedConsumer) entity;
- recoverConsumer(c.getConsumerTag(), c);
+ recoverConsumer(c.getConsumerTag(), c, true);
}
}
}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java b/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java
new file mode 100644
index 0000000000..572bb6fb57
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java
@@ -0,0 +1,33 @@
+// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.impl.recovery;
+
+/**
+ * Backoff policy for topology recovery retry attempts.
+ *
+ * @see DefaultRetryHandler
+ * @see TopologyRecoveryRetryHandlerBuilder
+ * @since 4.8.0
+ */
+public interface BackoffPolicy {
+
+ /**
+ * Wait depending on the current attempt number (1, 2, 3, etc)
+ * @param attemptNumber current attempt number
+ * @throws InterruptedException
+ */
+ void backoff(int attemptNumber) throws InterruptedException;
+}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java
new file mode 100644
index 0000000000..a2879fbfd4
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java
@@ -0,0 +1,175 @@
+// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.impl.recovery;
+
+/**
+ * Composable topology recovery retry handler.
+ * This retry handler implementations let the user choose the condition
+ * to trigger retry and the retry operation for each type of recoverable
+ * entities. The number of attempts and the backoff policy (time to wait
+ * between retries) are also configurable.
+ *
+ * See also {@link TopologyRecoveryRetryHandlerBuilder} to easily create
+ * instances and {@link TopologyRecoveryRetryLogic} for ready-to-use
+ * conditions and operations.
+ *
+ * @see TopologyRecoveryRetryHandlerBuilder
+ * @see TopologyRecoveryRetryLogic
+ * @since 4.8.0
+ */
+public class DefaultRetryHandler implements RetryHandler {
+
+ private final RetryCondition super RecordedQueue> queueRecoveryRetryCondition;
+ private final RetryCondition super RecordedExchange> exchangeRecoveryRetryCondition;
+ private final RetryCondition super RecordedBinding> bindingRecoveryRetryCondition;
+ private final RetryCondition super RecordedConsumer> consumerRecoveryRetryCondition;
+
+ private final RetryOperation> queueRecoveryRetryOperation;
+ private final RetryOperation> exchangeRecoveryRetryOperation;
+ private final RetryOperation> bindingRecoveryRetryOperation;
+ private final RetryOperation> consumerRecoveryRetryOperation;
+
+ private final int retryAttempts;
+
+ private final BackoffPolicy backoffPolicy;
+
+ public DefaultRetryHandler(RetryCondition super RecordedQueue> queueRecoveryRetryCondition,
+ RetryCondition super RecordedExchange> exchangeRecoveryRetryCondition,
+ RetryCondition super RecordedBinding> bindingRecoveryRetryCondition,
+ RetryCondition super RecordedConsumer> consumerRecoveryRetryCondition,
+ RetryOperation> queueRecoveryRetryOperation,
+ RetryOperation> exchangeRecoveryRetryOperation,
+ RetryOperation> bindingRecoveryRetryOperation,
+ RetryOperation> consumerRecoveryRetryOperation, int retryAttempts, BackoffPolicy backoffPolicy) {
+ this.queueRecoveryRetryCondition = queueRecoveryRetryCondition;
+ this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition;
+ this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition;
+ this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition;
+ this.queueRecoveryRetryOperation = queueRecoveryRetryOperation;
+ this.exchangeRecoveryRetryOperation = exchangeRecoveryRetryOperation;
+ this.bindingRecoveryRetryOperation = bindingRecoveryRetryOperation;
+ this.consumerRecoveryRetryOperation = consumerRecoveryRetryOperation;
+ this.backoffPolicy = backoffPolicy;
+ if (retryAttempts <= 0) {
+ throw new IllegalArgumentException("Number of retry attempts must be greater than 0");
+ }
+ this.retryAttempts = retryAttempts;
+ }
+
+ @Override
+ public RetryResult retryQueueRecovery(RetryContext context) throws Exception {
+ return doRetry(queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context);
+ }
+
+ @Override
+ public RetryResult retryExchangeRecovery(RetryContext context) throws Exception {
+ return doRetry(exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context);
+ }
+
+ @Override
+ public RetryResult retryBindingRecovery(RetryContext context) throws Exception {
+ return doRetry(bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context);
+ }
+
+ @Override
+ public RetryResult retryConsumerRecovery(RetryContext context) throws Exception {
+ return doRetry(consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context);
+ }
+
+ protected RetryResult doRetry(RetryCondition condition, RetryOperation> operation, T entity, RetryContext context)
+ throws Exception {
+ int attempts = 0;
+ Exception exception = context.exception();
+ while (attempts < retryAttempts) {
+ if (condition.test(entity, exception)) {
+ backoffPolicy.backoff(attempts + 1);
+ try {
+ Object result = operation.call(context);
+ return new RetryResult(
+ entity, result == null ? null : result.toString()
+ );
+ } catch (Exception e) {
+ exception = e;
+ attempts++;
+ continue;
+ }
+ } else {
+ throw exception;
+ }
+ }
+ throw context.exception();
+ }
+
+ public static abstract class RetryOperation {
+
+ public abstract T call(RetryContext context) throws Exception;
+
+ public RetryOperation andThen(final RetryOperation after) {
+ return new RetryOperation() {
+
+ @Override
+ public V call(RetryContext context) throws Exception {
+ RetryOperation.this.call(context);
+ return after.call(context);
+ }
+ };
+ }
+ }
+
+ public static abstract class RetryCondition {
+
+ public abstract boolean test(E entity, Exception ex);
+
+ public RetryCondition and(final RetryCondition super E> other) {
+ if (other == null) {
+ throw new IllegalArgumentException("Condition cannot be null");
+ }
+ return new RetryCondition() {
+
+ @Override
+ public boolean test(E entity, Exception ex) {
+ return RetryCondition.this.test(entity, ex) && other.test(entity, ex);
+ }
+ };
+ }
+
+ public RetryCondition negate(final RetryCondition super E> other) {
+ if (other == null) {
+ throw new IllegalArgumentException("Condition cannot be null");
+ }
+ return new RetryCondition() {
+
+ @Override
+ public boolean test(E entity, Exception ex) {
+ return !RetryCondition.this.test(entity, ex);
+ }
+ };
+ }
+
+ public RetryCondition or(final RetryCondition super E> other) {
+ if (other == null) {
+ throw new IllegalArgumentException("Condition cannot be null");
+ }
+ return new RetryCondition() {
+
+ @Override
+ public boolean test(E entity, Exception ex) {
+ return RetryCondition.this.test(entity, ex) || other.test(entity, ex);
+ }
+ };
+ }
+ }
+}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java
new file mode 100644
index 0000000000..25e5beb5fa
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java
@@ -0,0 +1,99 @@
+// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.impl.recovery;
+
+/**
+ * The context of a topology recovery retry operation.
+ *
+ * @since 4.8.0
+ */
+public class RetryContext {
+
+ private final RecordedEntity entity;
+
+ private final Exception exception;
+
+ private final AutorecoveringConnection connection;
+
+ public RetryContext(RecordedEntity entity, Exception exception, AutorecoveringConnection connection) {
+ this.entity = entity;
+ this.exception = exception;
+ this.connection = connection;
+ }
+
+ /**
+ * The underlying connection.
+ *
+ * @return
+ */
+ public AutorecoveringConnection connection() {
+ return connection;
+ }
+
+ /**
+ * The exception that triggered the retry attempt.
+ *
+ * @return
+ */
+ public Exception exception() {
+ return exception;
+ }
+
+ /**
+ * The to-be-recovered entity.
+ *
+ * @return
+ */
+ public RecordedEntity entity() {
+ return entity;
+ }
+
+ /**
+ * The to-be-recovered entity as a queue.
+ *
+ * @return
+ */
+ public RecordedQueue queue() {
+ return (RecordedQueue) entity;
+ }
+
+ /**
+ * The to-be-recovered entity as an exchange.
+ *
+ * @return
+ */
+ public RecordedExchange exchange() {
+ return (RecordedExchange) entity;
+ }
+
+ /**
+ * The to-be-recovered entity as a binding.
+ *
+ * @return
+ */
+ public RecordedBinding binding() {
+ return (RecordedBinding) entity;
+ }
+
+ /**
+ * The to-be-recovered entity as a consumer.
+ *
+ * @return
+ */
+ public RecordedConsumer consumer() {
+ return (RecordedConsumer) entity;
+ }
+}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java
new file mode 100644
index 0000000000..5eca22465d
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java
@@ -0,0 +1,62 @@
+// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.impl.recovery;
+
+/**
+ * Contract to retry failed operations during topology recovery.
+ * Not all operations have to be retried, it's a decision of the
+ * underlying implementation.
+ *
+ * @since 4.8.0
+ */
+public interface RetryHandler {
+
+ /**
+ * Retry a failed queue recovery operation.
+ *
+ * @param context the context of the retry
+ * @return the result of the retry attempt
+ * @throws Exception if the retry fails
+ */
+ RetryResult retryQueueRecovery(RetryContext context) throws Exception;
+
+ /**
+ * Retry a failed exchange recovery operation.
+ *
+ * @param context the context of the retry
+ * @return the result of the retry attempt
+ * @throws Exception if the retry fails
+ */
+ RetryResult retryExchangeRecovery(RetryContext context) throws Exception;
+
+ /**
+ * Retry a failed binding recovery operation.
+ *
+ * @param context the context of the retry
+ * @return the result of the retry attempt
+ * @throws Exception if the retry fails
+ */
+ RetryResult retryBindingRecovery(RetryContext context) throws Exception;
+
+ /**
+ * Retry a failed consumer recovery operation.
+ *
+ * @param context the context of the retry
+ * @return the result of the retry attempt
+ * @throws Exception if the retry fails
+ */
+ RetryResult retryConsumerRecovery(RetryContext context) throws Exception;
+}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java
new file mode 100644
index 0000000000..5b0beb2baa
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java
@@ -0,0 +1,57 @@
+// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.impl.recovery;
+
+/**
+ * The retry of a retried topology recovery operation.
+ *
+ * @since 4.8.0
+ */
+public class RetryResult {
+
+ /**
+ * The entity to recover.
+ */
+ private final RecordedEntity recordedEntity;
+
+ /**
+ * The result of the recovery operation.
+ * E.g. a consumer tag when recovering a consumer.
+ */
+ private final Object result;
+
+ public RetryResult(RecordedEntity recordedEntity, Object result) {
+ this.recordedEntity = recordedEntity;
+ this.result = result;
+ }
+
+ /**
+ * The entity to recover.
+ *
+ * @return
+ */
+ public RecordedEntity getRecordedEntity() {
+ return recordedEntity;
+ }
+
+ /**
+ * The result of the recovery operation.
+ * E.g. a consumer tag when recovering a consumer.
+ */
+ public Object getResult() {
+ return result;
+ }
+}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java
new file mode 100644
index 0000000000..767178300a
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java
@@ -0,0 +1,166 @@
+// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.impl.recovery;
+
+/**
+ * Builder to ease creation of {@link DefaultRetryHandler} instances.
+ *
+ * Just override what you need. By default, retry conditions don't trigger retry,
+ * retry operations are no-op, the number of retry attempts is 1, and the backoff
+ * policy doesn't wait at all.
+ *
+ * @see DefaultRetryHandler
+ * @see TopologyRecoveryRetryLogic
+ * @since 4.8.0
+ */
+public class TopologyRecoveryRetryHandlerBuilder {
+
+ private DefaultRetryHandler.RetryCondition super RecordedQueue> queueRecoveryRetryCondition = new DefaultRetryHandler.RetryCondition() {
+
+ @Override
+ public boolean test(RecordedQueue entity, Exception ex) {
+ return false;
+ }
+ };
+ private DefaultRetryHandler.RetryCondition super RecordedExchange> exchangeRecoveryRetryCondition = new DefaultRetryHandler.RetryCondition() {
+
+ @Override
+ public boolean test(RecordedExchange entity, Exception ex) {
+ return false;
+ }
+ };
+ private DefaultRetryHandler.RetryCondition super RecordedBinding> bindingRecoveryRetryCondition = new DefaultRetryHandler.RetryCondition() {
+
+ @Override
+ public boolean test(RecordedBinding entity, Exception ex) {
+ return false;
+ }
+ };
+ private DefaultRetryHandler.RetryCondition super RecordedConsumer> consumerRecoveryRetryCondition = new DefaultRetryHandler.RetryCondition() {
+
+ @Override
+ public boolean test(RecordedConsumer entity, Exception ex) {
+ return false;
+ }
+ };
+
+ private DefaultRetryHandler.RetryOperation> queueRecoveryRetryOperation = new DefaultRetryHandler.RetryOperation