Skip to content

Commit

Permalink
fix(find): force unsubscribe when it completes or errors
Browse files Browse the repository at this point in the history
Force unsubscribe when resulting Observable completes or errors,
even when following operator does not unsubscribe reliably,
so that source Observable is not being subscribed unnecessarily.
  • Loading branch information
mpodlasin committed Apr 12, 2017
1 parent ee1ce30 commit c975b2b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
16 changes: 16 additions & 0 deletions spec/helpers/doNotUnsubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
///<reference path='../../typings/index.d.ts'/>
import * as Rx from '../../dist/cjs/Rx';

export function doNotUnsubscribe<T>(ob: Rx.Observable<T>): Rx.Observable<T> {
return ob.lift(new DoNotUnsubscribeOperator());
}

class DoNotUnsubscribeOperator<T, R> implements Rx.Operator<T, R> {
call(subscriber: Rx.Subscriber<R>, source: any): any {
return source.subscribe(new DoNotUnsubscribeSubscriber(subscriber));
}
}

class DoNotUnsubscribeSubscriber<T> extends Rx.Subscriber<T> {
unsubscribe() {} // tslint:disable-line no-empty
}
28 changes: 28 additions & 0 deletions spec/operators/find-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
import { doNotUnsubscribe } from '../helpers/doNotUnsubscribe';

declare const { asDiagram };
declare const hot: typeof marbleTestingSignature.hot;
Expand Down Expand Up @@ -162,6 +163,33 @@ describe('Observable.prototype.find', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should unsubscribe from source when complete, even if following operator does not unsubscribe', () => {
const values = {a: 3, b: 9, c: 15, d: 20};
const source = hot('---a--b--c--d---|', values);
const subs = '^ ! ';
const expected = '---------(c|) ';

const predicate = function (x) { return x % 5 === 0; };

expectObservable((<any>source).find(predicate).let(doNotUnsubscribe)).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should unsubscribe from source when predicate function errors,' +
' even if followring operator does not unsubscribe', () => {

const source = hot('--a--b--c--|');
const subs = '^ !';
const expected = '--#';

const predicate = function (value) {
throw 'error';
};

expectObservable((<any>source).find(predicate).let(doNotUnsubscribe)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should support type guards without breaking previous behavior', () => {
// tslint:disable no-unused-variable

Expand Down
2 changes: 2 additions & 0 deletions src/operator/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export class FindValueSubscriber<T> extends Subscriber<T> {

destination.next(value);
destination.complete();
this.unsubscribe();
}

protected _next(value: T): void {
Expand All @@ -97,6 +98,7 @@ export class FindValueSubscriber<T> extends Subscriber<T> {
}
} catch (err) {
this.destination.error(err);
this.unsubscribe();
}
}

Expand Down

0 comments on commit c975b2b

Please sign in to comment.