Skip to content

Commit f3357b9

Browse files
committed
fix(window): don't track internal window subjects as subscriptions.
1 parent 993a2c3 commit f3357b9

File tree

8 files changed

+49
-116
lines changed

8 files changed

+49
-116
lines changed

spec/operators/window-spec.ts

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import {expect} from 'chai';
21
import * as Rx from '../../dist/cjs/Rx';
32
declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions};
43

@@ -57,6 +56,20 @@ describe('Observable.prototype.window', () => {
5756
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
5857
});
5958

59+
it('should return a single empty window if source is sync empty and closing is sync empty', () => {
60+
const source = cold('(|)');
61+
const sourceSubs = '(^!)';
62+
const expected = '(w|)';
63+
const w = cold('|');
64+
const expectedValues = { w: w };
65+
66+
const result = source.window(Observable.empty());
67+
68+
expectObservable(result).toBe(expected, expectedValues);
69+
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
70+
// expectSubscriptions(closings.subscriptions).toBe(closingSubs);
71+
});
72+
6073
it('should split a Just source into a single window identical to source, using a Never closing',
6174
() => {
6275
const source = cold('(a|)');
@@ -201,28 +214,6 @@ describe('Observable.prototype.window', () => {
201214
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
202215
});
203216

204-
it('should dispose window Subjects if the outer is unsubscribed early', () => {
205-
const source = hot('--a--b--c--d--e--f--g--h--|');
206-
const sourceSubs = '^ ! ';
207-
const expected = 'x--------- ';
208-
const x = cold( '--a--b--c- ');
209-
const unsub = ' ! ';
210-
const late = time('---------------| ');
211-
const values = { x: x };
212-
213-
let window;
214-
const result = source.window(Observable.never())
215-
.do((w: any) => { window = w; });
216-
217-
expectObservable(result, unsub).toBe(expected, values);
218-
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
219-
rxTestScheduler.schedule(() => {
220-
expect(() => {
221-
window.subscribe();
222-
}).to.throw(Rx.ObjectUnsubscribedError);
223-
}, late);
224-
});
225-
226217
it('should make outer emit error when closing throws', () => {
227218
const source = hot('-1-2-^3-4-5-6-7-8-9-#');
228219
const subs = '^ ! ';
@@ -256,4 +247,4 @@ describe('Observable.prototype.window', () => {
256247
expectSubscriptions(source.subscriptions).toBe(subs);
257248
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
258249
});
259-
});
250+
});

spec/operators/windowCount-spec.ts

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import {expect} from 'chai';
21
import * as Rx from '../../dist/cjs/Rx';
32
declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions};
43

@@ -131,28 +130,6 @@ describe('Observable.prototype.windowCount', () => {
131130
expectSubscriptions(source.subscriptions).toBe(subs);
132131
});
133132

134-
it('should dispose window Subjects if the outer is unsubscribed early', () => {
135-
const source = hot('--a--b--c--d--e--f--g--h--|');
136-
const sourceSubs = '^ ! ';
137-
const expected = 'x--------- ';
138-
const x = cold( '--a--b--c- ');
139-
const unsub = ' ! ';
140-
const late = time('---------------| ');
141-
const values = { x: x };
142-
143-
let window;
144-
const result = source.windowCount(10, 10)
145-
.do((w: any) => { window = w; });
146-
147-
expectObservable(result, unsub).toBe(expected, values);
148-
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
149-
rxTestScheduler.schedule(() => {
150-
expect(() => {
151-
window.subscribe();
152-
}).to.throw(Rx.ObjectUnsubscribedError);
153-
}, late);
154-
});
155-
156133
it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
157134
const source = hot('^-a--b--c--d--|');
158135
const subs = '^ ! ';
@@ -172,4 +149,4 @@ describe('Observable.prototype.windowCount', () => {
172149
expectObservable(result, unsub).toBe(expected, values);
173150
expectSubscriptions(source.subscriptions).toBe(subs);
174151
});
175-
});
152+
});

