Skip to content

Commit dc67d21

Browse files
trxcllntbenlesh
authored andcommitted
fix(Subscriber): adds unsubscription when errors are thrown from user-land handlers.
- slightly improves perf by improving shape of SafeSubscriber so JIT can optimize call patterns. related #1186
1 parent 04c42b3 commit dc67d21

File tree

5 files changed

+181
-47
lines changed

5 files changed

+181
-47
lines changed

spec/Observable-spec.js

Lines changed: 112 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* globals describe, it, expect */
22
var Rx = require('../dist/cjs/Rx');
33
var Promise = require('promise');
4+
var Subscriber = Rx.Subscriber;
45
var Observable = Rx.Observable;
56

67
function expectFullObserver(val) {
@@ -118,6 +119,15 @@ describe('Observable', function () {
118119
expect(mutatedByComplete).toBe(true);
119120
});
120121

122+
it('should work when subscribe is called with no arguments', function () {
123+
var source = new Observable(function (subscriber) {
124+
subscriber.next('foo');
125+
subscriber.complete();
126+
});
127+
128+
source.subscribe();
129+
});
130+
121131
it('should return a Subscription that calls the unsubscribe function returned by the subscriber', function () {
122132
var unsubscribeCalled = false;
123133

@@ -136,36 +146,128 @@ describe('Observable', function () {
136146
expect(unsubscribeCalled).toBe(true);
137147
});
138148

149+
it('should not run unsubscription logic when an error is thrown sending messages synchronously', function () {
150+
var messageError = false;
151+
var messageErrorValue = false;
152+
var unsubscribeCalled = false;
153+
154+
var sub;
155+
var source = new Observable(function (observer) {
156+
observer.next('boo!');
157+
return function () {
158+
unsubscribeCalled = true;
159+
};
160+
});
161+
162+
try {
163+
sub = source.subscribe(function (x) { throw x; });
164+
} catch (e) {
165+
messageError = true;
166+
messageErrorValue = e;
167+
}
168+
169+
expect(sub).toBe(undefined);
170+
expect(unsubscribeCalled).toBe(false);
171+
expect(messageError).toBe(true);
172+
expect(messageErrorValue).toBe('boo!');
173+
});
174+
175+
it('should dispose of the subscriber when an error is thrown sending messages synchronously', function () {
176+
var messageError = false;
177+
var messageErrorValue = false;
178+
var unsubscribeCalled = false;
179+
180+
var sub;
181+
var subscriber = new Subscriber(function (x) { throw x; });
182+
var source = new Observable(function (observer) {
183+
observer.next('boo!');
184+
return function () {
185+
unsubscribeCalled = true;
186+
};
187+
});
188+
189+
try {
190+
sub = source.subscribe(subscriber);
191+
} catch (e) {
192+
messageError = true;
193+
messageErrorValue = e;
194+
}
195+
196+
expect(sub).toBe(undefined);
197+
expect(subscriber.isUnsubscribed).toBe(true);
198+
expect(unsubscribeCalled).toBe(false);
199+
expect(messageError).toBe(true);
200+
expect(messageErrorValue).toBe('boo!');
201+
});
202+
139203
describe('when called with an anonymous observer', function () {
140-
it('should accept an anonymous observer with just a next function', function () {
141-
Observable.of(1).subscribe({
204+
it('should accept an anonymous observer with just a next function and call the next function in the context of the anonymous observer', function () {
205+
var o = {
142206
next: function next(x) {
207+
expect(this).toBe(o);
143208
expect(x).toBe(1);
144209
}
145-
});
210+
};
211+
Observable.of(1).subscribe(o);
146212
});
147213

148-
it('should accept an anonymous observer with just an error function', function () {
149-
Observable.throw('bad').subscribe({
214+
it('should accept an anonymous observer with just an error function and call the error function in the context of the anonymous observer', function () {
215+
var o = {
150216
error: function error(err) {
217+
expect(this).toBe(o);
151218
expect(err).toBe('bad');
152219
}
153-
});
220+
};
221+
Observable.throw('bad').subscribe(o);
154222
});
155223

156-
it('should accept an anonymous observer with just a complete function', function (done) {
157-
Observable.empty().subscribe({
224+
it('should accept an anonymous observer with just a complete function and call the complete function in the context of the anonymous observer', function (done) {
225+
var o = {
158226
complete: function complete() {
227+
expect(this).toBe(o);
159228
done();
160229
}
161-
});
230+
};
231+
Observable.empty().subscribe(o);
162232
});
163233

164234
it('should accept an anonymous observer with no functions at all', function () {
165235
expect(function testEmptyObject() {
166236
Observable.empty().subscribe({});
167237
}).not.toThrow();
168238
});
239+
240+
it('should not run unsubscription logic when an error is thrown sending messages synchronously to an anonymous observer', function () {
241+
var messageError = false;
242+
var messageErrorValue = false;
243+
var unsubscribeCalled = false;
244+
245+
var o = {
246+
next: function next(x) {
247+
expect(this).toBe(o);
248+
throw x;
249+
}
250+
};
251+
var sub;
252+
var source = new Observable(function (observer) {
253+
observer.next('boo!');
254+
return function () {
255+
unsubscribeCalled = true;
256+
};
257+
});
258+
259+
try {
260+
sub = source.subscribe(o);
261+
} catch (e) {
262+
messageError = true;
263+
messageErrorValue = e;
264+
}
265+
266+
expect(sub).toBe(undefined);
267+
expect(unsubscribeCalled).toBe(false);
268+
expect(messageError).toBe(true);
269+
expect(messageErrorValue).toBe('boo!');
270+
});
169271
});
170272
});
171273
});
@@ -188,4 +290,4 @@ describe('Observable.create', function () {
188290
result.subscribe(function () { });
189291
expect(called).toBe(true);
190292
});
191-
});
293+
});

src/Observable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ export class Observable<T> implements CoreOperators<T> {
113113
const subscriber = toSubscriber(observerOrNext, error, complete);
114114

115115
if (operator) {
116-
subscriber.add(this._subscribe(this.operator.call(subscriber)));
116+
subscriber.add(this._subscribe(operator.call(subscriber)));
117117
} else {
118118
subscriber.add(this._subscribe(subscriber));
119119
}

src/Subscriber.ts

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import {noop} from './util/noop';
2-
import {throwError} from './util/throwError';
3-
import {tryOrThrowError} from './util/tryOrThrowError';
1+
import {isFunction} from './util/isFunction';
2+
import {tryCatch} from './util/tryCatch';
3+
import {errorObject} from './util/errorObject';
44

55
import {Observer} from './Observer';
66
import {Subscription} from './Subscription';
@@ -12,26 +12,38 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
1212
static create<T>(next?: (x?: T) => void,
1313
error?: (e?: any) => void,
1414
complete?: () => void): Subscriber<T> {
15-
return new SafeSubscriber<T>(next, error, complete);
15+
return new Subscriber(next, error, complete);
1616
}
1717

1818
protected isStopped: boolean = false;
1919
protected destination: Observer<any>;
2020

21-
constructor(destination: Observer<any> = emptyObserver) {
21+
constructor(destinationOrNext?: Observer<any> | ((value: T) => void),
22+
error?: (e?: any) => void,
23+
complete?: () => void) {
2224
super();
2325

24-
this.destination = destination;
25-
26-
if (!destination ||
27-
(destination instanceof Subscriber) ||
28-
(destination === emptyObserver)) {
29-
return;
26+
switch (arguments.length) {
27+
case 0:
28+
this.destination = emptyObserver;
29+
break;
30+
case 1:
31+
if (!destinationOrNext) {
32+
this.destination = emptyObserver;
33+
break;
34+
}
35+
if (typeof destinationOrNext === 'object') {
36+
if (destinationOrNext instanceof Subscriber) {
37+
this.destination = (<Observer<any>> destinationOrNext);
38+
} else {
39+
this.destination = new SafeSubscriber<T>(this, <Observer<any>> destinationOrNext);
40+
}
41+
break;
42+
}
43+
default:
44+
this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
45+
break;
3046
}
31-
32-
if (typeof destination.next !== 'function') { destination.next = noop; }
33-
if (typeof destination.error !== 'function') { destination.error = throwError; }
34-
if (typeof destination.complete !== 'function') { destination.complete = noop; }
3547
}
3648

3749
next(value?: T): void {
@@ -83,25 +95,48 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
8395

8496
class SafeSubscriber<T> extends Subscriber<T> {
8597

86-
constructor(next?: (x?: T) => void,
98+
private _context: any;
99+
100+
constructor(private _parent: Subscriber<T>,
101+
observerOrNext?: Observer<T> | ((value: T) => void),
87102
error?: (e?: any) => void,
88103
complete?: () => void) {
89104
super();
90-
this._next = (typeof next === 'function') && tryOrThrowError(next) || null;
91-
this._error = (typeof error === 'function') && tryOrThrowError(error) || throwError;
92-
this._complete = (typeof complete === 'function') && tryOrThrowError(complete) || null;
105+
106+
let next: ((value: T) => void);
107+
let context: any = this;
108+
109+
if (isFunction(observerOrNext)) {
110+
next = (<((value: T) => void)> observerOrNext);
111+
} else if (observerOrNext) {
112+
context = observerOrNext;
113+
next = (<Observer<T>> observerOrNext).next;
114+
error = (<Observer<T>> observerOrNext).error;
115+
complete = (<Observer<T>> observerOrNext).complete;
116+
}
117+
118+
this._context = context;
119+
this._next = next;
120+
this._error = error;
121+
this._complete = complete;
93122
}
94123

95124
next(value?: T): void {
96125
if (!this.isStopped && this._next) {
97-
this._next(value);
126+
if (tryCatch(this._next).call(this._context, value) === errorObject) {
127+
this.unsubscribe();
128+
throw errorObject.e;
129+
}
98130
}
99131
}
100132

101133
error(err?: any): void {
102134
if (!this.isStopped) {
103135
if (this._error) {
104-
this._error(err);
136+
if (tryCatch(this._error).call(this._context, err) === errorObject) {
137+
this.unsubscribe();
138+
throw errorObject.e;
139+
}
105140
}
106141
this.unsubscribe();
107142
}
@@ -110,9 +145,19 @@ class SafeSubscriber<T> extends Subscriber<T> {
110145
complete(): void {
111146
if (!this.isStopped) {
112147
if (this._complete) {
113-
this._complete();
148+
if (tryCatch(this._complete).call(this._context) === errorObject) {
149+
this.unsubscribe();
150+
throw errorObject.e;
151+
}
114152
}
115153
this.unsubscribe();
116154
}
117155
}
156+
157+
protected _unsubscribe(): void {
158+
const { _parent } = this;
159+
this._context = null;
160+
this._parent = null;
161+
_parent.unsubscribe();
162+
}
118163
}

src/util/toSubscriber.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ export function toSubscriber<T>(
1212
return (<Subscriber<T>> next);
1313
} else if (typeof next[rxSubscriber] === 'function') {
1414
return next[rxSubscriber]();
15-
} else {
16-
return new Subscriber(<Observer<T>> next);
1715
}
1816
}
1917

20-
return Subscriber.create(<((value: T) => void)> next, error, complete);
18+
return new Subscriber(next, error, complete);
2119
}

src/util/tryOrThrowError.ts

Lines changed: 0 additions & 11 deletions
This file was deleted.

0 commit comments

Comments
 (0)