Skip to content

Commit

Permalink
Implement missing retry configuration options (#440)
Browse files Browse the repository at this point in the history
Implement missing retry configuration options, RetryOnTimeout and RetryOnSocketException
carterkozak authored Feb 26, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 6787805 commit d3e07f4
Showing 5 changed files with 155 additions and 15 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-440.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: fix
fix:
description: Implement missing RetryOnTimeout configuration option
links:
- https://github.com/palantir/dialogue/pull/440
Original file line number Diff line number Diff line change
@@ -36,6 +36,9 @@ private Channels() {}
public static Channel create(Collection<? extends Channel> channels, ClientConfiguration config) {
Preconditions.checkArgument(!channels.isEmpty(), "channels must not be empty");
Preconditions.checkArgument(config.userAgent().isPresent(), "config.userAgent() must be specified");
Preconditions.checkArgument(
config.retryOnSocketException() == ClientConfiguration.RetryOnSocketException.ENABLED,
"Retries on socket exceptions cannot be disabled without disabling retries entirely.");

DialogueClientMetrics clientMetrics =
DialogueClientMetrics.of(new VersionedTaggedMetricRegistry(config.taggedMetricRegistry()));
@@ -54,8 +57,12 @@ public static Channel create(Collection<? extends Channel> channels, ClientConfi
Channel channel = new LimitedChannelToChannelAdapter(limited);
channel = new TracedChannel(channel, "Dialogue-request-attempt");
if (config.maxNumRetries() > 0) {
channel =
new RetryingChannel(channel, config.maxNumRetries(), config.backoffSlotSize(), config.serverQoS());
channel = new RetryingChannel(
channel,
config.maxNumRetries(),
config.backoffSlotSize(),
config.serverQoS(),
config.retryOnTimeout());
}
channel = new UserAgentChannel(channel, config.userAgent().get());
channel = new DeprecationWarningChannel(channel, clientMetrics);
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.tracing.DetachedSpan;
import com.palantir.tracing.Tracers;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
@@ -67,12 +68,17 @@ final class RetryingChannel implements Channel {
private final Channel delegate;
private final int maxRetries;
private final ClientConfiguration.ServerQoS serverQoS;
private final ClientConfiguration.RetryOnTimeout retryOnTimeout;
private final Duration backoffSlotSize;
private final DoubleSupplier jitter;

RetryingChannel(
Channel delegate, int maxRetries, Duration backoffSlotSize, ClientConfiguration.ServerQoS serverQoS) {
this(delegate, maxRetries, backoffSlotSize, serverQoS, sharedScheduler.get(), () ->
Channel delegate,
int maxRetries,
Duration backoffSlotSize,
ClientConfiguration.ServerQoS serverQoS,
ClientConfiguration.RetryOnTimeout retryOnTimeout) {
this(delegate, maxRetries, backoffSlotSize, serverQoS, retryOnTimeout, sharedScheduler.get(), () ->
ThreadLocalRandom.current().nextDouble());
}

@@ -81,12 +87,14 @@ final class RetryingChannel implements Channel {
int maxRetries,
Duration backoffSlotSize,
ClientConfiguration.ServerQoS serverQoS,
ClientConfiguration.RetryOnTimeout retryOnTimeout,
ListeningScheduledExecutorService scheduler,
DoubleSupplier jitter) {
this.delegate = delegate;
this.maxRetries = maxRetries;
this.backoffSlotSize = backoffSlotSize;
this.serverQoS = serverQoS;
this.retryOnTimeout = retryOnTimeout;
this.scheduler = scheduler;
this.jitter = jitter;
}
@@ -171,11 +179,32 @@ ListenableFuture<Response> success(Response response) {

ListenableFuture<Response> failure(Throwable throwable) {
if (++failures <= maxRetries) {
return retry(throwable);
if (shouldAttemptToRetry(throwable)) {
return retry(throwable);
} else if (log.isDebugEnabled()) {
log.debug(
"Not attempting to retry failure",
SafeArg.of("serviceName", endpoint.serviceName()),
SafeArg.of("endpoint", endpoint.endpointName()),
throwable);
}
}
return Futures.immediateFailedFuture(throwable);
}

private boolean shouldAttemptToRetry(Throwable throwable) {
if (retryOnTimeout == ClientConfiguration.RetryOnTimeout.DISABLED) {
if (throwable instanceof SocketTimeoutException) {
// non-connect timeouts should not be retried
SocketTimeoutException socketTimeout = (SocketTimeoutException) throwable;
return socketTimeout.getMessage() != null
// String matches CJR RemotingOkHttpCall.shouldRetry
&& socketTimeout.getMessage().contains("connect timed out");
}
}
return true;
}

private void logRetry(long backoffNanoseconds, Throwable throwable) {
if (log.isInfoEnabled()) {
log.info(
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
import com.palantir.dialogue.UrlBuilder;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -63,7 +64,12 @@ public class RetryingChannelTest {
public void testNoFailures() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any())).thenReturn(SUCCESS);

Channel retryer = new RetryingChannel(channel, 3, Duration.ZERO, ClientConfiguration.ServerQoS.AUTOMATIC_RETRY);
Channel retryer = new RetryingChannel(
channel,
3,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}
@@ -73,7 +79,12 @@ public void testRetriesUpToMaxRetries() throws ExecutionException, InterruptedEx
when(channel.execute(any(), any())).thenReturn(FAILED).thenReturn(SUCCESS);

// One retry allows an initial request (not a retry) and a single retry.
Channel retryer = new RetryingChannel(channel, 1, Duration.ZERO, ClientConfiguration.ServerQoS.AUTOMATIC_RETRY);
Channel retryer = new RetryingChannel(
channel,
1,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response).isDone();
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
@@ -87,7 +98,12 @@ public void testRetriesUpToMaxRetriesAndFails() throws ExecutionException, Inter
.thenReturn(SUCCESS);

// One retry allows an initial request (not a retry) and a single retry.
Channel retryer = new RetryingChannel(channel, 1, Duration.ZERO, ClientConfiguration.ServerQoS.AUTOMATIC_RETRY);
Channel retryer = new RetryingChannel(
channel,
1,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get)
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class)
@@ -98,7 +114,12 @@ public void testRetriesUpToMaxRetriesAndFails() throws ExecutionException, Inter
public void testRetriesMax() {
when(channel.execute(any(), any())).thenReturn(FAILED);

Channel retryer = new RetryingChannel(channel, 3, Duration.ZERO, ClientConfiguration.ServerQoS.AUTOMATIC_RETRY);
Channel retryer = new RetryingChannel(
channel,
3,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get).hasCauseInstanceOf(IllegalArgumentException.class);
verify(channel, times(4)).execute(ENDPOINT, REQUEST);
@@ -110,7 +131,12 @@ public void retries_429s() throws Exception {
when(mockResponse.code()).thenReturn(429);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(channel, 3, Duration.ZERO, ClientConfiguration.ServerQoS.AUTOMATIC_RETRY);
Channel retryer = new RetryingChannel(
channel,
3,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response).isDone();
assertThat(response.get())
@@ -125,7 +151,12 @@ public void retries_503s() throws Exception {
when(mockResponse.code()).thenReturn(503);
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(channel, 3, Duration.ZERO, ClientConfiguration.ServerQoS.AUTOMATIC_RETRY);
Channel retryer = new RetryingChannel(
channel,
3,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response).isDone();
assertThat(response.get())
@@ -141,7 +172,11 @@ public void retries_429s_when_requested() throws Exception {
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(
channel, 3, Duration.ZERO, ClientConfiguration.ServerQoS.PROPAGATE_429_and_503_TO_CALLER);
channel,
3,
Duration.ZERO,
ClientConfiguration.ServerQoS.PROPAGATE_429_and_503_TO_CALLER,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response).isDone();
assertThat(response.get().code()).isEqualTo(429);
@@ -155,7 +190,11 @@ public void returns_503s_when_requested() throws Exception {
when(channel.execute(any(), any())).thenReturn(Futures.immediateFuture(mockResponse));

Channel retryer = new RetryingChannel(
channel, 3, Duration.ZERO, ClientConfiguration.ServerQoS.PROPAGATE_429_and_503_TO_CALLER);
channel,
3,
Duration.ZERO,
ClientConfiguration.ServerQoS.PROPAGATE_429_and_503_TO_CALLER,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response).isDone();
assertThat(response.get().code()).isEqualTo(503);
@@ -173,7 +212,12 @@ public void response_bodies_are_closed() throws Exception {
.thenReturn(Futures.immediateFuture(response2))
.thenReturn(Futures.immediateFuture(eventualSuccess));

Channel retryer = new RetryingChannel(channel, 3, Duration.ZERO, ClientConfiguration.ServerQoS.AUTOMATIC_RETRY);
Channel retryer = new RetryingChannel(
channel,
3,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get(1, TimeUnit.SECONDS).code()).isEqualTo(200);

@@ -185,12 +229,66 @@ public void response_bodies_are_closed() throws Exception {
public void testPropagatesCancel() {
ListenableFuture<Response> delegateResult = SettableFuture.create();
when(channel.execute(any(), any())).thenReturn(delegateResult);
Channel retryer = new RetryingChannel(channel, 3, Duration.ZERO, ClientConfiguration.ServerQoS.AUTOMATIC_RETRY);
Channel retryer = new RetryingChannel(
channel,
3,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> retryingResult = retryer.execute(ENDPOINT, REQUEST);
assertThat(retryingResult.cancel(true)).isTrue();
assertThat(delegateResult).as("Failed to cancel the delegate future").isCancelled();
}

@Test
public void doesNotRetrySocketTimeout() {
when(channel.execute(any(), any()))
.thenReturn(Futures.immediateFailedFuture(new SocketTimeoutException()))
.thenReturn(SUCCESS);

Channel retryer = new RetryingChannel(
channel,
1,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThatThrownBy(response::get).hasRootCauseExactlyInstanceOf(SocketTimeoutException.class);
}

@Test
public void retriesSocketTimeoutWhenRequested() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any()))
.thenReturn(Futures.immediateFailedFuture(new SocketTimeoutException()))
.thenReturn(SUCCESS);

Channel retryer = new RetryingChannel(
channel,
1,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DANGEROUS_ENABLE_AT_RISK_OF_RETRY_STORMS);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

@Test
public void retriesSocketTimeout_connectionTimeout() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any()))
// Magic string allows us to retry on RetryOnTimeout.DISABLED
.thenReturn(Futures.immediateFailedFuture(new SocketTimeoutException("connect timed out")))
.thenReturn(SUCCESS);

Channel retryer = new RetryingChannel(
channel,
1,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(ENDPOINT, REQUEST);
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

private static Response mockResponse(int status) {
Response response = mock(Response.class);
when(response.code()).thenReturn(status);
Original file line number Diff line number Diff line change
@@ -119,6 +119,7 @@ private static Channel retryingChannel(Simulation sim, LimitedChannel limited) {
4 /* ClientConfigurations.DEFAULT_MAX_NUM_RETRIES */,
Duration.ofMillis(250) /* ClientConfigurations.DEFAULT_BACKOFF_SLOT_SIZE */,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED,
sim.scheduler(),
new Random(8 /* Guaranteed lucky */)::nextDouble);
}

0 comments on commit d3e07f4

Please sign in to comment.