Skip to content

Commit

Permalink
Fix off-by-one in RetryingChannel, use the configured number of retri…
Browse files Browse the repository at this point in the history
…es (#367)

Fix off-by-one in RetryingChannel, use the configured number of retries
  • Loading branch information
carterkozak authored Feb 18, 2020
1 parent fcb5148 commit 093c647
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 68 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-367.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: fix
fix:
description: Fix off-by-one in RetryingChannel, use the configured number of retries
links:
- https://github.com/palantir/dialogue/pull/367
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static Channel create(

LimitedChannel limited = new RoundRobinChannel(limitedChannels);
Channel channel = new QueuedChannel(limited, DispatcherMetrics.of(config.taggedMetricRegistry()));
channel = new RetryingChannel(channel);
channel = new RetryingChannel(channel, config.maxNumRetries());
channel = new UserAgentChannel(channel, userAgent);
channel = new NeverThrowChannel(channel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.function.Consumer;
import org.checkerframework.checker.nullness.qual.Nullable;

final class DialogueFutures {
private DialogueFutures() {}

@CanIgnoreReturnValue
static <T> ListenableFuture<T> addDirectCallback(ListenableFuture<T> future, FutureCallback<T> callback) {
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,104 +16,107 @@

package com.palantir.dialogue.core;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Immediately retries calls to the underlying channel upon failure.
*/
final class RetryingChannel implements Channel {
private static final int DEFAULT_MAX_RETRIES = 4;
private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor();

private static final Logger log = LoggerFactory.getLogger(RetryingChannel.class);

private static final int UNAVAILABLE_503 = 503;
private static final int TOO_MANY_REQUESTS_429 = 429;

private final Channel delegate;
private final int maxRetries;

RetryingChannel(Channel delegate) {
this(delegate, DEFAULT_MAX_RETRIES);
}

@VisibleForTesting
RetryingChannel(Channel delegate, int maxRetries) {
this.delegate = delegate;
this.maxRetries = maxRetries;
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
SettableFuture<Response> future = SettableFuture.create();

Function<Integer, ListenableFuture<Response>> callSupplier = attempt -> {
// TODO(dfox): include retry number in the request somehow
return delegate.execute(endpoint, request);
};
FutureCallback<Response> retryer = new RetryingCallback(callSupplier, future);
Futures.addCallback(callSupplier.apply(0), retryer, DIRECT_EXECUTOR);

return future;
return new RetryingCallback(delegate, endpoint, request, maxRetries).execute();
}

private final class RetryingCallback implements FutureCallback<Response> {
private final AtomicInteger failures = new AtomicInteger(0);
private final Function<Integer, ListenableFuture<Response>> runnable;
private final SettableFuture<Response> delegate;
private static final class RetryingCallback {
private final Channel delegate;
private final Endpoint endpoint;
private final Request request;
private final int maxRetries;
private int failures = 0;

private RetryingCallback(
Function<Integer, ListenableFuture<Response>> runnable, SettableFuture<Response> delegate) {
this.runnable = runnable;
private RetryingCallback(Channel delegate, Endpoint endpoint, Request request, int maxRetries) {
this.delegate = delegate;
this.endpoint = endpoint;
this.request = request;
this.maxRetries = maxRetries;
}

ListenableFuture<Response> execute() {
return wrap(delegate.execute(endpoint, request));
}

@Override
public void onSuccess(Response response) {
ListenableFuture<Response> success(Response response) {
// this condition should really match the BlacklistingChannel so that we don't hit the same host twice in
// a row
// TODO(ckozak): Respect ClientConfiguration.serverQos.
if (response.code() == UNAVAILABLE_503 || response.code() == TOO_MANY_REQUESTS_429) {
response.close();
retryOrFail(Optional.empty());
return;
Throwable failure =
new SafeRuntimeException("Received retryable response", SafeArg.of("status", response.code()));
if (++failures <= maxRetries) {
logRetry(failure);
return execute();
}
// TODO(ckozak): It's out of scope for this class to map responses to exceptions. After retries
// are exhausted the open response should be returned.
return Futures.immediateFailedFuture(new SafeRuntimeException("Retries exhausted", failure));
}

// TODO(dfox): if people are using 308, we probably need to support it too

boolean setSuccessfully = delegate.set(response);
if (!setSuccessfully) {
response.close();
}
return Futures.immediateFuture(response);
}

@Override
public void onFailure(Throwable throwable) {
retryOrFail(Optional.of(throwable));
ListenableFuture<Response> failure(Throwable throwable) {
if (++failures <= maxRetries) {
logRetry(throwable);
return execute();
}
return Futures.immediateFailedFuture(throwable);
}

private void retryOrFail(Optional<Throwable> throwable) {
int attempt = failures.incrementAndGet();
if (attempt < maxRetries) {
Futures.addCallback(runnable.apply(attempt), this, DIRECT_EXECUTOR);
} else {
if (throwable.isPresent()) {
delegate.setException(throwable.get());
} else {
delegate.setException(new SafeRuntimeException("Retries exhausted"));
}
private void logRetry(Throwable throwable) {
if (log.isInfoEnabled()) {
log.info(
"Retrying call after failure",
SafeArg.of("failures", failures),
SafeArg.of("maxRetries", maxRetries),
SafeArg.of("serviceName", endpoint.serviceName()),
SafeArg.of("endpoint", endpoint.endpointName()),
throwable);
}
}

private ListenableFuture<Response> wrap(ListenableFuture<Response> input) {
return Futures.catchingAsync(
Futures.transformAsync(input, this::success, MoreExecutors.directExecutor()),
Throwable.class,
this::failure,
MoreExecutors.directExecutor());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.HttpMethod;
Expand All @@ -39,7 +40,6 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand All @@ -50,46 +50,56 @@ public class RetryingChannelTest {
private static final TestResponse EXPECTED_RESPONSE = new TestResponse();
private static final ListenableFuture<Response> SUCCESS = Futures.immediateFuture(EXPECTED_RESPONSE);
private static final ListenableFuture<Response> FAILED =
Futures.immediateFailedFuture(new IllegalArgumentException());
Futures.immediateFailedFuture(new IllegalArgumentException("FAILED"));
private static final TestEndpoint ENDPOINT = new TestEndpoint();
private static final Request REQUEST = Request.builder().build();

@Mock
private Channel channel;

private RetryingChannel retryer;

@Before
public void before() {
retryer = new RetryingChannel(channel, 3);
}

@Test
public void testNoFailures() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any())).thenReturn(SUCCESS);

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

@Test
public void testRetriesUpToMaxRetries() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any())).thenReturn(FAILED).thenReturn(SUCCESS);

// One retry allows an initial request (not a retry) and a single retry.
Channel retryer = new RetryingChannel(channel, 1);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response).isDone();
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

@Test
public void testRetriesUpToMaxRetriesAndFails() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any()))
.thenReturn(FAILED)
.thenReturn(FAILED)
.thenReturn(SUCCESS);

// One retry allows an initial request (not a retry) and a single retry.
Channel retryer = new RetryingChannel(channel, 1);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
assertThatThrownBy(response::get)
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage("FAILED");
}

@Test
public void testRetriesMax() {
when(channel.execute(any(), any())).thenReturn(FAILED);

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get).hasCauseInstanceOf(IllegalArgumentException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -98,11 +108,12 @@ public void retries_429s() {
when(mockResponse.code()).thenReturn(429);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasMessageContaining("Retries exhausted")
.hasCauseInstanceOf(RuntimeException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -111,11 +122,12 @@ public void retries_503s() {
when(mockResponse.code()).thenReturn(503);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasMessageContaining("Retries exhausted")
.hasCauseInstanceOf(RuntimeException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -129,13 +141,24 @@ public void response_bodies_are_closed() throws Exception {
.thenReturn(Futures.immediateFuture(response2))
.thenReturn(Futures.immediateFuture(eventualSuccess));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get(1, TimeUnit.SECONDS).code()).isEqualTo(200);

verify(response1, times(1)).close();
verify(response2, times(1)).close();
}

@Test
public void testPropagatesCancel() {
ListenableFuture<Response> delegateResult = SettableFuture.create();
when(channel.execute(any(), any())).thenReturn(delegateResult);
Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> retryingResult = retryer.execute(ENDPOINT, REQUEST);
assertThat(retryingResult.cancel(true)).isTrue();
assertThat(delegateResult).as("Failed to cancel the delegate future").isCancelled();
}

private static Response mockResponse(int status) {
Response response = mock(Response.class);
when(response.code()).thenReturn(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private static Channel concurrencyLimiter(Simulation sim, Supplier<List<Simulati
LimitedChannel limited1 = new RoundRobinChannel(limitedChannels1);
limited1 = instrumentClient(limited1, sim.taggedMetrics()); // just for debugging
Channel channel = new QueuedChannel(limited1, DispatcherMetrics.of(sim.taggedMetrics()));
return new RetryingChannel(channel);
return new RetryingChannel(channel, 4 /* ClientConfigurations.DEFAULT_MAX_NUM_RETRIES */);
});
}

Expand All @@ -68,7 +68,7 @@ private static Channel roundRobin(Simulation sim, Supplier<List<SimulationServer
limited =
instrumentClient(limited, sim.taggedMetrics()); // will always be zero due to the noOpLimitedChannel
Channel channel = new QueuedChannel(limited, DispatcherMetrics.of(sim.taggedMetrics()));
return new RetryingChannel(channel);
return new RetryingChannel(channel, 4 /* ClientConfigurations.DEFAULT_MAX_NUM_RETRIES */);
});
}

Expand Down

0 comments on commit 093c647

Please sign in to comment.