From f3357b9e636c058e70ffbb980ae4fca3a8c0ec52 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Thu, 12 May 2016 11:35:57 -0700 Subject: [PATCH] fix(window): don't track internal window subjects as subscriptions. --- spec/operators/window-spec.ts | 39 ++++++++++++------------------ spec/operators/windowCount-spec.ts | 25 +------------------ spec/operators/windowTime-spec.ts | 24 +----------------- spec/operators/windowWhen-spec.ts | 26 +------------------- src/operator/window.ts | 21 ++++++++++------ src/operator/windowCount.ts | 28 ++++++++++++--------- src/operator/windowTime.ts | 1 - src/operator/windowWhen.ts | 1 - 8 files changed, 49 insertions(+), 116 deletions(-) diff --git a/spec/operators/window-spec.ts b/spec/operators/window-spec.ts index dfb27cbb55..3446fbfb9e 100644 --- a/spec/operators/window-spec.ts +++ b/spec/operators/window-spec.ts @@ -1,4 +1,3 @@ -import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions}; @@ -57,6 +56,20 @@ describe('Observable.prototype.window', () => { expectSubscriptions(closings.subscriptions).toBe(closingSubs); }); + it('should return a single empty window if source is sync empty and closing is sync empty', () => { + const source = cold('(|)'); + const sourceSubs = '(^!)'; + const expected = '(w|)'; + const w = cold('|'); + const expectedValues = { w: w }; + + const result = source.window(Observable.empty()); + + expectObservable(result).toBe(expected, expectedValues); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + // expectSubscriptions(closings.subscriptions).toBe(closingSubs); + }); + it('should split a Just source into a single window identical to source, using a Never closing', () => { const source = cold('(a|)'); @@ -201,28 +214,6 @@ describe('Observable.prototype.window', () => { expectSubscriptions(closings.subscriptions).toBe(closingSubs); }); - it('should dispose window Subjects if the outer is unsubscribed early', () => { - const source = hot('--a--b--c--d--e--f--g--h--|'); - const sourceSubs = '^ ! '; - const expected = 'x--------- '; - const x = cold( '--a--b--c- '); - const unsub = ' ! '; - const late = time('---------------| '); - const values = { x: x }; - - let window; - const result = source.window(Observable.never()) - .do((w: any) => { window = w; }); - - expectObservable(result, unsub).toBe(expected, values); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - rxTestScheduler.schedule(() => { - expect(() => { - window.subscribe(); - }).to.throw(Rx.ObjectUnsubscribedError); - }, late); - }); - it('should make outer emit error when closing throws', () => { const source = hot('-1-2-^3-4-5-6-7-8-9-#'); const subs = '^ ! '; @@ -256,4 +247,4 @@ describe('Observable.prototype.window', () => { expectSubscriptions(source.subscriptions).toBe(subs); expectSubscriptions(closings.subscriptions).toBe(closingSubs); }); -}); \ No newline at end of file +}); diff --git a/spec/operators/windowCount-spec.ts b/spec/operators/windowCount-spec.ts index 2c57db802b..6eb9c03a53 100644 --- a/spec/operators/windowCount-spec.ts +++ b/spec/operators/windowCount-spec.ts @@ -1,4 +1,3 @@ -import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions}; @@ -131,28 +130,6 @@ describe('Observable.prototype.windowCount', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); - it('should dispose window Subjects if the outer is unsubscribed early', () => { - const source = hot('--a--b--c--d--e--f--g--h--|'); - const sourceSubs = '^ ! '; - const expected = 'x--------- '; - const x = cold( '--a--b--c- '); - const unsub = ' ! '; - const late = time('---------------| '); - const values = { x: x }; - - let window; - const result = source.windowCount(10, 10) - .do((w: any) => { window = w; }); - - expectObservable(result, unsub).toBe(expected, values); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - rxTestScheduler.schedule(() => { - expect(() => { - window.subscribe(); - }).to.throw(Rx.ObjectUnsubscribedError); - }, late); - }); - it('should not break unsubscription chains when result is unsubscribed explicitly', () => { const source = hot('^-a--b--c--d--|'); const subs = '^ ! '; @@ -172,4 +149,4 @@ describe('Observable.prototype.windowCount', () => { expectObservable(result, unsub).toBe(expected, values); expectSubscriptions(source.subscriptions).toBe(subs); }); -}); \ No newline at end of file +}); diff --git a/spec/operators/windowTime-spec.ts b/spec/operators/windowTime-spec.ts index d0f048aa6a..5aff47d00c 100644 --- a/spec/operators/windowTime-spec.ts +++ b/spec/operators/windowTime-spec.ts @@ -1,4 +1,3 @@ -import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions}; @@ -172,27 +171,6 @@ describe('Observable.prototype.windowTime', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); - it('should dispose window Subjects if the outer is unsubscribed early', () => { - const source = hot('--a--b--c--d--e--f--g--h--|'); - const sourceSubs = '^ ! '; - const expected = 'x--------- '; - const x = cold( '--a--b--c- '); - const unsub = ' ! '; - const values = { x: x }; - - let window; - const result = source.windowTime(1000, 1000, rxTestScheduler) - .do((w: any) => { window = w; }); - - expectObservable(result, unsub).toBe(expected, values); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - rxTestScheduler.schedule(() => { - expect(() => { - window.subscribe(); - }).to.throw(Rx.ObjectUnsubscribedError); - }, 150); - }); - it('should not break unsubscription chains when result is unsubscribed explicitly', () => { const source = hot('--1--2--^--a--b--c--d--e--f--g--h--|'); const sourcesubs = '^ ! '; @@ -216,4 +194,4 @@ describe('Observable.prototype.windowTime', () => { expectObservable(result, unsub).toBe(expected, values); expectSubscriptions(source.subscriptions).toBe(sourcesubs); }); -}); \ No newline at end of file +}); diff --git a/spec/operators/windowWhen-spec.ts b/spec/operators/windowWhen-spec.ts index 8956608c7d..934c21308a 100644 --- a/spec/operators/windowWhen-spec.ts +++ b/spec/operators/windowWhen-spec.ts @@ -1,4 +1,3 @@ -import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions}; @@ -154,29 +153,6 @@ describe('Observable.prototype.windowWhen', () => { expectSubscriptions(closings[1].subscriptions).toBe(closeSubs[1]); }); - it('should dispose window Subjects if the outer is unsubscribed early', () => { - const source = hot('--a--b--c--d--e--f--g--h--|'); - const sourceSubs = '^ ! '; - const expected = 'x--------- '; - const x = cold( '--a--b--c- '); - const unsub = ' ! '; - const late = time('---------------| '); - const values = { x: x }; - - let window; - const result = source - .windowWhen(() => Observable.never()) - .do((w: any) => { window = w; }); - - expectObservable(result, unsub).toBe(expected, values); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - rxTestScheduler.schedule(() => { - expect(() => { - window.subscribe(); - }).to.throw(Rx.ObjectUnsubscribedError); - }, late); - }); - it('should propagate error thrown from closingSelector', () => { const e1 = hot('--a--^---b---c---d---e---f---g---h------| '); const e1subs = '^ ! '; @@ -355,4 +331,4 @@ describe('Observable.prototype.windowWhen', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); -}); \ No newline at end of file +}); diff --git a/src/operator/window.ts b/src/operator/window.ts index 1e0dbce9fa..e2514e465f 100644 --- a/src/operator/window.ts +++ b/src/operator/window.ts @@ -57,7 +57,12 @@ class WindowOperator implements Operator> { } call(subscriber: Subscriber>, source: any): any { - return source._subscribe(new WindowSubscriber(subscriber, this.windowBoundaries)); + const windowSubscriber = new WindowSubscriber(subscriber); + const sourceSubscription = source._subscribe(windowSubscriber); + if (!sourceSubscription.isUnsubscribed) { + windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries)); + } + return sourceSubscription; } } @@ -67,13 +72,12 @@ class WindowOperator implements Operator> { * @extends {Ignored} */ class WindowSubscriber extends OuterSubscriber { - private window: Subject; - constructor(protected destination: Subscriber>, - private windowBoundaries: Observable) { + private window: Subject = new Subject(); + + constructor(destination: Subscriber>) { super(destination); - this.add(subscribeToResult(this, windowBoundaries)); - this.openWindow(); + destination.next(this.window); } notifyNext(outerValue: T, innerValue: any, @@ -104,6 +108,10 @@ class WindowSubscriber extends OuterSubscriber { this.destination.complete(); } + protected _unsubscribe() { + this.window = null; + } + private openWindow(): void { const prevWindow = this.window; if (prevWindow) { @@ -111,7 +119,6 @@ class WindowSubscriber extends OuterSubscriber { } const destination = this.destination; const newWindow = this.window = new Subject(); - destination.add(newWindow); destination.next(newWindow); } } diff --git a/src/operator/windowCount.ts b/src/operator/windowCount.ts index 4970034fc7..f0245029e4 100644 --- a/src/operator/windowCount.ts +++ b/src/operator/windowCount.ts @@ -84,9 +84,7 @@ class WindowCountSubscriber extends Subscriber { private windowSize: number, private startWindowEvery: number) { super(destination); - const firstWindow = this.windows[0]; - destination.add(firstWindow); - destination.next(firstWindow); + destination.next(this.windows[0]); } protected _next(value: T) { @@ -96,34 +94,42 @@ class WindowCountSubscriber extends Subscriber { const windows = this.windows; const len = windows.length; - for (let i = 0; i < len; i++) { + for (let i = 0; i < len && !this.isUnsubscribed; i++) { windows[i].next(value); } const c = this.count - windowSize + 1; - if (c >= 0 && c % startWindowEvery === 0) { + if (c >= 0 && c % startWindowEvery === 0 && !this.isUnsubscribed) { windows.shift().complete(); } - if (++this.count % startWindowEvery === 0) { + if (++this.count % startWindowEvery === 0 && !this.isUnsubscribed) { const window = new Subject(); windows.push(window); - destination.add(window); destination.next(window); } } protected _error(err: any) { const windows = this.windows; - while (windows.length > 0) { - windows.shift().error(err); + if (windows) { + while (windows.length > 0 && !this.isUnsubscribed) { + windows.shift().error(err); + } } this.destination.error(err); } protected _complete() { const windows = this.windows; - while (windows.length > 0) { - windows.shift().complete(); + if (windows) { + while (windows.length > 0 && !this.isUnsubscribed) { + windows.shift().complete(); + } } this.destination.complete(); } + + protected _unsubscribe() { + this.count = 0; + this.windows = null; + } } diff --git a/src/operator/windowTime.ts b/src/operator/windowTime.ts index 60c1488d43..5b18595e92 100644 --- a/src/operator/windowTime.ts +++ b/src/operator/windowTime.ts @@ -147,7 +147,6 @@ class WindowTimeSubscriber extends Subscriber { const window = new Subject(); this.windows.push(window); const destination = this.destination; - destination.add(window); destination.next(window); return window; } diff --git a/src/operator/windowWhen.ts b/src/operator/windowWhen.ts index 2871a3ca79..2832d87aa7 100644 --- a/src/operator/windowWhen.ts +++ b/src/operator/windowWhen.ts @@ -138,7 +138,6 @@ class WindowSubscriber extends OuterSubscriber { this.window.error(err); } else { this.add(this.closingNotification = subscribeToResult(this, closingNotifier)); - this.add(window); } } }