diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java index 6ddbb00cdb..a19ee402bc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java @@ -51,6 +51,8 @@ static final class ScanSeedSubscriber extends SinglePostCompleteSubscriber private static final long serialVersionUID = -1776795561228106469L; final BiFunction accumulator; + + boolean done = false; ScanSeedSubscriber(Subscriber actual, BiFunction accumulator, R value) { super(actual); @@ -80,12 +82,20 @@ public void onNext(T t) { @Override public void onError(Throwable t) { + if (done) { + return; + } + done = true; value = null; actual.onError(t); } @Override public void onComplete() { + if (done) { + return; + } + done = true; complete(value); } } diff --git a/src/test/java/io/reactivex/flowable/FlowableScanTests.java b/src/test/java/io/reactivex/flowable/FlowableScanTests.java index 6502f4c135..46825fad4d 100644 --- a/src/test/java/io/reactivex/flowable/FlowableScanTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableScanTests.java @@ -14,9 +14,11 @@ package io.reactivex.flowable; import java.util.HashMap; +import java.util.concurrent.Callable; import org.junit.Test; +import io.reactivex.Flowable; import io.reactivex.flowable.FlowableEventStream.Event; import io.reactivex.functions.*; @@ -41,4 +43,66 @@ public void accept(HashMap v) { } }); } + + @Test + public void testFlowableScanSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() { + final RuntimeException e = new RuntimeException(); + Burst.item(1).error(e).scan(0, new BiFunction() { + + @Override + public Integer apply(Integer n1, Integer n2) throws Exception { + throw e; + }}) + .test() + .assertNoValues() + .assertError(e); + } + + @Test + public void testFlowableScanSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() { + final RuntimeException e = new RuntimeException(); + Burst.item(1).create().scan(0, new BiFunction() { + + @Override + public Integer apply(Integer n1, Integer n2) throws Exception { + throw e; + }}) + .test() + .assertNoValues() + .assertError(e); + } + + @Test + public void testFlowableScanSeedCompletesNormally() { + Flowable.just(1,2,3).scan(0, new BiFunction() { + + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return t1 + t2; + }}) + .test() + .assertValues(0, 1, 3, 6) + .assertComplete(); + } + + @Test + public void testFlowableScanSeedWhenScanSeedProviderThrows() { + final RuntimeException e = new RuntimeException(); + Flowable.just(1,2,3).scanWith(new Callable() { + @Override + public Integer call() throws Exception { + throw e; + } + }, + new BiFunction() { + + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return t1 + t2; + } + }) + .test() + .assertError(e) + .assertNoValues(); + } } \ No newline at end of file