Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix off-by-one in RetryingChannel, use the configured number of retries #367

Merged
merged 3 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-367.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: fix
fix:
description: Fix off-by-one in RetryingChannel, use the configured number of retries
links:
- https://github.com/palantir/dialogue/pull/367
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static Channel create(

LimitedChannel limited = new RoundRobinChannel(limitedChannels);
Channel channel = new QueuedChannel(limited, DispatcherMetrics.of(config.taggedMetricRegistry()));
channel = new RetryingChannel(channel);
channel = new RetryingChannel(channel, config.maxNumRetries());
channel = new UserAgentChannel(channel, userAgent);
channel = new NeverThrowChannel(channel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.function.Consumer;
import org.checkerframework.checker.nullness.qual.Nullable;

final class DialogueFutures {
private DialogueFutures() {}

@CanIgnoreReturnValue
static <T> ListenableFuture<T> addDirectCallback(ListenableFuture<T> future, FutureCallback<T> callback) {
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,29 @@

package com.palantir.dialogue.core;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.IntFunction;

/**
* Immediately retries calls to the underlying channel upon failure.
*/
final class RetryingChannel implements Channel {
private static final int DEFAULT_MAX_RETRIES = 4;
private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor();

private static final int UNAVAILABLE_503 = 503;
private static final int TOO_MANY_REQUESTS_429 = 429;

private final Channel delegate;
private final int maxRetries;

RetryingChannel(Channel delegate) {
this(delegate, DEFAULT_MAX_RETRIES);
}

@VisibleForTesting
RetryingChannel(Channel delegate, int maxRetries) {
this.delegate = delegate;
this.maxRetries = maxRetries;
Expand All @@ -59,23 +48,21 @@ final class RetryingChannel implements Channel {
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
SettableFuture<Response> future = SettableFuture.create();

Function<Integer, ListenableFuture<Response>> callSupplier = attempt -> {
IntFunction<ListenableFuture<Response>> callSupplier = attempt -> {
// TODO(dfox): include retry number in the request somehow
return delegate.execute(endpoint, request);
};
FutureCallback<Response> retryer = new RetryingCallback(callSupplier, future);
Futures.addCallback(callSupplier.apply(0), retryer, DIRECT_EXECUTOR);

DialogueFutures.addDirectCallback(callSupplier.apply(0), retryer);
return future;
}

private final class RetryingCallback implements FutureCallback<Response> {
private final AtomicInteger failures = new AtomicInteger(0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an int given that retries are sequenced we can rely on the happens-before relationship.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to make that change as part of this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it to a an improvement on top of this PR, might be best if you'd like to review that in isolation :-]

private final Function<Integer, ListenableFuture<Response>> runnable;
private final IntFunction<ListenableFuture<Response>> runnable;
private final SettableFuture<Response> delegate;

private RetryingCallback(
Function<Integer, ListenableFuture<Response>> runnable, SettableFuture<Response> delegate) {
private RetryingCallback(IntFunction<ListenableFuture<Response>> runnable, SettableFuture<Response> delegate) {
this.runnable = runnable;
this.delegate = delegate;
}
Expand Down Expand Up @@ -105,8 +92,8 @@ public void onFailure(Throwable throwable) {

private void retryOrFail(Optional<Throwable> throwable) {
int attempt = failures.incrementAndGet();
if (attempt < maxRetries) {
Futures.addCallback(runnable.apply(attempt), this, DIRECT_EXECUTOR);
if (attempt <= maxRetries) {
DialogueFutures.addDirectCallback(runnable.apply(attempt), this);
} else {
if (throwable.isPresent()) {
delegate.setException(throwable.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand All @@ -50,46 +49,56 @@ public class RetryingChannelTest {
private static final TestResponse EXPECTED_RESPONSE = new TestResponse();
private static final ListenableFuture<Response> SUCCESS = Futures.immediateFuture(EXPECTED_RESPONSE);
private static final ListenableFuture<Response> FAILED =
Futures.immediateFailedFuture(new IllegalArgumentException());
Futures.immediateFailedFuture(new IllegalArgumentException("FAILED"));
private static final TestEndpoint ENDPOINT = new TestEndpoint();
private static final Request REQUEST = Request.builder().build();

@Mock
private Channel channel;

private RetryingChannel retryer;

@Before
public void before() {
retryer = new RetryingChannel(channel, 3);
}

@Test
public void testNoFailures() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any())).thenReturn(SUCCESS);

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

@Test
public void testRetriesUpToMaxRetries() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any())).thenReturn(FAILED).thenReturn(SUCCESS);

// One retry allows an initial request (not a retry) and a single retry.
Channel retryer = new RetryingChannel(channel, 1);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response).isDone();
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

@Test
public void testRetriesUpToMaxRetriesAndFails() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any()))
.thenReturn(FAILED)
.thenReturn(FAILED)
.thenReturn(SUCCESS);

// One retry allows an initial request (not a retry) and a single retry.
Channel retryer = new RetryingChannel(channel, 1);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
assertThatThrownBy(response::get)
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage("FAILED");
}

@Test
public void testRetriesMax() {
when(channel.execute(any(), any())).thenReturn(FAILED);

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get).hasCauseInstanceOf(IllegalArgumentException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -98,11 +107,12 @@ public void retries_429s() {
when(mockResponse.code()).thenReturn(429);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasMessageContaining("Retries exhausted")
.hasCauseInstanceOf(RuntimeException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -111,11 +121,12 @@ public void retries_503s() {
when(mockResponse.code()).thenReturn(503);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasMessageContaining("Retries exhausted")
.hasCauseInstanceOf(RuntimeException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
}

@Test
Expand All @@ -129,6 +140,7 @@ public void response_bodies_are_closed() throws Exception {
.thenReturn(Futures.immediateFuture(response2))
.thenReturn(Futures.immediateFuture(eventualSuccess));

Channel retryer = new RetryingChannel(channel, 3);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get(1, TimeUnit.SECONDS).code()).isEqualTo(200);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private static Channel concurrencyLimiter(Simulation sim, Supplier<List<Simulati
LimitedChannel limited1 = new RoundRobinChannel(limitedChannels1);
limited1 = instrumentClient(limited1, sim.taggedMetrics()); // just for debugging
Channel channel = new QueuedChannel(limited1, DispatcherMetrics.of(sim.taggedMetrics()));
return new RetryingChannel(channel);
return new RetryingChannel(channel, 4 /* ClientConfigurations.DEFAULT_MAX_NUM_RETRIES */);
});
}

Expand All @@ -68,7 +68,7 @@ private static Channel roundRobin(Simulation sim, Supplier<List<SimulationServer
limited =
instrumentClient(limited, sim.taggedMetrics()); // will always be zero due to the noOpLimitedChannel
Channel channel = new QueuedChannel(limited, DispatcherMetrics.of(sim.taggedMetrics()));
return new RetryingChannel(channel);
return new RetryingChannel(channel, 4 /* ClientConfigurations.DEFAULT_MAX_NUM_RETRIES */);
});
}

Expand Down