Skip to content

Commit

Permalink
Add Publisher.fromInputStream(InputStream, ByteArrayMapper) (#2989)
Browse files Browse the repository at this point in the history
Motivation:

Existing `Publisher.fromInputStream(InputStream)` contract has a side effect of an extra allocation in case the pre-allocated byte-array was not completely full of data.

Modifications:

- Add `Publisher.fromInputStream(InputStream, ByteArrayMapper)` that allows users to decide how to use the buffer region with data;
- Deprecate pre-existing `Publisher.fromInputStream(InputStream)` and `Publisher.fromInputStream(InputStream, int)` overloads;
- Update all existing use-cases to use `BufferAllocator.wrap` as a method reference as a `ByteArrayMapper`;

Result:

No impact on throughput and latency for `BlockingStreamingHttpClient` requests with `InputStream` payload (8Kb, 16Kb, 24Kb), but significant reduction in memory allocations and number of GC runs per benchmark:

Allocations: ~2Gb/s -> ~1.4Gb/s
Young Collection GC Count: 29 -> 23
Alloc Outside TLABs: 7.33 GiB -> 5.04 GiB
  • Loading branch information
idelpivnitskiy authored Jun 28, 2024
1 parent 63089ee commit 434da83
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.api.FromInputStreamPublisher.ToByteArrayMapper;

import static io.servicetalk.concurrent.api.FromInputStreamPublisher.DEFAULT_MAX_BUFFER_SIZE;
import static io.servicetalk.concurrent.api.FromInputStreamPublisher.ToByteArrayMapper.DEFAULT_TO_BYTE_ARRAY_MAPPER;

/**
* A mapper to transform {@code byte[]} buffer regions into a desired type {@code T}.
*
* @param <T> Type of the result of this mapper
*/
@FunctionalInterface
public interface ByteArrayMapper<T> {

/**
* Maps a specified {@code byte[]} buffer region into a {@code T}.
* <p>
* The mapper can operate only within the specified region of the {@code buffer}, which can be safely used without a
* need to copy data. Access to other parts of the buffer may lead to unexpected results and due care should be
* taken to avoid leaking that data through the returned type {@code T}.
*
* @param buffer {@code byte[]} buffer with data
* @param offset the offset of the region
* @param length the length of the region
* @return result of type {@code T}
*/
T map(byte[] buffer, int offset, int length);

/**
* Returns the maximum allowed buffer size for the {@link #map(byte[], int, int)} operation.
* <p>
* Must be a positive number.
*
* @return the maximum allowed buffer size for the {@link #map(byte[], int, int)} operation
*/
default int maxBufferSize() {
return DEFAULT_MAX_BUFFER_SIZE;
}

/**
* Mapper from the buffer region to an independent {@code byte[]} buffer.
* <p>
* Returns {@link #toByteArray(int)} with default {@link #maxBufferSize()}.
*
* @return a mapper from the buffer region to an independent {@code byte[]} buffer
*/
static ByteArrayMapper<byte[]> toByteArray() {
return DEFAULT_TO_BYTE_ARRAY_MAPPER;
}

/**
* Mapper from the buffer region to an independent {@code byte[]} buffer.
* <p>
* Returns the original {@code byte[]} buffer as-is if it was completely full of data or allocates a new buffer for
* the specified length and copies data. Returned {@code byte[]} buffer is always completely full.
*
* @param maxBufferSize the value for {@link #maxBufferSize()}
* @return a mapper from the buffer region to an independent {@code byte[]} buffer
*/
static ByteArrayMapper<byte[]> toByteArray(final int maxBufferSize) {
return new ToByteArrayMapper(maxBufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,22 @@
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import static java.lang.Math.min;
import static java.lang.System.arraycopy;
import static java.util.Objects.requireNonNull;

/**
* A {@link Publisher} created from an {@link InputStream} such that any data requested from the {@link Publisher} is
* read from the {@link InputStream} until it terminates.
*
* <p>
* Given that {@link InputStream} is a blocking API, requesting data from the {@link Publisher} can block on {@link
* Subscription#request(long)} until there is sufficient data available. The implementation attempts to minimize
* blocking, however by reading data faster than the writer is sending, blocking is inevitable.
*
* @param <T> Type of items emitted to the {@link PublisherSource.Subscriber}.
*/
final class FromInputStreamPublisher extends Publisher<byte[]> implements PublisherSource<byte[]> {
final class FromInputStreamPublisher<T> extends Publisher<T> implements PublisherSource<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(FromInputStreamPublisher.class);
// While sun.nio.ch.FileChannelImpl and java.io.InputStream.transferTo(...) use 8Kb chunks,
// we use 16Kb-32B because 16Kb is:
Expand All @@ -53,7 +56,8 @@ final class FromInputStreamPublisher extends Publisher<byte[]> implements Publis
// write hits SslHandler. This helps utilize the full potential of the transport without fragmentation at TLS/HTTP/2
// layers or introducing too many flushes (they are expensive!) for large payloads. Benchmarks confirmed that
// subtraction of 32B significantly improves throughput and latency for TLS and has no effect on plaintext traffic.
private static final int DEFAULT_READ_CHUNK_SIZE = 16 * 1024 - 32;
static final int DEFAULT_MAX_BUFFER_SIZE = 16 * 1024 - 32;
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<FromInputStreamPublisher> subscribedUpdater =
AtomicIntegerFieldUpdater.newUpdater(FromInputStreamPublisher.class, "subscribed");

Expand All @@ -71,42 +75,30 @@ final class FromInputStreamPublisher extends Publisher<byte[]> implements Publis
private volatile int subscribed;

private final InputStream stream;
private final int readChunkSize;
private final ByteArrayMapper<T> mapper;

/**
* A new instance.
*
* @param stream the {@link InputStream} to expose as a {@link Publisher}
* @param mapper a mapper to transform a {@code byte[]} buffer into a desired type {@code T} that will be emitted by
* the {@link Publisher}
*/
FromInputStreamPublisher(final InputStream stream) {
this(stream, DEFAULT_READ_CHUNK_SIZE);
}

/**
* A new instance.
*
* @param stream the {@link InputStream} to expose as a {@link Publisher}
* @param readChunkSize the maximum length of {@code byte[]} chunks which will be read from the {@link InputStream}
* and emitted by the {@link Publisher}.
*/
FromInputStreamPublisher(final InputStream stream, final int readChunkSize) {
FromInputStreamPublisher(final InputStream stream, final ByteArrayMapper<T> mapper) {
this.stream = requireNonNull(stream);
if (readChunkSize <= 0) {
throw new IllegalArgumentException("readChunkSize: " + readChunkSize + " (expected: >0)");
}
this.readChunkSize = readChunkSize;
this.mapper = requireNonNull(mapper);
}

@Override
public void subscribe(final Subscriber<? super byte[]> subscriber) {
public void subscribe(final Subscriber<? super T> subscriber) {
subscribeInternal(subscriber);
}

@Override
protected void handleSubscribe(final Subscriber<? super byte[]> subscriber) {
protected void handleSubscribe(final Subscriber<? super T> subscriber) {
if (subscribedUpdater.compareAndSet(this, 0, 1)) {
try {
subscriber.onSubscribe(new InputStreamPublisherSubscription(stream, subscriber, readChunkSize));
subscriber.onSubscribe(new InputStreamPublisherSubscription<>(stream, subscriber, mapper));
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
}
Expand All @@ -115,7 +107,7 @@ protected void handleSubscribe(final Subscriber<? super byte[]> subscriber) {
}
}

private static final class InputStreamPublisherSubscription implements Subscription {
private static final class InputStreamPublisherSubscription<T> implements Subscription {

private static final int END_OF_FILE = -1;
/**
Expand All @@ -124,8 +116,8 @@ private static final class InputStreamPublisherSubscription implements Subscript
private static final int TERMINAL_SENT = -1;

private final InputStream stream;
private final Subscriber<? super byte[]> subscriber;
private final int readChunkSize;
private final Subscriber<? super T> subscriber;
private final ByteArrayMapper<T> mapper;
/**
* Contains the outstanding demand or {@link #TERMINAL_SENT} indicating when {@link InputStream} and {@link
* Subscription} are terminated.
Expand All @@ -134,11 +126,11 @@ private static final class InputStreamPublisherSubscription implements Subscript
private int writeIdx;
private boolean ignoreRequests;

InputStreamPublisherSubscription(final InputStream stream, final Subscriber<? super byte[]> subscriber,
final int readChunkSize) {
InputStreamPublisherSubscription(final InputStream stream, final Subscriber<? super T> subscriber,
final ByteArrayMapper<T> mapper) {
this.stream = stream;
this.subscriber = subscriber;
this.readChunkSize = readChunkSize;
this.mapper = mapper;
}

@Override
Expand Down Expand Up @@ -170,7 +162,7 @@ public void cancel() {
}
}

private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
private void readAndDeliver(final Subscriber<? super T> subscriber) {
try {
do {
// Initialize readByte with a negative value different from END_OF_FILE as an indicator that it was
Expand All @@ -191,8 +183,8 @@ private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
if (available == 0) {
// This InputStream either does not implement available() method at all, or does not honor
// the 0 == EOF contract, or does not prefetch data in larger chunks.
// In this case, we attempt to read based on the configured readChunkSize:
available = readChunkSize;
// In this case, we attempt to read based on the configured maxBufferSize:
available = mapper.maxBufferSize();
}
}
available = readAvailableAndEmit(available, readByte);
Expand All @@ -207,9 +199,10 @@ private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
}

private int readAvailableAndEmit(final int available, final int readByte) throws IOException {
final int readChunkSize = mapper.maxBufferSize();
final byte[] buffer;
if (readByte >= 0) {
buffer = new byte[available < readChunkSize ? available + 1 : readChunkSize];
buffer = new byte[min(available + 1, readChunkSize)];
buffer[writeIdx++] = (byte) readByte;
} else {
buffer = new byte[min(available, readChunkSize)];
Expand All @@ -233,29 +226,21 @@ private int fillBuffer(final byte[] buffer, int available) throws IOException {
return available;
}

private void emitSingleBuffer(final Subscriber<? super byte[]> subscriber,
private void emitSingleBuffer(final Subscriber<? super T> subscriber,
final byte[] buffer, final int remainingLength) {
if (writeIdx < 1) {
assert remainingLength == END_OF_FILE :
"unexpected writeIdx == 0 while we still have some remaining data to read";
return;
}
assert writeIdx <= buffer.length : "writeIdx can not be grater than buffer.length";
final byte[] b;
if (writeIdx == buffer.length) {
b = buffer;
} else {
// this extra copy is necessary when we read the last chunk and total number of bytes read before EOF
// is less than guesstimated buffer size
b = new byte[writeIdx];
arraycopy(buffer, 0, b, 0, writeIdx);
}
final T item = mapper.map(buffer, 0, writeIdx);
requested--;
writeIdx = 0;
subscriber.onNext(b);
subscriber.onNext(item);
}

private void sendOnComplete(final Subscriber<? super byte[]> subscriber) {
private void sendOnComplete(final Subscriber<? super T> subscriber) {
closeStream(subscriber);
if (trySetTerminalSent()) {
try {
Expand All @@ -266,7 +251,7 @@ private void sendOnComplete(final Subscriber<? super byte[]> subscriber) {
}
}

private void sendOnError(final Subscriber<? super byte[]> subscriber, final Throwable t) {
private void sendOnError(final Subscriber<? super T> subscriber, final Throwable t) {
if (trySetTerminalSent()) {
try {
subscriber.onError(t);
Expand All @@ -285,7 +270,7 @@ private Throwable closeStreamOnError(Throwable t) {
return t;
}

private void closeStream(final Subscriber<? super byte[]> subscriber) {
private void closeStream(final Subscriber<? super T> subscriber) {
try {
stream.close();
} catch (Throwable e) {
Expand All @@ -306,4 +291,32 @@ private boolean trySetTerminalSent() {
return true;
}
}

static final class ToByteArrayMapper implements ByteArrayMapper<byte[]> {

static final ByteArrayMapper<byte[]> DEFAULT_TO_BYTE_ARRAY_MAPPER =
new ToByteArrayMapper(DEFAULT_MAX_BUFFER_SIZE);

private final int maxBufferSize;

ToByteArrayMapper(final int maxBufferSize) {
this.maxBufferSize = ensurePositive(maxBufferSize, "maxBufferSize");
}

@Override
public byte[] map(final byte[] buffer, final int offset, final int length) {
if (offset == 0 && length == buffer.length) {
return buffer;
} else {
final byte[] partial = new byte[length];
arraycopy(buffer, offset, partial, 0, length);
return partial;
}
}

@Override
public int maxBufferSize() {
return maxBufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4522,9 +4522,12 @@ public static <T> Publisher<T> fromBlockingIterable(BlockingIterable<? extends T
* InputStream#read(byte[], int, int)}.
* @return a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the
* {@link Subscriber} and then {@link Subscriber#onComplete()}.
* @deprecated Use {@link #fromInputStream(InputStream, ByteArrayMapper)} with
* {@link ByteArrayMapper#toByteArray()}.
*/
@Deprecated // FIXME: 0.43 - remove deprecated method
public static Publisher<byte[]> fromInputStream(InputStream stream) {
return new FromInputStreamPublisher(stream);
return fromInputStream(stream, ByteArrayMapper.toByteArray());
}

/**
Expand Down Expand Up @@ -4552,9 +4555,45 @@ public static Publisher<byte[]> fromInputStream(InputStream stream) {
* and emitted by the returned {@link Publisher}.
* @return a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the
* {@link Subscriber} and then {@link Subscriber#onComplete()}.
* @deprecated Use {@link #fromInputStream(InputStream, ByteArrayMapper)} with
* {@link ByteArrayMapper#toByteArray(int)}.
*/
@Deprecated // FIXME: 0.43 - remove deprecated method
public static Publisher<byte[]> fromInputStream(InputStream stream, int readChunkSize) {
return new FromInputStreamPublisher(stream, readChunkSize);
return fromInputStream(stream, ByteArrayMapper.toByteArray(readChunkSize));
}

/**
* Create a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the
* {@link Subscriber} as a mapped type {@code T} and then {@link Subscriber#onComplete()}.
* <p>
* The resulting publisher is not replayable and supports only a single {@link Subscriber}.
* <p>
* After a returned {@link Publisher} is subscribed, it owns the passed {@link InputStream}, meaning that the
* {@link InputStream} will be automatically closed when the {@link Publisher} is cancelled or terminated. Not
* necessary to close the {@link InputStream} after subscribe, but it should be closed when control flow never
* subscribes to the returned {@link Publisher}.
* <p>
* The Reactive Streams specification provides two criteria (
* <a href="https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.4">3.4</a>, and
* <a href="https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5">3.5</a>) stating
* the {@link Subscription} should be "responsive". The responsiveness of the associated {@link Subscription}s will
* depend upon the behavior of the {@code stream} below. Make sure the {@link Executor} for this execution chain
* can tolerate this responsiveness and any blocking behavior.
* <p>
* Given the blocking nature of {@link InputStream}, assume {@link Subscription#request(long)} can block when the
* underlying {@link InputStream} blocks on {@link InputStream#read(byte[], int, int)}.
*
* @param stream provides the data in the form of {@code byte[]} buffer regions for the specified
* {@link ByteArrayMapper}.
* @param mapper a mapper to transform raw {@code byte[]} buffer regions into a desired type {@code T} to be emitted
* to the {@link Subscriber} by the returned {@link Publisher}.
* @param <T> Type of the items emitted by the returned {@link Publisher}.
* @return a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the
* {@link Subscriber} as a mapped type {@code T} and then {@link Subscriber#onComplete()}.
*/
public static <T> Publisher<T> fromInputStream(InputStream stream, ByteArrayMapper<T> mapper) {
return new FromInputStreamPublisher<>(stream, mapper);
}

/**
Expand Down
Loading

0 comments on commit 434da83

Please sign in to comment.