Skip to content

Commit

Permalink
fix(catch): fix catch to dispose old subscriptions
Browse files Browse the repository at this point in the history
Fix catch operator to not have anymore a shared underlying Subscription, and instead reset the
subscription for each new observable replacing the caught error. This fixes a potential memory leak
if catch is used as an infinite retry, because subscriptions would be retained since the beginning,
and would increasing each time a catch is performed.

Resolves issue ReactiveX#763.
  • Loading branch information
staltz committed Nov 24, 2015
1 parent a134d15 commit 3f7b8c5
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 33 deletions.
57 changes: 37 additions & 20 deletions spec/operators/catch-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('Observable.prototype.catch()', function () {

it('should catch error and replace it with a cold Observable', function () {
var e1 = hot('--a--b--#----| ');
var e1subs = '^ !';
var e1subs = '^ ! ';
var e2 = cold( '1-2-3-4-5-|');
var e2subs = ' ^ !';
var expected = '--a--b--1-2-3-4-5-|';
Expand All @@ -37,9 +37,23 @@ describe('Observable.prototype.catch()', function () {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should allow unsubscribing explicitly and early', function () {
var e1 = hot('--1-2-3-4-5-6---#');
var unsub = ' ! ';
var e1subs = '^ ! ';
var expected = '--1-2-3- ';

var result = e1.catch(function () {
return Observable.of('X', 'Y', 'Z');
});

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should catch error and replace it with a hot Observable', function () {
var e1 = hot('--a--b--#----| ');
var e1subs = '^ !';
var e1subs = '^ ! ';
var e2 = hot('1-2-3-4-5-6-7-8-9-|');
var e2subs = ' ^ !';
var expected = '--a--b--5-6-7-8-9-|';
Expand All @@ -54,8 +68,8 @@ describe('Observable.prototype.catch()', function () {
it('should catch and allow the cold observable to be repeated with the third ' +
'(caught) argument', function () {
var e1 = cold('--a--b--c--------| ');
var subs = ['^ !',
' ^ !',
var subs = ['^ ! ',
' ^ ! ',
' ^ !'];
var expected = '--a--b----a--b----a--b--#';

Expand All @@ -81,7 +95,7 @@ describe('Observable.prototype.catch()', function () {
it('should catch and allow the hot observable to proceed with the third ' +
'(caught) argument', function () {
var e1 = hot('--a--b--c----d---|');
var subs = ['^ !',
var subs = ['^ ! ',
' ^ !'];
var expected = '--a--b-------d---|';

Expand Down Expand Up @@ -132,41 +146,44 @@ describe('Observable.prototype.catch()', function () {

it('should complete if you return Observable.empty()', function () {
var e1 = hot('--a--b--#');
var subs = '^ !';
var e1subs = '^ !';
var e2 = cold( '|');
var e2subs = ' (^!)';
var expected = '--a--b--|';

var result = e1.catch(function (err) {
return Observable.empty();
});
var result = e1.catch(function () { return e2; });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should raise error if you return Observable.throw()', function () {
var e1 = hot('--a--b--#');
var subs = '^ !';
var e1subs = '^ !';
var e2 = cold( '#');
var e2subs = ' (^!)';
var expected = '--a--b--#';

var result = e1.catch(function (err) {
return Observable.throw('error');
});
var result = e1.catch(function () { return e2; });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should never terminate if you return Observable.never()', function () {
var e1 = hot('--a--b--#');
var subs = '^ ';
var e1subs = '^ !';
var e2 = cold( '-');
var e2subs = ' ^';
var expected = '--a--b---';

var result = e1.catch(function (err) {
return Observable.never();
});
var result = e1.catch(function () { return e2; });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should pass the error as the first argument', function (done) {
Expand Down
36 changes: 23 additions & 13 deletions src/operators/catch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

Expand All @@ -20,12 +21,9 @@ export function _catch<T>(selector: (err: any, caught: Observable<any>) => Obser
}

class CatchOperator<T, R> implements Operator<T, R> {
selector: (err: any, caught: Observable<any>) => Observable<any>;
caught: Observable<any>;
source: Observable<T>;

constructor(selector: (err: any, caught: Observable<any>) => Observable<any>) {
this.selector = selector;
constructor(private selector: (err: any, caught: Observable<any>) => Observable<any>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
Expand All @@ -34,23 +32,35 @@ class CatchOperator<T, R> implements Operator<T, R> {
}

class CatchSubscriber<T> extends Subscriber<T> {
selector: (err: any, caught: Observable<any>) => Observable<any>;
caught: Observable<any>;
private lastSubscription: Subscription<T>;

constructor(public destination: Subscriber<T>,
private selector: (err: any, caught: Observable<any>) => Observable<any>,
private caught: Observable<any>) {
super(null);
this.lastSubscription = this;
}

constructor(destination: Subscriber<T>,
selector: (err: any, caught: Observable<any>) => Observable<any>,
caught: Observable<any>) {
super(destination);
this.selector = selector;
this.caught = caught;
_next(value: T) {
this.destination.next(value);
}

_error(err) {
const result = tryCatch(this.selector)(err, this.caught);
if (result === errorObject) {
this.destination.error(errorObject.e);
} else {
this.add(result.subscribe(this.destination));
this.lastSubscription.unsubscribe();
this.lastSubscription = result.subscribe(this.destination);
}
}

_complete() {
this.lastSubscription.unsubscribe();
this.destination.complete();
}

_unsubscribe() {
this.lastSubscription.unsubscribe();
}
}

0 comments on commit 3f7b8c5

Please sign in to comment.