diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java index 60b9437f05..6b76558a5e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java @@ -743,6 +743,20 @@ public T poll() { produced++; return v; } + tryReplenish(); + return null; + } + + @Override + public boolean isEmpty() { + if (queue.isEmpty()) { + tryReplenish(); + return true; + } + return false; + } + + void tryReplenish() { int p = produced; if (p != 0) { produced = 0; @@ -750,12 +764,6 @@ public T poll() { parent.upstream.request(p); } } - return null; - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); } @Override diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java index 825c11354e..f5bf461dda 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java @@ -2413,4 +2413,34 @@ public void run() { } } } + + @Test + public void fusedParallelGroupProcessing() { + Flowable.range(0, 500000) + .subscribeOn(Schedulers.single()) + .groupBy(new Function() { + @Override + public Integer apply(Integer i) throws Throwable { + return i % 2; + } + }) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) { + return g.getKey() == 0 + ? g + .parallel() + .runOn(Schedulers.computation()) + .map(Functions.identity()) + .sequential() + : g.map(Functions.identity()) // no need to use hide + ; + } + }) + .test() + .awaitDone(20, TimeUnit.SECONDS) + .assertValueCount(500000) + .assertComplete() + .assertNoErrors(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java index 1c8763b33f..624ecee97a 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java @@ -1972,4 +1972,20 @@ public void fusedNoConcurrentCleanDueToCancel() { } } } + + @Test + public void fusedParallelProcessing() { + Flowable.range(0, 500000) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()) + .parallel() + .runOn(Schedulers.computation()) + .map(Functions.identity()) + .sequential() + .test() + .awaitDone(20, TimeUnit.SECONDS) + .assertValueCount(500000) + .assertComplete() + .assertNoErrors(); + } }