diff --git a/changelog/@unreleased/pr-570.v2.yml b/changelog/@unreleased/pr-570.v2.yml new file mode 100644 index 000000000..229a767ab --- /dev/null +++ b/changelog/@unreleased/pr-570.v2.yml @@ -0,0 +1,7 @@ +type: fix +fix: + description: Conjure endpoints returning `binary` or `optional` now correctly + return unclosed InputStreams. Users must be careful to close these InputStreams, + otherwise resources will be leaked. + links: + - https://github.com/palantir/dialogue/pull/570 diff --git a/dialogue-client-test-lib/src/main/java/com/palantir/dialogue/CloseRecordingInputStream.java b/dialogue-client-test-lib/src/main/java/com/palantir/dialogue/CloseRecordingInputStream.java new file mode 100644 index 000000000..7bb896380 --- /dev/null +++ b/dialogue-client-test-lib/src/main/java/com/palantir/dialogue/CloseRecordingInputStream.java @@ -0,0 +1,92 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue; + +import com.palantir.logsafe.exceptions.SafeRuntimeException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import org.assertj.core.api.Assertions; + +/** A test-only inputstream which can only be closed once. */ +public final class CloseRecordingInputStream extends InputStream { + + private final InputStream delegate; + private Optional closeCalled = Optional.empty(); + + public CloseRecordingInputStream(InputStream delegate) { + this.delegate = delegate; + } + + public boolean isClosed() { + return closeCalled.isPresent(); + } + + public void assertNotClosed() { + if (closeCalled.isPresent()) { + Assertions.fail("Expected CloseRecordingInputStream to be open but was closed", closeCalled.get()); + } + } + + @Override + public int read() throws IOException { + assertNotClosed(); + return delegate.read(); + } + + @Override + public int read(byte[] bytes) throws IOException { + assertNotClosed(); + return delegate.read(bytes); + } + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + assertNotClosed(); + return delegate.read(bytes, off, len); + } + + @Override + public int available() throws IOException { + assertNotClosed(); + return delegate.available(); + } + + @Override + public void reset() throws IOException { + assertNotClosed(); + delegate.reset(); + } + + @Override + public void mark(int readlimit) { + assertNotClosed(); + delegate.mark(readlimit); + } + + @Override + public long skip(long num) throws IOException { + assertNotClosed(); + return delegate.skip(num); + } + + @Override + public void close() throws IOException { + closeCalled = Optional.of(new SafeRuntimeException("close was called here")); + delegate.close(); + } +} diff --git a/dialogue-client-test-lib/src/main/java/com/palantir/dialogue/TestResponse.java b/dialogue-client-test-lib/src/main/java/com/palantir/dialogue/TestResponse.java new file mode 100644 index 000000000..a1bf03677 --- /dev/null +++ b/dialogue-client-test-lib/src/main/java/com/palantir/dialogue/TestResponse.java @@ -0,0 +1,84 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue; + +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.errorprone.annotations.CheckReturnValue; +import com.palantir.logsafe.exceptions.SafeRuntimeException; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Optional; +import javax.ws.rs.core.HttpHeaders; + +public final class TestResponse implements Response { + + private final CloseRecordingInputStream inputStream = + new CloseRecordingInputStream(new ByteArrayInputStream(new byte[] {})); + + private Optional closeCalled = Optional.empty(); + private int code = 0; + private ListMultimap headers = ImmutableListMultimap.of(); + + @Override + public CloseRecordingInputStream body() { + return inputStream; + } + + @Override + public int code() { + return code; + } + + @CheckReturnValue + public TestResponse code(int value) { + this.code = value; + return this; + } + + @Override + public ListMultimap headers() { + return headers; + } + + @Override + public void close() { + checkNotClosed(); + try { + closeCalled = Optional.of(new SafeRuntimeException("Close called here")); + inputStream.close(); + } catch (IOException e) { + throw new SafeRuntimeException("Failed to close", e); + } + } + + public boolean isClosed() { + return closeCalled.isPresent(); + } + + private void checkNotClosed() { + if (closeCalled.isPresent()) { + throw new SafeRuntimeException("Please don't close twice", closeCalled.get()); + } + } + + @CheckReturnValue + public TestResponse contentType(String contentType) { + this.headers = ImmutableListMultimap.of(HttpHeaders.CONTENT_TYPE, contentType); + return this; + } +} diff --git a/dialogue-client-verifier/build.gradle b/dialogue-client-verifier/build.gradle index 80f04527e..8866846ee 100644 --- a/dialogue-client-verifier/build.gradle +++ b/dialogue-client-verifier/build.gradle @@ -30,11 +30,13 @@ dependencies { testImplementation project('verification-server-api') testImplementation project(':dialogue-apache-hc4-client') testImplementation project(':dialogue-serde') + testImplementation project(':dialogue-example:dialogue-example-dialogue') testImplementation 'junit:junit' testImplementation 'org.assertj:assertj-core' testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' testImplementation 'org.apache.commons:commons-lang3' - testCompile 'com.palantir.conjure.java.runtime:keystores' + testImplementation 'com.palantir.conjure.java.runtime:keystores' + testImplementation 'io.undertow:undertow-core' testRuntimeOnly 'org.apache.logging.log4j:log4j-slf4j-impl' testRuntimeOnly 'org.apache.logging.log4j:log4j-core' diff --git a/dialogue-client-verifier/src/test/java/com/palantir/verification/BinaryReturnTypeTest.java b/dialogue-client-verifier/src/test/java/com/palantir/verification/BinaryReturnTypeTest.java new file mode 100644 index 000000000..33dd4af31 --- /dev/null +++ b/dialogue-client-verifier/src/test/java/com/palantir/verification/BinaryReturnTypeTest.java @@ -0,0 +1,227 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.verification; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.conjure.java.api.config.service.ServiceConfiguration; +import com.palantir.conjure.java.api.config.service.UserAgent; +import com.palantir.conjure.java.api.config.ssl.SslConfiguration; +import com.palantir.conjure.java.client.config.ClientConfiguration; +import com.palantir.conjure.java.client.config.ClientConfigurations; +import com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime; +import com.palantir.dialogue.example.SampleServiceAsync; +import com.palantir.dialogue.example.SampleServiceBlocking; +import com.palantir.dialogue.hc4.ApacheHttpClientChannels; +import com.palantir.logsafe.Preconditions; +import io.undertow.Undertow; +import io.undertow.server.HttpHandler; +import io.undertow.server.handlers.BlockingHandler; +import io.undertow.util.Headers; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPOutputStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class BinaryReturnTypeTest { + private static final UserAgent USER_AGENT = UserAgent.of(UserAgent.Agent.of("BinaryReturnTypeTest", "0.0.0")); + private static final SslConfiguration SSL_CONFIG = SslConfiguration.of( + Paths.get("../dialogue-core/src/test/resources/trustStore.jks"), + Paths.get("../dialogue-core/src/test/resources/keyStore.jks"), + "keystore"); + + private Undertow undertow; + private HttpHandler undertowHandler; + + @Before + public void before() { + undertow = Undertow.builder() + .addHttpListener( + 0, "localhost", new BlockingHandler(exchange -> undertowHandler.handleRequest(exchange))) + .build(); + undertow.start(); + } + + @Test + public void conjure_generated_async_interface_with_optional_binary_return_type_and_gzip() { + setBinaryGzipResponse("Hello, world"); + SampleServiceAsync client = sampleServiceAsync(); + + ListenableFuture> future = client.getOptionalBinary(); + Optional maybeBinary = Futures.getUnchecked(future); + + assertThat(maybeBinary).isPresent(); + assertThat(maybeBinary.get()).hasSameContentAs(asInputStream("Hello, world")); + } + + @Test + public void conjure_generated_blocking_interface_with_optional_binary_return_type_and_gzip() { + setBinaryGzipResponse("Hello, world"); + + Optional maybeBinary = sampleServiceBlocking().getOptionalBinary(); + + assertThat(maybeBinary).isPresent(); + assertThat(maybeBinary.get()).hasSameContentAs(asInputStream("Hello, world")); + } + + @Test + public void stream_3_gigabytes() throws IOException { + long oneMegabyte = 1000_000; + int megabytes = 3000; + long limit = megabytes * oneMegabyte; + assertThat(limit).isGreaterThan(Integer.MAX_VALUE); + + byte[] sample = new byte[8192]; + Arrays.fill(sample, (byte) 'A'); + + undertowHandler = exchange -> { + exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/octet-stream"); + InputStream bigInputStream = repeat(sample, limit); + ByteStreams.copy(bigInputStream, exchange.getOutputStream()); + }; + + Stopwatch sw = Stopwatch.createStarted(); + + InputStream maybeBinary = sampleServiceBlocking().getOptionalBinary().get(); + assertThat(ByteStreams.exhaust(maybeBinary)) + .describedAs("Should receive exactly the number of bytes we sent!") + .isEqualTo(limit); + + System.out.printf("%d MB took %d millis%n", megabytes, sw.elapsed(TimeUnit.MILLISECONDS)); + } + + private void setBinaryGzipResponse(String stringToCompress) { + undertowHandler = exchange -> { + exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/octet-stream"); + Preconditions.checkArgument(exchange.getRequestHeaders().contains(Headers.ACCEPT_ENCODING)); + Preconditions.checkArgument(exchange.getRequestHeaders() + .getFirst(Headers.ACCEPT_ENCODING) + .contains("gzip")); + exchange.getResponseHeaders().put(Headers.CONTENT_ENCODING, "gzip"); + exchange.getOutputStream().write(gzipCompress(stringToCompress)); + }; + } + + private SampleServiceBlocking sampleServiceBlocking() { + return SampleServiceBlocking.of( + ApacheHttpClientChannels.create(clientConf(getUri(undertow))), + DefaultConjureRuntime.builder().build()); + } + + private SampleServiceAsync sampleServiceAsync() { + return SampleServiceAsync.of( + ApacheHttpClientChannels.create(clientConf(getUri(undertow))), + DefaultConjureRuntime.builder().build()); + } + + @After + public void after() { + undertow.stop(); + } + + private static ByteArrayInputStream asInputStream(String string) { + return new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8)); + } + + private static ClientConfiguration clientConf(String uri) { + return ClientConfiguration.builder() + .from(ClientConfigurations.of(ServiceConfiguration.builder() + .addUris(uri) + .security(SSL_CONFIG) + .readTimeout(Duration.ofSeconds(1)) + .writeTimeout(Duration.ofSeconds(1)) + .connectTimeout(Duration.ofSeconds(1)) + .build())) + .userAgent(USER_AGENT) + .build(); + } + + private static byte[] gzipCompress(String stringToCompress) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutput = new GZIPOutputStream(baos)) { + gzipOutput.write(stringToCompress.getBytes(StandardCharsets.UTF_8)); + gzipOutput.finish(); + return baos.toByteArray(); + } + } + + private static String getUri(Undertow undertow) { + Undertow.ListenerInfo listenerInfo = Iterables.getOnlyElement(undertow.getListenerInfo()); + return String.format("%s:/%s", listenerInfo.getProtcol(), listenerInfo.getAddress()); + } + + /** Produces a big inputStream by repeating a smaller bytearray sample until limit is reached. */ + private static InputStream repeat(byte[] sample, long limit) { + return new InputStream() { + private long position = 0; + + @Override + public int read() { + if (position < limit) { + return sample[(int) (position++ % sample.length)]; + } else { + return -1; + } + } + + // this optimized version isn't really necessary, I just wanted to see how fast we could make + // the test go + @Override + public int read(byte[] outputArray, int off, int len) { + long remainingInStream = limit - position; + if (remainingInStream <= 0) { + return -1; + } + + int numBytesToWrite = (int) Math.min(len, remainingInStream); + int bytesWritten = 0; + while (bytesWritten < numBytesToWrite) { + int sampleIndex = (int) position % sample.length; + int outputIndex = off + bytesWritten; + int chunkSize = Math.min(sample.length - sampleIndex, numBytesToWrite - bytesWritten); + + System.arraycopy(sample, sampleIndex, outputArray, outputIndex, chunkSize); + position += chunkSize; + bytesWritten += chunkSize; + } + + return bytesWritten; + } + + @Override + public int available() { + return Ints.saturatedCast(limit - position); + } + }; + } +} diff --git a/dialogue-example/src/main/conjure/example.yml b/dialogue-example/src/main/conjure/example.yml index 04024d43d..0ac715f42 100644 --- a/dialogue-example/src/main/conjure/example.yml +++ b/dialogue-example/src/main/conjure/example.yml @@ -12,8 +12,10 @@ services: package: com.palantir.dialogue.example default-auth: none endpoints: + voidToVoid: http: GET /voidToVoid + objectToObject: http: POST /objectToObject/objects/{path} args: @@ -28,3 +30,7 @@ services: body: SampleObject returns: SampleObject + getOptionalBinary: + http: GET /getOptionalBinary + returns: optional + diff --git a/dialogue-serde/build.gradle b/dialogue-serde/build.gradle index 0a9a153e0..1b8aca44e 100644 --- a/dialogue-serde/build.gradle +++ b/dialogue-serde/build.gradle @@ -8,6 +8,7 @@ dependencies { compile 'com.palantir.safe-logging:preconditions' compile 'org.slf4j:slf4j-api' + testCompile project(':dialogue-client-test-lib') testCompile 'junit:junit' testCompile 'org.assertj:assertj-core' testCompile 'org.immutables:value::annotations' diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/BinaryEncoding.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/BinaryEncoding.java index ad66f5e44..f3bb20936 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/BinaryEncoding.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/BinaryEncoding.java @@ -67,11 +67,12 @@ public String toString() { return "BinaryEncoding{" + CONTENT_TYPE + '}'; } - enum OptionalInputStreamDeserializer implements Deserializer { + enum OptionalInputStreamDeserializer implements Deserializer> { INSTANCE; @Override - public Object deserialize(InputStream input) { + public Optional deserialize(InputStream input) { + // intentionally not closing this, otherwise users wouldn't be able to get any data out of it! return Optional.of(input); } @@ -81,11 +82,12 @@ public String toString() { } } - enum InputStreamDeserializer implements Deserializer { + enum InputStreamDeserializer implements Deserializer { INSTANCE; @Override - public Object deserialize(InputStream input) { + public InputStream deserialize(InputStream input) { + // intentionally not closing this, otherwise users wouldn't be able to get any data out of it! return input; } diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDe.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDe.java index 06ab40717..64c4f4e16 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDe.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDe.java @@ -190,6 +190,7 @@ private static final class EncodingSerializerContainer { private static final class EncodingDeserializerRegistry implements Deserializer { + private static final Logger log = LoggerFactory.getLogger(EncodingDeserializerRegistry.class); private final ImmutableList> encodings; private final ErrorDecoder errorDecoder; private final TypeMarker token; @@ -210,6 +211,7 @@ private static final class EncodingDeserializerRegistry implements Deserializ @Override public T deserialize(Response response) { + boolean closeResponse = true; try { if (errorDecoder.isError(response)) { throw errorDecoder.decode(response); @@ -227,10 +229,15 @@ public T deserialize(Response response) { "Response is missing Content-Type header", SafeArg.of("received", response.headers().keySet())); } - EncodingDeserializerContainer container = getResponseDeserializer(contentType.get()); - return container.deserializer.deserialize(response.body()); + Encoding.Deserializer deserializer = getResponseDeserializer(contentType.get()); + T deserialized = deserializer.deserialize(response.body()); + // deserializer has taken on responsibility for closing the response body + closeResponse = false; + return deserialized; } finally { - response.close(); + if (closeResponse) { + response.close(); + } } } @@ -242,20 +249,35 @@ public Optional accepts() { /** Returns the {@link EncodingDeserializerContainer} to use to deserialize the request body. */ @SuppressWarnings("ForLoopReplaceableByForEach") // performance sensitive code avoids iterator allocation - EncodingDeserializerContainer getResponseDeserializer(String contentType) { + Encoding.Deserializer getResponseDeserializer(String contentType) { for (int i = 0; i < encodings.size(); i++) { EncodingDeserializerContainer container = encodings.get(i); if (container.encoding.supportsContentType(contentType)) { - return container; + return container.deserializer; } } - throw new SafeRuntimeException( - "Unsupported Content-Type", - SafeArg.of("received", contentType), - SafeArg.of("supportedEncodings", encodings)); + return throwingDeserializer(contentType); + } + + private Encoding.Deserializer throwingDeserializer(String contentType) { + return new Encoding.Deserializer() { + @Override + public T deserialize(InputStream input) { + try { + input.close(); + } catch (RuntimeException | IOException e) { + log.warn("Failed to close InputStream", e); + } + throw new SafeRuntimeException( + "Unsupported Content-Type", + SafeArg.of("received", contentType), + SafeArg.of("supportedEncodings", encodings)); + } + }; } } + /** Effectively just a pair. */ private static final class EncodingDeserializerContainer { private final Encoding encoding; diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/DefaultConjureRuntime.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/DefaultConjureRuntime.java index afdc837fd..24b20bf78 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/DefaultConjureRuntime.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/DefaultConjureRuntime.java @@ -30,17 +30,17 @@ */ public final class DefaultConjureRuntime implements ConjureRuntime { + static final ImmutableList DEFAULT_ENCODINGS = ImmutableList.of( + WeightedEncoding.of(Encodings.json(), 1), + WeightedEncoding.of(Encodings.smile(), .9), + WeightedEncoding.of(Encodings.cbor(), .7)); + private final BodySerDe bodySerDe; private DefaultConjureRuntime(Builder builder) { this.bodySerDe = new ConjureBodySerDe( // TODO(rfink): The default thing here is a little odd - builder.encodings.isEmpty() - ? ImmutableList.of( - WeightedEncoding.of(Encodings.json(), 1), - WeightedEncoding.of(Encodings.smile(), .9), - WeightedEncoding.of(Encodings.cbor(), .7)) - : builder.encodings); + builder.encodings.isEmpty() ? DEFAULT_ENCODINGS : builder.encodings); } public static Builder builder() { diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encoding.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encoding.java index 3ac9e0b4e..b4e4fc695 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encoding.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encoding.java @@ -57,8 +57,10 @@ interface Deserializer { /** * Reads a serialized type-{@link T} object representation from the given input stream and returns the - * corresponding object. Implementations should read the entire input stream, but must not close it. - * Format-related deserialization errors surface as {@link IllegalArgumentException}. Inputs and outputs + * corresponding object. Implementations should read the entire input stream and must close it (unless they + * return the raw InputStream, e.g. for a binary response type). + * + *

