diff --git a/spec/helpers/doNotUnsubscribe.ts b/spec/helpers/doNotUnsubscribe.ts new file mode 100644 index 0000000000..22752739dc --- /dev/null +++ b/spec/helpers/doNotUnsubscribe.ts @@ -0,0 +1,16 @@ +/// +import * as Rx from '../../dist/cjs/Rx'; + +export function doNotUnsubscribe(ob: Rx.Observable): Rx.Observable { + return ob.lift(new DoNotUnsubscribeOperator()); +} + +class DoNotUnsubscribeOperator implements Rx.Operator { + call(subscriber: Rx.Subscriber, source: any): any { + return source.subscribe(new DoNotUnsubscribeSubscriber(subscriber)); + } +} + +class DoNotUnsubscribeSubscriber extends Rx.Subscriber { + unsubscribe() {} // tslint:disable-line no-empty +} \ No newline at end of file diff --git a/spec/operators/takeWhile-spec.ts b/spec/operators/takeWhile-spec.ts index 067187df46..cc484a3905 100644 --- a/spec/operators/takeWhile-spec.ts +++ b/spec/operators/takeWhile-spec.ts @@ -1,6 +1,7 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports +import { doNotUnsubscribe } from '../helpers/doNotUnsubscribe'; declare const { asDiagram }; declare const hot: typeof marbleTestingSignature.hot; @@ -205,4 +206,37 @@ describe('Observable.prototype.takeWhile', () => { expectObservable(result, unsub).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should unsubscribe from source Observable after complete,' + + ' even if destination does not unsubscribe', () => { + const e1 = hot('--a-^-b--c--d--e--|'); + const e1subs = '^-------! '; + const expected = '--b--c--|'; + + function predicate(value) { + return value !== 'd'; + } + + const result = e1.takeWhile(predicate).let(doNotUnsubscribe); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should unsubscribe from source Observable after error,' + + ' even if destination does not unsubscribe', () => { + const e1 = hot('--a-^-b--c--d--e--|'); + const e1subs = '^----! '; + const expected = '--b--#'; + + function predicate(value) { + if (value === 'c') { throw 'error'; } + return value !== 'd'; + } + + const result = e1.takeWhile(predicate).let(doNotUnsubscribe); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); diff --git a/src/operator/takeWhile.ts b/src/operator/takeWhile.ts index a2a45b50c7..6f668b7c86 100644 --- a/src/operator/takeWhile.ts +++ b/src/operator/takeWhile.ts @@ -72,6 +72,7 @@ class TakeWhileSubscriber extends Subscriber { result = this.predicate(value, this.index++); } catch (err) { destination.error(err); + this.unsubscribe(); return; } this.nextOrComplete(value, result); @@ -83,6 +84,7 @@ class TakeWhileSubscriber extends Subscriber { destination.next(value); } else { destination.complete(); + this.unsubscribe(); } } }