spec/operators/windowTime-spec.ts

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import {expect} from 'chai';
21
import * as Rx from '../../dist/cjs/Rx';
32
declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions};
43

@@ -172,27 +171,6 @@ describe('Observable.prototype.windowTime', () => {
172171
expectSubscriptions(source.subscriptions).toBe(subs);
173172
});
174173

175-
it('should dispose window Subjects if the outer is unsubscribed early', () => {
176-
const source = hot('--a--b--c--d--e--f--g--h--|');
177-
const sourceSubs = '^ ! ';
178-
const expected = 'x--------- ';
179-
const x = cold( '--a--b--c- ');
180-
const unsub = ' ! ';
181-
const values = { x: x };
182-
183-
let window;
184-
const result = source.windowTime(1000, 1000, rxTestScheduler)
185-
.do((w: any) => { window = w; });
186-
187-
expectObservable(result, unsub).toBe(expected, values);
188-
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
189-
rxTestScheduler.schedule(() => {
190-
expect(() => {
191-
window.subscribe();
192-
}).to.throw(Rx.ObjectUnsubscribedError);
193-
}, 150);
194-
});
195-
196174
it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
197175
const source = hot('--1--2--^--a--b--c--d--e--f--g--h--|');
198176
const sourcesubs = '^ ! ';
@@ -216,4 +194,4 @@ describe('Observable.prototype.windowTime', () => {
216194
expectObservable(result, unsub).toBe(expected, values);
217195
expectSubscriptions(source.subscriptions).toBe(sourcesubs);
218196
});
219-
});
197+
});

spec/operators/windowWhen-spec.ts

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import {expect} from 'chai';
21
import * as Rx from '../../dist/cjs/Rx';
32
declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions};
43

@@ -154,29 +153,6 @@ describe('Observable.prototype.windowWhen', () => {
154153
expectSubscriptions(closings[1].subscriptions).toBe(closeSubs[1]);
155154
});
156155

157-
it('should dispose window Subjects if the outer is unsubscribed early', () => {
158-
const source = hot('--a--b--c--d--e--f--g--h--|');
159-
const sourceSubs = '^ ! ';
160-
const expected = 'x--------- ';
161-
const x = cold( '--a--b--c- ');
162-
const unsub = ' ! ';
163-
const late = time('---------------| ');
164-
const values = { x: x };
165-
166-
let window;
167-
const result = source
168-
.windowWhen(() => Observable.never())
169-
.do((w: any) => { window = w; });
170-
171-
expectObservable(result, unsub).toBe(expected, values);
172-
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
173-
rxTestScheduler.schedule(() => {
174-
expect(() => {
175-
window.subscribe();
176-
}).to.throw(Rx.ObjectUnsubscribedError);
177-
}, late);
178-
});
179-
180156
it('should propagate error thrown from closingSelector', () => {
181157
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
182158
const e1subs = '^ ! ';
@@ -355,4 +331,4 @@ describe('Observable.prototype.windowWhen', () => {
355331
expectSubscriptions(e1.subscriptions).toBe(e1subs);
356332
expectSubscriptions(e2.subscriptions).toBe(e2subs);
357333
});
358-
});
334+
});

src/operator/window.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,12 @@ class WindowOperator<T> implements Operator<T, Observable<T>> {
5757
}
5858

5959
call(subscriber: Subscriber<Observable<T>>, source: any): any {
60-
return source._subscribe(new WindowSubscriber(subscriber, this.windowBoundaries));
60+
const windowSubscriber = new WindowSubscriber(subscriber);
61+
const sourceSubscription = source._subscribe(windowSubscriber);
62+
if (!sourceSubscription.isUnsubscribed) {
63+
windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries));
64+
}
65+
return sourceSubscription;
6166
}
6267
}
6368

