Skip to content

Commit

Permalink
BlockingStreamingHttp[Response|Response] to support InputStream setter (
Browse files Browse the repository at this point in the history
#1175)

Motivation:
BlockingStreamingHttp[Response|Response] allow a user to get the payload
body as an InputStream. However the user is not able to set the payload
body as an InputStream. This requires manual conversion from the user
perspective and raises the bar to entry.

Modifications:
- BlockingStreamingHttp[Response|Response] supports an additional method
to set the payloadBody as an InputStream

Result:
BlockingStreamingHttp[Response|Response] APIs are more InputStream
friendly.
  • Loading branch information
Scottmitch authored Oct 12, 2020
1 parent db7d8fb commit a323b86
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ default <T> BlockingIterable<T> payloadBody(HttpDeserializer<T> deserializer) {
/**
* Returns a {@link BlockingStreamingHttpRequest} with its underlying payload set to {@code payloadBody}.
* <p>
* A best effort will be made to apply back pressure to the existing {@link Iterable} payload body. If this
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(UnaryOperator)} for more fine grain
* control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing {@link Iterable} payload body.
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body.
* @return {@code this}
*/
Expand All @@ -71,26 +71,40 @@ default <T> BlockingIterable<T> payloadBody(HttpDeserializer<T> deserializer) {
/**
* Returns a {@link BlockingStreamingHttpRequest} with its underlying payload set to {@code payloadBody}.
* <p>
* A best effort will be made to apply back pressure to the existing {@link CloseableIterable} payload body. If this
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(UnaryOperator)} for more fine grain
* control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing {@link CloseableIterable} payload body.
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body.
* @return {@code this}
*/
BlockingStreamingHttpRequest payloadBody(CloseableIterable<Buffer> payloadBody);

/**
* Returns a {@link BlockingStreamingHttpRequest} with its underlying payload set to {@code payloadBody}.
* <p>
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(UnaryOperator)} for more fine grain
* control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body.
* @return {@code this}
*/
BlockingStreamingHttpRequest payloadBody(InputStream payloadBody);

/**
* Returns a {@link BlockingStreamingHttpRequest} with its underlying payload set to the result of serialization.
* <p>
* A best effort will be made to apply back pressure to the existing {@link Iterable} payload body. If this
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(Function, HttpSerializer)} for more
* fine grain control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing {@link Iterable} payload body.
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body, prior to serialization.
* @param serializer Used to serialize the payload body.
* @param <T> The type of objects to serialize.
Expand All @@ -101,12 +115,12 @@ default <T> BlockingIterable<T> payloadBody(HttpDeserializer<T> deserializer) {
/**
* Returns a {@link BlockingStreamingHttpRequest} with its underlying payload set to the result of serialization.
* <p>
* A best effort will be made to apply back pressure to the existing {@link CloseableIterable} payload body. If this
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(Function, HttpSerializer)} for more
* fine grain control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing {@link CloseableIterable} payload body.
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body, prior to serialization.
* @param serializer Used to serialize the payload body.
* @param <T> The type of objects to serialize.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ default <T> BlockingIterable<T> payloadBody(HttpDeserializer<T> deserializer) {
/**
* Returns a {@link BlockingStreamingHttpResponse} with its underlying payload set to {@code payloadBody}.
* <p>
* A best effort will be made to apply back pressure to the existing {@link Iterable} payload body. If this
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(UnaryOperator)} for more fine grain
* control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing {@link Iterable} payload body.
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body.
* @return {@code this}
*/
Expand All @@ -71,26 +71,40 @@ default <T> BlockingIterable<T> payloadBody(HttpDeserializer<T> deserializer) {
/**
* Returns a {@link BlockingStreamingHttpResponse} with its underlying payload set to {@code payloadBody}.
* <p>
* A best effort will be made to apply back pressure to the existing {@link CloseableIterable} payload body. If this
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(UnaryOperator)} for more fine grain
* control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing {@link CloseableIterable} payload body.
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body.
* @return {@code this}
*/
BlockingStreamingHttpResponse payloadBody(CloseableIterable<Buffer> payloadBody);

/**
* Returns a {@link BlockingStreamingHttpResponse} with its underlying payload set to {@code payloadBody}.
* <p>
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(UnaryOperator)} for more fine grain
* control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body.
* @return {@code this}
*/
BlockingStreamingHttpResponse payloadBody(InputStream payloadBody);

/**
* Returns a {@link BlockingStreamingHttpResponse} with its underlying payload set to the result of serialization.
* <p>
* A best effort will be made to apply back pressure to the existing {@link Iterable} payload body. If this
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(Function, HttpSerializer)} for more
* fine grain control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing {@link Iterable} payload body.
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body, prior to serialization.
* @param serializer Used to serialize the payload body.
* @param <T> The type of objects to serialize.
Expand All @@ -101,12 +115,12 @@ default <T> BlockingIterable<T> payloadBody(HttpDeserializer<T> deserializer) {
/**
* Returns a {@link BlockingStreamingHttpResponse} with its underlying payload set to the result of serialization.
* <p>
* A best effort will be made to apply back pressure to the existing {@link CloseableIterable} payload body. If this
* A best effort will be made to apply back pressure to the existing payload body which is being replaced. If this
* default policy is not sufficient you can use {@link #transformPayloadBody(Function, HttpSerializer)} for more
* fine grain control.
* <p>
* This method reserves the right to delay completion/consumption of {@code payloadBody}. This may occur due to the
* combination with the existing {@link CloseableIterable} payload body.
* combination with the existing payload body that is being replaced.
* @param payloadBody The new payload body, prior to serialization.
* @param serializer Used to serialize the payload body.
* @param <T> The type of objects to serialize.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import io.servicetalk.concurrent.CloseableIterable;
import io.servicetalk.concurrent.api.Single;

import java.io.InputStream;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static io.servicetalk.concurrent.api.Publisher.fromInputStream;
import static io.servicetalk.concurrent.api.Publisher.fromIterable;

final class DefaultBlockingStreamingHttpRequest extends AbstractDelegatingHttpRequest
Expand Down Expand Up @@ -127,6 +129,13 @@ public BlockingStreamingHttpRequest payloadBody(final CloseableIterable<Buffer>
return this;
}

@Override
public BlockingStreamingHttpRequest payloadBody(final InputStream payloadBody) {
original.payloadBody(fromInputStream(payloadBody)
.map(bytes -> original.payloadHolder().allocator().wrap(bytes)));
return this;
}

@Override
public <T> BlockingStreamingHttpRequest payloadBody(final Iterable<T> payloadBody,
final HttpSerializer<T> serializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import io.servicetalk.concurrent.CloseableIterable;
import io.servicetalk.concurrent.api.Single;

import java.io.InputStream;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static io.servicetalk.concurrent.api.Publisher.fromInputStream;
import static io.servicetalk.concurrent.api.Publisher.fromIterable;

final class DefaultBlockingStreamingHttpResponse extends AbstractDelegatingHttpResponse
Expand All @@ -49,6 +51,13 @@ public BlockingStreamingHttpResponse payloadBody(final CloseableIterable<Buffer>
return this;
}

@Override
public BlockingStreamingHttpResponse payloadBody(final InputStream payloadBody) {
original.payloadBody(fromInputStream(payloadBody)
.map(bytes -> original.payloadHolder().allocator().wrap(bytes)));
return this;
}

@Override
public <T> BlockingStreamingHttpResponse payloadBody(final Iterable<T> payloadBody,
final HttpSerializer<T> serializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -142,6 +143,40 @@ public void receivePayloadBody() throws Exception {
assertThat(receivedPayloadBody.toString(), is(HELLO_WORLD));
}

@Test
public void clientRequestInputStreamPayloadBody() throws Exception {
StringBuilder receivedPayloadBody = new StringBuilder();

BlockingStreamingHttpClient client = context((ctx, request, response) -> {
request.payloadBody().forEach(chunk -> receivedPayloadBody.append(chunk.toString(US_ASCII)));
response.sendMetaData().close();
});

BlockingStreamingHttpResponse response = client.request(client.post("/")
.payloadBody(new ByteArrayInputStream(HELLO_WORLD.getBytes(US_ASCII))));
assertResponse(response);
assertThat(response.toResponse().toFuture().get().payloadBody(), is(EMPTY_BUFFER));
assertThat(receivedPayloadBody.toString(), is(HELLO_WORLD));
}

@Test
public void clientResponseInputStreamPayloadBody() throws Exception {
StringBuilder receivedPayloadBody = new StringBuilder();

BlockingStreamingHttpClient client = context((ctx, request, response) -> {
request.payloadBody().forEach(chunk -> receivedPayloadBody.append(chunk.toString(US_ASCII)));
response.sendMetaData().close();
});

String expectedBody = "overwritten";
BlockingStreamingHttpResponse response = client.request(client.post("/")
.payloadBody(new ByteArrayInputStream(HELLO_WORLD.getBytes(US_ASCII))));
assertResponse(response);
response.payloadBody(new ByteArrayInputStream(expectedBody.getBytes(US_ASCII)));
assertThat(response.toResponse().toFuture().get().payloadBody().toString(US_ASCII), is(expectedBody));
assertThat(receivedPayloadBody.toString(), is(HELLO_WORLD));
}

@Test
public void respondWithPayloadBodyAndTrailersUsingPayloadWriter() throws Exception {
respondWithPayloadBodyAndTrailers((ctx, request, response) -> {
Expand Down

0 comments on commit a323b86

Please sign in to comment.