Skip to content

Commit

Permalink
Closing binary InputStreams triggers Response to close
Browse files Browse the repository at this point in the history
  • Loading branch information
iamdanfox committed Mar 27, 2020
1 parent 2e7f992 commit 78a4edf
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.palantir.dialogue.TypeMarker;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;

Expand Down Expand Up @@ -71,9 +73,9 @@ enum OptionalInputStreamDeserializer implements Deserializer<Optional<InputStrea
INSTANCE;

@Override
public Optional<InputStream> deserialize(InputStream input) {
// intentionally not closing this, otherwise users wouldn't be able to get any data out of it!
return Optional.of(input);
public Optional<InputStream> deserialize(InputStream input, Closeable response) {
// intentionally not closing just yet, otherwise users wouldn't be able to get any data out of it!
return Optional.of(new ResponseClosingInputStream(input, response));
}

@Override
Expand All @@ -86,14 +88,36 @@ enum InputStreamDeserializer implements Deserializer<InputStream> {
INSTANCE;

@Override
public InputStream deserialize(InputStream input) {
// intentionally not closing this, otherwise users wouldn't be able to get any data out of it!
return input;
public InputStream deserialize(InputStream input, Closeable response) {
// intentionally not closing just yet, otherwise users wouldn't be able to get any data out of it!
return new ResponseClosingInputStream(input, response);
}

@Override
public String toString() {
return "InputStreamDeserializer{}";
}
}

static class ResponseClosingInputStream extends ForwardingInputStream {
private final InputStream delegate;
private final Closeable response;

ResponseClosingInputStream(InputStream delegate, Closeable response) {
this.delegate = delegate;
this.response = response;
}

@Override
InputStream delegate() {
return delegate;
}

@Override
public void close() throws IOException {
// TODO(dfox): try-catch?
super.close();
response.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -234,7 +235,7 @@ public T deserialize(Response response) {
SafeArg.of("received", response.headers().keySet()));
}
Encoding.Deserializer<T> deserializer = getResponseDeserializer(contentType.get());
return deserializer.deserialize(response.body());
return deserializer.deserialize(response.body(), response);
}

@Override
Expand All @@ -258,12 +259,17 @@ Encoding.Deserializer<T> getResponseDeserializer(String contentType) {
private Encoding.Deserializer<T> throwingDeserializer(String contentType) {
return new Encoding.Deserializer<T>() {
@Override
public T deserialize(InputStream input) {
public T deserialize(InputStream input, Closeable response) {
try {
input.close();
} catch (RuntimeException | IOException e) {
// empty
}
try {
response.close();
} catch (RuntimeException | IOException e) {
// empty
}
throw new SafeRuntimeException(
"Unsupported Content-Type",
SafeArg.of("received", contentType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.palantir.conjure.java.dialogue.serde;

import com.palantir.dialogue.TypeMarker;
import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;

Expand Down Expand Up @@ -63,7 +64,7 @@ interface Deserializer<T> {
* <p>Format-related deserialization errors surface as {@link IllegalArgumentException}. Inputs and outputs
* must never be null.
*/
T deserialize(InputStream input);
T deserialize(InputStream input, Closeable response);
}

interface Serializer<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -61,9 +62,10 @@ public final <T> Serializer<T> serializer(TypeMarker<T> type) {
@Override
public final <T> Deserializer<T> deserializer(TypeMarker<T> type) {
ObjectReader reader = mapper.readerFor(mapper.constructType(type.getType()));
return input -> {
try (InputStream inputStream = input) {
T value = reader.readValue(inputStream);
return (InputStream input, Closeable response) -> {
try (InputStream closeMe = input;
Closeable closeMeToo = response) {
T value = reader.readValue(input);
// Bad input should result in a 4XX response status, throw IAE rather than NPE.
Preconditions.checkArgument(value != null, "cannot deserialize a JSON null value");
return value;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.conjure.java.dialogue.serde;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;

abstract class ForwardingInputStream extends InputStream {

/** All methods delegate to this instance. */
@VisibleForTesting
abstract InputStream delegate();

@Override
public int read() throws IOException {
return delegate().read();
}

@Override
public int read(byte[] bytes) throws IOException {
return delegate().read(bytes);
}

@Override
public int read(byte[] bytes, int off, int len) throws IOException {
return delegate().read(bytes, off, len);
}

@Override
public long skip(long num) throws IOException {
return delegate().skip(num);
}

@Override
public int available() throws IOException {
return delegate().available();
}

@Override
public void close() throws IOException {
delegate().close();
}

@Override
public void mark(int readlimit) {
delegate().mark(readlimit);
}

@Override
public void reset() throws IOException {
delegate().reset();
}

@Override
public boolean markSupported() {
return delegate().markSupported();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
Expand Down Expand Up @@ -216,7 +217,7 @@ public <T> Serializer<T> serializer(TypeMarker<T> _type) {
@Override
@SuppressWarnings("unchecked")
public <T> Deserializer<T> deserializer(TypeMarker<T> type) {
return input -> {
return (InputStream input, Closeable response) -> {
Preconditions.checkArgument(TYPE.equals(type), "This stub encoding only supports String");
return (T) getContentType();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,23 @@ public void testBinaryResponse_inputStreamRemainsUnclosed() throws IOException {
TestResponse testResponse = new TestResponse().contentType("application/octet-stream");
responseFuture.set(testResponse);

try (CloseRecordingInputStream inputStream = (CloseRecordingInputStream) Futures.getUnchecked(future)) {
try (InputStream inputStream = Futures.getUnchecked(future)) {
assertThat(inputStream.available())
.describedAs("Content should be empty")
.isEqualTo(0);
inputStream.assertNotClosed();
asCloseRecording(inputStream).assertNotClosed();
assertThat(testResponse.isClosed())
.describedAs("TODO(dfox): what do we do with the actual response at this point??")
.describedAs("It's ok for the Response to remain open for now, it'll be closed when the user "
+ "closes the InputStream")
.isFalse();
}

assertThat(testResponse.body().isClosed())
.describedAs("User has closed it now")
.isTrue();
assertThat(testResponse.isClosed())
.describedAs("TODO(dfox): I think this should magically be closed by now")
.isFalse();
.describedAs("Closing the InputStream also closed the Response")
.isTrue();
}

@Test
Expand All @@ -153,22 +154,23 @@ public void testOptionalBinaryResponse_inputStreamRemainsUnclosed() throws IOExc
responseFuture.set(testResponse);

Optional<InputStream> maybeInputStream = Futures.getUnchecked(future);
try (CloseRecordingInputStream inputStream = (CloseRecordingInputStream) maybeInputStream.get()) {
try (InputStream inputStream = maybeInputStream.get()) {
assertThat(inputStream.available())
.describedAs("Content should be empty")
.isEqualTo(0);
inputStream.assertNotClosed();
asCloseRecording(inputStream).assertNotClosed();
assertThat(testResponse.isClosed())
.describedAs("TODO(dfox): what do we do with the actual response at this point??")
.describedAs("It's ok for the Response to remain open for now, it'll be closed when the user "
+ "closes the InputStream")
.isFalse();
}

assertThat(testResponse.body().isClosed())
.describedAs("User has closed it now")
.isTrue();
assertThat(testResponse.isClosed())
.describedAs("TODO(dfox): I think this should magically be closed by now")
.isFalse();
.describedAs("Closing the InputStream also closed the Response")
.isTrue();
}

@Test
Expand All @@ -186,4 +188,8 @@ public void testCallClosesRequestOnCompletion_failure() {
assertThat(result).isDone();
verify(body).close();
}

private static CloseRecordingInputStream asCloseRecording(InputStream inputStream) {
return (CloseRecordingInputStream) ((ForwardingInputStream) inputStream).delegate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ private void serialize(Object object, OutputStream stream) throws IOException {
}

private <T> T deserialize(InputStream stream, TypeMarker<T> token) throws IOException {
return json.deserializer(token).deserialize(stream);
return json.deserializer(token).deserialize(stream, () -> {
// empty
});
}
}

0 comments on commit 78a4edf

Please sign in to comment.