Skip to content

Commit

Permalink
Add support for call timeouts (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Fink authored Apr 29, 2019
1 parent 8ead44a commit 74bdb09
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,25 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

// CHECKSTYLE:ON

public abstract class AbstractSampleServiceClientTest {

abstract SampleService createBlockingClient(URL baseUrl, Duration timeout);
abstract AsyncSampleService createAsyncClient(URL baseUrl, Duration timeout);

private static final String PATH = "myPath";
private static final OffsetDateTime HEADER = OffsetDateTime.parse("2018-07-19T08:11:21+00:00");
private static final ImmutableList<ResourceIdentifier> QUERY = ImmutableList.of(
Expand All @@ -57,9 +61,6 @@ public abstract class AbstractSampleServiceClientTest {
private static final SampleObject RESPONSE = new SampleObject(84);
private static final String RESPONSE_STRING = "{\"intProperty\": 84}";

abstract SampleService createBlockingClient(URL baseUrl);
abstract AsyncSampleService createAsyncClient(URL baseUrl);

static final SslConfiguration SSL_CONFIG = SslConfiguration.of(
Paths.get("src/test/resources/trustStore.jks"),
Paths.get("src/test/resources/keyStore.jks"),
Expand Down Expand Up @@ -106,8 +107,8 @@ public abstract class AbstractSampleServiceClientTest {
@Before
public void before() {
server.useHttps(SslSocketFactories.createSslSocketFactory(SSL_CONFIG), false);
blockingClient = createBlockingClient(server.url("").url());
asyncClient = createAsyncClient(server.url("").url());
blockingClient = createBlockingClient(server.url("").url(), Duration.ofSeconds(1));
asyncClient = createAsyncClient(server.url("").url(), Duration.ofSeconds(1));
}

@Test
Expand Down Expand Up @@ -190,7 +191,7 @@ public void testAsync_voidToVoid_throwsWhenResponseBodyIsNonEmpty() {
.hasMessageContaining("Expected empty response body");
}

@Test
@Test(timeout = 2_000)
public void testBlocking_throwsOnConnectError() throws Exception {
server.shutdown();
assertThatThrownBy(() -> blockingClient.objectToObject(PATH, HEADER, QUERY, BODY))
Expand All @@ -199,19 +200,21 @@ public void testBlocking_throwsOnConnectError() throws Exception {
.hasMessageMatching(".*((Connection refused)|(Failed to connect)).*");
}

@Ignore("TODO(rfink): Figure out how to inject read/write timeouts")
@Test(timeout = 2_000)
public void testBlocking_throwsOnTimeout() {
@Test(timeout = 2_000) // see client construction: we set a 1s timeout
public void testBlocking_throwsOnTimeout() throws Exception {
server.enqueue(new MockResponse()
.setBody("\"response\"")
.addHeader(Headers.CONTENT_TYPE, "application/json")
.setBodyDelay(10, TimeUnit.SECONDS));
assertThatThrownBy(() -> blockingClient.objectToObject(PATH, HEADER, QUERY, BODY))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to deserialize response");
.hasCauseInstanceOf(TimeoutException.class)
.hasMessageContaining("Waited 1000 milliseconds");

// TODO(rfink): It seems that the OkHttp version of this tests leaves the connection open after the timeout.
}

@Test
@Test(timeout = 2_000)
public void testAsync_throwsOnConnectError() throws Exception {
server.shutdown();
assertThatThrownBy(() -> asyncClient.voidToVoid().get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@
import com.palantir.dialogue.PathTemplate;
import com.palantir.dialogue.PlainSerDe;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.Serializer;
import com.palantir.dialogue.TypeMarker;
import com.palantir.dialogue.UrlBuilder;
import com.palantir.logsafe.Preconditions;
import com.palantir.ri.ResourceIdentifier;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

// Example of the implementation code conjure would generate for a simple SampleService.
public final class SampleServiceClient {
Expand Down Expand Up @@ -80,8 +80,13 @@ public HttpMethod httpMethod() {
}
};

/** Returns a new blocking {@link SampleService} implementation whose calls are executed on the given channel. */
public static SampleService blocking(Channel channel, ConjureRuntime runtime) {
/**
* Returns a new blocking {@link SampleService} implementation whose calls are executed on the given channel.
* The {@code callTimeout} parameter indicates the maximum end-to-end life time for the blocking methods in this
* service; an exception is thrown when this duration is exceeded.
*/
// TODO(rfink): Consider using a builder pattern to construct clients
public static SampleService blocking(Channel channel, ConjureRuntime runtime, Duration callTimeout) {
return new SampleService() {

private Serializer<SampleObject> sampleObjectToSampleObjectSerializer =
Expand All @@ -105,10 +110,14 @@ public SampleObject objectToObject(
.build();

Call call = channel.createCall(STRING_TO_STRING, request);
ListenableFuture<Response> response = Calls.toFuture(call);
ListenableFuture<SampleObject> response = Futures.transform(
Calls.toFuture(call),
r -> sampleObjectToSampleObjectDeserializer.deserialize(r),
MoreExecutors.directExecutor());
try {
// TODO(rfink): Figure out how to inject read/write timeouts
return sampleObjectToSampleObjectDeserializer.deserialize(response.get());
return response.get(callTimeout.toMillis(), TimeUnit.MILLISECONDS);
// TODO(rfink): Think about exception handling, in particular in the case of retries. Should this
// actually throw a TimeoutException?
} catch (Throwable t) {
throw Exceptions.unwrapExecutionException(t);
}
Expand All @@ -119,9 +128,12 @@ public void voidToVoid() {
Request request = Request.builder().build();

Call call = channel.createCall(VOID_TO_VOID, request);
ListenableFuture<Response> response = Calls.toFuture(call);
ListenableFuture<Void> deserializedResponse = Futures.transform(
Calls.toFuture(call),
r -> voidToVoidDeserializer.deserialize(r),
MoreExecutors.directExecutor());
try {
voidToVoidDeserializer.deserialize(response.get());
deserializedResponse.get(callTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Throwable t) {
throw Exceptions.unwrapExecutionException(t);
}
Expand All @@ -133,6 +145,7 @@ public void voidToVoid() {
* Returns a new asynchronous {@link AsyncSampleService} implementation whose calls are executed on the given
* channel.
*/
// TODO(rfink): Consider using a builder pattern to construct clients
public static AsyncSampleService async(Channel channel, ConjureRuntime runtime) {
return new AsyncSampleService() {

Expand Down Expand Up @@ -162,14 +175,7 @@ public ListenableFuture<SampleObject> stringToString(
Call call = channel.createCall(STRING_TO_STRING, request);
return Futures.transform(
Calls.toFuture(call),
response -> {
try {
// TODO(rfink): The try/catch is a bit odd here.
return sampleObjectToSampleObjectDeserializer.deserialize(response);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize response", e);
}
},
response -> sampleObjectToSampleObjectDeserializer.deserialize(response),
MoreExecutors.directExecutor());
}

Expand All @@ -180,14 +186,7 @@ public ListenableFuture<Void> voidToVoid() {
Call call = channel.createCall(VOID_TO_VOID, request);
return Futures.transform(
Calls.toFuture(call),
response -> {
try {
voidToVoidDeserializer.deserialize(response);
return null;
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize response", e);
}
},
response -> voidToVoidDeserializer.deserialize(response),
MoreExecutors.directExecutor());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.dialogue;

import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.conjure.java.config.ssl.SslSocketFactories;
import com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime;
import com.palantir.conjure.java.dialogue.serde.DefaultErrorDecoder;
Expand All @@ -26,6 +25,7 @@
import java.net.URL;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.Executors;
import javax.net.ssl.SSLParameters;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
Expand All @@ -36,14 +36,14 @@ public final class HttpSampleServiceClientTest extends AbstractSampleServiceClie
private static final ConjureRuntime runtime = DefaultConjureRuntime.builder().build();

@Override
SampleService createBlockingClient(URL baseUrl) {
Channel channel = createChannel(baseUrl, Duration.ofSeconds(1));
return SampleServiceClient.blocking(channel, runtime);
SampleService createBlockingClient(URL baseUrl, Duration timeout) {
Channel channel = createChannel(baseUrl, timeout);
return SampleServiceClient.blocking(channel, runtime, timeout);
}

@Override
AsyncSampleService createAsyncClient(URL baseUrl) {
Channel channel = createChannel(baseUrl, Duration.ofSeconds(1));
AsyncSampleService createAsyncClient(URL baseUrl, Duration timeout) {
Channel channel = createChannel(baseUrl, timeout);
return SampleServiceClient.async(channel, runtime);
}

Expand All @@ -57,7 +57,7 @@ private HttpChannel createChannel(URL url, Duration timeout) {
.sslParameters(sslConfig)
.sslContext(SslSocketFactories.createSslContext(SSL_CONFIG))
.build(),
MoreExecutors.newDirectExecutorService(),
Executors.newSingleThreadExecutor(),
url,
DefaultErrorDecoder.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.palantir.dialogue;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.conjure.java.config.ssl.SslSocketFactories;
import com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime;
import com.palantir.conjure.java.dialogue.serde.DefaultErrorDecoder;
Expand All @@ -26,6 +25,7 @@
import com.palantir.dialogue.example.SampleServiceClient;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
Expand All @@ -39,14 +39,14 @@ public final class OkHttpSampleServiceClientTest extends AbstractSampleServiceCl
private static final ConjureRuntime runtime = DefaultConjureRuntime.builder().build();

@Override
SampleService createBlockingClient(URL baseUrl) {
Channel channel = createChannel(baseUrl, Duration.ofSeconds(1));
return SampleServiceClient.blocking(channel, runtime);
SampleService createBlockingClient(URL baseUrl, Duration timeout) {
Channel channel = createChannel(baseUrl, timeout);
return SampleServiceClient.blocking(channel, runtime, timeout);
}

@Override
AsyncSampleService createAsyncClient(URL baseUrl) {
Channel channel = createChannel(baseUrl, Duration.ofSeconds(1));
AsyncSampleService createAsyncClient(URL baseUrl, Duration timeout) {
Channel channel = createChannel(baseUrl, timeout);
return SampleServiceClient.async(channel, runtime);
}

Expand All @@ -55,10 +55,8 @@ private OkHttpChannel createChannel(URL url, Duration timeout) {
new OkHttpClient.Builder()
.protocols(ImmutableList.of(Protocol.HTTP_1_1))
// Execute calls on same thread so that async tests are deterministic.
.dispatcher(new Dispatcher(MoreExecutors.newDirectExecutorService()))
.dispatcher(new Dispatcher(Executors.newSingleThreadExecutor()))
.connectTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.readTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.writeTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.sslSocketFactory(
SslSocketFactories.createSslSocketFactory(SSL_CONFIG),
SslSocketFactories.createX509TrustManager(SSL_CONFIG))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeIoException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -68,8 +68,12 @@ public <T> Deserializer<T> deserializer(TypeMarker<T> token) {
@Override
public Deserializer<Void> emptyBodyDeserializer() {
return response -> {
if (response.body().read() != -1) {
throw new RuntimeException("Expected empty response body");
try {
if (response.body().read() != -1) {
throw new RuntimeException("Expected empty response body");
}
} catch (IOException e) {
throw new RuntimeException("Failed to read from response body", e);
}
return null;
};
Expand Down Expand Up @@ -171,15 +175,15 @@ private static final class EncodingDeserializerRegistry<T> implements Deserializ
}

@Override
public T deserialize(Response response) throws IOException {
public T deserialize(Response response) {
EncodingDeserializerContainer<T> container = getResponseDeserializer(response.contentType());
return container.deserializer.deserialize(response.body());
}

/** Returns the {@link EncodingDeserializerContainer} to use to deserialize the request body. */
@SuppressWarnings("ForLoopReplaceableByForEach")
// performance sensitive code avoids iterator allocation
EncodingDeserializerContainer<T> getResponseDeserializer(Optional<String> contentType) throws SafeIoException {
EncodingDeserializerContainer<T> getResponseDeserializer(Optional<String> contentType) {
if (!contentType.isPresent()) {
throw new SafeIllegalArgumentException("Response is missing Content-Type header");
}
Expand All @@ -189,7 +193,7 @@ EncodingDeserializerContainer<T> getResponseDeserializer(Optional<String> conten
return container;
}
}
throw new SafeIoException("Unsupported Content-Type", SafeArg.of("Content-Type", contentType));
throw new SafeRuntimeException("Unsupported Content-Type", SafeArg.of("Content-Type", contentType));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.palantir.conjure.java.dialogue.serde;

import com.palantir.dialogue.TypeMarker;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

Expand Down Expand Up @@ -62,7 +61,7 @@ interface Deserializer<T> {
* Format-related deserialization errors surface as {@link IllegalArgumentException}. Inputs and outputs
* must never be null.
*/
T deserialize(InputStream input) throws IOException;
T deserialize(InputStream input);
}

interface Serializer<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.palantir.dialogue.TypeMarker;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIoException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -68,11 +68,11 @@ public <T> Deserializer<T> deserializer(TypeMarker<T> type) {
Preconditions.checkArgument(value != null, "cannot deserialize a JSON null value");
return value;
} catch (MismatchedInputException e) {
throw new SafeIoException(
throw new SafeRuntimeException(
"Failed to deserialize response stream. Syntax error?",
e, SafeArg.of("type", type.getType()));
} catch (IOException e) {
throw new SafeIoException(
throw new SafeRuntimeException(
"Failed to deserialize response stream", e, SafeArg.of("type", type.getType()));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.palantir.dialogue.TypeMarker;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testUnsupportedRequestContentType() {
response.contentType = Optional.of("application/unknown");
BodySerDe serializers = new ConjureBodySerDe(ImmutableList.of(new StubEncoding("application/json")));
assertThatThrownBy(() -> serializers.deserializer(TYPE).deserialize(response))
.isInstanceOf(IOException.class)
.isInstanceOf(SafeRuntimeException.class)
.hasMessageContaining("Unsupported Content-Type");
}

Expand Down
Loading

0 comments on commit 74bdb09

Please sign in to comment.