-
Notifications
You must be signed in to change notification settings - Fork 119
/
Copy pathHandlingErrors.java
128 lines (92 loc) · 3.11 KB
/
HandlingErrors.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.packtpub.reactive.chapter05;
import static com.packtpub.reactive.common.Helpers.subscribePrint;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import com.packtpub.reactive.common.Program;
/**
* Demonstrates working with {@link Observable#onErrorReturn}, {@link Observable#onErrorResumeNext} and {@link Observable#onExceptionResumeNext}
* as well as retrying with {@link Observable#retry} and {@link Observable#retryWhen}.
*
* @author meddle
*/
public class HandlingErrors implements Program {
class FooException extends RuntimeException {
private static final long serialVersionUID = 1L;
public FooException() {
super("Foo!");
}
}
class BooException extends RuntimeException {
private static final long serialVersionUID = 1L;
public BooException() {
super("Boo!");
}
}
class ErrorEmitter implements OnSubscribe<Integer> {
private int throwAnErrorCounter = 5;
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
if (throwAnErrorCounter > 4) {
throwAnErrorCounter--;
subscriber.onError(new FooException());
return;
}
if (throwAnErrorCounter > 0) {
throwAnErrorCounter--;
subscriber.onError(new BooException());
return;
}
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onCompleted();
}
}
@Override
public String name() {
return "Examples of handling errors";
}
@Override
public int chapter() {
return 5;
}
@Override
public void run() {
Observable<String> numbers = Observable.just("1", "2", "three", "4", "5");
Observable<Integer> n = numbers.map(Integer::parseInt).onErrorReturn(e -> -1);
subscribePrint(n, "error returned");
Observable<Integer> defaultOnError = Observable.just(5, 4, 3, 2, 1);
n = numbers.map(Integer::parseInt).onErrorResumeNext(defaultOnError);
subscribePrint(n, "on error resume next");
n = numbers.map(Integer::parseInt).onExceptionResumeNext(defaultOnError);
subscribePrint(n, "on exception resume next");
n = numbers.doOnNext(number -> {
assert !number.equals("three");
}).map(Integer::parseInt).onExceptionResumeNext(defaultOnError);
subscribePrint(n, "on exception resume next 2");
n = numbers.doOnNext(number -> {
assert !number.equals("three");
}).map(Integer::parseInt).onErrorResumeNext(defaultOnError);
subscribePrint(n, "on error resume next 2");
subscribePrint(Observable.create(new ErrorEmitter()).retry(), "Retry");
Observable<Integer> when = Observable.create(new ErrorEmitter()).retryWhen(attempts -> {
return attempts.flatMap(error -> {
if (error instanceof FooException) {
System.err.println("Delaying...");
return Observable.timer(1L, TimeUnit.SECONDS, Schedulers.immediate());
}
return Observable.error(error);
});
}).retry((attempts, error) -> {
return (error instanceof BooException) && attempts < 3;
});
subscribePrint(when, "retryWhen");
}
public static void main(String[] args) {
new HandlingErrors().run();
}
}