From 49b3ce71d1fc223ec5360b28e9f075f5ac115de6 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Mon, 2 May 2016 22:36:46 -0700 Subject: [PATCH] feat(onErrorResumeNext): add onErrorResumeNext operator closes #1665 --- spec/observables/onErrorResumeNext-spec.ts | 40 +++++++ spec/operators/onErrorResumeNext-spec.ts | 120 +++++++++++++++++++++ src/Rx.ts | 2 + src/add/observable/onErrorResumeNext.ts | 10 ++ src/add/operator/onErrorResumeNext.ts | 10 ++ src/operator/onErrorResumeNext.ts | 96 +++++++++++++++++ 6 files changed, 278 insertions(+) create mode 100644 spec/observables/onErrorResumeNext-spec.ts create mode 100644 spec/operators/onErrorResumeNext-spec.ts create mode 100644 src/add/observable/onErrorResumeNext.ts create mode 100644 src/add/operator/onErrorResumeNext.ts create mode 100644 src/operator/onErrorResumeNext.ts diff --git a/spec/observables/onErrorResumeNext-spec.ts b/spec/observables/onErrorResumeNext-spec.ts new file mode 100644 index 0000000000..0e966e7851 --- /dev/null +++ b/spec/observables/onErrorResumeNext-spec.ts @@ -0,0 +1,40 @@ +import * as Rx from '../../dist/cjs/Rx'; +declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; + +const Observable = Rx.Observable; + +describe('Observable.onErrorResumeNext', () => { + it('should continue with observables', () => { + const source = hot('--a--b--#'); + const next1 = cold( '--c--d--#'); + const next2 = cold( '--e--#'); + const next3 = cold( '--f--g--|'); + const subs = '^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should continue array of observables', () => { + const source = hot('--a--b--#'); + const next = [ source, + cold( '--c--d--#'), + cold( '--e--#'), + cold( '--f--g--|')]; + const subs = '^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(Observable.onErrorResumeNext(next)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should complete single observable throws', () => { + const source = hot('#'); + const subs = '(^!)'; + const expected = '|'; + + expectObservable(Observable.onErrorResumeNext(source)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); +}); \ No newline at end of file diff --git a/spec/operators/onErrorResumeNext-spec.ts b/spec/operators/onErrorResumeNext-spec.ts new file mode 100644 index 0000000000..420d1de7fd --- /dev/null +++ b/spec/operators/onErrorResumeNext-spec.ts @@ -0,0 +1,120 @@ +import {expect} from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; +declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; + +const Observable = Rx.Observable; + +describe('Observable.prototype.onErrorResumeNext', () => { + asDiagram('onErrorResumeNext')('should continue observable sequence with next observable', () => { + const source = hot('--a--b--#'); + const next = cold( '--c--d--|'); + const subs = '^ !'; + const expected = '--a--b----c--d--|'; + + expectObservable(source.onErrorResumeNext(next)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should continue with hot observables', () => { + const source = hot('--a--b--#'); + const next = hot('-----x----c--d--|'); + const subs = '^ !'; + const expected = '--a--b----c--d--|'; + + expectObservable(source.onErrorResumeNext(next)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should continue with array of multiple observables throw error', () => { + const source = hot('--a--b--#'); + const next = [cold( '--c--d--#'), + cold( '--e--#'), + cold( '--f--g--|')]; + const subs = '^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(source.onErrorResumeNext(next)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should continue with multiple observables throw error', () => { + const source = hot('--a--b--#'); + const next1 = cold( '--c--d--#'); + const next2 = cold( '--e--#'); + const next3 = cold( '--f--g--|'); + const subs = '^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should continue with multiple observables does not throw error', () => { + const source = hot('--a--b--|'); + const next1 = cold( '--c--d--|'); + const next2 = cold( '--e--|'); + const next3 = cold( '--f--g--|'); + const subs = '^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should continue after empty observable', () => { + const source = hot('|'); + const next1 = cold('--c--d--|'); + const next2 = cold( '--e--#'); + const next3 = cold( '--f--g--|'); + const subs = '^ !'; + const expected = '--c--d----e----f--g--|'; + + expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should not complete with observble does not ends', () => { + const source = hot('--a--b--|'); + const next1 = cold( '--'); + const subs = '^ '; + const expected = '--a--b----'; + + expectObservable(source.onErrorResumeNext(next1)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should not continue with observble does not ends', () => { + const source = hot('--'); + const next1 = cold( '-a--b-'); + const subs = '^ '; + const expected = '-'; + + expectObservable(source.onErrorResumeNext(next1)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should complete observable with next observable throws', () => { + const source = hot('--a--b--#'); + const next = cold( '--c--d--#'); + const subs = '^ !'; + const expected = '--a--b----c--d--|'; + + expectObservable(source.onErrorResumeNext(next)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should work with promise', (done: MochaDone) => { + const expected = [1, 2]; + const source = Observable.concat(Observable.of(1), Observable.throw('meh')); + + source.onErrorResumeNext(Promise.resolve(2)) + .subscribe(x => { + expect(expected.shift()).to.equal(x); + }, (err: any) => { + done(new Error('should not be called')); + }, () => { + expect(expected).to.be.empty; + done(); + }); + }); +}); diff --git a/src/Rx.ts b/src/Rx.ts index 5462589f15..8aa4aac74d 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -26,6 +26,7 @@ import './add/observable/merge'; import './add/observable/race'; import './add/observable/never'; import './add/observable/of'; +import './add/observable/onErrorResumeNext'; import './add/observable/range'; import './add/observable/using'; import './add/observable/throw'; @@ -91,6 +92,7 @@ import './add/operator/mergeScan'; import './add/operator/min'; import './add/operator/multicast'; import './add/operator/observeOn'; +import './add/operator/onErrorResumeNext'; import './add/operator/pairwise'; import './add/operator/partition'; import './add/operator/pluck'; diff --git a/src/add/observable/onErrorResumeNext.ts b/src/add/observable/onErrorResumeNext.ts new file mode 100644 index 0000000000..609596bd6a --- /dev/null +++ b/src/add/observable/onErrorResumeNext.ts @@ -0,0 +1,10 @@ +import {Observable} from '../../Observable'; +import {onErrorResumeNextStatic} from '../../operator/onErrorResumeNext'; + +Observable.onErrorResumeNext = onErrorResumeNextStatic; + +declare module '../../Observable' { + namespace Observable { + export let onErrorResumeNext: typeof onErrorResumeNextStatic; + } +} \ No newline at end of file diff --git a/src/add/operator/onErrorResumeNext.ts b/src/add/operator/onErrorResumeNext.ts new file mode 100644 index 0000000000..00e8ea6757 --- /dev/null +++ b/src/add/operator/onErrorResumeNext.ts @@ -0,0 +1,10 @@ +import {Observable} from '../../Observable'; +import {onErrorResumeNext, OnErrorResumeNextSignature} from '../../operator/onErrorResumeNext'; + +Observable.prototype.onErrorResumeNext = onErrorResumeNext; + +declare module '../../Observable' { + interface Observable { + onErrorResumeNext: OnErrorResumeNextSignature; + } +} \ No newline at end of file diff --git a/src/operator/onErrorResumeNext.ts b/src/operator/onErrorResumeNext.ts new file mode 100644 index 0000000000..cbeb252e6d --- /dev/null +++ b/src/operator/onErrorResumeNext.ts @@ -0,0 +1,96 @@ +import {Observable, ObservableInput} from '../Observable'; +import {FromObservable} from '../observable/FromObservable'; +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {isArray} from '../util/isArray'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; +import {subscribeToResult} from '../util/subscribeToResult'; + +export function onErrorResumeNext(...nextSources: Array | + Array> | + ((...values: Array) => R)>): Observable { + if (nextSources.length === 1 && isArray(nextSources[0])) { + nextSources = >>nextSources[0]; + } + + return this.lift(new OnErrorResumeNextOperator(nextSources)); +} + +/* tslint:disable:max-line-length */ +export interface OnErrorResumeNextSignature { + (v: ObservableInput): Observable; + (v2: ObservableInput, v3: ObservableInput): Observable; + (v2: ObservableInput, v3: ObservableInput, v4: ObservableInput): Observable; + (v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput): Observable; + (v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput): Observable; + + (...observables: Array | ((...values: Array) => R)>): Observable; + (array: ObservableInput[]): Observable; +} +/* tslint:enable:max-line-length */ + +/* tslint:disable:max-line-length */ +export function onErrorResumeNextStatic(v: ObservableInput): Observable; +export function onErrorResumeNextStatic(v2: ObservableInput, v3: ObservableInput): Observable; +export function onErrorResumeNextStatic(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput): Observable; +export function onErrorResumeNextStatic(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput): Observable; +export function onErrorResumeNextStatic(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput): Observable; + +export function onErrorResumeNextStatic(...observables: Array | ((...values: Array) => R)>): Observable; +export function onErrorResumeNextStatic(array: ObservableInput[]): Observable; +/* tslint:enable:max-line-length */ + +export function onErrorResumeNextStatic(...nextSources: Array | + Array> | + ((...values: Array) => R)>): Observable { + let source: ObservableInput = null; + + if (nextSources.length === 1 && isArray(nextSources[0])) { + nextSources = >>nextSources[0]; + } + source = nextSources.shift(); + + return new FromObservable(source, null).lift(new OnErrorResumeNextOperator(nextSources)); +} + +class OnErrorResumeNextOperator implements Operator { + constructor(private nextSources: Array>) { + } + + call(subscriber: Subscriber, source: any): any { + return source._subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources)); + } +} + +class OnErrorResumeNextSubscriber extends OuterSubscriber { + constructor(protected destination: Subscriber, + private nextSources: Array>) { + super(destination); + } + + notifyError(error: any, innerSub: InnerSubscriber): void { + this.subscribeToNextSource(); + } + + notifyComplete(innerSub: InnerSubscriber): void { + this.subscribeToNextSource(); + } + + protected _error(err: any): void { + this.subscribeToNextSource(); + } + + protected _complete(): void { + this.subscribeToNextSource(); + } + + private subscribeToNextSource(): void { + const next = this.nextSources.shift(); + if (next) { + this.add(subscribeToResult(this, next)); + } else { + this.destination.complete(); + } + } +} \ No newline at end of file