Skip to content

Commit

Permalink
Add optional retry logic to topology recovery
Browse files Browse the repository at this point in the history
There's no topology recovery retry by default. The default
implementation is composable: not all have the recoverable entities have
to retry and the retry operations don't have to be only the
corresponding entity recovery, but also other operations, like
recovering the corresponding channel.

Fixes #387
  • Loading branch information
acogoluegnes committed Aug 7, 2018
1 parent 26a4a96 commit 34e33ea
Show file tree
Hide file tree
Showing 18 changed files with 1,160 additions and 155 deletions.
18 changes: 18 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;

import java.io.IOException;
Expand Down Expand Up @@ -192,6 +193,13 @@ public class ConnectionFactory implements Cloneable {
*/
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;

/**
* Retry handler for topology recovery.
* Default is no retry.
* @since 5.4.0
*/
private RetryHandler topologyRecoveryRetryHandler;

/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -1087,6 +1095,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setErrorOnWriteListener(errorOnWriteListener);
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
return result;
}

Expand Down Expand Up @@ -1454,4 +1463,13 @@ public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalExc
this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition;
}

/**
* Set retry handler for topology recovery.
* Default is no retry.
* @param topologyRecoveryRetryHandler
* @since 5.4.0
*/
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
}
}
9 changes: 9 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;

import java.util.Map;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class ConnectionParams {
private int workPoolTimeout = -1;
private TopologyRecoveryFilter topologyRecoveryFilter;
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
private RetryHandler topologyRecoveryRetryHandler;

private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;
Expand Down Expand Up @@ -257,4 +259,11 @@ public Predicate<ShutdownSignalException> getConnectionRecoveryTriggeringConditi
return connectionRecoveryTriggeringCondition;
}

public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
}

public RetryHandler getTopologyRecoveryRetryHandler() {
return topologyRecoveryRetryHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC

private final Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;

private final RetryHandler retryHandler;

public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
this(params, f, new ListAddressResolver(addrs));
}
Expand All @@ -115,6 +117,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
this.channels = new ConcurrentHashMap<>();
this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
letAllPassFilter() : params.getTopologyRecoveryFilter();

this.retryHandler = params.getTopologyRecoveryRetryHandler();
}

