Skip to content

Commit 564d7ec

Browse files
committed
Merge pull request #3637 from davidmoten/skipWhile-exception-handling
handle predicate exceptions properly in skipWhile
2 parents 71d3d0f + aaeadf7 commit 564d7ec

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

src/main/java/rx/internal/operators/OperatorSkipWhile.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20+
import rx.exceptions.Exceptions;
2021
import rx.functions.Func1;
2122
import rx.functions.Func2;
2223

@@ -40,7 +41,14 @@ public void onNext(T t) {
4041
if (!skipping) {
4142
child.onNext(t);
4243
} else {
43-
if (!predicate.call(t, index++)) {
44+
final boolean skip;
45+
try {
46+
skip = predicate.call(t, index++);
47+
} catch (Throwable e) {
48+
Exceptions.throwOrReport(e, child, t);
49+
return;
50+
}
51+
if (!skip) {
4452
skipping = false;
4553
child.onNext(t);
4654
} else {

src/test/java/rx/internal/operators/OperatorSkipWhileTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertFalse;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Matchers.anyInt;
2021
import static org.mockito.Mockito.inOrder;
@@ -23,12 +24,17 @@
2324
import static org.mockito.Mockito.times;
2425
import static org.mockito.Mockito.verify;
2526

27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
2629
import org.junit.Test;
2730
import org.mockito.InOrder;
2831

2932
import rx.Observable;
3033
import rx.Observer;
34+
import rx.functions.Action1;
3135
import rx.functions.Func1;
36+
import rx.observers.Subscribers;
37+
import rx.observers.TestSubscriber;
3238

3339
public class OperatorSkipWhileTest {
3440

@@ -51,6 +57,20 @@ public Boolean call(Integer value) {
5157
return index++ < 3;
5258
}
5359
};
60+
61+
private static final Func1<Integer, Boolean> THROWS_NON_FATAL = new Func1<Integer, Boolean>() {
62+
@Override
63+
public Boolean call(Integer values) {
64+
throw new RuntimeException();
65+
}
66+
};
67+
68+
private static final Func1<Integer, Boolean> THROWS_FATAL = new Func1<Integer, Boolean>() {
69+
@Override
70+
public Boolean call(Integer values) {
71+
throw new OutOfMemoryError();
72+
}
73+
};
5474

5575
@Test
5676
public void testSkipWithIndex() {
@@ -120,6 +140,33 @@ public void testSkipError() {
120140
inOrder.verify(w, times(1)).onError(any(RuntimeException.class));
121141
}
122142

143+
@Test
144+
public void testPredicateRuntimeError() {
145+
Observable.just(1).skipWhile(THROWS_NON_FATAL).subscribe(w);
146+
InOrder inOrder = inOrder(w);
147+
inOrder.verify(w, never()).onNext(anyInt());
148+
inOrder.verify(w, never()).onCompleted();
149+
inOrder.verify(w, times(1)).onError(any(RuntimeException.class));
150+
}
151+
152+
@Test(expected = OutOfMemoryError.class)
153+
public void testPredicateFatalError() {
154+
Observable.just(1).skipWhile(THROWS_FATAL).unsafeSubscribe(Subscribers.empty());
155+
}
156+
157+
@Test
158+
public void testPredicateRuntimeErrorDoesNotGoUpstreamFirst() {
159+
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
160+
TestSubscriber<Integer> ts = TestSubscriber.create();
161+
Observable.just(1).doOnError(new Action1<Throwable>() {
162+
@Override
163+
public void call(Throwable t) {
164+
errorOccurred.set(true);
165+
}
166+
}).skipWhile(THROWS_NON_FATAL).subscribe(ts);
167+
assertFalse(errorOccurred.get());
168+
}
169+
123170
@Test
124171
public void testSkipManySubscribers() {
125172
Observable<Integer> src = Observable.range(1, 10).skipWhile(LESS_THAN_FIVE);

0 commit comments

Comments
 (0)