forked from ReactiveX/rxjs
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(onErrorResumeNext): add onErrorResumeNext operator
closes ReactiveX#1665
- Loading branch information
Showing
6 changed files
with
284 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import * as Rx from '../../dist/cjs/Rx'; | ||
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; | ||
|
||
const Observable = Rx.Observable; | ||
|
||
describe('Observable.onErrorResumeNext', () => { | ||
it('should continue with observables', () => { | ||
const source = hot('--a--b--#'); | ||
const next1 = cold( '--c--d--#'); | ||
const next2 = cold( '--e--#'); | ||
const next3 = cold( '--f--g--|'); | ||
const subs = '^ !'; | ||
const expected = '--a--b----c--d----e----f--g--|'; | ||
|
||
expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should continue array of observables', () => { | ||
const source = hot('--a--b--#'); | ||
const next = [ source, | ||
cold( '--c--d--#'), | ||
cold( '--e--#'), | ||
cold( '--f--g--|')]; | ||
const subs = '^ !'; | ||
const expected = '--a--b----c--d----e----f--g--|'; | ||
|
||
expectObservable(Observable.onErrorResumeNext(next)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should complete single observable throws', () => { | ||
const source = hot('#'); | ||
const subs = '(^!)'; | ||
const expected = '|'; | ||
|
||
expectObservable(Observable.onErrorResumeNext(source)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
import {expect} from 'chai'; | ||
import * as Rx from '../../dist/cjs/Rx'; | ||
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; | ||
|
||
const Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.onErrorResumeNext', () => { | ||
asDiagram('onErrorResumeNext')('should continue observable sequence with next observable', () => { | ||
const source = hot('--a--b--#'); | ||
const next = cold( '--c--d--|'); | ||
const subs = '^ !'; | ||
const expected = '--a--b----c--d--|'; | ||
|
||
expectObservable(source.onErrorResumeNext(next)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should continue with hot observables', () => { | ||
const source = hot('--a--b--#'); | ||
const next = hot('-----x----c--d--|'); | ||
const subs = '^ !'; | ||
const expected = '--a--b----c--d--|'; | ||
|
||
expectObservable(source.onErrorResumeNext(next)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should continue with array of multiple observables throw error', () => { | ||
const source = hot('--a--b--#'); | ||
const next = [cold( '--c--d--#'), | ||
cold( '--e--#'), | ||
cold( '--f--g--|')]; | ||
const subs = '^ !'; | ||
const expected = '--a--b----c--d----e----f--g--|'; | ||
|
||
expectObservable(source.onErrorResumeNext(next)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should continue with multiple observables throw error', () => { | ||
const source = hot('--a--b--#'); | ||
const next1 = cold( '--c--d--#'); | ||
const next2 = cold( '--e--#'); | ||
const next3 = cold( '--f--g--|'); | ||
const subs = '^ !'; | ||
const expected = '--a--b----c--d----e----f--g--|'; | ||
|
||
expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should continue with multiple observables does not throw error', () => { | ||
const source = hot('--a--b--|'); | ||
const next1 = cold( '--c--d--|'); | ||
const next2 = cold( '--e--|'); | ||
const next3 = cold( '--f--g--|'); | ||
const subs = '^ !'; | ||
const expected = '--a--b----c--d----e----f--g--|'; | ||
|
||
expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should continue after empty observable', () => { | ||
const source = hot('|'); | ||
const next1 = cold('--c--d--|'); | ||
const next2 = cold( '--e--#'); | ||
const next3 = cold( '--f--g--|'); | ||
const subs = '^ !'; | ||
const expected = '--c--d----e----f--g--|'; | ||
|
||
expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should not complete with observble does not ends', () => { | ||
const source = hot('--a--b--|'); | ||
const next1 = cold( '--'); | ||
const subs = '^ '; | ||
const expected = '--a--b----'; | ||
|
||
expectObservable(source.onErrorResumeNext(next1)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should not continue with observble does not ends', () => { | ||
const source = hot('--'); | ||
const next1 = cold( '-a--b-'); | ||
const subs = '^ '; | ||
const expected = '-'; | ||
|
||
expectObservable(source.onErrorResumeNext(next1)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should complete observable with next observable throws', () => { | ||
const source = hot('--a--b--#'); | ||
const next = cold( '--c--d--#'); | ||
const subs = '^ !'; | ||
const expected = '--a--b----c--d--|'; | ||
|
||
expectObservable(source.onErrorResumeNext(next)).toBe(expected); | ||
expectSubscriptions(source.subscriptions).toBe(subs); | ||
}); | ||
|
||
it('should work with promise', (done: MochaDone) => { | ||
const expected = [1, 2]; | ||
const source = Observable.concat(Observable.of(1), Observable.throw('meh')); | ||
|
||
source.onErrorResumeNext(Promise.resolve(2)) | ||
.subscribe(x => { | ||
expect(expected.shift()).to.equal(x); | ||
}, (err: any) => { | ||
done(new Error('should not be called')); | ||
}, () => { | ||
expect(expected).to.be.empty; | ||
done(); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
import {Observable} from '../../Observable'; | ||
import {onErrorResumeNextStatic} from '../../operator/onErrorResumeNext'; | ||
|
||
Observable.onErrorResumeNext = onErrorResumeNextStatic; | ||
|
||
declare module '../../Observable' { | ||
namespace Observable { | ||
export let onErrorResumeNext: typeof onErrorResumeNextStatic; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
import {Observable} from '../../Observable'; | ||
import {onErrorResumeNext, OnErrorResumeNextSignature} from '../../operator/onErrorResumeNext'; | ||
|
||
Observable.prototype.onErrorResumeNext = onErrorResumeNext; | ||
|
||
declare module '../../Observable' { | ||
interface Observable<T> { | ||
onErrorResumeNext: OnErrorResumeNextSignature<T>; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
import {Observable, ObservableInput} from '../Observable'; | ||
import {FromObservable} from '../observable/FromObservable'; | ||
import {Operator} from '../Operator'; | ||
import {Subscriber} from '../Subscriber'; | ||
import {isArray} from '../util/isArray'; | ||
import {OuterSubscriber} from '../OuterSubscriber'; | ||
import {InnerSubscriber} from '../InnerSubscriber'; | ||
import {subscribeToResult} from '../util/subscribeToResult'; | ||
|
||
export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> | | ||
Array<ObservableInput<any>> | | ||
((...values: Array<any>) => R)>): Observable<R> { | ||
if (nextSources.length === 1 && isArray(nextSources[0])) { | ||
nextSources = <Array<Observable<any>>>nextSources[0]; | ||
} | ||
|
||
return this.lift(new OnErrorResumeNextOperator<T, R>(nextSources)); | ||
} | ||
|
||
/* tslint:disable:max-line-length */ | ||
export interface OnErrorResumeNextSignature<T> { | ||
<R>(v: ObservableInput<R>): Observable<R>; | ||
<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>; | ||
<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>; | ||
<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>; | ||
<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>; | ||
|
||
<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>; | ||
<R>(array: ObservableInput<any>[]): Observable<R>; | ||
} | ||
/* tslint:enable:max-line-length */ | ||
|
||
/* tslint:disable:max-line-length */ | ||
export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>; | ||
export function onErrorResumeNextStatic<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>; | ||
export function onErrorResumeNextStatic<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>; | ||
export function onErrorResumeNextStatic<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>; | ||
export function onErrorResumeNextStatic<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>; | ||
|
||
export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>; | ||
export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>; | ||
/* tslint:enable:max-line-length */ | ||
|
||
export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> | | ||
Array<ObservableInput<any>> | | ||
((...values: Array<any>) => R)>): Observable<R> { | ||
let source: ObservableInput<any> = null; | ||
|
||
if (nextSources.length === 1 && isArray(nextSources[0])) { | ||
nextSources = <Array<ObservableInput<any>>>nextSources[0]; | ||
} | ||
source = nextSources.shift(); | ||
|
||
return new FromObservable(source, null).lift(new OnErrorResumeNextOperator<T, R>(nextSources)); | ||
} | ||
|
||
class OnErrorResumeNextOperator<T, R> implements Operator<T, R> { | ||
constructor(private nextSources: Array<ObservableInput<any>>) { | ||
} | ||
|
||
call(subscriber: Subscriber<R>, source: any): any { | ||
return source._subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources)); | ||
} | ||
} | ||
|
||
class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> { | ||
constructor(protected destination: Subscriber<T>, | ||
private nextSources: Array<ObservableInput<any>>) { | ||
super(destination); | ||
} | ||
|
||
notifyNext(outerValue: T, innerValue: any, | ||
outerIndex: number, innerIndex: number, | ||
innerSub: InnerSubscriber<T, any>): void { | ||
this.destination.next(innerValue); | ||
} | ||
|
||
notifyError(error: any, innerSub: InnerSubscriber<T, any>): void { | ||
this.subscribeToNextSource(); | ||
} | ||
|
||
notifyComplete(innerSub: InnerSubscriber<T, any>): void { | ||
this.subscribeToNextSource(); | ||
} | ||
|
||
protected _error(err: any): void { | ||
this.subscribeToNextSource(); | ||
} | ||
|
||
protected _complete(): void { | ||
this.subscribeToNextSource(); | ||
} | ||
|
||
private subscribeToNextSource(): void { | ||
const next = this.nextSources.shift(); | ||
if (next) { | ||
this.add(subscribeToResult(this, next)); | ||
} else { | ||
this.destination.complete(); | ||
} | ||
} | ||
} |