Skip to content

Commit

Permalink
FromInputStreamPublisher: avoid extra allocation of a buffer
Browse files Browse the repository at this point in the history
Motivation:

In #2949 we optimized a case when `available()` is not implemented and
always returns `0`. However, we de-optimized a use-case when it's
implemented because after that change the last call to `available()`
always returns 0, but we still allocate a buffer of size `readChunkSize`
that won't be used at all.

Modifications:
- Enhance `doNotFailOnInputStreamWithBrokenAvailableCall(...)` test
before any changes to have better test coverage.
- Remove `byte[] buffer` from a class variable. It can be a local
variable because it's never reused in practice. Only the last `buffer`
won't be used nullified, but we don't need it after that.
- When `available()` returns `0`, try reading a single byte and then
check availability again instead of always falling back to
`readChunkSize`.
- Adjust `doNotFailOnInputStreamWithBrokenAvailableCall()` test to
account for the 2nd call to `available()`;
- Add `singleReadTriggersMoreAvailability()` test to simulate when the
2nd call to `available()` returns positive value;

Result:

1. No allocation of a `buffer` that won't be used at the EOF.
2. Account for new availability if it appears after a `read()`.
  • Loading branch information
idelpivnitskiy committed Jun 13, 2024
1 parent c4288fd commit 70383fa
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
Expand Down Expand Up @@ -132,8 +131,6 @@ private static final class InputStreamPublisherSubscription implements Subscript
* Subscription} are terminated.
*/
private long requested;
@Nullable
private byte[] buffer;
private int writeIdx;
private boolean ignoreRequests;

