Skip to content

Commit

Permalink
Make retry condition more tolerant with wildcards
Browse files Browse the repository at this point in the history
References #387
  • Loading branch information
acogoluegnes committed Aug 10, 2018
1 parent 9176062 commit 9711406
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
*/
public class DefaultRetryHandler implements RetryHandler {

private final BiPredicate<RecordedQueue, Exception> queueRecoveryRetryCondition;
private final BiPredicate<RecordedExchange, Exception> exchangeRecoveryRetryCondition;
private final BiPredicate<RecordedBinding, Exception> bindingRecoveryRetryCondition;
private final BiPredicate<RecordedConsumer, Exception> consumerRecoveryRetryCondition;
private final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
private final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition;

private final RetryOperation<?> queueRecoveryRetryOperation;
private final RetryOperation<?> exchangeRecoveryRetryOperation;
Expand All @@ -49,10 +49,10 @@ public class DefaultRetryHandler implements RetryHandler {

private final BackoffPolicy backoffPolicy;

public DefaultRetryHandler(BiPredicate<RecordedQueue, Exception> queueRecoveryRetryCondition,
BiPredicate<RecordedExchange, Exception> exchangeRecoveryRetryCondition,
BiPredicate<RecordedBinding, Exception> bindingRecoveryRetryCondition,
BiPredicate<RecordedConsumer, Exception> consumerRecoveryRetryCondition,
public DefaultRetryHandler(BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition,
BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition,
BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition,
BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition,
RetryOperation<?> queueRecoveryRetryOperation,
RetryOperation<?> exchangeRecoveryRetryOperation,
RetryOperation<?> bindingRecoveryRetryOperation,
Expand All @@ -72,27 +72,31 @@ public DefaultRetryHandler(BiPredicate<RecordedQueue, Exception> queueRecoveryRe
this.retryAttempts = retryAttempts;
}

@SuppressWarnings("unchecked")
@Override
public RetryResult retryQueueRecovery(RetryContext context) throws Exception {
return doRetry(queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context);
return doRetry((BiPredicate<RecordedEntity, Exception>) queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context);
}

@SuppressWarnings("unchecked")
@Override
public RetryResult retryExchangeRecovery(RetryContext context) throws Exception {
return doRetry(exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context);
return doRetry((BiPredicate<RecordedEntity, Exception>) exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context);
}

@SuppressWarnings("unchecked")
@Override
public RetryResult retryBindingRecovery(RetryContext context) throws Exception {
return doRetry(bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context);
return doRetry((BiPredicate<RecordedEntity, Exception>) bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context);
}

@SuppressWarnings("unchecked")
@Override
public RetryResult retryConsumerRecovery(RetryContext context) throws Exception {
return doRetry(consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context);
return doRetry((BiPredicate<RecordedEntity, Exception>) consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context);
}

protected <T extends RecordedEntity> RetryResult doRetry(BiPredicate<T, Exception> condition, RetryOperation<?> operation, T entity, RetryContext context)
protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition, RetryOperation<?> operation, RecordedEntity entity, RetryContext context)
throws Exception {
int attempts = 0;
Exception exception = context.exception();
Expand All @@ -107,7 +111,6 @@ protected <T extends RecordedEntity> RetryResult doRetry(BiPredicate<T, Exceptio
} catch (Exception e) {
exception = e;
attempts++;
continue;
}
} else {
throw exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
*/
public class TopologyRecoveryRetryHandlerBuilder {

private BiPredicate<RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
private BiPredicate<RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
private BiPredicate<RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
private BiPredicate<RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;
private BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
private BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
private BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
private BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;

private DefaultRetryHandler.RetryOperation<?> queueRecoveryRetryOperation = context -> null;
private DefaultRetryHandler.RetryOperation<?> exchangeRecoveryRetryOperation = context -> null;
Expand All @@ -50,25 +50,25 @@ public static TopologyRecoveryRetryHandlerBuilder builder() {
}

public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryCondition(
BiPredicate<RecordedQueue, Exception> queueRecoveryRetryCondition) {
BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition) {
this.queueRecoveryRetryCondition = queueRecoveryRetryCondition;
return this;
}

public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryCondition(
BiPredicate<RecordedExchange, Exception> exchangeRecoveryRetryCondition) {
BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition) {
this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition;
return this;
}

public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryCondition(
BiPredicate<RecordedBinding, Exception> bindingRecoveryRetryCondition) {
BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition) {
this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition;
return this;
}

public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryCondition(
BiPredicate<RecordedConsumer, Exception> consumerRecoveryRetryCondition) {
BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition) {
this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ShutdownSignalException;

import java.util.function.BiPredicate;
import java.util.function.Predicate;

/**
Expand All @@ -31,9 +32,9 @@
*/
public abstract class TopologyRecoveryRetryLogic {

public static final Predicate<Exception> CHANNEL_CLOSED_NOT_FOUND = e -> {
if (e.getCause() instanceof ShutdownSignalException) {
ShutdownSignalException cause = (ShutdownSignalException) e.getCause();
public static final BiPredicate<RecordedEntity, Exception> CHANNEL_CLOSED_NOT_FOUND = (entity, ex) -> {
if (ex.getCause() instanceof ShutdownSignalException) {
ShutdownSignalException cause = (ShutdownSignalException) ex.getCause();
if (cause.getReason() instanceof AMQP.Channel.Close) {
return ((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void topologyRecoveryRetry() throws Exception {
protected ConnectionFactory newConnectionFactory() {
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
connectionFactory.setTopologyRecoveryRetryHandler(
builder().bindingRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e))
.consumerRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e))
builder().bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)))
.build()
Expand Down

0 comments on commit 9711406

Please sign in to comment.