Skip to content

Commit

Permalink
fix(takeWhile): force unsubscribe when it completes or errors (#2470)
Browse files Browse the repository at this point in the history
Force unsubscribe on `takeWhile` when it completes or errors,
even when used with operator that does not unsubscribe reliably,
so that source Observable is never subscribed longer than it
needs to.
  • Loading branch information
mpodlasin authored and benlesh committed Jun 14, 2017
1 parent e36adba commit 2b4a96c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
16 changes: 16 additions & 0 deletions spec/helpers/doNotUnsubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
///<reference path='../../typings/index.d.ts'/>
import * as Rx from '../../dist/cjs/Rx';

export function doNotUnsubscribe<T>(ob: Rx.Observable<T>): Rx.Observable<T> {
return ob.lift(new DoNotUnsubscribeOperator());
}

class DoNotUnsubscribeOperator<T, R> implements Rx.Operator<T, R> {
call(subscriber: Rx.Subscriber<R>, source: any): any {
return source.subscribe(new DoNotUnsubscribeSubscriber(subscriber));
}
}

class DoNotUnsubscribeSubscriber<T> extends Rx.Subscriber<T> {
unsubscribe() {} // tslint:disable-line no-empty
}
34 changes: 34 additions & 0 deletions spec/operators/takeWhile-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
import { doNotUnsubscribe } from '../helpers/doNotUnsubscribe';

declare const { asDiagram };
declare const hot: typeof marbleTestingSignature.hot;
Expand Down Expand Up @@ -205,4 +206,37 @@ describe('Observable.prototype.takeWhile', () => {
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe from source Observable after complete,' +
' even if destination does not unsubscribe', () => {
const e1 = hot('--a-^-b--c--d--e--|');
const e1subs = '^-------! ';
const expected = '--b--c--|';

function predicate(value) {
return value !== 'd';
}

const result = e1.takeWhile(predicate).let(doNotUnsubscribe);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe from source Observable after error,' +
' even if destination does not unsubscribe', () => {
const e1 = hot('--a-^-b--c--d--e--|');
const e1subs = '^----! ';
const expected = '--b--#';

function predicate(value) {
if (value === 'c') { throw 'error'; }
return value !== 'd';
}

const result = e1.takeWhile(predicate).let(doNotUnsubscribe);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
2 changes: 2 additions & 0 deletions src/operator/takeWhile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TakeWhileSubscriber<T> extends Subscriber<T> {
result = this.predicate(value, this.index++);
} catch (err) {
destination.error(err);
this.unsubscribe();
return;
}
this.nextOrComplete(value, result);
Expand All @@ -83,6 +84,7 @@ class TakeWhileSubscriber<T> extends Subscriber<T> {
destination.next(value);
} else {
destination.complete();
this.unsubscribe();
}
}
}

0 comments on commit 2b4a96c

Please sign in to comment.