diff --git a/changelog/@unreleased/pr-367.v2.yml b/changelog/@unreleased/pr-367.v2.yml new file mode 100644 index 000000000..79842cb91 --- /dev/null +++ b/changelog/@unreleased/pr-367.v2.yml @@ -0,0 +1,5 @@ +type: fix +fix: + description: Fix off-by-one in RetryingChannel, use the configured number of retries + links: + - https://github.com/palantir/dialogue/pull/367 diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/Channels.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/Channels.java index 4011b5c58..f7be4fa34 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/Channels.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/Channels.java @@ -55,7 +55,7 @@ public static Channel create( LimitedChannel limited = new RoundRobinChannel(limitedChannels); Channel channel = new QueuedChannel(limited, DispatcherMetrics.of(config.taggedMetricRegistry())); - channel = new RetryingChannel(channel); + channel = new RetryingChannel(channel, config.maxNumRetries()); channel = new UserAgentChannel(channel, userAgent); channel = new NeverThrowChannel(channel); diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueFutures.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueFutures.java index 9c8364633..04645767f 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueFutures.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueFutures.java @@ -20,12 +20,14 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.util.function.Consumer; import org.checkerframework.checker.nullness.qual.Nullable; final class DialogueFutures { private DialogueFutures() {} + @CanIgnoreReturnValue static ListenableFuture addDirectCallback(ListenableFuture future, FutureCallback callback) { Futures.addCallback(future, callback, MoreExecutors.directExecutor()); return future; diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/RetryingChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/RetryingChannel.java index a69595d36..233d0aa71 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/RetryingChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/RetryingChannel.java @@ -16,28 +16,24 @@ package com.palantir.dialogue.core; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import com.palantir.dialogue.Channel; import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.Request; import com.palantir.dialogue.Response; +import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeRuntimeException; -import java.util.Optional; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Immediately retries calls to the underlying channel upon failure. */ final class RetryingChannel implements Channel { - private static final int DEFAULT_MAX_RETRIES = 4; - private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor(); + + private static final Logger log = LoggerFactory.getLogger(RetryingChannel.class); private static final int UNAVAILABLE_503 = 503; private static final int TOO_MANY_REQUESTS_429 = 429; @@ -45,11 +41,6 @@ final class RetryingChannel implements Channel { private final Channel delegate; private final int maxRetries; - RetryingChannel(Channel delegate) { - this(delegate, DEFAULT_MAX_RETRIES); - } - - @VisibleForTesting RetryingChannel(Channel delegate, int maxRetries) { this.delegate = delegate; this.maxRetries = maxRetries; @@ -57,63 +48,75 @@ final class RetryingChannel implements Channel { @Override public ListenableFuture execute(Endpoint endpoint, Request request) { - SettableFuture future = SettableFuture.create(); - - Function> callSupplier = attempt -> { - // TODO(dfox): include retry number in the request somehow - return delegate.execute(endpoint, request); - }; - FutureCallback retryer = new RetryingCallback(callSupplier, future); - Futures.addCallback(callSupplier.apply(0), retryer, DIRECT_EXECUTOR); - - return future; + return new RetryingCallback(delegate, endpoint, request, maxRetries).execute(); } - private final class RetryingCallback implements FutureCallback { - private final AtomicInteger failures = new AtomicInteger(0); - private final Function> runnable; - private final SettableFuture delegate; + private static final class RetryingCallback { + private final Channel delegate; + private final Endpoint endpoint; + private final Request request; + private final int maxRetries; + private int failures = 0; - private RetryingCallback( - Function> runnable, SettableFuture delegate) { - this.runnable = runnable; + private RetryingCallback(Channel delegate, Endpoint endpoint, Request request, int maxRetries) { this.delegate = delegate; + this.endpoint = endpoint; + this.request = request; + this.maxRetries = maxRetries; + } + + ListenableFuture execute() { + return wrap(delegate.execute(endpoint, request)); } - @Override - public void onSuccess(Response response) { + ListenableFuture success(Response response) { // this condition should really match the BlacklistingChannel so that we don't hit the same host twice in // a row + // TODO(ckozak): Respect ClientConfiguration.serverQos. if (response.code() == UNAVAILABLE_503 || response.code() == TOO_MANY_REQUESTS_429) { response.close(); - retryOrFail(Optional.empty()); - return; + Throwable failure = + new SafeRuntimeException("Received retryable response", SafeArg.of("status", response.code())); + if (++failures <= maxRetries) { + logRetry(failure); + return execute(); + } + // TODO(ckozak): It's out of scope for this class to map responses to exceptions. After retries + // are exhausted the open response should be returned. + return Futures.immediateFailedFuture(new SafeRuntimeException("Retries exhausted", failure)); } // TODO(dfox): if people are using 308, we probably need to support it too - boolean setSuccessfully = delegate.set(response); - if (!setSuccessfully) { - response.close(); - } + return Futures.immediateFuture(response); } - @Override - public void onFailure(Throwable throwable) { - retryOrFail(Optional.of(throwable)); + ListenableFuture failure(Throwable throwable) { + if (++failures <= maxRetries) { + logRetry(throwable); + return execute(); + } + return Futures.immediateFailedFuture(throwable); } - private void retryOrFail(Optional throwable) { - int attempt = failures.incrementAndGet(); - if (attempt < maxRetries) { - Futures.addCallback(runnable.apply(attempt), this, DIRECT_EXECUTOR); - } else { - if (throwable.isPresent()) { - delegate.setException(throwable.get()); - } else { - delegate.setException(new SafeRuntimeException("Retries exhausted")); - } + private void logRetry(Throwable throwable) { + if (log.isInfoEnabled()) { + log.info( + "Retrying call after failure", + SafeArg.of("failures", failures), + SafeArg.of("maxRetries", maxRetries), + SafeArg.of("serviceName", endpoint.serviceName()), + SafeArg.of("endpoint", endpoint.endpointName()), + throwable); } } + + private ListenableFuture wrap(ListenableFuture input) { + return Futures.catchingAsync( + Futures.transformAsync(input, this::success, MoreExecutors.directExecutor()), + Throwable.class, + this::failure, + MoreExecutors.directExecutor()); + } } } diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/RetryingChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/RetryingChannelTest.java index bb0631fb4..9ab270461 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/RetryingChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/RetryingChannelTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.palantir.dialogue.Channel; import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.HttpMethod; @@ -39,7 +40,6 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -50,46 +50,56 @@ public class RetryingChannelTest { private static final TestResponse EXPECTED_RESPONSE = new TestResponse(); private static final ListenableFuture SUCCESS = Futures.immediateFuture(EXPECTED_RESPONSE); private static final ListenableFuture FAILED = - Futures.immediateFailedFuture(new IllegalArgumentException()); + Futures.immediateFailedFuture(new IllegalArgumentException("FAILED")); private static final TestEndpoint ENDPOINT = new TestEndpoint(); private static final Request REQUEST = Request.builder().build(); @Mock private Channel channel; - private RetryingChannel retryer; - - @Before - public void before() { - retryer = new RetryingChannel(channel, 3); - } - @Test public void testNoFailures() throws ExecutionException, InterruptedException { when(channel.execute(any(), any())).thenReturn(SUCCESS); + Channel retryer = new RetryingChannel(channel, 3); ListenableFuture response = retryer.execute(ENDPOINT, REQUEST); assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE); } @Test public void testRetriesUpToMaxRetries() throws ExecutionException, InterruptedException { + when(channel.execute(any(), any())).thenReturn(FAILED).thenReturn(SUCCESS); + + // One retry allows an initial request (not a retry) and a single retry. + Channel retryer = new RetryingChannel(channel, 1); + ListenableFuture response = retryer.execute(ENDPOINT, REQUEST); + assertThat(response).isDone(); + assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE); + } + + @Test + public void testRetriesUpToMaxRetriesAndFails() throws ExecutionException, InterruptedException { when(channel.execute(any(), any())) .thenReturn(FAILED) .thenReturn(FAILED) .thenReturn(SUCCESS); + // One retry allows an initial request (not a retry) and a single retry. + Channel retryer = new RetryingChannel(channel, 1); ListenableFuture response = retryer.execute(ENDPOINT, REQUEST); - assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE); + assertThatThrownBy(response::get) + .hasRootCauseExactlyInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("FAILED"); } @Test public void testRetriesMax() { when(channel.execute(any(), any())).thenReturn(FAILED); + Channel retryer = new RetryingChannel(channel, 3); ListenableFuture response = retryer.execute(ENDPOINT, REQUEST); assertThatThrownBy(response::get).hasCauseInstanceOf(IllegalArgumentException.class); - verify(channel, times(3)).execute(ENDPOINT, REQUEST); + verify(channel, times(4)).execute(ENDPOINT, REQUEST); } @Test @@ -98,11 +108,12 @@ public void retries_429s() { when(mockResponse.code()).thenReturn(429); when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse)); + Channel retryer = new RetryingChannel(channel, 3); ListenableFuture response = retryer.execute(ENDPOINT, REQUEST); assertThatThrownBy(response::get) .hasMessageContaining("Retries exhausted") .hasCauseInstanceOf(RuntimeException.class); - verify(channel, times(3)).execute(ENDPOINT, REQUEST); + verify(channel, times(4)).execute(ENDPOINT, REQUEST); } @Test @@ -111,11 +122,12 @@ public void retries_503s() { when(mockResponse.code()).thenReturn(503); when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse)); + Channel retryer = new RetryingChannel(channel, 3); ListenableFuture response = retryer.execute(ENDPOINT, REQUEST); assertThatThrownBy(response::get) .hasMessageContaining("Retries exhausted") .hasCauseInstanceOf(RuntimeException.class); - verify(channel, times(3)).execute(ENDPOINT, REQUEST); + verify(channel, times(4)).execute(ENDPOINT, REQUEST); } @Test @@ -129,6 +141,7 @@ public void response_bodies_are_closed() throws Exception { .thenReturn(Futures.immediateFuture(response2)) .thenReturn(Futures.immediateFuture(eventualSuccess)); + Channel retryer = new RetryingChannel(channel, 3); ListenableFuture response = retryer.execute(ENDPOINT, REQUEST); assertThat(response.get(1, TimeUnit.SECONDS).code()).isEqualTo(200); @@ -136,6 +149,16 @@ public void response_bodies_are_closed() throws Exception { verify(response2, times(1)).close(); } + @Test + public void testPropagatesCancel() { + ListenableFuture delegateResult = SettableFuture.create(); + when(channel.execute(any(), any())).thenReturn(delegateResult); + Channel retryer = new RetryingChannel(channel, 3); + ListenableFuture retryingResult = retryer.execute(ENDPOINT, REQUEST); + assertThat(retryingResult.cancel(true)).isTrue(); + assertThat(delegateResult).as("Failed to cancel the delegate future").isCancelled(); + } + private static Response mockResponse(int status) { Response response = mock(Response.class); when(response.code()).thenReturn(status); diff --git a/simulation/src/test/java/com/palantir/dialogue/core/Strategy.java b/simulation/src/test/java/com/palantir/dialogue/core/Strategy.java index 42cd718bd..9e7702ab5 100644 --- a/simulation/src/test/java/com/palantir/dialogue/core/Strategy.java +++ b/simulation/src/test/java/com/palantir/dialogue/core/Strategy.java @@ -56,7 +56,7 @@ private static Channel concurrencyLimiter(Simulation sim, Supplier