diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/BinaryEncoding.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/BinaryEncoding.java index f3bb20936..06a93105d 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/BinaryEncoding.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/BinaryEncoding.java @@ -19,6 +19,8 @@ import com.palantir.dialogue.TypeMarker; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; import java.util.Optional; @@ -71,9 +73,9 @@ enum OptionalInputStreamDeserializer implements Deserializer deserialize(InputStream input) { - // intentionally not closing this, otherwise users wouldn't be able to get any data out of it! - return Optional.of(input); + public Optional deserialize(InputStream input, Closeable response) { + // intentionally not closing just yet, otherwise users wouldn't be able to get any data out of it! + return Optional.of(new ResponseClosingInputStream(input, response)); } @Override @@ -86,9 +88,9 @@ enum InputStreamDeserializer implements Deserializer { INSTANCE; @Override - public InputStream deserialize(InputStream input) { - // intentionally not closing this, otherwise users wouldn't be able to get any data out of it! - return input; + public InputStream deserialize(InputStream input, Closeable response) { + // intentionally not closing just yet, otherwise users wouldn't be able to get any data out of it! + return new ResponseClosingInputStream(input, response); } @Override @@ -96,4 +98,26 @@ public String toString() { return "InputStreamDeserializer{}"; } } + + static class ResponseClosingInputStream extends ForwardingInputStream { + private final InputStream delegate; + private final Closeable response; + + ResponseClosingInputStream(InputStream delegate, Closeable response) { + this.delegate = delegate; + this.response = response; + } + + @Override + InputStream delegate() { + return delegate; + } + + @Override + public void close() throws IOException { + // TODO(dfox): try-catch? + super.close(); + response.close(); + } + } } diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDe.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDe.java index 461c0958f..82406c1b2 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDe.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDe.java @@ -31,6 +31,7 @@ import com.palantir.logsafe.UnsafeArg; import com.palantir.logsafe.exceptions.SafeIllegalArgumentException; import com.palantir.logsafe.exceptions.SafeRuntimeException; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -234,7 +235,7 @@ public T deserialize(Response response) { SafeArg.of("received", response.headers().keySet())); } Encoding.Deserializer deserializer = getResponseDeserializer(contentType.get()); - return deserializer.deserialize(response.body()); + return deserializer.deserialize(response.body(), response); } @Override @@ -258,12 +259,17 @@ Encoding.Deserializer getResponseDeserializer(String contentType) { private Encoding.Deserializer throwingDeserializer(String contentType) { return new Encoding.Deserializer() { @Override - public T deserialize(InputStream input) { + public T deserialize(InputStream input, Closeable response) { try { input.close(); } catch (RuntimeException | IOException e) { // empty } + try { + response.close(); + } catch (RuntimeException | IOException e) { + // empty + } throw new SafeRuntimeException( "Unsupported Content-Type", SafeArg.of("received", contentType), diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encoding.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encoding.java index b4e4fc695..edddd6988 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encoding.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encoding.java @@ -17,6 +17,7 @@ package com.palantir.conjure.java.dialogue.serde; import com.palantir.dialogue.TypeMarker; +import java.io.Closeable; import java.io.InputStream; import java.io.OutputStream; @@ -63,7 +64,7 @@ interface Deserializer { *

Format-related deserialization errors surface as {@link IllegalArgumentException}. Inputs and outputs * must never be null. */ - T deserialize(InputStream input); + T deserialize(InputStream input, Closeable response); } interface Serializer { diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encodings.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encodings.java index bd0816c42..abfbe3a4f 100644 --- a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encodings.java +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/Encodings.java @@ -26,6 +26,7 @@ import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeRuntimeException; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import javax.annotation.Nullable; @@ -61,9 +62,10 @@ public final Serializer serializer(TypeMarker type) { @Override public final Deserializer deserializer(TypeMarker type) { ObjectReader reader = mapper.readerFor(mapper.constructType(type.getType())); - return input -> { - try (InputStream inputStream = input) { - T value = reader.readValue(inputStream); + return (InputStream input, Closeable response) -> { + try (InputStream closeMe = input; + Closeable closeMeToo = response) { + T value = reader.readValue(input); // Bad input should result in a 4XX response status, throw IAE rather than NPE. Preconditions.checkArgument(value != null, "cannot deserialize a JSON null value"); return value; diff --git a/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ForwardingInputStream.java b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ForwardingInputStream.java new file mode 100644 index 000000000..b7cb864a3 --- /dev/null +++ b/dialogue-serde/src/main/java/com/palantir/conjure/java/dialogue/serde/ForwardingInputStream.java @@ -0,0 +1,73 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.conjure.java.dialogue.serde; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; + +abstract class ForwardingInputStream extends InputStream { + + /** All methods delegate to this instance. */ + @VisibleForTesting + abstract InputStream delegate(); + + @Override + public int read() throws IOException { + return delegate().read(); + } + + @Override + public int read(byte[] bytes) throws IOException { + return delegate().read(bytes); + } + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + return delegate().read(bytes, off, len); + } + + @Override + public long skip(long num) throws IOException { + return delegate().skip(num); + } + + @Override + public int available() throws IOException { + return delegate().available(); + } + + @Override + public void close() throws IOException { + delegate().close(); + } + + @Override + public void mark(int readlimit) { + delegate().mark(readlimit); + } + + @Override + public void reset() throws IOException { + delegate().reset(); + } + + @Override + public boolean markSupported() { + return delegate().markSupported(); + } +} diff --git a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDeTest.java b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDeTest.java index 92d51161b..ae5544644 100644 --- a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDeTest.java +++ b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/ConjureBodySerDeTest.java @@ -33,6 +33,7 @@ import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.exceptions.SafeIllegalArgumentException; import com.palantir.logsafe.exceptions.SafeRuntimeException; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Optional; @@ -216,7 +217,7 @@ public Serializer serializer(TypeMarker _type) { @Override @SuppressWarnings("unchecked") public Deserializer deserializer(TypeMarker type) { - return input -> { + return (InputStream input, Closeable response) -> { Preconditions.checkArgument(TYPE.equals(type), "This stub encoding only supports String"); return (T) getContentType(); }; diff --git a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/DefaultClientsTest.java b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/DefaultClientsTest.java index 1335aa5a0..ddcc15712 100644 --- a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/DefaultClientsTest.java +++ b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/DefaultClientsTest.java @@ -124,13 +124,14 @@ public void testBinaryResponse_inputStreamRemainsUnclosed() throws IOException { TestResponse testResponse = new TestResponse().contentType("application/octet-stream"); responseFuture.set(testResponse); - try (CloseRecordingInputStream inputStream = (CloseRecordingInputStream) Futures.getUnchecked(future)) { + try (InputStream inputStream = Futures.getUnchecked(future)) { assertThat(inputStream.available()) .describedAs("Content should be empty") .isEqualTo(0); - inputStream.assertNotClosed(); + asCloseRecording(inputStream).assertNotClosed(); assertThat(testResponse.isClosed()) - .describedAs("TODO(dfox): what do we do with the actual response at this point??") + .describedAs("It's ok for the Response to remain open for now, it'll be closed when the user " + + "closes the InputStream") .isFalse(); } @@ -138,8 +139,8 @@ public void testBinaryResponse_inputStreamRemainsUnclosed() throws IOException { .describedAs("User has closed it now") .isTrue(); assertThat(testResponse.isClosed()) - .describedAs("TODO(dfox): I think this should magically be closed by now") - .isFalse(); + .describedAs("Closing the InputStream also closed the Response") + .isTrue(); } @Test @@ -153,13 +154,14 @@ public void testOptionalBinaryResponse_inputStreamRemainsUnclosed() throws IOExc responseFuture.set(testResponse); Optional maybeInputStream = Futures.getUnchecked(future); - try (CloseRecordingInputStream inputStream = (CloseRecordingInputStream) maybeInputStream.get()) { + try (InputStream inputStream = maybeInputStream.get()) { assertThat(inputStream.available()) .describedAs("Content should be empty") .isEqualTo(0); - inputStream.assertNotClosed(); + asCloseRecording(inputStream).assertNotClosed(); assertThat(testResponse.isClosed()) - .describedAs("TODO(dfox): what do we do with the actual response at this point??") + .describedAs("It's ok for the Response to remain open for now, it'll be closed when the user " + + "closes the InputStream") .isFalse(); } @@ -167,8 +169,8 @@ public void testOptionalBinaryResponse_inputStreamRemainsUnclosed() throws IOExc .describedAs("User has closed it now") .isTrue(); assertThat(testResponse.isClosed()) - .describedAs("TODO(dfox): I think this should magically be closed by now") - .isFalse(); + .describedAs("Closing the InputStream also closed the Response") + .isTrue(); } @Test @@ -186,4 +188,8 @@ public void testCallClosesRequestOnCompletion_failure() { assertThat(result).isDone(); verify(body).close(); } + + private static CloseRecordingInputStream asCloseRecording(InputStream inputStream) { + return (CloseRecordingInputStream) ((ForwardingInputStream) inputStream).delegate(); + } } diff --git a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/EncodingsTest.java b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/EncodingsTest.java index 28405c675..cb7758254 100644 --- a/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/EncodingsTest.java +++ b/dialogue-serde/src/test/java/com/palantir/conjure/java/dialogue/serde/EncodingsTest.java @@ -79,6 +79,8 @@ private void serialize(Object object, OutputStream stream) throws IOException { } private T deserialize(InputStream stream, TypeMarker token) throws IOException { - return json.deserializer(token).deserialize(stream); + return json.deserializer(token).deserialize(stream, () -> { + // empty + }); } }