@@ -67,13 +72,12 @@ class WindowOperator<T> implements Operator<T, Observable<T>> {
6772
* @extends {Ignored}
6873
*/
6974
class WindowSubscriber<T> extends OuterSubscriber<T, any> {
70-
private window: Subject<T>;
7175

72-
constructor(protected destination: Subscriber<Observable<T>>,
73-
private windowBoundaries: Observable<any>) {
76+
private window: Subject<T> = new Subject<T>();
77+
78+
constructor(destination: Subscriber<Observable<T>>) {
7479
super(destination);
75-
this.add(subscribeToResult(this, windowBoundaries));
76-
this.openWindow();
80+
destination.next(this.window);
7781
}
7882

7983
notifyNext(outerValue: T, innerValue: any,
@@ -104,14 +108,17 @@ class WindowSubscriber<T> extends OuterSubscriber<T, any> {
104108
this.destination.complete();
105109
}
106110

111+
protected _unsubscribe() {
112+
this.window = null;
113+
}
114+
107115
private openWindow(): void {
108116
const prevWindow = this.window;
109117
if (prevWindow) {
110118
prevWindow.complete();
111119
}
112120
const destination = this.destination;
113121
const newWindow = this.window = new Subject<T>();
114-
destination.add(newWindow);
115122
destination.next(newWindow);
116123
}
117124
}

src/operator/windowCount.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,7 @@ class WindowCountSubscriber<T> extends Subscriber<T> {
8484
private windowSize: number,
8585
private startWindowEvery: number) {
8686
super(destination);
87-
const firstWindow = this.windows[0];
88-
destination.add(firstWindow);
89-
destination.next(firstWindow);
87+
destination.next(this.windows[0]);
9088
}
9189

9290
protected _next(value: T) {
@@ -96,34 +94,42 @@ class WindowCountSubscriber<T> extends Subscriber<T> {
9694
const windows = this.windows;
9795
const len = windows.length;
9896

99-
for (let i = 0; i < len; i++) {
97+
for (let i = 0; i < len && !this.isUnsubscribed; i++) {
10098
windows[i].next(value);
10199
}
102100
const c = this.count - windowSize + 1;
103-
if (c >= 0 && c % startWindowEvery === 0) {
101+
if (c >= 0 && c % startWindowEvery === 0 && !this.isUnsubscribed) {
104102
windows.shift().complete();
105103
}
106-
if (++this.count % startWindowEvery === 0) {
104+
if (++this.count % startWindowEvery === 0 && !this.isUnsubscribed) {
107105
const window = new Subject<T>();
108106
windows.push(window);
109-
destination.add(window);
110107
destination.next(window);
111108
}
112109
}
113110

114111
protected _error(err: any) {
115112
const windows = this.windows;
116-
while (windows.length > 0) {
117-
windows.shift().error(err);
113+
if (windows) {
114+
while (windows.length > 0 && !this.isUnsubscribed) {
115+
windows.shift().error(err);
116+
}
118117
}
119118
this.destination.error(err);
120119
}
121120

122121
protected _complete() {
123122
const windows = this.windows;
124-
while (windows.length > 0) {
125-
windows.shift().complete();
123+
if (windows) {
124+
while (windows.length > 0 && !this.isUnsubscribed) {
125+
windows.shift().complete();
126+
}
126127
}
127128
this.destination.complete();
128129
}
130+
131+
protected _unsubscribe() {
132+
this.count = 0;
133+
this.windows = null;
134+
}
129135
}

src/operator/windowTime.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
147147
const window = new Subject<T>();
148148
this.windows.push(window);
149149
const destination = this.destination;
150-
destination.add(window);
151150
destination.next(window);
152151
return window;
153152
}

src/operator/windowWhen.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ class WindowSubscriber<T> extends OuterSubscriber<T, any> {
138138
this.window.error(err);
139139
} else {
140140
this.add(this.closingNotification = subscribeToResult(this, closingNotifier));
141-
this.add(window);
142141
}
143142
}
144143
}

0 commit comments

Comments
 (0)