Skip to content

Commit 9664a38

Browse files
committed
fix(observeOn): seal memory leak involving old notifications
fixes #2244
1 parent 375d4a5 commit 9664a38

File tree

3 files changed

+62
-7
lines changed

3 files changed

+62
-7
lines changed

spec/operators/observeOn-spec.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import * as Rx from '../../dist/cjs/Rx';
2+
import { expect } from 'chai';
3+
24
declare const {hot, asDiagram, expectObservable, expectSubscriptions};
35

46
declare const rxTestScheduler: Rx.TestScheduler;
@@ -77,4 +79,46 @@ describe('Observable.prototype.observeOn', () => {
7779
expectObservable(result, unsub).toBe(expected);
7880
expectSubscriptions(e1.subscriptions).toBe(sub);
7981
});
82+
83+
it('should clean up subscriptions created by async scheduling (prevent memory leaks #2244)', (done) => {
84+
//HACK: Deep introspection to make sure we're cleaning up notifications in scheduling.
85+
// as the architecture changes, this test may become brittle.
86+
const results = [];
87+
const subscription: any = new Observable(observer => {
88+
let i = 1;
89+
const id = setInterval(() => {
90+
if (i > 3) {
91+
observer.complete();
92+
} else {
93+
observer.next(i++);
94+
}
95+
}, 0);
96+
97+
return () => clearInterval(id);
98+
})
99+
.observeOn(Rx.Scheduler.asap)
100+
.subscribe(
101+
x => {
102+
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
103+
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification
104+
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
105+
.to.equal('N');
106+
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value)
107+
.to.equal(x);
108+
results.push(x);
109+
},
110+
err => done(err),
111+
() => {
112+
// now that the last nexted value is done, there should only be a complete notification scheduled
113+
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
114+
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification
115+
// only this completion notification should remain.
116+
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
117+
.to.equal('C');
118+
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
119+
expect(results).to.deep.equal([1, 2, 3]);
120+
done();
121+
}
122+
);
123+
});
80124
});

src/operator/observeOn.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { Operator } from '../Operator';
44
import { PartialObserver } from '../Observer';
55
import { Subscriber } from '../Subscriber';
66
import { Notification } from '../Notification';
7-
import { TeardownLogic } from '../Subscription';
7+
import { TeardownLogic, Subscription } from '../Subscription';
8+
import { Action } from '../scheduler/Action';
89

910
/**
1011
* @see {@link Notification}
@@ -34,9 +35,12 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
3435
* @extends {Ignored}
3536
*/
3637
export class ObserveOnSubscriber<T> extends Subscriber<T> {
37-
static dispatch(arg: ObserveOnMessage) {
38-
const { notification, destination } = arg;
38+
static dispatch(this: Action<ObserveOnMessage>, arg: ObserveOnMessage) {
39+
const { notification, destination, subscription } = arg;
3940
notification.observe(destination);
41+
if (subscription) {
42+
subscription.unsubscribe();
43+
}
4044
}
4145

4246
constructor(destination: Subscriber<T>,
@@ -46,10 +50,11 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
4650
}
4751

4852
private scheduleMessage(notification: Notification<any>): void {
49-
this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch,
50-
this.delay,
51-
new ObserveOnMessage(notification, this.destination)));
52-
}
53+
const message = new ObserveOnMessage(notification, this.destination);
54+
message.subscription = this.add(
55+
this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message)
56+
);
57+
}
5358

5459
protected _next(value: T): void {
5560
this.scheduleMessage(Notification.createNext(value));
@@ -65,6 +70,8 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
6570
}
6671

6772
export class ObserveOnMessage {
73+
public subscription: Subscription;
74+
6875
constructor(public notification: Notification<any>,
6976
public destination: PartialObserver<any>) {
7077
}

src/scheduler/VirtualTimeScheduler.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ export class VirtualAction<T> extends AsyncAction<T> {
5858
return super.schedule(state, delay);
5959
}
6060

61+
// If an action is rescheduled, we save allocations by mutating its state,
62+
// pushing it to the end of the scheduler queue, and recycling the action.
63+
// But since the VirtualTimeScheduler is used for testing, VirtualActions
64+
// must be immutable so they can be inspected later.
6165
const action = new VirtualAction(this.scheduler, this.work);
6266
this.add(action);
6367
return action.schedule(state, delay);

0 commit comments

Comments
 (0)