diff --git a/spec/observables/onErrorResumeNext-spec.ts b/spec/observables/onErrorResumeNext-spec.ts index 54e354b0fe..3f9c8a9e5d 100644 --- a/spec/observables/onErrorResumeNext-spec.ts +++ b/spec/observables/onErrorResumeNext-spec.ts @@ -1,4 +1,4 @@ -import * as Rx from '../../src/Rx'; +import { onErrorResumeNext } from '../../src/create'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports declare const hot: typeof marbleTestingSignature.hot; @@ -6,32 +6,41 @@ declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -const Observable = Rx.Observable; - -describe('Observable.onErrorResumeNext', () => { +describe('onErrorResumeNext', () => { it('should continue with observables', () => { - const source = hot('--a--b--#'); - const next1 = cold( '--c--d--#'); - const next2 = cold( '--e--#'); - const next3 = cold( '--f--g--|'); - const subs = '^ !'; - const expected = '--a--b----c--d----e----f--g--|'; - - expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(subs); + const s1 = hot('--a--b--#'); + const s2 = cold( '--c--d--#'); + const s3 = cold( '--e--#'); + const s4 = cold( '--f--g--|'); + const subs1 = '^ !'; + const subs2 = ' ^ !'; + const subs3 = ' ^ !'; + const subs4 = ' ^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(onErrorResumeNext(s1, s2, s3, s4)).toBe(expected); + expectSubscriptions(s1.subscriptions).toBe(subs1); + expectSubscriptions(s2.subscriptions).toBe(subs2); + expectSubscriptions(s3.subscriptions).toBe(subs3); + expectSubscriptions(s4.subscriptions).toBe(subs4); }); it('should continue array of observables', () => { - const source = hot('--a--b--#'); - const next = [ source, - cold( '--c--d--#'), - cold( '--e--#'), - cold( '--f--g--|')]; - const subs = '^ !'; - const expected = '--a--b----c--d----e----f--g--|'; - - expectObservable(Observable.onErrorResumeNext(next)).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(subs); + const s1 = hot('--a--b--#'); + const s2 = cold( '--c--d--#'); + const s3 = cold( '--e--#'); + const s4 = cold( '--f--g--|'); + const subs1 = '^ !'; + const subs2 = ' ^ !'; + const subs3 = ' ^ !'; + const subs4 = ' ^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(onErrorResumeNext([s1, s2, s3, s4])).toBe(expected); + expectSubscriptions(s1.subscriptions).toBe(subs1); + expectSubscriptions(s2.subscriptions).toBe(subs2); + expectSubscriptions(s3.subscriptions).toBe(subs3); + expectSubscriptions(s4.subscriptions).toBe(subs4); }); it('should complete single observable throws', () => { @@ -39,7 +48,7 @@ describe('Observable.onErrorResumeNext', () => { const subs = '(^!)'; const expected = '|'; - expectObservable(Observable.onErrorResumeNext(source)).toBe(expected); + expectObservable(onErrorResumeNext(source)).toBe(expected); expectSubscriptions(source.subscriptions).toBe(subs); }); }); diff --git a/src/internal/observable/onErrorResumeNext.ts b/src/internal/observable/onErrorResumeNext.ts index d1c89e65b9..edd468cd82 100644 --- a/src/internal/observable/onErrorResumeNext.ts +++ b/src/internal/observable/onErrorResumeNext.ts @@ -1,11 +1,7 @@ import { Observable, ObservableInput } from '../Observable'; import { from } from './from'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { isArray } from '..//util/isArray'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '..//util/subscribeToResult'; +import { isArray } from '../util/isArray'; +import { EMPTY } from './empty'; /* tslint:disable:max-line-length */ export function onErrorResumeNext(v: ObservableInput): Observable; @@ -73,62 +69,35 @@ export function onErrorResumeNext(array: ObservableInput[]): Observable< * @see {@link concat} * @see {@link catch} * - * @param {...ObservableInput} observables Observables passed either directly or as an array. + * @param {...ObservableInput} sources Observables passed either directly or as an array. * @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes * to the next passed Observable and so on, until it completes or runs out of Observables. * @method onErrorResumeNext * @owner Observable */ -export function onErrorResumeNext(...nextSources: Array | +export function onErrorResumeNext(...sources: Array | Array> | ((...values: Array) => R)>): Observable { - let source: ObservableInput = null; - if (nextSources.length === 1 && isArray(nextSources[0])) { - nextSources = >>nextSources[0]; + if (sources.length === 0) { + return EMPTY; } - source = nextSources.shift(); - return from(source, null).lift(new OnErrorResumeNextOperator(nextSources)); -} - -class OnErrorResumeNextOperator implements Operator { - constructor(private nextSources: Array>) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources)); - } -} - -class OnErrorResumeNextSubscriber extends OuterSubscriber { - constructor(protected destination: Subscriber, - private nextSources: Array>) { - super(destination); - } - - notifyError(error: any, innerSub: InnerSubscriber): void { - this.subscribeToNextSource(); - } + const [ first, ...remainder ] = sources; - notifyComplete(innerSub: InnerSubscriber): void { - this.subscribeToNextSource(); + if (sources.length === 1 && isArray(first)) { + return onErrorResumeNext(...first); } - protected _error(err: any): void { - this.subscribeToNextSource(); - } + return new Observable(subscriber => { + const subNext = () => subscriber.add( + onErrorResumeNext(...remainder).subscribe(subscriber) + ); - protected _complete(): void { - this.subscribeToNextSource(); - } - - private subscribeToNextSource(): void { - const next = this.nextSources.shift(); - if (next) { - this.add(subscribeToResult(this, next)); - } else { - this.destination.complete(); - } - } + return from(first).subscribe({ + next(value) { subscriber.next(value); }, + error: subNext, + complete: subNext, + }); + }); }