-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
The following unit test fails because the sequence doesn't complete:
public void publishConcat() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Observable.range(1, 3)
.publish(o -> o.take(5).concatWith(o.takeLast(5)))
.subscribe(ts);
ts.assertValues(1, 2, 3);
ts.assertNoErrors();
ts.assertCompleted();
}
The problem lies in operator publish()
. When a published source completes, the operator switches back to a ready state where more subscribers can be subscribed to it and they all receive values from the upstream on the next connect()
. Otherwise, it wouldn't be possible to observe the sequence again from the beginning by more than one subscriber.
In the test, the source is sort and by the time take(5) completes and makes concat
subscribe to o
, the published source, the underlying OperatorPublish is already in the ready state and awaits another connect, which will never arrive, thus leaving the second takeLast hang forever.
This anomaly is there since the rewrite of publish()
to support backpressure, namely when it doubles as a multicast source. Previously, a PublishSubject was the mediator between the upstream and the use places in the lambda. PublishSubject, when completed will complete any latecommer subscriber such as the one by takeLast()
thus the unit test passes.
The solution would be to write a request-coordinating internal PublishSubject variant and dispatch upstream values over it in the multicast scenario.
(Note that unbounded replay
has no such problem because it stays in the completed state until the next connect()
call, thus latecommers still receive the orignal sequence. Because it buffers all values, there is no penalty for latecommers after a new connect()
call.)