diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromBlockingIterablePublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromBlockingIterablePublisher.java index ba58e8e7ec..73b5c9fa84 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromBlockingIterablePublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromBlockingIterablePublisher.java @@ -33,14 +33,21 @@ final class FromBlockingIterablePublisher extends AbstractSynchronousPublishe private final LongSupplier timeoutSupplier; private final TimeUnit unit; - FromBlockingIterablePublisher(final BlockingIterable iterable, - final LongSupplier timeoutSupplier, - final TimeUnit unit) { + private FromBlockingIterablePublisher( + final BlockingIterable iterable, final LongSupplier timeoutSupplier, final TimeUnit unit) { this.iterable = requireNonNull(iterable); this.timeoutSupplier = requireNonNull(timeoutSupplier); this.unit = requireNonNull(unit); } + @SuppressWarnings("unchecked") + static Publisher fromBlockingIterable0( + BlockingIterable iterable, LongSupplier timeoutSupplier, TimeUnit unit) { + // Unwrap and grab the Publisher directly if possible to avoid conversion layers. + return iterable instanceof PublisherAsBlockingIterable ? ((PublisherAsBlockingIterable) iterable).original : + new FromBlockingIterablePublisher<>(iterable, timeoutSupplier, unit); + } + @Override void doSubscribe(final Subscriber subscriber) { try { diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromIterablePublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromIterablePublisher.java index 5dd0e24012..f3c38fd6e0 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromIterablePublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromIterablePublisher.java @@ -33,10 +33,17 @@ final class FromIterablePublisher extends AbstractSynchronousPublisher { private final Iterable iterable; - FromIterablePublisher(Iterable iterable) { + private FromIterablePublisher(Iterable iterable) { this.iterable = requireNonNull(iterable); } + @SuppressWarnings("unchecked") + static Publisher fromIterable0(Iterable iterable) { + // Unwrap and grab the Publisher directly if possible to avoid conversion layers. + return iterable instanceof PublisherAsBlockingIterable ? ((PublisherAsBlockingIterable) iterable).original : + new FromIterablePublisher<>(iterable); + } + @Override void doSubscribe(final Subscriber subscriber) { try { diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index b96c402a5a..6d4b2f9e9a 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -3827,7 +3827,7 @@ public static Publisher from(T... values) { * and emit all values to the {@link Subscriber} and then {@link Subscriber#onComplete()}. */ public static Publisher fromIterable(Iterable iterable) { - return new FromIterablePublisher<>(iterable); + return FromIterablePublisher.fromIterable0(iterable); } /** @@ -3855,7 +3855,7 @@ public static Publisher fromIterable(Iterable iterable) { public static Publisher fromBlockingIterable(BlockingIterable iterable, LongSupplier timeoutSupplier, TimeUnit unit) { - return new FromBlockingIterablePublisher<>(iterable, timeoutSupplier, unit); + return FromBlockingIterablePublisher.fromBlockingIterable0(iterable, timeoutSupplier, unit); } /** diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java index 9fce10aa0b..4ba4ba35ef 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java @@ -49,7 +49,7 @@ * @param Type of items emitted by the {@link Publisher} from which this {@link BlockingIterable} is created. */ final class PublisherAsBlockingIterable implements BlockingIterable { - private final Publisher original; + final Publisher original; private final int queueCapacityHint; PublisherAsBlockingIterable(final Publisher original) { diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/BlockingStreamingHttpClientTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/BlockingStreamingHttpClientTest.java new file mode 100644 index 0000000000..a26500255f --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/BlockingStreamingHttpClientTest.java @@ -0,0 +1,151 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.http.api.BlockingStreamingHttpClient; +import io.servicetalk.http.api.BlockingStreamingHttpResponse; +import io.servicetalk.http.api.HttpPayloadWriter; +import io.servicetalk.transport.api.ServerContext; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; + +import static io.servicetalk.http.api.HttpSerializers.appSerializerAsciiFixLen; +import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; +import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort; +import static io.servicetalk.utils.internal.PlatformDependent.throwException; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +final class BlockingStreamingHttpClientTest { + @ParameterizedTest + @ValueSource(ints = {1, 100}) + void iterableSequentialConsumptionDoesNotDeadLock(int numItems) throws Exception { + try (ServerContext ctx = HttpServers.forAddress(localAddress(0)) + .listenBlockingStreamingAndAwait((ctx1, request, response) -> { + HttpPayloadWriter output = response.sendMetaData(appSerializerAsciiFixLen()); + for (String input : request.payloadBody(appSerializerAsciiFixLen())) { + output.write(input); + } + output.close(); + }); + BlockingStreamingHttpClient client = HttpClients.forResolvedAddress(serverHostAndPort(ctx)) + .buildBlockingStreaming()) { + NextSuppliers nextSuppliers = NextSuppliers.stringSuppliers(numItems); + // Allow first item in Iterator to return from next(). + nextSuppliers.countDownNextLatch(); + BlockingStreamingHttpResponse resp = client.request(client.get("/") + .payloadBody(new TestBlockingIterable<>(nextSuppliers, nextSuppliers), appSerializerAsciiFixLen())); + StringBuilder responseBody = new StringBuilder(numItems); + int i = 0; + for (String respChunk : resp.payloadBody(appSerializerAsciiFixLen())) { + // Goal is to ensure we can write each individual chunk independently without blocking threads or having + // to batch multiple items. As each chunk is echoed back, unblock the next one. + nextSuppliers.countDownNextLatch(); + responseBody.append(respChunk); + ++i; + } + assertThat("num items: " + i + " responseBody: " + responseBody, i, equalTo(numItems)); + } + } + + private static final class NextSuppliers implements Supplier, BooleanSupplier { + private final List> items; + private final AtomicInteger nextIndex = new AtomicInteger(); + private final AtomicInteger latchIndex = new AtomicInteger(); + + NextSuppliers(List> items) { + this.items = items; + } + + static NextSuppliers stringSuppliers(final int numItems) { + List> items = new ArrayList<>(numItems); + for (int i = 0; i < numItems; ++i) { + items.add(new NextSupplier<>(String.valueOf(i))); + } + return new NextSuppliers<>(items); + } + + void countDownNextLatch() { + final int i = latchIndex.getAndIncrement(); + if (i < items.size()) { + items.get(i).latch.countDown(); + } + } + + @Override + public boolean getAsBoolean() { + return nextIndex.get() < items.size(); + } + + @Override + public T get() { + return items.get(nextIndex.getAndIncrement()).get(); + } + } + + private static final class NextSupplier implements Supplier { + final CountDownLatch latch = new CountDownLatch(1); + final T next; + + private NextSupplier(final T next) { + this.next = next; + } + + @Override + public T get() { + try { + latch.await(); + } catch (InterruptedException e) { + throwException(e); + } + return next; + } + } + + private static final class TestBlockingIterable implements Iterable { + private final BooleanSupplier hasNextSupplier; + private final Supplier nextSupplier; + + private TestBlockingIterable(final BooleanSupplier hasNextSupplier, final Supplier nextSupplier) { + this.hasNextSupplier = hasNextSupplier; + this.nextSupplier = nextSupplier; + } + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return hasNextSupplier.getAsBoolean(); + } + + @Override + public T next() { + return nextSupplier.get(); + } + }; + } + } +}