-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Description
This is the simplest reproduction I could conjure up of this problem:
Observable.just(1)
.flatMap(n -> {
return Observable.just(null, null)
.filter(o -> o != null)
.switchIfEmpty(Observable.empty().switchIfEmpty(Observable.just("Hello")));
})
.subscribe(System.out::println);
This will spit out onError
with the error message "more items arrived than were requested".
This appears to be a regression. It works on RxJava 1.0.11 but fails on every version since then (including 1.0.15, the latest).
Adding in a take(1)
seems to fix the problem (as a workaround).
For completeness, here is the exception:
Exception in thread "main" rx.exceptions.OnErrorNotImplementedException: more items arrived than were requested
at rx.Observable$27.onError(Observable.java:7996)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:158)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:119)
at rx.internal.util.ScalarSynchronousObservable$2$1.onError(ScalarSynchronousObservable.java:140)
at rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.onError(OperatorSwitchIfEmpty.java:116)
at rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.onError(OperatorSwitchIfEmpty.java:116)
at rx.Observable.unsafeSubscribe(Observable.java:8178)
at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate(OperatorSwitchIfEmpty.java:78)
at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:71)
at rx.Observable$EmptyHolder$1.call(Observable.java:1073)
at rx.Observable$EmptyHolder$1.call(Observable.java:1070)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8171)
at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate(OperatorSwitchIfEmpty.java:78)
at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:71)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastpath(OnSubscribeFromIterable.java:129)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:70)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.setProducer(OperatorSwitchIfEmpty.java:63)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8171)
at rx.internal.util.ScalarSynchronousObservable$2.call(ScalarSynchronousObservable.java:133)
at rx.internal.util.ScalarSynchronousObservable$2.call(ScalarSynchronousObservable.java:125)
at rx.Observable.subscribe(Observable.java:8266)
at rx.Observable.subscribe(Observable.java:8233)
at rx.Observable.subscribe(Observable.java:7987)
at net.danlew.experiments.Tester.main(Tester.java:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.IllegalStateException: more items arrived than were requested
at rx.internal.producers.ProducerArbiter.produced(ProducerArbiter.java:98)
at rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.onNext(OperatorSwitchIfEmpty.java:122)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
at rx.Observable.unsafeSubscribe(Observable.java:8171)
... 33 more