From d01f87253f9334a37d2d1276121bb9f33be75481 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Thu, 14 Nov 2024 20:36:37 -0800 Subject: [PATCH] simplify end loop --- .../api/PublisherConcatMapIterable.java | 23 +++++++------------ .../api/PublisherConcatMapIterableTest.java | 2 +- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatMapIterable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatMapIterable.java index 0a0a9c0cf4..997e646946 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatMapIterable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatMapIterable.java @@ -246,26 +246,19 @@ private void tryDrainIterator(ErrorHandlingStrategyInDrain errorHandlingStrategy doCancel(); } } else if (terminalNotification == null && !hasNext) { - for (;;) { - final Iterator nextIter = iterator; - if (nextIter == currIter && currRequestN > 0 && - (currIter != EmptyIterator.instance() || thrown)) { + final Iterator nextIter = iterator; + if (nextIter == currIter && currRequestN > 0 && + (currIter != EmptyIterator.instance() || thrown) && // We only request 1 at a time, and therefore we don't have outstanding demand. // We will not be getting an onNext call, so we write the currIter variable // before we unlock emitting so visibility to other threads is taken care of // by the write to emitting below (and later read). - if (iterUpdater.compareAndSet(this, currIter, EmptyIterator.instance())) { - if (sourceSubscription != null) { - sourceSubscription.request(1); - } - break; - } - } else { - // if nextIter != currIter -> outer loop will re-read "iterator" state and - // attempt to drain from it. - break; - } + iterUpdater.compareAndSet(this, currIter, EmptyIterator.instance()) && + sourceSubscription != null) { + sourceSubscription.request(1); } + // if the CAS fails or nextIter != currIter, the outer loop will re-read iterator + // and try to emit if items are present and demand allows it. } } finally { // The lock must be released after we interact with the subscription for thread safety diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherConcatMapIterableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherConcatMapIterableTest.java index 94c02802ed..ba45f17230 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherConcatMapIterableTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherConcatMapIterableTest.java @@ -428,7 +428,7 @@ void exceptionFromSubscriptionRequestNIsPropagated() { } @Test - void testFlatMapConcatIterable() throws Exception { + void concurrencyEmitsInOrder() throws Exception { try (BlockingIterator iterable = fromIterable(() -> range(0, 10000).iterator()) .publishOn(Executors.global()) .flatMapConcatIterable(Collections::singletonList)