Skip to content

Commit badec6a

Browse files
committed
feat(onErrorResumeNext): add higher-order lettable version of onErrorResumeNext
1 parent fde7205 commit badec6a

File tree

4 files changed

+160
-78
lines changed

4 files changed

+160
-78
lines changed

src/observable/onErrorResumeNext.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
import { onErrorResumeNextStatic } from '../operator/onErrorResumeNext';
1+
import { onErrorResumeNextStatic } from '../operators/onErrorResumeNext';
22

33
export const onErrorResumeNext = onErrorResumeNextStatic;

src/operator/onErrorResumeNext.ts

Lines changed: 2 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
import { Observable, ObservableInput } from '../Observable';
2-
import { FromObservable } from '../observable/FromObservable';
3-
import { Operator } from '../Operator';
4-
import { Subscriber } from '../Subscriber';
5-
import { isArray } from '../util/isArray';
6-
import { OuterSubscriber } from '../OuterSubscriber';
7-
import { InnerSubscriber } from '../InnerSubscriber';
8-
import { subscribeToResult } from '../util/subscribeToResult';
2+
import { onErrorResumeNext as higherOrder } from '../operators/onErrorResumeNext';
93

104
/* tslint:disable:max-line-length */
115
export function onErrorResumeNext<T, R>(this: Observable<T>, v: ObservableInput<R>): Observable<R>;
@@ -82,74 +76,5 @@ export function onErrorResumeNext<T, R>(this: Observable<T>, array: ObservableIn
8276
export function onErrorResumeNext<T, R>(this: Observable<T>, ...nextSources: Array<ObservableInput<any> |
8377
Array<ObservableInput<any>> |
8478
((...values: Array<any>) => R)>): Observable<R> {
85-
if (nextSources.length === 1 && isArray(nextSources[0])) {
86-
nextSources = <Array<Observable<any>>>nextSources[0];
87-
}
88-
89-
return this.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
90-
}
91-
92-
/* tslint:disable:max-line-length */
93-
export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>;
94-
export function onErrorResumeNextStatic<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
95-
export function onErrorResumeNextStatic<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
96-
export function onErrorResumeNextStatic<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
97-
export function onErrorResumeNextStatic<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;
98-
99-
export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
100-
export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>;
101-
/* tslint:enable:max-line-length */
102-
103-
export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> |
104-
Array<ObservableInput<any>> |
105-
((...values: Array<any>) => R)>): Observable<R> {
106-
let source: ObservableInput<any> = null;
107-
108-
if (nextSources.length === 1 && isArray(nextSources[0])) {
109-
nextSources = <Array<ObservableInput<any>>>nextSources[0];
110-
}
111-
source = nextSources.shift();
112-
113-
return new FromObservable(source, null).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
114-
}
115-
116-
class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
117-
constructor(private nextSources: Array<ObservableInput<any>>) {
118-
}
119-
120-
call(subscriber: Subscriber<R>, source: any): any {
121-
return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
122-
}
123-
}
124-
125-
class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
126-
constructor(protected destination: Subscriber<T>,
127-
private nextSources: Array<ObservableInput<any>>) {
128-
super(destination);
129-
}
130-
131-
notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
132-
this.subscribeToNextSource();
133-
}
134-
135-
notifyComplete(innerSub: InnerSubscriber<T, any>): void {
136-
this.subscribeToNextSource();
137-
}
138-
139-
protected _error(err: any): void {
140-
this.subscribeToNextSource();
141-
}
142-
143-
protected _complete(): void {
144-
this.subscribeToNextSource();
145-
}
146-
147-
private subscribeToNextSource(): void {
148-
const next = this.nextSources.shift();
149-
if (next) {
150-
this.add(subscribeToResult(this, next));
151-
} else {
152-
this.destination.complete();
153-
}
154-
}
79+
return higherOrder(...nextSources)(this);
15580
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export { mergeScan } from './mergeScan';
4545
export { min } from './min';
4646
export { multicast } from './multicast';
4747
export { observeOn } from './observeOn';
48+
export { onErrorResumeNext } from './onErrorResumeNext';
4849
export { publish } from './publish';
4950
export { race } from './race';
5051
export { reduce } from './reduce';