private void setupErrorOnWriteListenerForPotentialRecovery() {
Expand All @@ -125,12 +129,9 @@ private void setupErrorOnWriteListenerForPotentialRecovery() {
// we should trigger the error handling and the recovery only once
if (errorOnWriteLock.tryLock()) {
try {
Thread recoveryThread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
AMQConnection c = (AMQConnection) connection;
c.handleIoError(exception);
}
Thread recoveryThread = threadFactory.newThread(() -> {
AMQConnection c = (AMQConnection) connection;
c.handleIoError(exception);
});
recoveryThread.setName("RabbitMQ Error On Write Thread");
recoveryThread.start();
Expand Down Expand Up @@ -630,6 +631,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
}
}

void recoverChannel(AutorecoveringChannel channel) throws IOException {
channel.automaticallyRecover(this, this.delegate);
}

private void notifyRecoveryListenersComplete() {
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
f.handleRecovery(this);
Expand All @@ -651,16 +656,16 @@ private void recoverTopology(final ExecutorService executor) {
if (executor == null) {
// recover entities in serial on the main connection thread
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
recoverExchange(exchange);
recoverExchange(exchange, true);
}
for (final Map.Entry<String, RecordedQueue> entry : Utility.copy(recordedQueues).entrySet()) {
recoverQueue(entry.getKey(), entry.getValue());
recoverQueue(entry.getKey(), entry.getValue(), true);
}
for (final RecordedBinding b : Utility.copy(recordedBindings)) {
recoverBinding(b);
recoverBinding(b, true);
}
for (final Map.Entry<String, RecordedConsumer> entry : Utility.copy(consumers).entrySet()) {
recoverConsumer(entry.getKey(), entry.getValue());
recoverConsumer(entry.getKey(), entry.getValue(), true);
}
} else {
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
Expand All @@ -680,11 +685,19 @@ private void recoverTopology(final ExecutorService executor) {
}
}

private void recoverExchange(final RecordedExchange x) {
private void recoverExchange(RecordedExchange x, boolean retry) {
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
try {
if (topologyRecoveryFilter.filterExchange(x)) {
x.recover();
if (retry) {
final RecordedExchange entity = x;
x = (RecordedExchange) wrapRetryIfNecessary(x, () -> {
entity.recover();
return null;
}).getRecordedEntity();
} else {
x.recover();
}
LOGGER.debug("{} has recovered", x);
}
} catch (Exception cause) {
Expand All @@ -695,12 +708,20 @@ private void recoverExchange(final RecordedExchange x) {
}
}

private void recoverQueue(final String oldName, final RecordedQueue q) {

void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
try {
if (topologyRecoveryFilter.filterQueue(q)) {
LOGGER.debug("Recovering {}", q);
q.recover();
if (retry) {
final RecordedQueue entity = q;
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
entity.recover();
return null;
}).getRecordedEntity();
} else {
q.recover();
}
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
Expand Down Expand Up @@ -731,10 +752,18 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {
}
}

private void recoverBinding(final RecordedBinding b) {
private void recoverBinding(RecordedBinding b, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterBinding(b)) {
b.recover();
if (retry) {
final RecordedBinding entity = b;
b = (RecordedBinding) wrapRetryIfNecessary(b, () -> {
entity.recover();
return null;
}).getRecordedEntity();
} else {
b.recover();
}
LOGGER.debug("{} has recovered", b);
}
} catch (Exception cause) {
Expand All @@ -745,11 +774,20 @@ private void recoverBinding(final RecordedBinding b) {
}
}

private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
LOGGER.debug("Recovering {}", consumer);
String newTag = consumer.recover();
String newTag = null;
if (retry) {
final RecordedConsumer entity = consumer;
RetryResult retryResult = wrapRetryIfNecessary(consumer, () -> entity.recover());
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
newTag = (String) retryResult.getResult();
} else {
newTag = consumer.recover();
}

// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
Expand All @@ -772,6 +810,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer)
}
}

private <T> RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable<T> recoveryAction) throws Exception {
if (this.retryHandler == null) {
T result = recoveryAction.call();
return new RetryResult(entity, result);
} else {
try {
T result = recoveryAction.call();
return new RetryResult(entity, result);
} catch (Exception e) {
RetryContext retryContext = new RetryContext(entity, e, this);
RetryResult retryResult;
if (entity instanceof RecordedQueue) {
retryResult = this.retryHandler.retryQueueRecovery(retryContext);
} else if (entity instanceof RecordedExchange) {
retryResult = this.retryHandler.retryExchangeRecovery(retryContext);
} else if (entity instanceof RecordedBinding) {
retryResult = this.retryHandler.retryBindingRecovery(retryContext);
} else if (entity instanceof RecordedConsumer) {
retryResult = this.retryHandler.retryConsumerRecovery(retryContext);
} else {
throw new IllegalArgumentException("Unknown type of recorded entity: " + entity);
}
return retryResult;
}
}
}

private void propagateQueueNameChangeToBindings(String oldName, String newName) {
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
if (b.getDestination().equals(oldName)) {
Expand Down Expand Up @@ -820,15 +885,15 @@ private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel
callables.add(Executors.callable(() -> {
for (final E entity : entityList) {
if (entity instanceof RecordedExchange) {
recoverExchange((RecordedExchange)entity);
recoverExchange((RecordedExchange)entity, true);
} else if (entity instanceof RecordedQueue) {
final RecordedQueue q = (RecordedQueue) entity;
recoverQueue(q.getName(), q);
recoverQueue(q.getName(), q, true);
} else if (entity instanceof RecordedBinding) {
recoverBinding((RecordedBinding) entity);
recoverBinding((RecordedBinding) entity, true);
} else if (entity instanceof RecordedConsumer) {
final RecordedConsumer c = (RecordedConsumer) entity;
recoverConsumer(c.getConsumerTag(), c);
recoverConsumer(c.getConsumerTag(), c, true);
}
}
}));
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.impl.recovery;

/**
* Backoff policy for topology recovery retry attempts.
*
* @see DefaultRetryHandler
* @see TopologyRecoveryRetryHandlerBuilder
* @since 5.4.0
*/
@FunctionalInterface
public interface BackoffPolicy {

/**
* Wait depending on the current attempt number (1, 2, 3, etc)
* @param attemptNumber current attempt number
* @throws InterruptedException
*/
void backoff(int attemptNumber) throws InterruptedException;
}
Loading

0 comments on commit 34e33ea

Please sign in to comment.