From a55f8dac1a876093c459265f11514f1c5b52a87d Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Sun, 12 Mar 2017 14:49:46 +0100 Subject: [PATCH] fix(first): force unsubscription logic on complete and error Force unsubscription logic when operator completes or errors, so that source Observable is only subscribed as long as it has to, even when combined with operators that do not immediately unsubscribe from 'first' when it completes or errors. Closes #2455 --- spec/helpers/doNotUnsubscribe.ts | 16 ++++++++++++++ spec/operators/first-spec.ts | 36 ++++++++++++++++++++++++++++++++ src/operator/first.ts | 3 +++ 3 files changed, 55 insertions(+) create mode 100644 spec/helpers/doNotUnsubscribe.ts diff --git a/spec/helpers/doNotUnsubscribe.ts b/spec/helpers/doNotUnsubscribe.ts new file mode 100644 index 0000000000..29341119a6 --- /dev/null +++ b/spec/helpers/doNotUnsubscribe.ts @@ -0,0 +1,16 @@ +/// +import * as Rx from '../../dist/cjs/Rx'; + +export function doNotUnsubscribe(ob: Rx.Observable): Rx.Observable { + return ob.lift(new DoNotUnsubscribeOperator()); +} + +class DoNotUnsubscribeOperator implements Rx.Operator { + call(subscriber: Rx.Subscriber, source: any): any { + return source.subscribe(new DoNotUnsubscribeSubscriber(subscriber)); + } +} + +class DoNotUnsubscribeSubscriber extends Rx.Subscriber { + unsubscribe() {} // tslint:disable-line no-empty +} diff --git a/spec/operators/first-spec.ts b/spec/operators/first-spec.ts index 91f031dfbb..395d7ca322 100644 --- a/spec/operators/first-spec.ts +++ b/spec/operators/first-spec.ts @@ -1,6 +1,7 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports +import { doNotUnsubscribe } from '../helpers/doNotUnsubscribe'; declare const { asDiagram }; declare const hot: typeof marbleTestingSignature.hot; @@ -220,6 +221,41 @@ describe('Observable.prototype.first', () => { expectSubscriptions(e1.subscriptions).toBe(sub); }); + it('should unsubscribe from the source after first value, even if destination doesn\'t unsubscribe', () => { + const e1 = hot('--^---a---|'); + const sub = '^---!'; + const expected = '----(a|)'; + + const result = e1.first().let(doNotUnsubscribe); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(sub); + }); + + it('should unsubscribe from the source after completion without value,' + + ' even if destination doesn\'t unsubscribe', () => { + const e1 = hot('--^---|'); + const sub = '^---!'; + const expected = '----(a|)'; + + const result = e1.first(undefined, undefined, 'a').let(doNotUnsubscribe); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(sub); + }); + + it('should unsubscribe from the source after erroring without value,' + + ' even if destination doesn\'t unsubscribe', () => { + const e1 = hot('--^---|'); + const sub = '^---!'; + const expected = '----#'; + + const result = e1.first().let(doNotUnsubscribe); + + expectObservable(result).toBe(expected, undefined, new Rx.EmptyError()); + expectSubscriptions(e1.subscriptions).toBe(sub); + }); + it('should support type guards without breaking previous behavior', () => { // tslint:disable no-unused-variable diff --git a/src/operator/first.ts b/src/operator/first.ts index cb6d09baf4..77180fb5e7 100644 --- a/src/operator/first.ts +++ b/src/operator/first.ts @@ -156,6 +156,7 @@ class FirstSubscriber extends Subscriber { this._emitted = true; destination.next(value); destination.complete(); + this.unsubscribe(); this.hasCompleted = true; } } @@ -165,8 +166,10 @@ class FirstSubscriber extends Subscriber { if (!this.hasCompleted && typeof this.defaultValue !== 'undefined') { destination.next(this.defaultValue); destination.complete(); + this.unsubscribe(); } else if (!this.hasCompleted) { destination.error(new EmptyError); + this.unsubscribe(); } } }