From 49a947a728eb694b47bc292b72fe93d72f95e503 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Sat, 28 Jan 2017 22:39:38 -0800 Subject: [PATCH] fix(Observable): errors thrown during subscription are now properly sent down error channel - Fixes `Observable.create(fn)` and `new Observable(fn)` such that any error thrown in `fn` on subscription will be sent to the subscriber's error handler. - Fixes a subject test that was relying on the errant behavior. fixes #1833 --- spec/Observable-spec.ts | 24 ++++++++++++++++++++++++ spec/Subject-spec.ts | 4 +--- src/Observable.ts | 12 +++++++++++- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 6ea8a39fdb..9093ed8ae2 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -29,6 +29,18 @@ describe('Observable', () => { source.subscribe(function (x) { expect(x).to.equal(1); }, null, done); }); + it('should send errors thrown in the constructor down the error path', (done) => { + new Observable((observer) => { + throw new Error('this should be handled'); + }) + .subscribe({ + error(err) { + expect(err).to.deep.equal(new Error('this should be handled')); + done(); + } + }); + }); + describe('forEach', () => { it('should iterate and return a Promise', (done: MochaDone) => { const expected = [1, 2, 3]; @@ -582,6 +594,18 @@ describe('Observable.create', () => { }); expect(called).to.be.true; }); + + it('should send errors thrown in the passed function down the error path', (done) => { + Observable.create((observer) => { + throw new Error('this should be handled'); + }) + .subscribe({ + error(err) { + expect(err).to.deep.equal(new Error('this should be handled')); + done(); + } + }); + }); }); /** @test {Observable} */ diff --git a/spec/Subject-spec.ts b/spec/Subject-spec.ts index 74ec21fb63..f4f9faa16c 100644 --- a/spec/Subject-spec.ts +++ b/spec/Subject-spec.ts @@ -261,10 +261,8 @@ describe('Subject', () => { expect(() => { subject.subscribe( function (x) { results3.push(x); }, - function (e) { results3.push('E'); }, - () => { results3.push('C'); } ); - }).to.throw(); + }).to.throw(Rx.ObjectUnsubscribedError); expect(results1).to.deep.equal([1, 2, 3, 4, 5]); expect(results2).to.deep.equal([3, 4, 5]); diff --git a/src/Observable.ts b/src/Observable.ts index 5dc9402171..3ec05cfbd7 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -95,7 +95,7 @@ export class Observable implements Subscribable { if (operator) { operator.call(sink, this.source); } else { - sink.add(this._subscribe(sink)); + sink.add(this._trySubscribe(sink)); } if (sink.syncErrorThrowable) { @@ -108,6 +108,16 @@ export class Observable implements Subscribable { return sink; } + private _trySubscribe(sink: Subscriber): TeardownLogic { + try { + return this._subscribe(sink); + } catch (err) { + sink.syncErrorThrown = true; + sink.syncErrorValue = err; + sink.error(err); + } + } + /** * @method forEach * @param {Function} next a handler for each value emitted by the observable