Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to readChunkSize if InputStream.available() not implemented #2949

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2021, 2023-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -171,7 +171,7 @@ private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
int available = stream.available();
if (available == 0) {
// Work around InputStreams that don't strictly honor the 0 == EOF contract.
available = buffer != null ? buffer.length : 1;
available = buffer != null ? buffer.length : readChunkSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned offline, the default value is 64kB which could be rough on GC in adversarial cases. Do you plan to change that as well in a different PR, or are we fine trying it as-is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I already played with different values and wire-logging enabled, 64Kb is not a good default.
Still need to run some benchmarks and will update the value in a separate PR.

Copy link
Contributor

@daschl daschl Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a data point, in AHC the InputStreamEntity uses OUTPUT_BUFFER_SIZE = 4096; as the chunk size for this. (that said, the change dates back to 2013 and there doesn't seem to be an explanation why 4k was chosen, so maybe we can come up with something better (larger?).

}
available = fillBufferAvoidingBlocking(available);
emitSingleBuffer(subscriber);
Expand Down Expand Up @@ -212,6 +212,8 @@ private void emitSingleBuffer(final Subscriber<? super byte[]> subscriber) {
b = buffer;
buffer = null;
} else {
// this extra copy is necessary when we read the last chunk and total number of bytes read before EOF
// is less than guesstimated buffer size
b = new byte[writeIdx];
arraycopy(buffer, 0, b, 0, writeIdx);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2021, 2023-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -200,7 +202,7 @@ void streamClosedAndErrorOnReadIOError() throws Exception {
void streamClosedAndErrorOnDeliveryError() throws Exception {
initChunkedStream(smallBuff, of(10), of(10));

Subscriber sub = mock(Subscriber.class);
Subscriber<byte[]> sub = mock(Subscriber.class);

doAnswer(inv -> {
Subscription s = inv.getArgument(0);
Expand All @@ -221,7 +223,7 @@ void streamClosedAndErrorOnDeliveryError() throws Exception {
void streamClosedAndErrorOnDeliveryErrorOnce() throws Exception {
initChunkedStream(smallBuff, ofAll(10), ofAll(10));

Subscriber sub = mock(Subscriber.class);
Subscriber<byte[]> sub = mock(Subscriber.class);

AtomicReference<Subscription> subRef = new AtomicReference<>();
doAnswer(inv -> {
Expand All @@ -246,7 +248,7 @@ void streamClosedAndErrorOnDeliveryErrorOnce() throws Exception {
void streamCanceledShouldCloseOnce() throws Exception {
initChunkedStream(smallBuff, ofAll(10), ofAll(10));

Subscriber sub = mock(Subscriber.class);
Subscriber<byte[]> sub = mock(Subscriber.class);

doAnswer(inv -> {
Subscription s = inv.getArgument(0);
Expand Down Expand Up @@ -351,21 +353,70 @@ void completeStreamIfEOFObservedDuringReadFromOverEstimatedAvailability() throws
}

@Test
void dontFailOnInputStreamWithBrokenAvailableCall() throws Throwable {
initChunkedStream(bigBuff, of(5, 0, 0, 10, 5, 5, 5, 5, 0),
of(5, 1, 1, 10, 5, 5, 5, 5, 0));
void readsAllBytesWhenAvailableNotImplemented() throws Throwable {
// constrain publisher to 10 byte chunks with no data availability to enforce inner loops until buffer drained
initChunkedStream(bigBuff, ofAll(0), ofAll(10));

byte[][] items = {
new byte[]{0, 1, 2, 3, 4},
new byte[]{5}, // avail == 0 -> override to 1
new byte[]{6}, // avail == 0 -> override to 1
new byte[]{7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
new byte[]{17, 18, 19, 20, 21},
new byte[]{22, 23, 24, 25, 26},
new byte[]{27, 28, 29, 30, 31},
new byte[]{32, 33, 34, 35, 36},
};
// expect single emitted item
// [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
// 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
// 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
// 30, 31, 32, 33, 34, 35, 36]

byte[][] items = chunked(bigBuff.length, bigBuff.length);
verifySuccess(items);
}

@ParameterizedTest(name = "{displayName} [{index}] readChunkSize={0}")
@ValueSource(ints = {7, 1024})
void doNotFailOnInputStreamWithBrokenAvailableCall(int readChunkSize) throws Throwable {
initChunkedStream(bigBuff, of(5, 0, 0, 10, 5, 5, 1, 0),
of(5, 7, 7, 10, 5, 5, 1, 0));
pub = new FromInputStreamPublisher(inputStream, readChunkSize);

if (readChunkSize > bigBuff.length) {
byte[][] items = {
new byte[]{0, 1, 2, 3, 4},
// avail == 0 -> override to readChunkSize
new byte[]{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,
28, 29, 30, 31, 32, 33, 34, 35, 36},
};
verifySuccess(items);
} else {
byte[][] items = {
new byte[]{0, 1, 2, 3, 4},
// avail == 0 -> override to readChunkSize
new byte[]{5, 6, 7, 8, 9, 10, 11},
// avail == 0 -> override to readChunkSize
new byte[]{12, 13, 14, 15, 16, 17, 18},
// readChunkSize < available
new byte[]{19, 20, 21, 22, 23, 24, 25},
new byte[]{26, 27, 28, 29, 30},
new byte[]{31, 32, 33, 34, 35},
new byte[]{36},
};
verifySuccess(items);
}
}

@ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}")
@ValueSource(ints = {3, 5, 7})
void readChunkSizeRespectedWhenAvailableNotImplemented(int chunkSize) throws Throwable {
initChunkedStream(bigBuff, ofAll(0), ofAll(chunkSize));
int readChunkSize = 5;
pub = new FromInputStreamPublisher(inputStream, readChunkSize);

// expect 8 emitted items
// [ 0, 1, 2, 3, 4]
// [ 5, 6, 7, 8, 9]
// [10, 11, 12, 13, 14]
// [15, 16, 17, 18, 19]
// [20, 21, 22, 23, 24]
// [25, 26, 27, 28, 29]
// [30, 31, 32, 33, 34]
// [35, 36]

byte[][] items = chunked(bigBuff.length, readChunkSize);
verifySuccess(items);
}

Expand Down Expand Up @@ -400,11 +451,12 @@ void keepReadingWhenAvailabilityPermits() throws Throwable {
verifySuccess(items);
}

@Test
void repeatedReadingWhenAvailabilityRunsOut() throws Throwable {
// constrain publisher to 10 byte chunks with only 5 byte availability per chunk to enforce multiple outer loops
// simulating multiple calls to IS.available()
initChunkedStream(bigBuff, ofAll(5), ofAll(5)); // 5 byte chunks per available() call
@ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}")
@ValueSource(ints = {3, 5, 7})
void repeatedReadingWhenAvailabilityRunsOut(int chunkSize) throws Throwable {
// constrain publisher to chunkSize byte chunks with only 5 byte availability per chunk to enforce multiple
// outer loops simulating multiple calls to IS.available()
initChunkedStream(bigBuff, ofAll(5), ofAll(chunkSize)); // chunkSize byte chunks per available() call

// expect 8 emitted items
// [ 0, 1, 2, 3, 4]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.reactivestreams.tck;

import io.servicetalk.concurrent.api.Publisher;

import org.testng.annotations.Test;

import java.io.ByteArrayInputStream;

@Test
public class PublisherFromInputStreamTckTest extends AbstractPublisherTckTest<byte[]> {

@Override
protected Publisher<byte[]> createServiceTalkPublisher(long elements) {
return Publisher.fromInputStream(new ByteArrayInputStream(new byte[(int) elements]), 1);
}

@Override
public long maxElementsFromPublisher() {
return TckUtils.maxElementsFromPublisher();
}
}
Loading