The following tests fail because the cancel/dispose from the downstream comes before the actual connection is established which leaves the connectable source in a disposed state, refusing to take in new subscribers/observers:
@Test
public void test() {
BehaviorSubject<Integer> subject = BehaviorSubject.create();
Observable<Integer> observable = subject
.replay(1)
.refCount();
observable.takeUntil(Observable.just(1)).test();
subject.onNext(2);
observable.take(1).test().assertResult(2);
}
@Test
public void test2() {
BehaviorProcessor<Integer> processor = BehaviorProcessor.create();
Flowable<Integer> flowable = processor
.replay(1)
.refCount();
// This line causes the test to fail.
flowable.takeUntil(Flowable.just(1)).test();
processor.onNext(2);
flowable.take(1).test().assertResult(2);
}