From 434da832118f72b4ea8869dd6704a6a186d33090 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 28 Jun 2024 12:38:13 -0700 Subject: [PATCH] Add `Publisher.fromInputStream(InputStream, ByteArrayMapper)` (#2989) Motivation: Existing `Publisher.fromInputStream(InputStream)` contract has a side effect of an extra allocation in case the pre-allocated byte-array was not completely full of data. Modifications: - Add `Publisher.fromInputStream(InputStream, ByteArrayMapper)` that allows users to decide how to use the buffer region with data; - Deprecate pre-existing `Publisher.fromInputStream(InputStream)` and `Publisher.fromInputStream(InputStream, int)` overloads; - Update all existing use-cases to use `BufferAllocator.wrap` as a method reference as a `ByteArrayMapper`; Result: No impact on throughput and latency for `BlockingStreamingHttpClient` requests with `InputStream` payload (8Kb, 16Kb, 24Kb), but significant reduction in memory allocations and number of GC runs per benchmark: Allocations: ~2Gb/s -> ~1.4Gb/s Young Collection GC Count: 29 -> 23 Alloc Outside TLABs: 7.33 GiB -> 5.04 GiB --- .../concurrent/api/ByteArrayMapper.java | 79 +++++++++++++ .../api/FromInputStreamPublisher.java | 107 ++++++++++-------- .../servicetalk/concurrent/api/Publisher.java | 43 ++++++- .../api/FromInputStreamPublisherTest.java | 9 +- ...ksonSerializerMessageBodyReaderWriter.java | 2 +- ...obufSerializerMessageBodyReaderWriter.java | 2 +- .../http/jaxrs/HelloWorldJaxRsResource.java | 2 +- .../DefaultBlockingStreamingHttpRequest.java | 3 +- .../DefaultBlockingStreamingHttpResponse.java | 3 +- .../AbstractMessageBodyReaderWriter.java | 2 +- 10 files changed, 191 insertions(+), 61 deletions(-) create mode 100644 servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ByteArrayMapper.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ByteArrayMapper.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ByteArrayMapper.java new file mode 100644 index 0000000000..05f35ca055 --- /dev/null +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ByteArrayMapper.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.api.FromInputStreamPublisher.ToByteArrayMapper; + +import static io.servicetalk.concurrent.api.FromInputStreamPublisher.DEFAULT_MAX_BUFFER_SIZE; +import static io.servicetalk.concurrent.api.FromInputStreamPublisher.ToByteArrayMapper.DEFAULT_TO_BYTE_ARRAY_MAPPER; + +/** + * A mapper to transform {@code byte[]} buffer regions into a desired type {@code T}. + * + * @param Type of the result of this mapper + */ +@FunctionalInterface +public interface ByteArrayMapper { + + /** + * Maps a specified {@code byte[]} buffer region into a {@code T}. + *

+ * The mapper can operate only within the specified region of the {@code buffer}, which can be safely used without a + * need to copy data. Access to other parts of the buffer may lead to unexpected results and due care should be + * taken to avoid leaking that data through the returned type {@code T}. + * + * @param buffer {@code byte[]} buffer with data + * @param offset the offset of the region + * @param length the length of the region + * @return result of type {@code T} + */ + T map(byte[] buffer, int offset, int length); + + /** + * Returns the maximum allowed buffer size for the {@link #map(byte[], int, int)} operation. + *

+ * Must be a positive number. + * + * @return the maximum allowed buffer size for the {@link #map(byte[], int, int)} operation + */ + default int maxBufferSize() { + return DEFAULT_MAX_BUFFER_SIZE; + } + + /** + * Mapper from the buffer region to an independent {@code byte[]} buffer. + *

+ * Returns {@link #toByteArray(int)} with default {@link #maxBufferSize()}. + * + * @return a mapper from the buffer region to an independent {@code byte[]} buffer + */ + static ByteArrayMapper toByteArray() { + return DEFAULT_TO_BYTE_ARRAY_MAPPER; + } + + /** + * Mapper from the buffer region to an independent {@code byte[]} buffer. + *

+ * Returns the original {@code byte[]} buffer as-is if it was completely full of data or allocates a new buffer for + * the specified length and copies data. Returned {@code byte[]} buffer is always completely full. + * + * @param maxBufferSize the value for {@link #maxBufferSize()} + * @return a mapper from the buffer region to an independent {@code byte[]} buffer + */ + static ByteArrayMapper toByteArray(final int maxBufferSize) { + return new ToByteArrayMapper(maxBufferSize); + } +} diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java index e499266378..c6bee691b3 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java @@ -30,6 +30,7 @@ import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; +import static io.servicetalk.utils.internal.NumberUtils.ensurePositive; import static java.lang.Math.min; import static java.lang.System.arraycopy; import static java.util.Objects.requireNonNull; @@ -37,12 +38,14 @@ /** * A {@link Publisher} created from an {@link InputStream} such that any data requested from the {@link Publisher} is * read from the {@link InputStream} until it terminates. - * + *

* Given that {@link InputStream} is a blocking API, requesting data from the {@link Publisher} can block on {@link * Subscription#request(long)} until there is sufficient data available. The implementation attempts to minimize * blocking, however by reading data faster than the writer is sending, blocking is inevitable. + * + * @param Type of items emitted to the {@link PublisherSource.Subscriber}. */ -final class FromInputStreamPublisher extends Publisher implements PublisherSource { +final class FromInputStreamPublisher extends Publisher implements PublisherSource { private static final Logger LOGGER = LoggerFactory.getLogger(FromInputStreamPublisher.class); // While sun.nio.ch.FileChannelImpl and java.io.InputStream.transferTo(...) use 8Kb chunks, // we use 16Kb-32B because 16Kb is: @@ -53,7 +56,8 @@ final class FromInputStreamPublisher extends Publisher implements Publis // write hits SslHandler. This helps utilize the full potential of the transport without fragmentation at TLS/HTTP/2 // layers or introducing too many flushes (they are expensive!) for large payloads. Benchmarks confirmed that // subtraction of 32B significantly improves throughput and latency for TLS and has no effect on plaintext traffic. - private static final int DEFAULT_READ_CHUNK_SIZE = 16 * 1024 - 32; + static final int DEFAULT_MAX_BUFFER_SIZE = 16 * 1024 - 32; + @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater subscribedUpdater = AtomicIntegerFieldUpdater.newUpdater(FromInputStreamPublisher.class, "subscribed"); @@ -71,42 +75,30 @@ final class FromInputStreamPublisher extends Publisher implements Publis private volatile int subscribed; private final InputStream stream; - private final int readChunkSize; + private final ByteArrayMapper mapper; /** * A new instance. * * @param stream the {@link InputStream} to expose as a {@link Publisher} + * @param mapper a mapper to transform a {@code byte[]} buffer into a desired type {@code T} that will be emitted by + * the {@link Publisher} */ - FromInputStreamPublisher(final InputStream stream) { - this(stream, DEFAULT_READ_CHUNK_SIZE); - } - - /** - * A new instance. - * - * @param stream the {@link InputStream} to expose as a {@link Publisher} - * @param readChunkSize the maximum length of {@code byte[]} chunks which will be read from the {@link InputStream} - * and emitted by the {@link Publisher}. - */ - FromInputStreamPublisher(final InputStream stream, final int readChunkSize) { + FromInputStreamPublisher(final InputStream stream, final ByteArrayMapper mapper) { this.stream = requireNonNull(stream); - if (readChunkSize <= 0) { - throw new IllegalArgumentException("readChunkSize: " + readChunkSize + " (expected: >0)"); - } - this.readChunkSize = readChunkSize; + this.mapper = requireNonNull(mapper); } @Override - public void subscribe(final Subscriber subscriber) { + public void subscribe(final Subscriber subscriber) { subscribeInternal(subscriber); } @Override - protected void handleSubscribe(final Subscriber subscriber) { + protected void handleSubscribe(final Subscriber subscriber) { if (subscribedUpdater.compareAndSet(this, 0, 1)) { try { - subscriber.onSubscribe(new InputStreamPublisherSubscription(stream, subscriber, readChunkSize)); + subscriber.onSubscribe(new InputStreamPublisherSubscription<>(stream, subscriber, mapper)); } catch (Throwable t) { handleExceptionFromOnSubscribe(subscriber, t); } @@ -115,7 +107,7 @@ protected void handleSubscribe(final Subscriber subscriber) { } } - private static final class InputStreamPublisherSubscription implements Subscription { + private static final class InputStreamPublisherSubscription implements Subscription { private static final int END_OF_FILE = -1; /** @@ -124,8 +116,8 @@ private static final class InputStreamPublisherSubscription implements Subscript private static final int TERMINAL_SENT = -1; private final InputStream stream; - private final Subscriber subscriber; - private final int readChunkSize; + private final Subscriber subscriber; + private final ByteArrayMapper mapper; /** * Contains the outstanding demand or {@link #TERMINAL_SENT} indicating when {@link InputStream} and {@link * Subscription} are terminated. @@ -134,11 +126,11 @@ private static final class InputStreamPublisherSubscription implements Subscript private int writeIdx; private boolean ignoreRequests; - InputStreamPublisherSubscription(final InputStream stream, final Subscriber subscriber, - final int readChunkSize) { + InputStreamPublisherSubscription(final InputStream stream, final Subscriber subscriber, + final ByteArrayMapper mapper) { this.stream = stream; this.subscriber = subscriber; - this.readChunkSize = readChunkSize; + this.mapper = mapper; } @Override @@ -170,7 +162,7 @@ public void cancel() { } } - private void readAndDeliver(final Subscriber subscriber) { + private void readAndDeliver(final Subscriber subscriber) { try { do { // Initialize readByte with a negative value different from END_OF_FILE as an indicator that it was @@ -191,8 +183,8 @@ private void readAndDeliver(final Subscriber subscriber) { if (available == 0) { // This InputStream either does not implement available() method at all, or does not honor // the 0 == EOF contract, or does not prefetch data in larger chunks. - // In this case, we attempt to read based on the configured readChunkSize: - available = readChunkSize; + // In this case, we attempt to read based on the configured maxBufferSize: + available = mapper.maxBufferSize(); } } available = readAvailableAndEmit(available, readByte); @@ -207,9 +199,10 @@ private void readAndDeliver(final Subscriber subscriber) { } private int readAvailableAndEmit(final int available, final int readByte) throws IOException { + final int readChunkSize = mapper.maxBufferSize(); final byte[] buffer; if (readByte >= 0) { - buffer = new byte[available < readChunkSize ? available + 1 : readChunkSize]; + buffer = new byte[min(available + 1, readChunkSize)]; buffer[writeIdx++] = (byte) readByte; } else { buffer = new byte[min(available, readChunkSize)]; @@ -233,7 +226,7 @@ private int fillBuffer(final byte[] buffer, int available) throws IOException { return available; } - private void emitSingleBuffer(final Subscriber subscriber, + private void emitSingleBuffer(final Subscriber subscriber, final byte[] buffer, final int remainingLength) { if (writeIdx < 1) { assert remainingLength == END_OF_FILE : @@ -241,21 +234,13 @@ private void emitSingleBuffer(final Subscriber subscriber, return; } assert writeIdx <= buffer.length : "writeIdx can not be grater than buffer.length"; - final byte[] b; - if (writeIdx == buffer.length) { - b = buffer; - } else { - // this extra copy is necessary when we read the last chunk and total number of bytes read before EOF - // is less than guesstimated buffer size - b = new byte[writeIdx]; - arraycopy(buffer, 0, b, 0, writeIdx); - } + final T item = mapper.map(buffer, 0, writeIdx); requested--; writeIdx = 0; - subscriber.onNext(b); + subscriber.onNext(item); } - private void sendOnComplete(final Subscriber subscriber) { + private void sendOnComplete(final Subscriber subscriber) { closeStream(subscriber); if (trySetTerminalSent()) { try { @@ -266,7 +251,7 @@ private void sendOnComplete(final Subscriber subscriber) { } } - private void sendOnError(final Subscriber subscriber, final Throwable t) { + private void sendOnError(final Subscriber subscriber, final Throwable t) { if (trySetTerminalSent()) { try { subscriber.onError(t); @@ -285,7 +270,7 @@ private Throwable closeStreamOnError(Throwable t) { return t; } - private void closeStream(final Subscriber subscriber) { + private void closeStream(final Subscriber subscriber) { try { stream.close(); } catch (Throwable e) { @@ -306,4 +291,32 @@ private boolean trySetTerminalSent() { return true; } } + + static final class ToByteArrayMapper implements ByteArrayMapper { + + static final ByteArrayMapper DEFAULT_TO_BYTE_ARRAY_MAPPER = + new ToByteArrayMapper(DEFAULT_MAX_BUFFER_SIZE); + + private final int maxBufferSize; + + ToByteArrayMapper(final int maxBufferSize) { + this.maxBufferSize = ensurePositive(maxBufferSize, "maxBufferSize"); + } + + @Override + public byte[] map(final byte[] buffer, final int offset, final int length) { + if (offset == 0 && length == buffer.length) { + return buffer; + } else { + final byte[] partial = new byte[length]; + arraycopy(buffer, offset, partial, 0, length); + return partial; + } + } + + @Override + public int maxBufferSize() { + return maxBufferSize; + } + } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 96e2e6c4e6..3a79c5adbb 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -4522,9 +4522,12 @@ public static Publisher fromBlockingIterable(BlockingIterable fromInputStream(InputStream stream) { - return new FromInputStreamPublisher(stream); + return fromInputStream(stream, ByteArrayMapper.toByteArray()); } /** @@ -4552,9 +4555,45 @@ public static Publisher fromInputStream(InputStream stream) { * and emitted by the returned {@link Publisher}. * @return a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the * {@link Subscriber} and then {@link Subscriber#onComplete()}. + * @deprecated Use {@link #fromInputStream(InputStream, ByteArrayMapper)} with + * {@link ByteArrayMapper#toByteArray(int)}. */ + @Deprecated // FIXME: 0.43 - remove deprecated method public static Publisher fromInputStream(InputStream stream, int readChunkSize) { - return new FromInputStreamPublisher(stream, readChunkSize); + return fromInputStream(stream, ByteArrayMapper.toByteArray(readChunkSize)); + } + + /** + * Create a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the + * {@link Subscriber} as a mapped type {@code T} and then {@link Subscriber#onComplete()}. + *

+ * The resulting publisher is not replayable and supports only a single {@link Subscriber}. + *

+ * After a returned {@link Publisher} is subscribed, it owns the passed {@link InputStream}, meaning that the + * {@link InputStream} will be automatically closed when the {@link Publisher} is cancelled or terminated. Not + * necessary to close the {@link InputStream} after subscribe, but it should be closed when control flow never + * subscribes to the returned {@link Publisher}. + *

+ * The Reactive Streams specification provides two criteria ( + * 3.4, and + * 3.5) stating + * the {@link Subscription} should be "responsive". The responsiveness of the associated {@link Subscription}s will + * depend upon the behavior of the {@code stream} below. Make sure the {@link Executor} for this execution chain + * can tolerate this responsiveness and any blocking behavior. + *

+ * Given the blocking nature of {@link InputStream}, assume {@link Subscription#request(long)} can block when the + * underlying {@link InputStream} blocks on {@link InputStream#read(byte[], int, int)}. + * + * @param stream provides the data in the form of {@code byte[]} buffer regions for the specified + * {@link ByteArrayMapper}. + * @param mapper a mapper to transform raw {@code byte[]} buffer regions into a desired type {@code T} to be emitted + * to the {@link Subscriber} by the returned {@link Publisher}. + * @param Type of the items emitted by the returned {@link Publisher}. + * @return a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the + * {@link Subscriber} as a mapped type {@code T} and then {@link Subscriber#onComplete()}. + */ + public static Publisher fromInputStream(InputStream stream, ByteArrayMapper mapper) { + return new FromInputStreamPublisher<>(stream, mapper); } /** diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java index 41a50e91a7..c80d4ff6b4 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; +import static io.servicetalk.concurrent.api.Publisher.fromInputStream; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static java.lang.Integer.MAX_VALUE; @@ -69,7 +70,7 @@ class FromInputStreamPublisherTest { @BeforeEach void setup() { inputStream = mock(InputStream.class); - pub = new FromInputStreamPublisher(inputStream); + pub = fromInputStream(inputStream); } @Test @@ -374,7 +375,7 @@ void doNotFailOnInputStreamWithBrokenAvailableCall(int readChunkSize) throws Thr // calls to "broken" available() should consistently return `0`. initChunkedStream(bigBuff, of(3, 0, 0, 4, 0, 0, 5, 0, 0, 2, 0, 0, 0, 0, 4, 0), of(3, 7, 4, 4, 5, 2, 2, 2, 1, 2, 1, 1, 1, 1, 1, 4, 0)); - pub = new FromInputStreamPublisher(inputStream, readChunkSize); + pub = fromInputStream(inputStream, readChunkSize); if (readChunkSize > bigBuff.length) { byte[][] items = { @@ -418,7 +419,7 @@ void singleReadTriggersMoreAvailability() throws Throwable { // returns a non zero value, we should return a chunk value of "chunks[idx - 1] - number of read bytes". initChunkedStream(bigBuff, of(0, 1, 0, 7, 0, 8, 1, 0, 17, 10, 2, 0), of(2, 8, 9, 1, 18, 10, 2, 0)); - pub = new FromInputStreamPublisher(inputStream, 8); + pub = fromInputStream(inputStream, 8); byte[][] items = { // available < readChunkSize @@ -444,7 +445,7 @@ void singleReadTriggersMoreAvailability() throws Throwable { void readChunkSizeRespectedWhenAvailableNotImplemented(int chunkSize) throws Throwable { initChunkedStream(bigBuff, ofAll(0), ofAll(chunkSize)); int readChunkSize = 5; - pub = new FromInputStreamPublisher(inputStream, readChunkSize); + pub = fromInputStream(inputStream, readChunkSize); // expect 8 emitted items // [ 0, 1, 2, 3, 4] diff --git a/servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/JacksonSerializerMessageBodyReaderWriter.java b/servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/JacksonSerializerMessageBodyReaderWriter.java index 33db71a35d..c7c3603b28 100644 --- a/servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/JacksonSerializerMessageBodyReaderWriter.java +++ b/servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/JacksonSerializerMessageBodyReaderWriter.java @@ -231,7 +231,7 @@ private JacksonSerializerFactory getJacksonSerializerFactory(final MediaType med } private static Publisher toBufferPublisher(final InputStream is, final BufferAllocator a) { - return fromInputStream(is).map(a::wrap); + return fromInputStream(is, a::wrap); } // FIXME: 0.43 - Remove deprecations diff --git a/servicetalk-data-protobuf-jersey/src/main/java/io/servicetalk/data/protobuf/jersey/ProtobufSerializerMessageBodyReaderWriter.java b/servicetalk-data-protobuf-jersey/src/main/java/io/servicetalk/data/protobuf/jersey/ProtobufSerializerMessageBodyReaderWriter.java index e11d0efd73..39bf7aa273 100644 --- a/servicetalk-data-protobuf-jersey/src/main/java/io/servicetalk/data/protobuf/jersey/ProtobufSerializerMessageBodyReaderWriter.java +++ b/servicetalk-data-protobuf-jersey/src/main/java/io/servicetalk/data/protobuf/jersey/ProtobufSerializerMessageBodyReaderWriter.java @@ -168,7 +168,7 @@ private ProtobufSerializerFactory getSerializerFactory(final MediaType mediaType } private static Publisher toBufferPublisher(final InputStream is, final BufferAllocator a) { - return fromInputStream(is).map(a::wrap); + return fromInputStream(is, a::wrap); } private static Single deserialize( diff --git a/servicetalk-examples/http/jaxrs/src/main/java/io/servicetalk/examples/http/jaxrs/HelloWorldJaxRsResource.java b/servicetalk-examples/http/jaxrs/src/main/java/io/servicetalk/examples/http/jaxrs/HelloWorldJaxRsResource.java index e1d118aac1..b66895bd5d 100644 --- a/servicetalk-examples/http/jaxrs/src/main/java/io/servicetalk/examples/http/jaxrs/HelloWorldJaxRsResource.java +++ b/servicetalk-examples/http/jaxrs/src/main/java/io/servicetalk/examples/http/jaxrs/HelloWorldJaxRsResource.java @@ -219,7 +219,7 @@ public Single multipartHello(@Context final ConnectionContext ctx, @FormDataParam("file") InputStream file) { final BufferAllocator allocator = ctx.executionContext().bufferAllocator(); return from(allocator.fromAscii("Hello multipart! Content: ")) - .concat(fromInputStream(file).map(allocator::wrap)) + .concat(fromInputStream(file, allocator::wrap)) .collect(allocator::newCompositeBuffer, (collector, item) -> ((CompositeBuffer) collector).addBuffer(item)); } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpRequest.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpRequest.java index 8675aa724d..c71293a4e2 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpRequest.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpRequest.java @@ -197,8 +197,7 @@ public BlockingStreamingHttpRequest payloadBody(final Iterable payloadBo @Override public BlockingStreamingHttpRequest payloadBody(final InputStream payloadBody) { - original.payloadBody(fromInputStream(payloadBody) - .map(bytes -> original.payloadHolder().allocator().wrap(bytes))); + original.payloadBody(fromInputStream(payloadBody, original.payloadHolder().allocator()::wrap)); return this; } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpResponse.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpResponse.java index 61b8137ae7..68a44650ca 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpResponse.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultBlockingStreamingHttpResponse.java @@ -81,8 +81,7 @@ public BlockingStreamingHttpResponse payloadBody(final Iterable payloadB @Override public BlockingStreamingHttpResponse payloadBody(final InputStream payloadBody) { - original.payloadBody(fromInputStream(payloadBody) - .map(bytes -> original.payloadHolder().allocator().wrap(bytes))); + original.payloadBody(fromInputStream(payloadBody, original.payloadHolder().allocator()::wrap)); return this; } diff --git a/servicetalk-http-router-jersey/src/main/java/io/servicetalk/http/router/jersey/AbstractMessageBodyReaderWriter.java b/servicetalk-http-router-jersey/src/main/java/io/servicetalk/http/router/jersey/AbstractMessageBodyReaderWriter.java index a8a28fdcb3..c8e810e62e 100644 --- a/servicetalk-http-router-jersey/src/main/java/io/servicetalk/http/router/jersey/AbstractMessageBodyReaderWriter.java +++ b/servicetalk-http-router-jersey/src/main/java/io/servicetalk/http/router/jersey/AbstractMessageBodyReaderWriter.java @@ -89,7 +89,7 @@ final SourceOfT readFrom(final InputStream entityStream, return handleEntityStream(entityStream, allocator, bodyFunction, (is, a) -> bodyFunction .andThen(sourceFunction) - .apply(fromInputStream(is).map(a::wrap), a)); + .apply(fromInputStream(is, a::wrap), a)); } @Override