Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for call timeouts #70

Merged
merged 7 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,23 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

// CHECKSTYLE:ON

public abstract class AbstractSampleServiceClientTest {

abstract SampleService createBlockingClient(URL baseUrl);
abstract AsyncSampleService createAsyncClient(URL baseUrl);
abstract SampleService createBlockingClient(URL baseUrl, Duration timeout);
abstract AsyncSampleService createAsyncClient(URL baseUrl, Duration timeout);

static final SslConfiguration SSL_CONFIG = SslConfiguration.of(
Paths.get("src/test/resources/trustStore.jks"),
Expand Down Expand Up @@ -93,8 +94,8 @@ public abstract class AbstractSampleServiceClientTest {
@Before
public void before() {
server.useHttps(SslSocketFactories.createSslSocketFactory(SSL_CONFIG), false);
blockingClient = createBlockingClient(server.url("").url());
asyncClient = createAsyncClient(server.url("").url());
blockingClient = createBlockingClient(server.url("").url(), Duration.ofSeconds(1));
asyncClient = createAsyncClient(server.url("").url(), Duration.ofSeconds(1));
}

@Test
Expand Down Expand Up @@ -173,7 +174,7 @@ public void testAsync_voidToVoid_throwsWhenResponseBodyIsNonEmpty() throws Excep
.hasMessageContaining("Expected empty response body");
}

@Test
@Test(timeout = 2_000)
public void testBlocking_throwsOnConnectError() throws Exception {
server.shutdown();
assertThatThrownBy(() -> blockingClient.stringToString("", "", ""))
Expand All @@ -182,19 +183,21 @@ public void testBlocking_throwsOnConnectError() throws Exception {
.hasMessageMatching(".*((Connection refused)|(Failed to connect)).*");
}

@Ignore("TODO(rfink): Figure out how to inject read/write timeouts")
@Test(timeout = 2_000)
@Test(timeout = 2_000) // see client construction: we set a 1s timeout
public void testBlocking_throwsOnTimeout() throws Exception {
server.enqueue(new MockResponse()
.setBody("\"response\"")
.addHeader(Headers.CONTENT_TYPE, "application/json")
.setBodyDelay(10, TimeUnit.SECONDS));
assertThatThrownBy(() -> blockingClient.stringToString("", "", ""))
assertThatThrownBy(() -> blockingClient.stringToString("foo", "bar", "baz"))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to deserialize response");
.hasCauseInstanceOf(TimeoutException.class)
.hasMessageContaining("Waited 1000 milliseconds");

// TODO(rfink): It seems that the OkHttp version of this tests leaves the connection open after the timeout.
}

@Test
@Test(timeout = 2_000)
public void testAsync_throwsOnConnectError() throws Exception {
server.shutdown();
assertThatThrownBy(() -> asyncClient.voidToVoid().get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import com.palantir.dialogue.HttpMethod;
import com.palantir.dialogue.PathTemplate;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.Serializer;
import com.palantir.dialogue.TypeMarker;
import com.palantir.dialogue.UrlBuilder;
import com.palantir.logsafe.Preconditions;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;

// Example of the implementation code conjure would generate for a simple SampleService.
public final class SampleServiceClient {
Expand Down Expand Up @@ -77,7 +77,7 @@ public HttpMethod httpMethod() {
};

/** Returns a new blocking {@link SampleService} implementation whose calls are executed on the given channel. */
public static SampleService blocking(Channel channel, ConjureRuntime runtime) {
public static SampleService blocking(Channel channel, ConjureRuntime runtime, Duration readTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if it'd make more sense for this to be a builder-pattern thing or possibly for the third arg to be some kind of settings object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a TODO, think that'd be a separate PR.

return new SampleService() {

private Serializer<String> stringToStringSerializer =
Expand All @@ -98,10 +98,12 @@ public String stringToString(String objectId, String header, String body) {
.build();

Call call = channel.createCall(STRING_TO_STRING, request);
ListenableFuture<Response> response = Calls.toFuture(call);
ListenableFuture<String> response = Futures.transform(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on composing the timeouts as a Channel implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. it wouldn't be so straightforward, because then the async and the blocking client would need different types of channels. (the async client as it stands doesn't do timeouts as callers are in control.) I think I'd revisit that problem when exposing clients that interact with an Observer, because we'd then have to think about how to expose the timeout to those observers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(sorry, missed this comment before the last one I posted)

It seems like async clients should still enforce timeouts on the requests/time-to-response, in which case it'd be the same Channel impl?

Calls.toFuture(call),
r -> stringToStringDeserializer.deserialize(r),
MoreExecutors.directExecutor());
try {
// TODO(rfink): Figure out how to inject read/write timeouts
return stringToStringDeserializer.deserialize(response.get());
return response.get(readTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Throwable t) {
throw Exceptions.unwrapExecutionException(t);
}
Expand All @@ -112,9 +114,12 @@ public void voidToVoid() {
Request request = Request.builder().build();

Call call = channel.createCall(VOID_TO_VOID, request);
ListenableFuture<Response> response = Calls.toFuture(call);
ListenableFuture<Void> deserializedResponse = Futures.transform(
Calls.toFuture(call),
r -> voidToVoidDeserializer.deserialize(r),
MoreExecutors.directExecutor());
try {
voidToVoidDeserializer.deserialize(response.get());
deserializedResponse.get(readTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Throwable t) {
throw Exceptions.unwrapExecutionException(t);
}
Expand Down Expand Up @@ -149,14 +154,7 @@ public ListenableFuture<String> stringToString(String objectId, String header, S
Call call = channel.createCall(STRING_TO_STRING, request);
return Futures.transform(
Calls.toFuture(call),
response -> {
try {
// TODO(rfink): The try/catch is a bit odd here.
return stringToStringDeserializer.deserialize(response);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize response", e);
}
},
response -> stringToStringDeserializer.deserialize(response),
MoreExecutors.directExecutor());
}

Expand All @@ -167,14 +165,7 @@ public ListenableFuture<Void> voidToVoid() {
Call call = channel.createCall(VOID_TO_VOID, request);
return Futures.transform(
Calls.toFuture(call),
response -> {
try {
voidToVoidDeserializer.deserialize(response);
return null;
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize response", e);
}
},
response -> voidToVoidDeserializer.deserialize(response),
MoreExecutors.directExecutor());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.dialogue;

import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.conjure.java.config.ssl.SslSocketFactories;
import com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime;
import com.palantir.conjure.java.dialogue.serde.DefaultErrorDecoder;
Expand All @@ -26,6 +25,7 @@
import java.net.URL;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.Executors;
import javax.net.ssl.SSLParameters;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
Expand All @@ -36,14 +36,14 @@ public final class HttpSampleServiceClientTest extends AbstractSampleServiceClie
private static final ConjureRuntime runtime = DefaultConjureRuntime.builder().build();

@Override
SampleService createBlockingClient(URL baseUrl) {
Channel channel = createChannel(baseUrl, Duration.ofSeconds(1));
return SampleServiceClient.blocking(channel, runtime);
SampleService createBlockingClient(URL baseUrl, Duration timeout) {
Channel channel = createChannel(baseUrl, timeout);
return SampleServiceClient.blocking(channel, runtime, timeout);
}

@Override
AsyncSampleService createAsyncClient(URL baseUrl) {
Channel channel = createChannel(baseUrl, Duration.ofSeconds(1));
AsyncSampleService createAsyncClient(URL baseUrl, Duration timeout) {
Channel channel = createChannel(baseUrl, timeout);
return SampleServiceClient.async(channel, runtime);
}

Expand All @@ -57,7 +57,7 @@ private HttpChannel createChannel(URL url, Duration timeout) {
.sslParameters(sslConfig)
.sslContext(SslSocketFactories.createSslContext(SSL_CONFIG))
.build(),
MoreExecutors.newDirectExecutorService(),
Executors.newSingleThreadExecutor(),
url,
DefaultErrorDecoder.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.palantir.dialogue;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.conjure.java.config.ssl.SslSocketFactories;
import com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime;
import com.palantir.conjure.java.dialogue.serde.DefaultErrorDecoder;
Expand All @@ -26,6 +25,7 @@
import com.palantir.dialogue.example.SampleServiceClient;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
Expand All @@ -39,14 +39,14 @@ public final class OkHttpSampleServiceClientTest extends AbstractSampleServiceCl
private static final ConjureRuntime runtime = DefaultConjureRuntime.builder().build();

@Override
SampleService createBlockingClient(URL baseUrl) {
Channel channel = createChannel(baseUrl, Duration.ofSeconds(1));
return SampleServiceClient.blocking(channel, runtime);
SampleService createBlockingClient(URL baseUrl, Duration timeout) {
Channel channel = createChannel(baseUrl, timeout);
return SampleServiceClient.blocking(channel, runtime, timeout);
}

@Override
AsyncSampleService createAsyncClient(URL baseUrl) {
Channel channel = createChannel(baseUrl, Duration.ofSeconds(1));
AsyncSampleService createAsyncClient(URL baseUrl, Duration timeout) {
Channel channel = createChannel(baseUrl, timeout);
return SampleServiceClient.async(channel, runtime);
}

Expand All @@ -55,10 +55,8 @@ private OkHttpChannel createChannel(URL url, Duration timeout) {
new OkHttpClient.Builder()
.protocols(ImmutableList.of(Protocol.HTTP_1_1))
// Execute calls on same thread so that async tests are deterministic.
.dispatcher(new Dispatcher(MoreExecutors.newDirectExecutorService()))
.dispatcher(new Dispatcher(Executors.newSingleThreadExecutor()))
.connectTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.readTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.writeTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.sslSocketFactory(
SslSocketFactories.createSslSocketFactory(SSL_CONFIG),
SslSocketFactories.createX509TrustManager(SSL_CONFIG))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeIoException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -68,8 +68,12 @@ public <T> Deserializer<T> deserializer(TypeMarker<T> token) {
@Override
public Deserializer<Void> emptyBodyDeserializer() {
return response -> {
if (response.body().read() != -1) {
throw new RuntimeException("Expected empty response body");
try {
if (response.body().read() != -1) {
throw new RuntimeException("Expected empty response body");
}
} catch (IOException e) {
throw new RuntimeException("Failed to read from response body", e);
}
return null;
};
Expand Down Expand Up @@ -171,15 +175,15 @@ private static final class EncodingDeserializerRegistry<T> implements Deserializ
}

@Override
public T deserialize(Response response) throws IOException {
public T deserialize(Response response) {
EncodingDeserializerContainer<T> container = getResponseDeserializer(response.contentType());
return container.deserializer.deserialize(response.body());
}

/** Returns the {@link EncodingDeserializerContainer} to use to deserialize the request body. */
@SuppressWarnings("ForLoopReplaceableByForEach")
// performance sensitive code avoids iterator allocation
EncodingDeserializerContainer<T> getResponseDeserializer(Optional<String> contentType) throws SafeIoException {
EncodingDeserializerContainer<T> getResponseDeserializer(Optional<String> contentType) {
if (!contentType.isPresent()) {
throw new SafeIllegalArgumentException("Response is missing Content-Type header");
}
Expand All @@ -189,7 +193,7 @@ EncodingDeserializerContainer<T> getResponseDeserializer(Optional<String> conten
return container;
}
}
throw new SafeIoException("Unsupported Content-Type", SafeArg.of("Content-Type", contentType));
throw new SafeRuntimeException("Unsupported Content-Type", SafeArg.of("Content-Type", contentType));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.palantir.conjure.java.dialogue.serde;

import com.palantir.dialogue.TypeMarker;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

Expand Down Expand Up @@ -62,7 +61,7 @@ interface Deserializer<T> {
* Format-related deserialization errors surface as {@link IllegalArgumentException}. Inputs and outputs
* must never be null.
*/
T deserialize(InputStream input) throws IOException;
T deserialize(InputStream input);
}

interface Serializer<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.palantir.dialogue.TypeMarker;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIoException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -68,11 +68,11 @@ public <T> Deserializer<T> deserializer(TypeMarker<T> type) {
Preconditions.checkArgument(value != null, "cannot deserialize a JSON null value");
return value;
} catch (MismatchedInputException e) {
throw new SafeIoException(
throw new SafeRuntimeException(
"Failed to deserialize response stream. Syntax error?",
e, SafeArg.of("type", type.getType()));
} catch (IOException e) {
throw new SafeIoException(
throw new SafeRuntimeException(
"Failed to deserialize response stream", e, SafeArg.of("type", type.getType()));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.palantir.dialogue.TypeMarker;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testUnsupportedRequestContentType() {
response.contentType = Optional.of("application/unknown");
BodySerDe serializers = new ConjureBodySerDe(ImmutableList.of(new StubEncoding("application/json")));
assertThatThrownBy(() -> serializers.deserializer(TYPE).deserialize(response))
.isInstanceOf(IOException.class)
.isInstanceOf(SafeRuntimeException.class)
.hasMessageContaining("Unsupported Content-Type");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.palantir.dialogue.TypeMarker;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeNullPointerException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -43,7 +44,7 @@ public final class EncodingsTest {
@Test
public void json_deserialize_throwsDeserializationErrorsAsIllegalArgumentException() {
assertThatThrownBy(() -> deserialize(asStream("\"2018-08-bogus\""), new TypeMarker<OffsetDateTime>() {}))
.isInstanceOf(IOException.class)
.isInstanceOf(SafeRuntimeException.class)
.hasMessageContaining("Failed to deserialize");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package com.palantir.dialogue;

import java.io.IOException;

/** Reads objects from a response. */
public interface Deserializer<T> {

/** Deserializes the response body. */
T deserialize(Response response) throws IOException;
T deserialize(Response response);
}