Format-related deserialization errors surface as {@link IllegalArgumentException}. Inputs and outputs * must never be null. */ T deserialize(InputStream input); diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encodings.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encodings.java index dbe764c6c..bd0816c42 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encodings.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encodings.java @@ -27,6 +27,7 @@ import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeRuntimeException; import java.io.IOException; +import java.io.InputStream; import javax.annotation.Nullable; // TODO(rfink): Consider async Jackson, see @@ -61,8 +62,8 @@ public final Serializer serializer(TypeMarker type) { public final Deserializer deserializer(TypeMarker type) { ObjectReader reader = mapper.readerFor(mapper.constructType(type.getType())); return input -> { - try { - T value = reader.readValue(input); + try (InputStream inputStream = input) { + T value = reader.readValue(inputStream); // Bad input should result in a 4XX response status, throw IAE rather than NPE. Preconditions.checkArgument(value != null, "cannot deserialize a JSON null value"); return value; diff --git a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/BinaryEncodingTest.java b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/BinaryEncodingTest.java new file mode 100644 index 000000000..4bd82c2c6 --- /dev/null +++ b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/BinaryEncodingTest.java @@ -0,0 +1,76 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.conjure.java.dialogue.serde; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import com.palantir.dialogue.BodySerDe; +import com.palantir.dialogue.CloseRecordingInputStream; +import com.palantir.dialogue.TestResponse; +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import org.junit.Test; + +public class BinaryEncodingTest { + + @Test + public void testBinary() throws IOException { + TestResponse response = new TestResponse().code(200).contentType("application/octet-stream"); + BodySerDe serializers = new ConjureBodySerDe( + ImmutableList.of(WeightedEncoding.of(new ConjureBodySerDeTest.StubEncoding("application/json")))); + InputStream deserialized = serializers.inputStreamDeserializer().deserialize(response); + assertThat(deserialized.available()).isEqualTo(0); + CloseRecordingInputStream rawInputStream = response.body(); + rawInputStream.assertNotClosed(); + assertThat(response.isClosed()) + .describedAs("response is unclosed initially") + .isFalse(); + + deserialized.close(); + assertThat(response.isClosed()) + .describedAs( + "Response#close was never called, but no big deal because the body is the only resource worth" + + " closing") + .isFalse(); + } + + @Test + public void testBinary_optional_present() throws IOException { + TestResponse response = new TestResponse().code(200).contentType("application/octet-stream"); + BodySerDe serializers = new ConjureBodySerDe( + ImmutableList.of(WeightedEncoding.of(new ConjureBodySerDeTest.StubEncoding("application/json")))); + Optional maybe = + serializers.optionalInputStreamDeserializer().deserialize(response); + assertThat(maybe).isPresent(); + InputStream deserialized = maybe.get(); + assertThat(deserialized.available()).isEqualTo(0); + CloseRecordingInputStream rawInputStream = response.body(); + rawInputStream.assertNotClosed(); + assertThat(response.isClosed()) + .describedAs("response is unclosed initially") + .isFalse(); + + deserialized.close(); + assertThat(response.isClosed()) + .describedAs( + "Response#close was never called, but no big deal because the body is the only resource worth" + + " closing") + .isFalse(); + } +} diff --git a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDeTest.java b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDeTest.java index f08e68ba1..19da472a0 100644 --- a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDeTest.java +++ b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDeTest.java @@ -22,24 +22,20 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableListMultimap; -import com.google.common.collect.ListMultimap; import com.palantir.conjure.java.api.errors.ErrorType; import com.palantir.conjure.java.api.errors.RemoteException; import com.palantir.conjure.java.api.errors.SerializableError; import com.palantir.conjure.java.api.errors.ServiceException; import com.palantir.dialogue.BodySerDe; import com.palantir.dialogue.RequestBody; -import com.palantir.dialogue.Response; +import com.palantir.dialogue.TestResponse; import com.palantir.dialogue.TypeMarker; import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.exceptions.SafeIllegalArgumentException; import com.palantir.logsafe.exceptions.SafeRuntimeException; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Optional; -import javax.ws.rs.core.HttpHeaders; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -59,8 +55,7 @@ public void testRequestContentType() throws IOException { Encoding json = new StubEncoding("application/json"); Encoding plain = new StubEncoding("text/plain"); - TestResponse response = new TestResponse(); - response.contentType("text/plain"); + TestResponse response = new TestResponse().contentType("text/plain"); BodySerDe serializers = new ConjureBodySerDe(ImmutableList.of(WeightedEncoding.of(json), WeightedEncoding.of(plain))); String value = serializers.deserializer(TYPE).deserialize(response); @@ -69,8 +64,7 @@ public void testRequestContentType() throws IOException { @Test public void testRequestOptionalEmpty() { - TestResponse response = new TestResponse(); - response.code = 204; + TestResponse response = new TestResponse().code(204); BodySerDe serializers = new ConjureBodySerDe(ImmutableList.of(WeightedEncoding.of(new StubEncoding("application/json")))); Optional value = serializers.deserializer(OPTIONAL_TYPE).deserialize(response); @@ -89,8 +83,7 @@ public void testRequestNoContentType() { @Test public void testUnsupportedRequestContentType() { - TestResponse response = new TestResponse(); - response.contentType("application/unknown"); + TestResponse response = new TestResponse().contentType("application/unknown"); BodySerDe serializers = new ConjureBodySerDe(ImmutableList.of(WeightedEncoding.of(new StubEncoding("application/json")))); assertThatThrownBy(() -> serializers.deserializer(TYPE).deserialize(response)) @@ -136,12 +129,10 @@ public void testResponseNoContentType() throws IOException { } @Test - public void testResponseUnknownContentType() throws IOException { + public void testRequestUnknownContentType() throws IOException { Encoding json = new StubEncoding("application/json"); Encoding plain = new StubEncoding("text/plain"); - TestResponse response = new TestResponse(); - response.contentType("application/unknown"); BodySerDe serializers = new ConjureBodySerDe(ImmutableList.of(WeightedEncoding.of(json), WeightedEncoding.of(plain))); RequestBody body = serializers.serializer(TYPE).serialize("test"); @@ -150,8 +141,7 @@ public void testResponseUnknownContentType() throws IOException { @Test public void testErrorsDecoded() { - TestResponse response = new TestResponse(); - response.code = 400; + TestResponse response = new TestResponse().code(400); ServiceException serviceException = new ServiceException(ErrorType.INVALID_ARGUMENT); SerializableError serialized = SerializableError.forException(serviceException); @@ -163,37 +153,66 @@ public void testErrorsDecoded() { assertThatExceptionOfType(RemoteException.class) .isThrownBy(() -> serializers.deserializer(TYPE).deserialize(response)); - } - @Test - public void testBinary() { - TestResponse response = new TestResponse(); - response.code = 200; - response.contentType("application/octet-stream"); - BodySerDe serializers = - new ConjureBodySerDe(ImmutableList.of(WeightedEncoding.of(new StubEncoding("application/json")))); - assertThat(serializers.inputStreamDeserializer().deserialize(response)).hasContent(""); + assertThat(response.isClosed()).describedAs("response should be closed").isTrue(); + assertThat(response.body().isClosed()) + .describedAs("inputstream should be closed") + .isTrue(); } @Test - public void testBinary_optional_present() { - TestResponse response = new TestResponse(); - response.code = 200; - response.contentType("application/octet-stream"); + public void testBinary_optional_empty() { + TestResponse response = new TestResponse().code(204); BodySerDe serializers = new ConjureBodySerDe(ImmutableList.of(WeightedEncoding.of(new StubEncoding("application/json")))); assertThat(serializers.optionalInputStreamDeserializer().deserialize(response)) - .hasValueSatisfying(stream -> assertThat(stream).hasContent("")); + .isEmpty(); + assertThat(response.body().isClosed()) + .describedAs("inputstream should be closed") + .isTrue(); + assertThat(response.isClosed()).describedAs("response should be closed").isTrue(); } @Test - public void testBinary_optional_empty() { - TestResponse response = new TestResponse(); - response.code = 204; - BodySerDe serializers = - new ConjureBodySerDe(ImmutableList.of(WeightedEncoding.of(new StubEncoding("application/json")))); - assertThat(serializers.optionalInputStreamDeserializer().deserialize(response)) - .isEmpty(); + public void if_deserialize_throws_response_is_still_closed() { + TestResponse response = new TestResponse().code(200).contentType("application/json"); + BodySerDe serializers = new ConjureBodySerDe(ImmutableList.of(WeightedEncoding.of(BrokenEncoding.INSTANCE))); + assertThatThrownBy(() -> serializers.deserializer(TYPE).deserialize(response)) + .isInstanceOf(SafeRuntimeException.class) + .hasMessage("brokenEncoding is broken"); + assertThat(response.body().isClosed()) + .describedAs("inputstream should be closed") + .isTrue(); + assertThat(response.isClosed()).describedAs("response should be closed").isTrue(); + } + + enum BrokenEncoding implements Encoding { + INSTANCE; + + @Override + public Serializer serializer(TypeMarker _type) { + throw new UnsupportedOperationException("unimplemented"); + } + + @Override + public Deserializer deserializer(TypeMarker _type) { + return new Deserializer() { + @Override + public T deserialize(InputStream _input) { + throw new SafeRuntimeException("brokenEncoding is broken"); + } + }; + } + + @Override + public String getContentType() { + return "application/json"; + } + + @Override + public boolean supportsContentType(String _contentType) { + return true; + } } /** Deserializes requests as the configured content type. */ @@ -236,33 +255,4 @@ public String toString() { return "StubEncoding{" + contentType + '}'; } } - - private static final class TestResponse implements Response { - - private InputStream body = new ByteArrayInputStream(new byte[] {}); - private int code = 0; - private ListMultimap headers = ImmutableListMultimap.of(); - - @Override - public InputStream body() { - return body; - } - - @Override - public int code() { - return code; - } - - @Override - public ListMultimap headers() { - return headers; - } - - @Override - public void close() {} - - public void contentType(String contentType) { - this.headers = ImmutableListMultimap.of(HttpHeaders.CONTENT_TYPE, contentType); - } - } } diff --git a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/DefaultClientsTest.java b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/DefaultClientsTest.java index 6d8878730..1715222bf 100644 --- a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/DefaultClientsTest.java +++ b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/DefaultClientsTest.java @@ -28,12 +28,17 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.palantir.dialogue.BodySerDe; import com.palantir.dialogue.Channel; +import com.palantir.dialogue.CloseRecordingInputStream; import com.palantir.dialogue.Deserializer; import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.Request; import com.palantir.dialogue.RequestBody; import com.palantir.dialogue.Response; +import com.palantir.dialogue.TestResponse; +import java.io.IOException; +import java.io.InputStream; import java.util.Optional; import java.util.concurrent.ExecutionException; import org.junit.Test; @@ -52,20 +57,20 @@ public final class DefaultClientsTest { @Mock private Endpoint endpoint; - @Mock - private Response response; - @Mock private Deserializer deserializer; @Captor private ArgumentCaptor requestCaptor; + private Response response = new TestResponse(); + private BodySerDe bodySerde = new ConjureBodySerDe(DefaultConjureRuntime.DEFAULT_ENCODINGS); + private final SettableFuture responseFuture = SettableFuture.create(); + @Test public void testCall() throws ExecutionException, InterruptedException { Request request = Request.builder().build(); when(deserializer.deserialize(eq(response))).thenReturn("value"); - SettableFuture responseFuture = SettableFuture.create(); when(channel.execute(eq(endpoint), eq(request))).thenReturn(responseFuture); ListenableFuture result = DefaultClients.INSTANCE.call(channel, endpoint, request, deserializer); assertThat(result).isNotDone(); @@ -97,7 +102,6 @@ public void testCallClosesRequestOnCompletion_success() { RequestBody body = mock(RequestBody.class); Request request = Request.builder().body(body).build(); when(deserializer.deserialize(eq(response))).thenReturn("value"); - SettableFuture responseFuture = SettableFuture.create(); when(channel.execute(eq(endpoint), eq(request))).thenReturn(responseFuture); ListenableFuture result = DefaultClients.INSTANCE.call(channel, endpoint, request, deserializer); @@ -110,11 +114,67 @@ public void testCallClosesRequestOnCompletion_success() { verify(body).close(); } + @Test + public void testBinaryResponse_inputStreamRemainsUnclosed() throws IOException { + when(channel.execute(eq(endpoint), any())).thenReturn(responseFuture); + + ListenableFuture future = DefaultClients.INSTANCE.call( + channel, endpoint, Request.builder().build(), bodySerde.inputStreamDeserializer()); + + TestResponse testResponse = new TestResponse().contentType("application/octet-stream"); + responseFuture.set(testResponse); + + try (CloseRecordingInputStream inputStream = (CloseRecordingInputStream) Futures.getUnchecked(future)) { + assertThat(inputStream.available()) + .describedAs("Content should be empty") + .isEqualTo(0); + inputStream.assertNotClosed(); + assertThat(testResponse.isClosed()).describedAs("Response").isFalse(); + } + + assertThat(testResponse.body().isClosed()) + .describedAs("User has closed it now") + .isTrue(); + assertThat(testResponse.isClosed()) + .describedAs( + "Response#close was never called, but no big deal because the body is the only resource worth" + + " closing") + .isFalse(); + } + + @Test + public void testOptionalBinaryResponse_inputStreamRemainsUnclosed() throws IOException { + when(channel.execute(eq(endpoint), any())).thenReturn(responseFuture); + + ListenableFuture> future = DefaultClients.INSTANCE.call( + channel, endpoint, Request.builder().build(), bodySerde.optionalInputStreamDeserializer()); + + TestResponse testResponse = new TestResponse().contentType("application/octet-stream"); + responseFuture.set(testResponse); + + Optional maybeInputStream = Futures.getUnchecked(future); + try (CloseRecordingInputStream inputStream = (CloseRecordingInputStream) maybeInputStream.get()) { + assertThat(inputStream.available()) + .describedAs("Content should be empty") + .isEqualTo(0); + inputStream.assertNotClosed(); + assertThat(testResponse.isClosed()).describedAs("Response").isFalse(); + } + + assertThat(testResponse.body().isClosed()) + .describedAs("User has closed it now") + .isTrue(); + assertThat(testResponse.isClosed()) + .describedAs( + "Response#close was never called, but no big deal because the body is the only resource worth" + + " closing") + .isFalse(); + } + @Test public void testCallClosesRequestOnCompletion_failure() { RequestBody body = mock(RequestBody.class); Request request = Request.builder().body(body).build(); - SettableFuture responseFuture = SettableFuture.create(); when(channel.execute(eq(endpoint), eq(request))).thenReturn(responseFuture); ListenableFuture result = DefaultClients.INSTANCE.call(channel, endpoint, request, deserializer); diff --git a/dialogue-target/src/main/java/com/palantir/dialogue/Deserializer.java b/dialogue-target/src/main/java/com/palantir/dialogue/Deserializer.java index fcf5faddb..51be50efe 100644 --- a/dialogue-target/src/main/java/com/palantir/dialogue/Deserializer.java +++ b/dialogue-target/src/main/java/com/palantir/dialogue/Deserializer.java @@ -21,7 +21,10 @@ /** Reads objects from a response. */ public interface Deserializer { - /** Deserializes the response body. */ + /** + * Deserializes the response body into a useful type. Closes the {@link Response#body()} inputStream unless + * the return type requires an {@code InputStream} to be returned in a readable state. + */ T deserialize(Response response); /** diff --git a/dialogue-target/src/main/java/com/palantir/dialogue/Response.java b/dialogue-target/src/main/java/com/palantir/dialogue/Response.java index 02e77bbb2..f767e1e7d 100644 --- a/dialogue-target/src/main/java/com/palantir/dialogue/Response.java +++ b/dialogue-target/src/main/java/com/palantir/dialogue/Response.java @@ -43,6 +43,9 @@ default Optional getFirstHeader(String header) { * Releases all resources associated with this response. If the {@link #body()} is still open, {@link #close()} * should {@link InputStream#close() close the stream}. * Implementations must not throw, preferring to catch and log internally. + * + * Note that in the case of binary and optional binary endpoints, this method may not be called as the user is + * expected to close the {@link #body()} themselves. */ @Override void close(); diff --git a/versions.lock b/versions.lock index 86311a032..ee27da156 100644 --- a/versions.lock +++ b/versions.lock @@ -59,6 +59,7 @@ com.spotify.dataenum:dataenum:1.3.2 (1 constraints: e91058c1) com.squareup.okhttp3:mockwebserver:3.13.1 (1 constraints: 3a053f3b) de.erichseifert.vectorgraphics2d:VectorGraphics2D:0.13 (1 constraints: 8c0a80bb) de.rototor.pdfbox:graphics2d:0.25 (1 constraints: 8f0a84bb) +io.undertow:undertow-core:2.0.30.Final (1 constraints: 4f070761) junit:junit:4.12 (2 constraints: e213d85a) net.bytebuddy:byte-buddy:1.10.5 (1 constraints: 410b37de) net.bytebuddy:byte-buddy-agent:1.10.5 (1 constraints: 410b37de) @@ -77,6 +78,9 @@ org.awaitility:awaitility:4.0.2 (1 constraints: 08050136) org.hamcrest:hamcrest:2.2 (3 constraints: f41da570) org.hamcrest:hamcrest-core:2.2 (3 constraints: 2f17d637) org.hamcrest:hamcrest-library:2.1 (1 constraints: 1507415c) +org.jboss.logging:jboss-logging:3.4.0.Final (1 constraints: bd0d7630) +org.jboss.xnio:xnio-api:3.3.8.Final (2 constraints: 6f1a4d45) +org.jboss.xnio:xnio-nio:3.3.8.Final (1 constraints: c40da530) org.jmock:jmock:2.12.0 (1 constraints: 3705353b) org.jmock:jmock-testjar:2.12.0 (1 constraints: a507a272) org.junit:junit-bom:5.6.1 (6 constraints: 406265f6) diff --git a/versions.props b/versions.props index 9eb36ad3a..ac45863cd 100644 --- a/versions.props +++ b/versions.props @@ -33,6 +33,8 @@ org.awaitility:awaitility = 4.0.2 org.jmock:jmock = 2.12.0 org.knowm.xchart:xchart = 3.6.1 com.palantir.conjure.verification:* = 0.18.3 +io.undertow:undertow-core = 2.0.30.Final +com.squareup.okhttp3:mockwebserver = 3.13.1 # dependency-upgrader:OFF # Match conjure-java-runtime okhttp version