From c975b2b404438d398945be2931479a226bf73618 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Wed, 12 Apr 2017 18:55:33 +0200 Subject: [PATCH] fix(find): force unsubscribe when it completes or errors Force unsubscribe when resulting Observable completes or errors, even when following operator does not unsubscribe reliably, so that source Observable is not being subscribed unnecessarily. --- spec/helpers/doNotUnsubscribe.ts | 16 ++++++++++++++++ spec/operators/find-spec.ts | 28 ++++++++++++++++++++++++++++ src/operator/find.ts | 2 ++ 3 files changed, 46 insertions(+) create mode 100644 spec/helpers/doNotUnsubscribe.ts diff --git a/spec/helpers/doNotUnsubscribe.ts b/spec/helpers/doNotUnsubscribe.ts new file mode 100644 index 0000000000..29341119a6 --- /dev/null +++ b/spec/helpers/doNotUnsubscribe.ts @@ -0,0 +1,16 @@ +/// +import * as Rx from '../../dist/cjs/Rx'; + +export function doNotUnsubscribe(ob: Rx.Observable): Rx.Observable { + return ob.lift(new DoNotUnsubscribeOperator()); +} + +class DoNotUnsubscribeOperator implements Rx.Operator { + call(subscriber: Rx.Subscriber, source: any): any { + return source.subscribe(new DoNotUnsubscribeSubscriber(subscriber)); + } +} + +class DoNotUnsubscribeSubscriber extends Rx.Subscriber { + unsubscribe() {} // tslint:disable-line no-empty +} diff --git a/spec/operators/find-spec.ts b/spec/operators/find-spec.ts index b9094cef02..56d7748bb3 100644 --- a/spec/operators/find-spec.ts +++ b/spec/operators/find-spec.ts @@ -1,6 +1,7 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports +import { doNotUnsubscribe } from '../helpers/doNotUnsubscribe'; declare const { asDiagram }; declare const hot: typeof marbleTestingSignature.hot; @@ -162,6 +163,33 @@ describe('Observable.prototype.find', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); + it('should unsubscribe from source when complete, even if following operator does not unsubscribe', () => { + const values = {a: 3, b: 9, c: 15, d: 20}; + const source = hot('---a--b--c--d---|', values); + const subs = '^ ! '; + const expected = '---------(c|) '; + + const predicate = function (x) { return x % 5 === 0; }; + + expectObservable((source).find(predicate).let(doNotUnsubscribe)).toBe(expected, values); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should unsubscribe from source when predicate function errors,' + + ' even if followring operator does not unsubscribe', () => { + + const source = hot('--a--b--c--|'); + const subs = '^ !'; + const expected = '--#'; + + const predicate = function (value) { + throw 'error'; + }; + + expectObservable((source).find(predicate).let(doNotUnsubscribe)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + it('should support type guards without breaking previous behavior', () => { // tslint:disable no-unused-variable diff --git a/src/operator/find.ts b/src/operator/find.ts index 110c813958..5e2f825c24 100644 --- a/src/operator/find.ts +++ b/src/operator/find.ts @@ -85,6 +85,7 @@ export class FindValueSubscriber extends Subscriber { destination.next(value); destination.complete(); + this.unsubscribe(); } protected _next(value: T): void { @@ -97,6 +98,7 @@ export class FindValueSubscriber extends Subscriber { } } catch (err) { this.destination.error(err); + this.unsubscribe(); } }