Skip to content

Commit

Permalink
refactor(Observable): simplify subscription and subscriber code (#5703)
Browse files Browse the repository at this point in the history
* refactor(Observable): simplify subscription and subscriber code

- Gets rid of weirdness with syncErrorThrowable, et al

* chore: update golden files
  • Loading branch information
benlesh authored Sep 8, 2020
1 parent 3485dd5 commit c570750
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 69 deletions.
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ export declare type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<ne

export declare class Subscriber<T> extends Subscription implements Observer<T> {
protected destination: Observer<any> | Subscriber<any>;
protected isStopped: boolean; syncErrorThrowable: boolean; syncErrorThrown: boolean; syncErrorValue: any;
protected isStopped: boolean;
constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void) | null, error?: ((e?: any) => void) | null, complete?: (() => void) | null);
protected _complete(): void;
protected _error(err: any): void;
Expand Down
2 changes: 1 addition & 1 deletion spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ describe('Observable', () => {
expect(() => throwError(new Error('thrown error')).subscribe()).to.throw(Error, 'thrown error');
});

it('should rethrow if sink has syncErrorThrowable = false', () => {
it('should rethrow if next handler throws', () => {
const observable = new Observable((observer) => {
observer.next(1);
});
Expand Down
68 changes: 21 additions & 47 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,37 +208,25 @@ export class Observable<T> implements Subscribable<T> {
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
const subscriber = toSubscriber(observerOrNext, error, complete);

if (operator) {
sink.add(operator.call(sink, this.source));
} else {
// If we have a source, we know this observable is the result of one of our lifted
// operators. We choose the trusted path of `_subscribe`. Otherwise,
// if we do not have a source, that means this is a user-created observable, and the
// subscribe function may error, so we need to wrap it in a try-catch. Likewise,
// if the user has chosen to use the deprecated synchronous error throwning behavior,
// AND the current subscriber is in a syncErrorThrowable state, meaning we're in the act
// of subscribing, we need to wrap the subscribe in a try-catch so we can flag
// synchronously thrown errors and throw them again.
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable)
? this._subscribe(sink)
: this._trySubscribe(sink)
);
}

if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
// If we have an operator, it's the result of a lift, and we let the lift
// mechanism do the subscription for us in the operator call. Otherwise,
// if we have a source, it's a trusted observable we own, and we can call
// the _subscribe without wrapping it in a try/catch. If we are supposed to
// use the deprecated sync error handling, then we don't need the try/catch either
// otherwise, it may be from a user-made observable instance, and we want to
// wrap it in a try/catch so we can handle errors appropriately.
const { operator, source } = this;
subscriber.add(
operator
? operator.call(subscriber, source)
: source || config.useDeprecatedSynchronousErrorHandling
? this._subscribe(subscriber)
: this._trySubscribe(subscriber)
);

return sink;
return subscriber;
}

/** @deprecated This is an internal implementation detail, do not use. */
Expand All @@ -247,8 +235,7 @@ export class Observable<T> implements Subscribable<T> {
return this._subscribe(sink);
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
sink.syncErrorThrown = true;
sink.syncErrorValue = err;
throw err;
} else {
if (canReportError(sink)) {
sink.error(err);
Expand Down Expand Up @@ -346,8 +333,7 @@ export class Observable<T> implements Subscribable<T> {

/** @internal This is an internal implementation detail, do not use. */
protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
const { source } = this;
return source && source.subscribe(subscriber);
return this.source?.subscribe(subscriber);
}

/**
Expand Down Expand Up @@ -450,11 +436,7 @@ export class Observable<T> implements Subscribable<T> {
* ```
*/
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
if (operations.length === 0) {
return this as any;
}

return pipeFromArray(operations)(this);
return operations.length ? pipeFromArray(operations)(this) : this;
}

/* tslint:disable:max-line-length */
Expand Down Expand Up @@ -506,15 +488,7 @@ export class Observable<T> implements Subscribable<T> {
* @param promiseCtor The optional promise constructor to passed by consuming code
*/
function getPromiseCtor(promiseCtor: PromiseConstructorLike | undefined) {
if (!promiseCtor) {
promiseCtor = config.Promise || Promise;
}

if (!promiseCtor) {
throw new Error('no Promise impl found');
}

return promiseCtor;
return promiseCtor ?? config.Promise ?? Promise;
}

/**
Expand Down
25 changes: 5 additions & 20 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,9 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
static create<T>(next?: (x?: T) => void,
error?: (e?: any) => void,
complete?: () => void): Subscriber<T> {
const subscriber = new Subscriber(next, error, complete);
subscriber.syncErrorThrowable = false;
return subscriber;
return new Subscriber(next, error, complete);
}

/** @internal */ syncErrorValue: any = null;
/** @internal */ syncErrorThrown: boolean = false;
/** @internal */ syncErrorThrowable: boolean = false;

protected isStopped: boolean = false;
protected destination: Observer<any> | Subscriber<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)

Expand Down Expand Up @@ -67,17 +61,14 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
this.destination = destinationOrNext;
destinationOrNext.add(this);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
}
break;
}
default:
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
break;
}
Expand Down Expand Up @@ -173,9 +164,9 @@ export class SafeSubscriber<T> extends Subscriber<T> {
} else {
context = observerOrNext;
}
next = next && next.bind(context);
error = error && error.bind(context);
complete = complete && complete.bind(context);
next = next?.bind(context);
error = error?.bind(context);
complete = complete?.bind(context);
if (isSubscription(observerOrNext)) {
observerOrNext.add(this.unsubscribe.bind(this));
}
Expand Down Expand Up @@ -216,13 +207,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
private _throw(err: any) {
this.unsubscribe();
if (config.useDeprecatedSynchronousErrorHandling) {
const { _parentSubscriber } = this;
if (_parentSubscriber?.syncErrorThrowable) {
_parentSubscriber.syncErrorValue = err;
_parentSubscriber.syncErrorThrown = true;
} else {
throw err;
}
throw err;
} else {
reportUnhandledError(err);
}
Expand Down

0 comments on commit c570750

Please sign in to comment.