src/operators/onErrorResumeNext.ts

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import { Observable, ObservableInput } from '../Observable';
2+
import { FromObservable } from '../observable/FromObservable';
3+
import { Operator } from '../Operator';
4+
import { Subscriber } from '../Subscriber';
5+
import { isArray } from '../util/isArray';
6+
import { OuterSubscriber } from '../OuterSubscriber';
7+
import { InnerSubscriber } from '../InnerSubscriber';
8+
import { subscribeToResult } from '../util/subscribeToResult';
9+
import { OperatorFunction } from '../interfaces';
10+
11+
/* tslint:disable:max-line-length */
12+
export function onErrorResumeNext<T, R>(v: ObservableInput<R>): OperatorFunction<T, R>;
13+
export function onErrorResumeNext<T, T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): OperatorFunction<T, R>;
14+
export function onErrorResumeNext<T, T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): OperatorFunction<T, R>;
15+
export function onErrorResumeNext<T, T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): OperatorFunction<T, R>;
16+
export function onErrorResumeNext<T, T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): OperatorFunction<T, R> ;
17+
export function onErrorResumeNext<T, R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): OperatorFunction<T, R>;
18+
export function onErrorResumeNext<T, R>(array: ObservableInput<any>[]): OperatorFunction<T, R>;
19+
/* tslint:enable:max-line-length */
20+
21+
/**
22+
* When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one
23+
* that was passed.
24+
*
25+
* <span class="informal">Execute series of Observables no matter what, even if it means swallowing errors.</span>
26+
*
27+
* <img src="./img/onErrorResumeNext.png" width="100%">
28+
*
29+
* `onErrorResumeNext` is an operator that accepts a series of Observables, provided either directly as
30+
* arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
31+
* as the source.
32+
*
33+
* `onErrorResumeNext` returns an Observable that starts by subscribing and re-emitting values from the source Observable.
34+
* When its stream of values ends - no matter if Observable completed or emitted an error - `onErrorResumeNext`
35+
* will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
36+
* its values as well and - again - when that stream ends, `onErrorResumeNext` will proceed to subscribing yet another
37+
* Observable in provided series, no matter if previous Observable completed or ended with an error. This will
38+
* be happening until there is no more Observables left in the series, at which point returned Observable will
39+
* complete - even if the last subscribed stream ended with an error.
40+
*
41+
* `onErrorResumeNext` can be therefore thought of as version of {@link concat} operator, which is more permissive
42+
* when it comes to the errors emitted by its input Observables. While `concat` subscribes to the next Observable
43+
* in series only if previous one successfully completed, `onErrorResumeNext` subscribes even if it ended with
44+
* an error.
45+
*
46+
* Note that you do not get any access to errors emitted by the Observables. In particular do not
47+
* expect these errors to appear in error callback passed to {@link subscribe}. If you want to take
48+
* specific actions based on what error was emitted by an Observable, you should try out {@link catch} instead.
49+
*
50+
*
51+
* @example <caption>Subscribe to the next Observable after map fails</caption>
52+
* Rx.Observable.of(1, 2, 3, 0)
53+
* .map(x => {
54+
* if (x === 0) { throw Error(); }
55+
return 10 / x;
56+
* })
57+
* .onErrorResumeNext(Rx.Observable.of(1, 2, 3))
58+
* .subscribe(
59+
* val => console.log(val),
60+
* err => console.log(err), // Will never be called.
61+
* () => console.log('that\'s it!')
62+
* );
63+
*
64+
* // Logs:
65+
* // 10
66+
* // 5
67+
* // 3.3333333333333335
68+
* // 1
69+
* // 2
70+
* // 3
71+
* // "that's it!"
72+
*
73+
* @see {@link concat}
74+
* @see {@link catch}
75+
*
76+
* @param {...ObservableInput} observables Observables passed either directly or as an array.
77+
* @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes
78+
* to the next passed Observable and so on, until it completes or runs out of Observables.
79+
* @method onErrorResumeNext
80+
* @owner Observable
81+
*/
82+
83+
export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
84+
Array<ObservableInput<any>> |
85+
((...values: Array<any>) => R)>): OperatorFunction<T, R> {
86+
if (nextSources.length === 1 && isArray(nextSources[0])) {
87+
nextSources = <Array<Observable<any>>>nextSources[0];
88+
}
89+
90+
return (source: Observable<T>) => source.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
91+
}
92+
93+
/* tslint:disable:max-line-length */
94+
export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>;
95+
export function onErrorResumeNextStatic<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
96+
export function onErrorResumeNextStatic<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
97+
export function onErrorResumeNextStatic<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
98+
export function onErrorResumeNextStatic<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;
99+
100+
export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
101+
export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>;
102+
/* tslint:enable:max-line-length */
103+
104+
export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> |
105+
Array<ObservableInput<any>> |
106+
((...values: Array<any>) => R)>): Observable<R> {
107+
let source: ObservableInput<any> = null;
108+
109+
if (nextSources.length === 1 && isArray(nextSources[0])) {
110+
nextSources = <Array<ObservableInput<any>>>nextSources[0];
111+
}
112+
source = nextSources.shift();
113+
114+
return new FromObservable(source, null).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
115+
}
116+
117+
class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
118+
constructor(private nextSources: Array<ObservableInput<any>>) {
119+
}
120+
121+
call(subscriber: Subscriber<R>, source: any): any {
122+
return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
123+
}
124+
}
125+
126+
class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
127+
constructor(protected destination: Subscriber<T>,
128+
private nextSources: Array<ObservableInput<any>>) {
129+
super(destination);
130+
}
131+
132+
notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
133+
this.subscribeToNextSource();
134+
}
135+
136+
notifyComplete(innerSub: InnerSubscriber<T, any>): void {
137+
this.subscribeToNextSource();
138+
}
139+
140+
protected _error(err: any): void {
141+
this.subscribeToNextSource();
142+
}
143+
144+
protected _complete(): void {
145+
this.subscribeToNextSource();
146+
}
147+
148+
private subscribeToNextSource(): void {
149+
const next = this.nextSources.shift();
150+
if (next) {
151+
this.add(subscribeToResult(this, next));
152+
} else {
153+
this.destination.complete();
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)