Skip to content

Commit

Permalink
simplify end loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Nov 15, 2024
1 parent b951e36 commit d01f872
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,26 +246,19 @@ private void tryDrainIterator(ErrorHandlingStrategyInDrain errorHandlingStrategy
doCancel();
}
} else if (terminalNotification == null && !hasNext) {
for (;;) {
final Iterator<? extends U> nextIter = iterator;
if (nextIter == currIter && currRequestN > 0 &&
(currIter != EmptyIterator.instance() || thrown)) {
final Iterator<? extends U> nextIter = iterator;
if (nextIter == currIter && currRequestN > 0 &&
(currIter != EmptyIterator.instance() || thrown) &&
// We only request 1 at a time, and therefore we don't have outstanding demand.
// We will not be getting an onNext call, so we write the currIter variable
// before we unlock emitting so visibility to other threads is taken care of
// by the write to emitting below (and later read).
if (iterUpdater.compareAndSet(this, currIter, EmptyIterator.instance())) {
if (sourceSubscription != null) {
sourceSubscription.request(1);
}
break;
}
} else {
// if nextIter != currIter -> outer loop will re-read "iterator" state and
// attempt to drain from it.
break;
}
iterUpdater.compareAndSet(this, currIter, EmptyIterator.instance()) &&
sourceSubscription != null) {
sourceSubscription.request(1);
}
// if the CAS fails or nextIter != currIter, the outer loop will re-read iterator
// and try to emit if items are present and demand allows it.
}
} finally {
// The lock must be released after we interact with the subscription for thread safety
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ void exceptionFromSubscriptionRequestNIsPropagated() {
}

@Test
void testFlatMapConcatIterable() throws Exception {
void concurrencyEmitsInOrder() throws Exception {
try (BlockingIterator<Integer> iterable = fromIterable(() -> range(0, 10000).iterator())
.publishOn(Executors.global())
.flatMapConcatIterable(Collections::singletonList)
Expand Down

0 comments on commit d01f872

Please sign in to comment.