Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RetryingChannel retries 429/503s #350

Merged
merged 9 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-350.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: RetryingChannel retries 500/503s
links:
- https://github.com/palantir/dialogue/pull/350
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.function.Function;

/**
* Retries calls to the underlying channel upon failure.
* Immediately retries calls to the underlying channel upon failure.
*/
final class RetryingChannel implements Channel {
private static final int DEFAULT_MAX_RETRIES = 4;
Expand All @@ -54,34 +57,69 @@ final class RetryingChannel implements Channel {
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
SettableFuture<Response> future = SettableFuture.create();

Supplier<ListenableFuture<Response>> callSupplier = () -> delegate.execute(endpoint, request);
FutureCallback<Response> retryer = new RetryingCallback<>(callSupplier, future);
Futures.addCallback(callSupplier.get(), retryer, DIRECT_EXECUTOR);
Function<Integer, ListenableFuture<Response>> callSupplier = attempt -> {
// TODO(dfox): include retry number in the request somehow
return delegate.execute(endpoint, request);
};
FutureCallback<Response> retryer = new RetryingCallback(callSupplier, future);
Futures.addCallback(callSupplier.apply(0), retryer, DIRECT_EXECUTOR);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Futures.addCallback(callSupplier.apply(0), retryer, DIRECT_EXECUTOR);
return DialogueFutures.addDirectCallback(callSupplier.apply(0), retryer);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's OK with you I think I'd actually prefer to just stick with the vanilla guava - I find it kinda reassuring that there's no magic going on under the hood

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just seems like extra boilerplate 🤷‍♂ feels off to have a utility exactly for this and then not use it


return future;
}

private final class RetryingCallback<T> implements FutureCallback<T> {
private final class RetryingCallback implements FutureCallback<Response> {
private final AtomicInteger failures = new AtomicInteger(0);
private final Supplier<ListenableFuture<T>> runnable;
private final SettableFuture<T> delegate;
private final Function<Integer, ListenableFuture<Response>> runnable;
private final SettableFuture<Response> delegate;

private RetryingCallback(Supplier<ListenableFuture<T>> runnable, SettableFuture<T> delegate) {
private RetryingCallback(
Function<Integer, ListenableFuture<Response>> runnable, SettableFuture<Response> delegate) {
this.runnable = runnable;
this.delegate = delegate;
}

@Override
public void onSuccess(T result) {
delegate.set(result);
public void onSuccess(Response response) {
// this condition should really match the BlacklistingChannel so that we don't hit the same host twice in
// a row
if (response.code() == 503 || response.code() == 500) {
Copy link
Contributor Author

@iamdanfox iamdanfox Feb 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking a bit more carefully here, I think we should possibly just match c-j-r's old behaviour to minimize disruption on the rollout:

  • 1xx and 2xx are considered successful
  • QoShandler catches 308, 429, 503 which are eligible for retry.
  • everything else goes straight into the error pipe.

We can debate the 'retry 500s' thing separately.

closeBody(response);
retryOrFail(Optional.empty());
return;
}

boolean setSuccessfully = delegate.set(response);
if (!setSuccessfully) {
closeBody(response);
}
}

@Override
public void onFailure(Throwable throwable) {
if (failures.incrementAndGet() < maxRetries) {
Futures.addCallback(runnable.get(), this, DIRECT_EXECUTOR);
retryOrFail(Optional.of(throwable));
}

private void retryOrFail(Optional<Throwable> throwable) {
int attempt = failures.incrementAndGet();
if (attempt < maxRetries) {
Futures.addCallback(runnable.apply(attempt), this, DIRECT_EXECUTOR);
} else {
delegate.setException(throwable);
if (throwable.isPresent()) {
delegate.setException(throwable.get());
} else {
delegate.setException(new SafeRuntimeException("Retries exhausted"));
}
}
}

private void closeBody(Response response) {
if (response == null || response.body() == null) {
return;
}
try {
response.body().close();
} catch (IOException e) {
delegate.setException(new SafeRuntimeException("Failed to close response body", e));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -37,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -90,6 +92,57 @@ public void testRetriesMax() {
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
}

@Test
public void retries_500s() {
Response mockResponse = mock(Response.class);
when(mockResponse.code()).thenReturn(500);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasMessageContaining("Retries exhausted")
.hasCauseInstanceOf(RuntimeException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
}

@Test
public void retries_503s() {
Response mockResponse = mock(Response.class);
when(mockResponse.code()).thenReturn(503);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasMessageContaining("Retries exhausted")
.hasCauseInstanceOf(RuntimeException.class);
verify(channel, times(3)).execute(ENDPOINT, REQUEST);
}

@Test
public void response_bodies_are_closed() throws Exception {
Response response1 = mockResponse(500);
Response response2 = mockResponse(500);
Response eventualSuccess = mockResponse(200);

when(channel.execute(any(), any()))
.thenReturn(Futures.immediateFuture(response1))
.thenReturn(Futures.immediateFuture(response2))
.thenReturn(Futures.immediateFuture(eventualSuccess));

ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get(1, TimeUnit.SECONDS).code()).isEqualTo(200);

verify(response1.body(), times(1)).close();
verify(response2.body(), times(1)).close();
}

private static Response mockResponse(int status) {
Response response = mock(Response.class);
when(response.body()).thenReturn(mock(InputStream.class));
when(response.code()).thenReturn(status);
return response;
}

private static final class TestResponse implements Response {
@Override
public InputStream body() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,21 +470,23 @@ public static void afterClass() throws IOException {
})
.collect(Collectors.joining("", "```\n", "```\n"));

String rows = files.stream()
String images = files.stream()
.filter(p -> p.toString().endsWith("png"))
.map(p -> {
String githubUrl = "https://raw.githubusercontent.com/palantir/dialogue/"
+ "develop/simulation/src/test/resources/"
String githubLfsUrl = "https://media.githubusercontent.com/media/palantir/dialogue/develop/"
+ "simulation/src/test/resources/"
+ p.getFileName();
return String.format(
"<tr>"
"%n## %s%n"
+ "<table><tr><th>develop</th><th>current</th></tr>%n"
+ "<tr>"
+ "<td><image width=400 src=\"%s\" /></td>"
+ "<td>%s<br /><image width=400 src=\"%s\" /></td>"
+ "</tr>%n",
githubUrl, p.getFileName(), p.getFileName());
+ "<td><image width=400 src=\"%s\" /></td>"
+ "</tr>"
+ "</table>%n%n",
p.getFileName(), githubLfsUrl, p.getFileName());
})
.collect(Collectors.joining());
String images = "<table><tr><th>develop</th><th>current</th></tr>" + rows + "</table>";

String report = String.format(
"# Report%n<!-- Run SimulationTest to regenerate this report. -->%n%s%n%n%s%n", txtSection, images);
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=50.0% client_mean=PT0.6S server_cpu=PT2M received=200/200 codes={200=100, 500=100}
success=59.0% client_mean=PT1.446S server_cpu=PT4M49.2S received=200/200 codes={200=118, Retries exhausted=82}
4 changes: 2 additions & 2 deletions simulation/src/test/resources/all_nodes_500[ROUND_ROBIN].png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=50.0% client_mean=PT0.6S server_cpu=PT2M received=200/200 codes={200=100, 500=100}
success=59.0% client_mean=PT1.446S server_cpu=PT4M49.2S received=200/200 codes={200=118, Retries exhausted=82}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=76.7% client_mean=PT0.055281733S server_cpu=PT3M27.306499709S received=3750/3750 codes={200=2875, 500=875}
success=100.0% client_mean=PT0.078971555S server_cpu=PT4M56.143333328S received=3750/3750 codes={200=3750}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=76.7% client_mean=PT0.055281733S server_cpu=PT3M27.306499709S received=3750/3750 codes={200=2875, 500=875}
success=100.0% client_mean=PT0.078971555S server_cpu=PT4M56.143333328S received=3750/3750 codes={200=3750}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=58.3% client_mean=PT0.7228S server_cpu=PT4M49.12S received=400/400 codes={200=233, 500=167}
success=95.0% client_mean=PT1.5825125S server_cpu=PT9M1.09S received=400/400 codes={200=380, Retries exhausted=20}
4 changes: 2 additions & 2 deletions simulation/src/test/resources/live_reloading[ROUND_ROBIN].png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=58.3% client_mean=PT0.7228S server_cpu=PT4M49.12S received=400/400 codes={200=233, 500=167}
success=95.0% client_mean=PT1.373525S server_cpu=PT9M9.41S received=400/400 codes={200=380, Retries exhausted=20}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=67.6% client_mean=PT0.6S server_cpu=PT5M6S received=510/510 codes={200=345, 500=165}
success=96.7% client_mean=PT0.945882352S server_cpu=PT8M2.4S received=510/510 codes={200=493, Retries exhausted=17}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=67.6% client_mean=PT0.6S server_cpu=PT5M6S received=510/510 codes={200=345, 500=165}
success=96.7% client_mean=PT0.945882352S server_cpu=PT8M2.4S received=510/510 codes={200=493, Retries exhausted=17}
Loading