Skip to content

Commit

Permalink
RetryingChannel retries IOExceptions rather than all Throwables (#644)
Browse files Browse the repository at this point in the history
RetryingChannel retries IOExceptions rather than all Throwables
  • Loading branch information
carterkozak authored Apr 14, 2020
1 parent a15fc57 commit 50da0c6
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 17 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-644.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: RetryingChannel retries IOExceptions rather than all Throwables
links:
- https://github.com/palantir/dialogue/pull/644
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -138,10 +139,24 @@ public void stream_3_gigabytes() throws IOException {
System.out.printf("%d MB took %d millis%n", megabytes, sw.elapsed(TimeUnit.MILLISECONDS));
}

private void set204Response() {
@Test
public void testClosedConnectionIsRetried() {
AtomicInteger requests = new AtomicInteger();
undertowHandler = exchange -> {
exchange.setStatusCode(204);
if (requests.getAndIncrement() == 0) {
exchange.getConnection().close();
} else {
exchange.setStatusCode(204);
}
};
AliasOfOptional myAlias = sampleServiceBlocking().getMyAlias();
Optional<String> maybeString = myAlias.get();
assertThat(maybeString).isNotPresent();
assertThat(requests).hasValue(2);
}

private void set204Response() {
undertowHandler = exchange -> exchange.setStatusCode(204);
}

private void setBinaryGzipResponse(String stringToCompress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ private DialogueChannel(
ClientConfiguration clientConfiguration,
ChannelFactory channelFactory,
Random random,
Supplier<ScheduledExecutorService> scheduler) {
Supplier<ScheduledExecutorService> scheduler,
int maxQueueSize) {
this.channelName = channelName;
this.clientConfiguration = clientConfiguration;
this.channelFactory = channelFactory;
clientMetrics = DialogueClientMetrics.of(clientConfiguration.taggedMetricRegistry());
this.random = random;
this.queuedChannel =
new QueuedChannel(new SupplierChannel(nodeSelectionStrategy::get), channelName, clientMetrics);
this.queuedChannel = new QueuedChannel(
new SupplierChannel(nodeSelectionStrategy::get), channelName, clientMetrics, maxQueueSize);
updateUris(clientConfiguration.uris());
this.delegate = wrap(queuedChannel, channelName, clientConfiguration, scheduler, random, clientMetrics);
}
Expand Down Expand Up @@ -258,6 +259,8 @@ public static final class Builder {
@Nullable
private ChannelFactory channelFactory;

private int maxQueueSize = 100_000;

public Builder channelName(String value) {
this.channelName = value;
return this;
Expand Down Expand Up @@ -285,6 +288,13 @@ Builder scheduler(ScheduledExecutorService value) {
return this;
}

@VisibleForTesting
Builder maxQueueSize(int value) {
Preconditions.checkArgument(value > 0, "maxQueueSize must be positive");
this.maxQueueSize = value;
return this;
}

@CheckReturnValue
public DialogueChannel build() {
ClientConfiguration conf = Preconditions.checkNotNull(config, "clientConfiguration is required");
Expand All @@ -295,7 +305,7 @@ public DialogueChannel build() {
.from(conf)
.taggedMetricRegistry(new VersionedTaggedMetricRegistry(conf.taggedMetricRegistry()))
.build();
return new DialogueChannel(name, cleanedConf, factory, random, scheduler);
return new DialogueChannel(name, cleanedConf, factory, random, scheduler, maxQueueSize);
}

private void preconditions(ClientConfiguration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ final class QueuedChannel implements Channel {
private final Timer queuedTime;
private final Supplier<ListenableFuture<Response>> limitedResultSupplier;

QueuedChannel(LimitedChannel channel, String channelName, DialogueClientMetrics metrics) {
this(channel, channelName, metrics, 100_000);
}

@VisibleForTesting
QueuedChannel(LimitedChannel delegate, String channelName, DialogueClientMetrics metrics, int maxQueueSize) {
this.delegate = new NeverThrowLimitedChannel(delegate);
this.channelName = channelName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.palantir.tritium.metrics.MetricRegistries;
import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry;
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.Optional;
Expand Down Expand Up @@ -264,7 +265,9 @@ private boolean shouldAttemptToRetry(Throwable throwable) {
&& socketTimeout.getMessage().contains("connect timed out");
}
}
return true;
// Only retry IOExceptions. Other failures, particularly RuntimeException and Error are not
// meant to be recovered from.
return throwable instanceof IOException;
}

private void logRetry(long backoffNanoseconds, @Nullable Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.TestEndpoint;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.tracing.TestTracing;
import java.nio.file.Paths;
import java.util.Random;
Expand Down Expand Up @@ -166,6 +167,33 @@ void live_reloading_an_extra_uri_allows_queued_requests_to_make_progress() {
.execute(any(), any());
}

@Test
void test_queue_rejection_is_not_retried() {
when(delegate.execute(any(), any())).thenReturn(SettableFuture.create());
channel = DialogueChannel.builder()
.channelName("my-channel")
.clientConfiguration(stubConfig)
.channelFactory(uri -> delegate)
.random(new Random(123456L))
.maxQueueSize(1)
.build();
// Saturate the concurrency limiter
int initialConcurrencyLimit = 20;
for (int i = 0; i < initialConcurrencyLimit; i++) {
ListenableFuture<Response> running = channel.execute(endpoint, request);
assertThat(running).isNotDone();
}
// Queue a request
ListenableFuture<Response> queued = channel.execute(endpoint, request);
assertThat(queued).isNotDone();
// Next request should be rejected.
ListenableFuture<Response> rejected = channel.execute(endpoint, request);
assertThat(rejected).isDone();
assertThatThrownBy(rejected::get)
.hasRootCauseExactlyInstanceOf(SafeRuntimeException.class)
.hasMessageContaining("queue is full");
}

@Test
@TestTracing(snapshot = true)
public void traces_on_retries() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public class QueuedChannelTest {

@BeforeEach
public void before() {
queuedChannel =
new QueuedChannel(delegate, "my-channel", DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()));
queuedChannel = new QueuedChannel(
delegate, "my-channel", DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()), 100_000);
futureResponse = SettableFuture.create();
maybeResponse = Optional.of(futureResponse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.palantir.dialogue.Response;
import com.palantir.dialogue.TestEndpoint;
import com.palantir.dialogue.TestResponse;
import com.palantir.logsafe.exceptions.SafeIoException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.time.Duration;
Expand All @@ -49,7 +51,7 @@ public class RetryingChannelTest {
private static final TestResponse EXPECTED_RESPONSE = new TestResponse();
private static final ListenableFuture<Response> SUCCESS = Futures.immediateFuture(EXPECTED_RESPONSE);
private static final ListenableFuture<Response> FAILED =
Futures.immediateFailedFuture(new IllegalArgumentException("FAILED"));
Futures.immediateFailedFuture(new SafeIoException("FAILED"));
private static final Request REQUEST = Request.builder().build();

@Mock
Expand Down Expand Up @@ -104,7 +106,7 @@ public void testRetriesUpToMaxRetriesAndFails() throws ExecutionException, Inter
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(TestEndpoint.POST, REQUEST);
assertThatThrownBy(response::get)
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class)
.hasRootCauseExactlyInstanceOf(SafeIoException.class)
.hasRootCauseMessage("FAILED");
}

Expand All @@ -120,7 +122,7 @@ public void testRetriesMax() {
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(TestEndpoint.POST, REQUEST);
assertThatThrownBy(response::get).hasCauseInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(response::get).hasCauseInstanceOf(SafeIoException.class);
verify(channel, times(4)).execute(TestEndpoint.POST, REQUEST);
}

Expand Down Expand Up @@ -344,6 +346,25 @@ public void retriesSocketTimeoutWhenRequested() throws ExecutionException, Inter
assertThat(response.get()).isEqualTo(EXPECTED_RESPONSE);
}

@Test
public void doesNotRetryRuntimeException() {
when(channel.execute(any(), any()))
.thenReturn(Futures.immediateFailedFuture(new SafeRuntimeException("bug")))
.thenReturn(SUCCESS);

Channel retryer = new RetryingChannel(
channel,
"my-channel",
1,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(TestEndpoint.POST, REQUEST);
assertThatThrownBy(response::get)
.hasRootCauseExactlyInstanceOf(SafeRuntimeException.class)
.hasRootCauseMessage("bug");
}

@Test
public void retriesSocketTimeout_connectionTimeout() throws ExecutionException, InterruptedException {
when(channel.execute(any(), any()))
Expand Down

0 comments on commit 50da0c6

Please sign in to comment.