Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix off-by-one in RetryingChannel, use the configured number of retries #367

Merged
merged 3 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-367.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: fix
fix:
description: Fix off-by-one in RetryingChannel, use the configured number of retries
links:
- https://github.com/palantir/dialogue/pull/367
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static Channel create(

LimitedChannel limited = new RoundRobinChannel(limitedChannels);
Channel channel = new QueuedChannel(limited, DispatcherMetrics.of(config.taggedMetricRegistry()));
channel = new RetryingChannel(channel);
channel = new RetryingChannel(channel, config.maxNumRetries());
channel = new UserAgentChannel(channel, userAgent);
channel = new NeverThrowChannel(channel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.function.Consumer;
import org.checkerframework.checker.nullness.qual.Nullable;

final class DialogueFutures {
private DialogueFutures() {}

@CanIgnoreReturnValue
static <T> ListenableFuture<T> addDirectCallback(ListenableFuture<T> future, FutureCallback<T> callback) {
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,104 +16,107 @@

package com.palantir.dialogue.core;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Immediately retries calls to the underlying channel upon failure.
*/
final class RetryingChannel implements Channel {
private static final int DEFAULT_MAX_RETRIES = 4;
private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor();

private static final Logger log = LoggerFactory.getLogger(RetryingChannel.class);

private static final int UNAVAILABLE_503 = 503;
private static final int TOO_MANY_REQUESTS_429 = 429;

private final Channel delegate;
private final int maxRetries;

RetryingChannel(Channel delegate) {
this(delegate, DEFAULT_MAX_RETRIES);
}

@VisibleForTesting
RetryingChannel(Channel delegate, int maxRetries) {
this.delegate = delegate;
this.maxRetries = maxRetries;
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
SettableFuture<Response> future = SettableFuture.create();

Function<Integer, ListenableFuture<Response>> callSupplier = attempt -> {
// TODO(dfox): include retry number in the request somehow
return delegate.execute(endpoint, request);
};
FutureCallback<Response> retryer = new RetryingCallback(callSupplier, future);
Futures.addCallback(callSupplier.apply(0), retryer, DIRECT_EXECUTOR);

return future;
return new RetryingCallback(delegate, endpoint, request, maxRetries).execute();
}

private final class RetryingCallback implements FutureCallback<Response> {
private final AtomicInteger failures = new AtomicInteger(0);
private final Function<Integer, ListenableFuture<Response>> runnable;
private final SettableFuture<Response> delegate;
private static final class RetryingCallback {
private final Channel delegate;
private final Endpoint endpoint;
private final Request request;
private final int maxRetries;
private int failures = 0;

private RetryingCallback(
Function<Integer, ListenableFuture<Response>> runnable, SettableFuture<Response> delegate) {
this.runnable = runnable;
private RetryingCallback(Channel delegate, Endpoint endpoint, Request request, int maxRetries) {
this.delegate = delegate;
this.endpoint = endpoint;
this.request = request;
this.maxRetries = maxRetries;
}

ListenableFuture<Response> execute() {
return wrap(delegate.execute(endpoint, request));
}

@Override
public void onSuccess(Response response) {
ListenableFuture<Response> success(Response response) {
// this condition should really match the BlacklistingChannel so that we don't hit the same host twice in
// a row
// TODO(ckozak): Respect ClientConfiguration.serverQos.
if (response.code() == UNAVAILABLE_503 || response.code() == TOO_MANY_REQUESTS_429) {
response.close();
retryOrFail(Optional.empty());
return;
Throwable failure =
new SafeRuntimeException("Received retryable response", SafeArg.of("status", response.code()));
if (++failures <= maxRetries) {
logRetry(failure);
return execute();
}
// TODO(ckozak): It's out of scope for this class to map responses to exceptions. After retries
// are exhausted the open response should be returned.
return Futures.immediateFailedFuture(new SafeRuntimeException("Retries exhausted", failure));
}

// TODO(dfox): if people are using 308, we probably need to support it too

boolean setSuccessfully = delegate.set(response);
if (!setSuccessfully) {
response.close();
}
return Futures.immediateFuture(response);
}

@Override
public void onFailure(Throwable throwable) {
retryOrFail(Optional.of(throwable));
ListenableFuture<Response> failure(Throwable throwable) {
if (++failures <= maxRetries) {
logRetry(throwable);
return execute();
}
return Futures.immediateFailedFuture(throwable);
}

private void retryOrFail(Optional<Throwable> throwable) {
int attempt = failures.incrementAndGet();
if (attempt < maxRetries) {
Futures.addCallback(runnable.apply(attempt), this, DIRECT_EXECUTOR);
} else {
if (throwable.isPresent()) {
delegate.setException(throwable.get());
} else {
delegate.setException(new SafeRuntimeException("Retries exhausted"));
}
private void logRetry(Throwable throwable) {
if (log.isInfoEnabled()) {
log.info(
"Retrying call after failure",
SafeArg.of("failures", failures),
SafeArg.of("maxRetries", maxRetries),
SafeArg.of("serviceName", endpoint.serviceName()),
SafeArg.of("endpoint", endpoint.endpointName()),
throwable);
}
}

private ListenableFuture<Response> wrap(ListenableFuture<Response> input) {
return Futures.catchingAsync(
Futures.transformAsync(input, this::success, MoreExecutors.directExecutor()),
Throwable.class,
this::failure,
MoreExecutors.directExecutor());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.HttpMethod;
Expand All @@ -39,7 +40,6 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand All @@ -50,46 +50,56 @@ public class RetryingChannelTest {
private static final TestResponse EXPECTED_RESPONSE = new TestResponse();
private static final ListenableFuture<Response> SUCCESS = Futures.immediateFuture(EXPECTED_RESPONSE);
private static final ListenableFuture<Response> FAILED =
Futures.immediateFailedFuture(new IllegalArgumentException());
Futures.immediateFailedFuture(new IllegalArgumentException("FAILED"));
private static final TestEndpoint ENDPOINT = new TestEndpoint();
private static final Request REQUEST = Request.builder().build();

@Mock
private Channel channel;

private RetryingChannel retryer;

@Before
public void before() {
retryer = new RetryingChannel(channel, 3);
}

@Test
public void testNoFailures() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any())).thenReturn(SUCCESS);

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

@Test
public void testRetriesUpToMaxRetries() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any())).thenReturn(FAILED).thenReturn(SUCCESS);

// One retry allows an initial request (not a retry) and a single retry.
Channel retryer = new RetryingChannel(channel, 1);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response).isDone();
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

@Test
public void testRetriesUpToMaxRetriesAndFails() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any()))
.thenReturn(FAILED)
.thenReturn(FAILED)
.thenReturn(SUCCESS);

// One retry allows an initial request (not a retry) and a single retry.
Channel retryer = new RetryingChannel(channel, 1);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
assertThatThrownBy(response::get)
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage("FAILED");
}

@Test
public void testRetriesMax() {
when(channel.execute(any(), any())).thenReturn(FAILED);

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get).hasCauseInstanceOf(IllegalArgumentException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -98,11 +108,12 @@ public void retries_429s() {
when(mockResponse.code()).thenReturn(429);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasMessageContaining("Retries exhausted")
.hasCauseInstanceOf(RuntimeException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -111,11 +122,12 @@ public void retries_503s() {
when(mockResponse.code()).thenReturn(503);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasMessageContaining("Retries exhausted")
.hasCauseInstanceOf(RuntimeException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -129,13 +141,24 @@ public void response_bodies_are_closed() throws Exception {
.thenReturn(Futures.immediateFuture(response2))
.thenReturn(Futures.immediateFuture(eventualSuccess));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get(1, TimeUnit.SECONDS).code()).isEqualTo(200);

verify(response1, times(1)).close();
verify(response2, times(1)).close();
}

@Test
public void testPropagatesCancel() {
ListenableFuture<Response> delegateResult = SettableFuture.create();
when(channel.execute(any(), any())).thenReturn(delegateResult);
Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> retryingResult = retryer.execute(ENDPOINT, REQUEST);
assertThat(retryingResult.cancel(true)).isTrue();
assertThat(delegateResult).as("Failed to cancel the delegate future").isCancelled();
}

private static Response mockResponse(int status) {
Response response = mock(Response.class);
when(response.code()).thenReturn(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private static Channel concurrencyLimiter(Simulation sim, Supplier<List<Simulati
LimitedChannel limited1 = new RoundRobinChannel(limitedChannels1);
limited1 = instrumentClient(limited1, sim.taggedMetrics()); // just for debugging
Channel channel = new QueuedChannel(limited1, DispatcherMetrics.of(sim.taggedMetrics()));
return new RetryingChannel(channel);
return new RetryingChannel(channel, 4 /* ClientConfigurations.DEFAULT_MAX_NUM_RETRIES */);
});
}

Expand All @@ -68,7 +68,7 @@ private static Channel roundRobin(Simulation sim, Supplier<List<SimulationServer
limited =
instrumentClient(limited, sim.taggedMetrics()); // will always be zero due to the noOpLimitedChannel
Channel channel = new QueuedChannel(limited, DispatcherMetrics.of(sim.taggedMetrics()));
return new RetryingChannel(channel);
return new RetryingChannel(channel, 4 /* ClientConfigurations.DEFAULT_MAX_NUM_RETRIES */);
});
}

Expand Down