Skip to content

Commit

Permalink
3.x: Fix parallel() on grouped flowable not replenishing properly (#…
Browse files Browse the repository at this point in the history
…6719)

* 3.x: Fix parallel() on grouped flowable not replenishing properly

* Remove accidental import

* Avoid calling `isEmpty`

* Undo some of the parallel changes

* Undo all changes to ParallelFromPublisher

* Again, undo
  • Loading branch information
akarnokd authored Nov 20, 2019
1 parent 5026999 commit 7c0793d
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -743,19 +743,27 @@ public T poll() {
produced++;
return v;
}
tryReplenish();
return null;
}

@Override
public boolean isEmpty() {
if (queue.isEmpty()) {
tryReplenish();
return true;
}
return false;
}

void tryReplenish() {
int p = produced;
if (p != 0) {
produced = 0;
if ((once.get() & ABANDONED) == 0) {
parent.upstream.request(p);
}
}
return null;
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2413,4 +2413,34 @@ public void run() {
}
}
}

@Test
public void fusedParallelGroupProcessing() {
Flowable.range(0, 500000)
.subscribeOn(Schedulers.single())
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer i) throws Throwable {
return i % 2;
}
})
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) {
return g.getKey() == 0
? g
.parallel()
.runOn(Schedulers.computation())
.map(Functions.<Integer>identity())
.sequential()
: g.map(Functions.<Integer>identity()) // no need to use hide
;
}
})
.test()
.awaitDone(20, TimeUnit.SECONDS)
.assertValueCount(500000)
.assertComplete()
.assertNoErrors();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1972,4 +1972,20 @@ public void fusedNoConcurrentCleanDueToCancel() {
}
}
}

@Test
public void fusedParallelProcessing() {
Flowable.range(0, 500000)
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation())
.parallel()
.runOn(Schedulers.computation())
.map(Functions.<Integer>identity())
.sequential()
.test()
.awaitDone(20, TimeUnit.SECONDS)
.assertValueCount(500000)
.assertComplete()
.assertNoErrors();
}
}

0 comments on commit 7c0793d

Please sign in to comment.