Expand Down Expand Up @@ -176,14 +173,27 @@ public void cancel() {
private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
try {
do {
int readByte = -1;
// Can't fully trust available(), but it's a reasonable hint to mitigate blocking on read().
int available = stream.available();
if (available == 0) {
// Work around InputStreams that don't strictly honor the 0 == EOF contract.
available = buffer != null ? buffer.length : readChunkSize;
// This can be an indicator of EOF or a signal that no bytes are available to read without
// blocking. To avoid unnecessary allocation, we first probe for EOF:
readByte = stream.read();
if (readByte == END_OF_FILE) {
sendOnComplete(subscriber);
return;
}
// There is a chance a single read triggered availability of more bytes, let's check:
available = stream.available();
if (available == 0) {
// This InputStream either does not implement available() method at all, or does not honor
// the 0 == EOF contract, or does not prefetch data in larger chunks.
// In this case, we attempt to read based on the configured readChunkSize:
available = readChunkSize;
}
}
available = fillBufferAvoidingBlocking(available);
emitSingleBuffer(subscriber);
available = readAvailableAndEmit(available, readByte);
if (available == END_OF_FILE) {
sendOnComplete(subscriber);
return;
Expand All @@ -194,11 +204,21 @@ private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
}
}

// This method honors the estimated available bytes that can be read without blocking
private int fillBufferAvoidingBlocking(int available) throws IOException {
if (buffer == null) {
private int readAvailableAndEmit(final int available, final int readByte) throws IOException {
final byte[] buffer;
if (readByte >= 0) {
buffer = new byte[available < readChunkSize ? available + 1 : readChunkSize];
buffer[writeIdx++] = (byte) readByte;
} else {
buffer = new byte[min(available, readChunkSize)];
}
final int remainingLength = fillBuffer(buffer, available);
emitSingleBuffer(subscriber, buffer, remainingLength);
return remainingLength;
}

// This method honors the estimated available bytes that can be read without blocking
private int fillBuffer(final byte[] buffer, int available) throws IOException {
while (writeIdx != buffer.length && available > 0) {
int len = min(buffer.length - writeIdx, available);
int readActual = stream.read(buffer, writeIdx, len); // may block if len > available
Expand All @@ -211,15 +231,17 @@ private int fillBufferAvoidingBlocking(int available) throws IOException {
return available;
}

private void emitSingleBuffer(final Subscriber<? super byte[]> subscriber) {
private void emitSingleBuffer(final Subscriber<? super byte[]> subscriber,
final byte[] buffer, final int remainingLength) {
if (writeIdx < 1) {
assert remainingLength == END_OF_FILE :
"unexpected writeIdx == 0 while we still have some remaining data to read";
return;
}
assert buffer != null : "should have a buffer when writeIdx > 0";
assert writeIdx <= buffer.length : "writeIdx can not be grater than buffer.length";
final byte[] b;
if (writeIdx == buffer.length) {
b = buffer;
buffer = null;
} else {
// this extra copy is necessary when we read the last chunk and total number of bytes read before EOF
// is less than guesstimated buffer size
Expand All @@ -242,7 +264,7 @@ private void sendOnComplete(final Subscriber<? super byte[]> subscriber) {
}
}

private <T extends Throwable> void sendOnError(final Subscriber<? super byte[]> subscriber, final T t) {
private void sendOnError(final Subscriber<? super byte[]> subscriber, final Throwable t) {
if (trySetTerminalSent()) {
try {
subscriber.onError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,10 @@ void readsAllBytesWhenAvailableNotImplemented() throws Throwable {
@ParameterizedTest(name = "{displayName} [{index}] readChunkSize={0}")
@ValueSource(ints = {4, 1024})
void doNotFailOnInputStreamWithBrokenAvailableCall(int readChunkSize) throws Throwable {
initChunkedStream(bigBuff, of(3, 0, 4, 0, 5, 0, 2, 0, 0, 4, 0),
of(3, 7, 4, 4, 5, 2, 2, 2, 1, 2, 1, 1, 1, 1, 1, 4, 0));
// We use double "0, 0" because FromInputStreamPublisher does two calls to available() now. For this test, both
// calls to "broken" available() should consistently return `0`.
initChunkedStream(bigBuff, of(3, 0, 0, 4, 0, 0, 5, 0, 0, 2, 0, 0, 0, 0, 4, 0),
of(3, 7, 4, 4, 5, 2, 2, 2, 1, 2, 1, 1, 1, 1, 1, 4, 0));
pub = new FromInputStreamPublisher(inputStream, readChunkSize);

if (readChunkSize > bigBuff.length) {
Expand Down Expand Up @@ -409,6 +411,34 @@ void doNotFailOnInputStreamWithBrokenAvailableCall(int readChunkSize) throws Thr
}
}

@Test
void singleReadTriggersMoreAvailability() throws Throwable {
// We simulate a case when a single stream.read() triggers a read of a larger chunk and then the next call to
// available() returns "chunk - 1". To accommodate mock behavior, if the 3rd in a row call to available()
// returns a non zero value, we should return a chunk value of "chunks[idx - 1] - number of read bytes".
initChunkedStream(bigBuff, of(0, 1, 0, 7, 0, 8, 1, 0, 17, 10, 2, 0),
of(2, 8, 9, 1, 18, 10, 2, 0));
pub = new FromInputStreamPublisher(inputStream, 8);

byte[][] items = {
// available < readChunkSize
new byte[]{0, 1},
// available == readChunkSize
new byte[]{2, 3, 4, 5, 6, 7, 8, 9},
// available > readChunkSize -> limit by readChunkSize
new byte[]{10, 11, 12, 13, 14, 15, 16, 17},
// available == 1 - unread remaining from the previous chunk of 9
new byte[]{18},
// available > 2x readChunkSize -> limit by readChunkSize
new byte[]{19, 20, 21, 22, 23, 24, 25, 26},
// available == 10 > readChunkSize - unread remaining from the previous chunk of 18
new byte[]{27, 28, 29, 30, 31, 32, 33, 34},
// available == 2 -> unread remaining from the previous chunk of 18
new byte[]{35, 36},
};
verifySuccess(items);
}

@ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}")
@ValueSource(ints = {3, 5, 7})
void readChunkSizeRespectedWhenAvailableNotImplemented(int chunkSize) throws Throwable {
Expand Down Expand Up @@ -488,6 +518,7 @@ private IntStream ofAll(int i) {

private void initEmptyStream() throws IOException {
when(inputStream.available()).thenReturn(0);
when(inputStream.read()).thenReturn(-1);
when(inputStream.read(any(), anyInt(), anyInt())).thenReturn(-1);
}

Expand All @@ -501,13 +532,23 @@ private void initChunkedStream(final byte[] data, final IntStream avails, final
AtomicInteger readIdx = new AtomicInteger();
OfInt availSizes = avails.iterator();
OfInt chunkSizes = chunks.iterator();
AtomicBoolean readOneByte = new AtomicBoolean();
try {
when(inputStream.available()).then(inv -> availSizes.nextInt());
when(inputStream.read()).then(inv -> {
if (data.length == readIdx.get()) {
return -1;
}
readOneByte.set(true);
return (int) data[readIdx.getAndIncrement()];
});
when(inputStream.read(any(), anyInt(), anyInt())).then(inv -> {
byte[] b = inv.getArgument(0);
int pos = inv.getArgument(1);
int len = inv.getArgument(2);
int read = min(min(len, data.length - readIdx.get()), chunkSizes.nextInt());
// subtract 1 byte from the next chunk if a single byte was already read
final int chunkSize = chunkSizes.nextInt() - (readOneByte.getAndSet(false) ? 1 : 0);
int read = min(min(len, data.length - readIdx.get()), chunkSize);
if (read == 0) {
return data.length == readIdx.get() ? -1 : 0;
}
Expand Down

0 comments on commit 70383fa

Please sign in to comment.