Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: FlowableScanSeed - prevent multiple terminal events #4899

Merged
merged 1 commit into from
Dec 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ static final class ScanSeedSubscriber<T, R> extends SinglePostCompleteSubscriber
private static final long serialVersionUID = -1776795561228106469L;

final BiFunction<R, ? super T, R> accumulator;

boolean done;

ScanSeedSubscriber(Subscriber<? super R> actual, BiFunction<R, ? super T, R> accumulator, R value) {
super(actual);
Expand All @@ -60,6 +62,10 @@ static final class ScanSeedSubscriber<T, R> extends SinglePostCompleteSubscriber

@Override
public void onNext(T t) {
if (done) {
return;
}

R v = value;

R u;
Expand All @@ -80,12 +86,20 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (done) {
return;
}
done = true;
value = null;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
complete(value);
}
}
Expand Down
85 changes: 85 additions & 0 deletions src/test/java/io/reactivex/flowable/FlowableScanTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@

package io.reactivex.flowable;

import static org.junit.Assert.assertEquals;

import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Test;

import io.reactivex.Flowable;
import io.reactivex.flowable.FlowableEventStream.Event;
import io.reactivex.functions.*;

Expand All @@ -41,4 +47,83 @@ public void accept(HashMap<String, String> v) {
}
});
}

@Test
public void testFlowableScanSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
Burst.item(1).error(e).scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
}

@Test
public void testFlowableScanSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
Burst.item(1).create().scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
}

@Test
public void testFlowableScanSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
final AtomicInteger count = new AtomicInteger();
Burst.items(1, 2).create().scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
count.incrementAndGet();
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
assertEquals(1, count.get());
}

@Test
public void testFlowableScanSeedCompletesNormally() {
Flowable.just(1,2,3).scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}})
.test()
.assertValues(0, 1, 3, 6)
.assertComplete();
}

@Test
public void testFlowableScanSeedWhenScanSeedProviderThrows() {
final RuntimeException e = new RuntimeException();
Flowable.just(1,2,3).scanWith(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw e;
}
},
new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}
})
.test()
.assertError(e)
.assertNoValues();
}
}