Skip to content

Commit

Permalink
feat(operator): add retryWhen operator. closes #129
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Aug 13, 2015
1 parent 1f36d99 commit 65eb50e
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 1 deletion.
56 changes: 56 additions & 0 deletions spec/operators/retryWhen-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.retryWhen()', function () {
it('should retry when notified via returned notifier on thrown error', function (done) {
var retried = false;
var expected = [1, 2, 1, 2];
var i = 0;
Observable.of(1, 2, 3)
.map(function (n) {
if (n === 3) {
throw 'bad';
}
return n;
})
.retryWhen(function (errors) {
return errors.map(function (x) {
expect(x).toBe('bad');
if (retried) {
throw 'done';
}
retried = true;
return x;
});
})
.subscribe(function (x) {
expect(x).toBe(expected[i++]);
},
function (err) {
expect(err).toBe('done');
done();
})
});

it('should retry when notified and complete on returned completion', function (done) {
var expected = [1, 2, 1, 2];
Observable.of(1, 2, 3)
.map(function (n) {
if (n === 3) {
throw 'bad';
}
return n;
})
.retryWhen(function (errors) {
return errors.take(1);
})
.subscribe(function (n) {
expect(n).toBe(expected.shift());
}, function (err) {
throw 'error should not be called';
}, function () {
done();
});
});
});
1 change: 0 additions & 1 deletion spec/subject-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ describe('Subject', function () {
});

it('should clean out unsubscribed subscribers', function (done) {
debugger;
var subject = new Subject();

var sub1 = subject.subscribe(function (x) {
Expand Down
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,5 @@ export default class Observable<T> {
multicast: (subjectFactory: () => Subject<T>) => ConnectableObservable<T>;

catch: (selector: (err: any, source: Observable<T>, caught: Observable<any>) => Observable<any>) => Observable<T>;
retryWhen: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
}
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ import partition from './operators/partition';
observableProto.partition = partition;

import _catch from './operators/catch';
import retryWhen from './operators/retryWhen';

observableProto.catch = _catch;
observableProto.retryWhen = retryWhen;

export default {
Subject,
Expand Down
74 changes: 74 additions & 0 deletions src/operators/retryWhen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subject from '../Subject';
import Subscription from '../Subscription';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export default function retryWhen<T>(notifier: (errors:Observable<any>) => Observable<any>) {
return this.lift(new RetryWhenOperator(notifier, this));
}

export class RetryWhenOperator<T, R> extends Operator<T, R> {
constructor(protected notifier: (errors: Observable<any>) => Observable<any>, protected original:Observable<T>) {
super();
}

call(observer: Observer<T>): Observer<T> {
return new RetryWhenSubscriber<T>(observer, this.notifier, this.original);
}
}

export class RetryWhenSubscriber<T> extends Subscriber<T> {
errors: Subject<any>;
retryNotifications: Observable<any>;
retryNotificationSubscription: Subscription<any>;

constructor(destination: Observer<T>, public notifier: (errors: Observable<any>) => Observable<any>, public original: Observable<T>) {
super(destination);
}

_error(err: any) {
if (!this.retryNotifications) {
this.errors = new Subject();
const notifications = tryCatch(this.notifier).call(this, this.errors);
if (notifications === errorObject) {
this.destination.error(errorObject.e);
} else {
this.retryNotifications = notifications;
this.retryNotificationSubscription = notifications.subscribe(new RetryNotificationSubscriber(this));
this.add(this.retryNotificationSubscription);
}
}
this.errors.next(err);
}

finalError(err: any) {
this.destination.error(err);
}

resubscribe() {
this.original.subscribe(this);
}
}

export class RetryNotificationSubscriber<T> extends Subscriber<T> {
constructor(public parent: RetryWhenSubscriber<any>) {
super(null);
}

_next(value: T) {
this.parent.resubscribe();
}

_error(err: any) {
this.parent.finalError(err);
}

_complete() {
this.parent.complete();
}
}

0 comments on commit 65eb50e

Please sign in to comment.