Skip to content

Commit

Permalink
feat(onErrorResumeNext): add onErrorResumeNext operator
Browse files Browse the repository at this point in the history
closes #1665
  • Loading branch information
kwonoj committed May 3, 2016
1 parent 7a551cc commit 49b3ce7
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 0 deletions.
40 changes: 40 additions & 0 deletions spec/observables/onErrorResumeNext-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;

describe('Observable.onErrorResumeNext', () => {
it('should continue with observables', () => {
const source = hot('--a--b--#');
const next1 = cold( '--c--d--#');
const next2 = cold( '--e--#');
const next3 = cold( '--f--g--|');
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should continue array of observables', () => {
const source = hot('--a--b--#');
const next = [ source,
cold( '--c--d--#'),
cold( '--e--#'),
cold( '--f--g--|')];
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(Observable.onErrorResumeNext(next)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should complete single observable throws', () => {
const source = hot('#');
const subs = '(^!)';
const expected = '|';

expectObservable(Observable.onErrorResumeNext(source)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
120 changes: 120 additions & 0 deletions spec/operators/onErrorResumeNext-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;

describe('Observable.prototype.onErrorResumeNext', () => {
asDiagram('onErrorResumeNext')('should continue observable sequence with next observable', () => {
const source = hot('--a--b--#');
const next = cold( '--c--d--|');
const subs = '^ !';
const expected = '--a--b----c--d--|';

expectObservable(source.onErrorResumeNext(next)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should continue with hot observables', () => {
const source = hot('--a--b--#');
const next = hot('-----x----c--d--|');
const subs = '^ !';
const expected = '--a--b----c--d--|';

expectObservable(source.onErrorResumeNext(next)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should continue with array of multiple observables throw error', () => {
const source = hot('--a--b--#');
const next = [cold( '--c--d--#'),
cold( '--e--#'),
cold( '--f--g--|')];
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(source.onErrorResumeNext(next)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should continue with multiple observables throw error', () => {
const source = hot('--a--b--#');
const next1 = cold( '--c--d--#');
const next2 = cold( '--e--#');
const next3 = cold( '--f--g--|');
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should continue with multiple observables does not throw error', () => {
const source = hot('--a--b--|');
const next1 = cold( '--c--d--|');
const next2 = cold( '--e--|');
const next3 = cold( '--f--g--|');
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should continue after empty observable', () => {
const source = hot('|');
const next1 = cold('--c--d--|');
const next2 = cold( '--e--#');
const next3 = cold( '--f--g--|');
const subs = '^ !';
const expected = '--c--d----e----f--g--|';

expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should not complete with observble does not ends', () => {
const source = hot('--a--b--|');
const next1 = cold( '--');
const subs = '^ ';
const expected = '--a--b----';

expectObservable(source.onErrorResumeNext(next1)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should not continue with observble does not ends', () => {
const source = hot('--');
const next1 = cold( '-a--b-');
const subs = '^ ';
const expected = '-';

expectObservable(source.onErrorResumeNext(next1)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should complete observable with next observable throws', () => {
const source = hot('--a--b--#');
const next = cold( '--c--d--#');
const subs = '^ !';
const expected = '--a--b----c--d--|';

expectObservable(source.onErrorResumeNext(next)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should work with promise', (done: MochaDone) => {
const expected = [1, 2];
const source = Observable.concat(Observable.of(1), Observable.throw('meh'));

source.onErrorResumeNext(Promise.resolve(2))
.subscribe(x => {
expect(expected.shift()).to.equal(x);
}, (err: any) => {
done(new Error('should not be called'));
}, () => {
expect(expected).to.be.empty;
done();
});
});
});
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import './add/observable/merge';
import './add/observable/race';
import './add/observable/never';
import './add/observable/of';
import './add/observable/onErrorResumeNext';
import './add/observable/range';
import './add/observable/using';
import './add/observable/throw';
Expand Down Expand Up @@ -91,6 +92,7 @@ import './add/operator/mergeScan';
import './add/operator/min';
import './add/operator/multicast';
import './add/operator/observeOn';
import './add/operator/onErrorResumeNext';
import './add/operator/pairwise';
import './add/operator/partition';
import './add/operator/pluck';
Expand Down
10 changes: 10 additions & 0 deletions src/add/observable/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {Observable} from '../../Observable';
import {onErrorResumeNextStatic} from '../../operator/onErrorResumeNext';

Observable.onErrorResumeNext = onErrorResumeNextStatic;

declare module '../../Observable' {
namespace Observable {
export let onErrorResumeNext: typeof onErrorResumeNextStatic;
}
}
10 changes: 10 additions & 0 deletions src/add/operator/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {Observable} from '../../Observable';
import {onErrorResumeNext, OnErrorResumeNextSignature} from '../../operator/onErrorResumeNext';

Observable.prototype.onErrorResumeNext = onErrorResumeNext;

declare module '../../Observable' {
interface Observable<T> {
onErrorResumeNext: OnErrorResumeNextSignature<T>;
}
}
96 changes: 96 additions & 0 deletions src/operator/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import {Observable, ObservableInput} from '../Observable';
import {FromObservable} from '../observable/FromObservable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {isArray} from '../util/isArray';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): Observable<R> {
if (nextSources.length === 1 && isArray(nextSources[0])) {
nextSources = <Array<Observable<any>>>nextSources[0];
}

return this.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
}

/* tslint:disable:max-line-length */
export interface OnErrorResumeNextSignature<T> {
<R>(v: ObservableInput<R>): Observable<R>;
<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;

<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
<R>(array: ObservableInput<any>[]): Observable<R>;
}
/* tslint:enable:max-line-length */

/* tslint:disable:max-line-length */
export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>;
export function onErrorResumeNextStatic<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
export function onErrorResumeNextStatic<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
export function onErrorResumeNextStatic<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
export function onErrorResumeNextStatic<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;

export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>;
/* tslint:enable:max-line-length */

export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): Observable<R> {
let source: ObservableInput<any> = null;

if (nextSources.length === 1 && isArray(nextSources[0])) {
nextSources = <Array<ObservableInput<any>>>nextSources[0];
}
source = nextSources.shift();

return new FromObservable(source, null).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
}

class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
constructor(private nextSources: Array<ObservableInput<any>>) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
}
}

class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
constructor(protected destination: Subscriber<T>,
private nextSources: Array<ObservableInput<any>>) {
super(destination);
}

notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
this.subscribeToNextSource();
}

notifyComplete(innerSub: InnerSubscriber<T, any>): void {
this.subscribeToNextSource();
}

protected _error(err: any): void {
this.subscribeToNextSource();
}

protected _complete(): void {
this.subscribeToNextSource();
}

private subscribeToNextSource(): void {
const next = this.nextSources.shift();
if (next) {
this.add(subscribeToResult(this, next));
} else {
this.destination.complete();
}
}
}

0 comments on commit 49b3ce7

Please sign in to comment.