diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index 5df99b2585..ccff8fc663 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -301,7 +301,14 @@ public void requestMore(long n) { @Override public void onNext(T t) { - child.onNext(combinator.call(t)); + final R value; + try { + value = combinator.call(t); + } catch (Throwable e) { + Exceptions.throwOrReport(e, child); + return; + } + child.onNext(value); } @Override diff --git a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java index c28606cae0..1a4043b683 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -883,5 +884,32 @@ public void onNext(Integer t) { }}); assertTrue(latch.await(10, TimeUnit.SECONDS)); } + + @Test + public void testNonFatalExceptionThrownByCombinatorForSingleSourceIsNotReportedByUpstreamOperator() { + final AtomicBoolean errorOccurred = new AtomicBoolean(false); + TestSubscriber ts = TestSubscriber.create(1); + Observable source = Observable.just(1) + // if haven't caught exception in combineLatest operator then would incorrectly + // be picked up by this call to doOnError + .doOnError(new Action1() { + @Override + public void call(Throwable t) { + errorOccurred.set(true); + } + }); + Observable + .combineLatest(Collections.singletonList(source), THROW_NON_FATAL) + .subscribe(ts); + assertFalse(errorOccurred.get()); + } + + private static final FuncN THROW_NON_FATAL = new FuncN() { + @Override + public Integer call(Object... args) { + throw new RuntimeException(); + } + + }; }