Skip to content

Commit

Permalink
fix: useDeprecatedSynchronousErrorThrowing honored for flattened sync…
Browse files Browse the repository at this point in the history
… sources

fixes ReactiveX#5983
  • Loading branch information
benlesh committed Jan 26, 2021
1 parent 9b62582 commit 73f76eb
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 40 deletions.
17 changes: 15 additions & 2 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { Observer, TeardownLogic } from '../src/internal/types';
import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError } from 'rxjs/operators';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';

Expand Down Expand Up @@ -625,9 +625,22 @@ describe('Observable', () => {
)
}
})
}).to.throw();
}).to.throw('hi there!');
});

it('should rethrow synchronous errors from flattened observables', () => {
expect(() => {
of(1)
.pipe(concatMap(() => throwError(new Error('Ahoy! An error!'))))
.subscribe(console.log);
}).to.throw('Ahoy! An error!');

expect(() => {
of(1)
.pipe(switchMap(() => throwError(new Error('Avast! Thar be a new error!'))))
.subscribe(console.log);
}).to.throw('Avast! Thar be a new error!');
})

afterEach(() => {
config.useDeprecatedSynchronousErrorHandling = false;
Expand Down
70 changes: 32 additions & 38 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,46 +132,40 @@ export class SafeSubscriber<T> extends Subscriber<T> {
) {
super();

// If we don't have arguments, or the observer passed is already EMPTY_OBSERVER,
// use EMPTY_OBSERVER. This is just to save a little on object allocations.
this.destination = EMPTY_OBSERVER;
if ((observerOrNext || error || complete) && observerOrNext !== EMPTY_OBSERVER) {
// We've got either functions or an observer to deal with
// let's figure that out here.

let next: ((value: T) => void) | undefined;
if (isFunction(observerOrNext)) {
next = observerOrNext;
} else if (observerOrNext) {
// Even if it's an observer, we have to pull the handlers off and
// capture the owner object as the context. That is because we're
// going to put them all in a new destination with ensured methods
// for `next`, `error`, and `complete`. That's part of what makes this
// the "Safe" Subscriber.
({ next, error, complete } = observerOrNext);
let context: any;
if (this && config.useDeprecatedNextContext) {
// This is a deprecated path that made `this.unsubscribe()` available in
// next handler functions passed to subscribe. This only exists behind a flag
// now, as it is *very* slow.
context = Object.create(observerOrNext);
context.unsubscribe = () => this.unsubscribe();
} else {
context = observerOrNext;
}
next = next?.bind(context);
error = error?.bind(context);
complete = complete?.bind(context);
let next: ((value: T) => void) | undefined;
if (isFunction(observerOrNext)) {
// The first argument is a function, not an observer. The next
// two arguments *could* be observers, or they could be empty.
next = observerOrNext;
} else if (observerOrNext) {
// The first argument is an observer object, we have to pull the handlers
// off and capture the owner object as the context. That is because we're
// going to put them all in a new destination with ensured methods
// for `next`, `error`, and `complete`. That's part of what makes this
// the "Safe" Subscriber.
({ next, error, complete } = observerOrNext);
let context: any;
if (this && config.useDeprecatedNextContext) {
// This is a deprecated path that made `this.unsubscribe()` available in
// next handler functions passed to subscribe. This only exists behind a flag
// now, as it is *very* slow.
context = Object.create(observerOrNext);
context.unsubscribe = () => this.unsubscribe();
} else {
context = observerOrNext;
}

// Once we set the destination, the superclass `Subscriber` will
// do it's magic in the `_next`, `_error`, and `_complete` methods.
this.destination = {
next: next ? maybeWrapForDeprecatedSyncErrorHandling(next, this) : noop,
error: error ? maybeWrapForDeprecatedSyncErrorHandling(error, this) : defaultErrorHandler,
complete: complete ? maybeWrapForDeprecatedSyncErrorHandling(complete, this) : noop,
};
next = next?.bind(context);
error = error?.bind(context);
complete = complete?.bind(context);
}

// Once we set the destination, the superclass `Subscriber` will
// do it's magic in the `_next`, `_error`, and `_complete` methods.
this.destination = {
next: next ? maybeWrapForDeprecatedSyncErrorHandling(next, this) : noop,
error: maybeWrapForDeprecatedSyncErrorHandling(error ? error : defaultErrorHandler, this),
complete: complete ? maybeWrapForDeprecatedSyncErrorHandling(complete, this) : noop,
};
}
}

Expand Down

0 comments on commit 73f76eb

Please sign in to comment.