Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deserialization of structured errors after retries are exhausted #630

Merged
merged 6 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-630.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: fix
fix:
description: Structured errors are now correctly deserialized when requests exhaust
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what it's worth I think this was only a bug for 503 and 429 responses where our infrastructure sends empty response bodies.

all available retries. (Previously they would always appear as an `UnknownRemoteException`.)
links:
- https://github.com/palantir/dialogue/pull/630
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.palantir.verification;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
Expand All @@ -27,6 +28,7 @@
import com.palantir.conjure.java.api.config.service.ServiceConfiguration;
import com.palantir.conjure.java.api.config.service.UserAgent;
import com.palantir.conjure.java.api.config.ssl.SslConfiguration;
import com.palantir.conjure.java.api.errors.RemoteException;
import com.palantir.conjure.java.client.config.ClientConfiguration;
import com.palantir.conjure.java.client.config.ClientConfigurations;
import com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime;
Expand All @@ -50,6 +52,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -112,6 +115,31 @@ public void conjure_generated_blocking_interface_with_optional_binary_return_typ
assertThat(maybeBinary.get()).hasSameContentAs(asInputStream("Hello, world"));
}

@Test
public void deserializes_a_conjure_error_after_exhausting_retries() {
AtomicInteger calls = new AtomicInteger(0);
undertowHandler = exchange -> {
exchange.setStatusCode(429);
exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
exchange.getResponseSender()
.send("{"
+ "\"errorCode\":\"FAILED_PRECONDITION\","
+ "\"errorName\":\"Default:FailedPrecondition\","
+ "\"errorInstanceId\":\"43580df1-e019-473b-bb3d-be6d489f36e5\","
+ "\"parameters\":{\"numCalls\":\"" + calls.getAndIncrement() + "\"}"
+ "}\n");
};

assertThatThrownBy(sampleServiceBlocking()::voidToVoid)
.isInstanceOf(RemoteException.class)
.satisfies(throwable -> {
assertThat(((RemoteException) throwable).getError().parameters())
.containsEntry("numCalls", "4");
});

assertThat(calls).describedAs("one initial call + 4 retries").hasValue(5);
}

@Test
public void stream_3_gigabytes() throws IOException {
long oneMegabyte = 1000_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Immediately retries calls to the underlying channel upon failure.
*/
/** Retries failed requests by scheduling them onto a ScheduledExecutorService after an exponential backoff. */
final class RetryingChannel implements Channel {

private static final Logger log = LoggerFactory.getLogger(RetryingChannel.class);
Expand Down Expand Up @@ -127,7 +126,7 @@ public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return delegate.execute(endpoint, request);
}

private boolean isRetryable(Request request) {
private static boolean isRetryable(Request request) {
Optional<RequestBody> maybeBody = request.body();
return !maybeBody.isPresent() || maybeBody.get().repeatable();
}
Expand Down Expand Up @@ -156,9 +155,9 @@ ListenableFuture<Response> execute() {
}

@SuppressWarnings("FutureReturnValueIgnored") // error-prone bug
ListenableFuture<Response> retry(Throwable cause) {
ListenableFuture<Response> scheduleRetry(@Nullable Throwable throwableToLog) {
long backoffNanoseconds = getBackoffNanoseconds();
logRetry(backoffNanoseconds, cause);
logRetry(backoffNanoseconds, throwableToLog);
if (backoffNanoseconds <= 0) {
return wrap(delegate.execute(endpoint, request));
}
Expand All @@ -181,22 +180,23 @@ private long getBackoffNanoseconds() {
return Math.round(backoffSlotSize.toNanos() * jitter.getAsDouble() * upperBound);
}

ListenableFuture<Response> success(Response response) {
// this condition should really match the BlacklistingChannel so that we don't hit the same host twice in
// a row
ListenableFuture<Response> handleHttpResponse(Response response) {
if (Responses.isQosStatus(response)) {
response.close();
Throwable failure =
new SafeRuntimeException("Received retryable response", SafeArg.of("status", response.code()));
if (++failures <= maxRetries) {
return retry(failure);
response.close(); // nobody is going to read this response body
Throwable throwableToLog = log.isInfoEnabled()
? new SafeRuntimeException(
"Received retryable response", SafeArg.of("status", response.code()))
: null;
return scheduleRetry(throwableToLog);
}
if (log.isDebugEnabled()) {
log.debug(
"Exhausted {} retries, returning a retryable response with status {}",
SafeArg.of("retries", maxRetries),
SafeArg.of("status", response.code()));
}
// not closing the final response body because ConjureBodySerde needs to read it to deserialize
return Futures.immediateFuture(response);
}

Expand All @@ -205,10 +205,10 @@ ListenableFuture<Response> success(Response response) {
return Futures.immediateFuture(response);
}

ListenableFuture<Response> failure(Throwable throwable) {
ListenableFuture<Response> handleThrowable(Throwable throwable) {
if (++failures <= maxRetries) {
if (shouldAttemptToRetry(throwable)) {
return retry(throwable);
return scheduleRetry(throwable);
} else if (log.isDebugEnabled()) {
log.debug(
"Not attempting to retry failure",
Expand All @@ -234,7 +234,7 @@ private boolean shouldAttemptToRetry(Throwable throwable) {
return true;
}

private void logRetry(long backoffNanoseconds, Throwable throwable) {
private void logRetry(long backoffNanoseconds, @Nullable Throwable throwable) {
if (log.isInfoEnabled()) {
log.info(
"Retrying call after failure",
Expand All @@ -251,9 +251,10 @@ private void logRetry(long backoffNanoseconds, Throwable throwable) {
private ListenableFuture<Response> wrap(ListenableFuture<Response> input) {
ListenableFuture<Response> result = input;
if (!shouldPropagateQos(serverQoS)) {
result = Futures.transformAsync(result, this::success, MoreExecutors.directExecutor());
result = Futures.transformAsync(result, this::handleHttpResponse, MoreExecutors.directExecutor());
}
result = Futures.catchingAsync(result, Throwable.class, this::failure, MoreExecutors.directExecutor());
result = Futures.catchingAsync(
result, Throwable.class, this::handleThrowable, MoreExecutors.directExecutor());
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,35 @@ public void response_bodies_are_closed() throws Exception {
verify(response2, times(1)).close();
}

@Test
public void final_exhausted_failure_response_body_is_not_closed() throws Exception {
TestResponse response1 = new TestResponse().code(503);
TestResponse response2 = new TestResponse().code(503);
TestResponse response3 = new TestResponse().code(503);

when(channel.execute(any(), any()))
.thenReturn(Futures.immediateFuture(response1))
.thenReturn(Futures.immediateFuture(response2))
.thenReturn(Futures.immediateFuture(response3));

Channel retryer = new RetryingChannel(
channel,
"my-channel",
2,
Duration.ZERO,
ClientConfiguration.ServerQoS.AUTOMATIC_RETRY,
ClientConfiguration.RetryOnTimeout.DISABLED);
ListenableFuture<Response> response = retryer.execute(TestEndpoint.POST, REQUEST);
assertThat(response.get(1, TimeUnit.SECONDS).code()).isEqualTo(503);

assertThat(response1.isClosed()).isTrue();
assertThat(response2.isClosed()).isTrue();
assertThat(response3.isClosed())
.describedAs("The last response must be left open so we can read the body"
+ " and deserialize it into a structured error")
.isFalse();
}

@Test
public void testPropagatesCancel() {
ListenableFuture<Response> delegateResult = SettableFuture.create();
Expand Down