Skip to content

Commit

Permalink
perf: removed code that would bind functions passed with observers …
Browse files Browse the repository at this point in the history
…to `subscribe`. (#6815)

* refactor(SafeSubscriber): optimize perf for ordinary observers

No longer require function binding if we aren't using the deprecated next context. This should improve performance in the common path of consumers subscribing with an object or even with a function.

Adds a simple class `ConsumerObserver` which is mostly meant to optimize the number of function refrences created. We should never expose this externally.

Related #6783

* chore: update comments

* refactor(Subscriber): reduce property access
  • Loading branch information
benlesh authored Feb 9, 2022
1 parent 481313d commit fb375a0
Showing 1 changed file with 71 additions and 42 deletions.
113 changes: 71 additions & 42 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,49 @@ function bind<Fn extends (...args: any[]) => any>(fn: Fn, thisArg: any): Fn {
return _bind.call(fn, thisArg);
}

/**
* Internal optimization only, DO NOT EXPOSE.
* @internal
*/
class ConsumerObserver<T> implements Observer<T> {
constructor(private partialObserver: Partial<Observer<T>>) {}

next(value: T): void {
const { partialObserver } = this;
if (partialObserver.next) {
try {
partialObserver.next(value);
} catch (error) {
handleUnhandledError(error);
}
}
}

error(err: any): void {
const { partialObserver } = this;
if (partialObserver.error) {
try {
partialObserver.error(err);
} catch (error) {
handleUnhandledError(error);
}
} else {
handleUnhandledError(err);
}
}

complete(): void {
const { partialObserver } = this;
if (partialObserver.complete) {
try {
partialObserver.complete();
} catch (error) {
handleUnhandledError(error);
}
}
}
}

export class SafeSubscriber<T> extends Subscriber<T> {
constructor(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
Expand All @@ -155,65 +198,51 @@ export class SafeSubscriber<T> extends Subscriber<T> {
) {
super();

let next: ((value: T) => void) | undefined;
if (isFunction(observerOrNext)) {
let partialObserver: Partial<Observer<T>>;
if (isFunction(observerOrNext) || !observerOrNext) {
// The first argument is a function, not an observer. The next
// two arguments *could* be observers, or they could be empty.
next = observerOrNext;
} else if (observerOrNext) {
// The first argument is an observer object, we have to pull the handlers
// off and capture the owner object as the context. That is because we're
// going to put them all in a new destination with ensured methods
// for `next`, `error`, and `complete`. That's part of what makes this
// the "Safe" Subscriber.
({ next, error, complete } = observerOrNext);
partialObserver = {
next: observerOrNext ?? undefined,
error: error ?? undefined,
complete: complete ?? undefined,
};
} else {
// The first argument is a partial observer.
let context: any;
if (this && config.useDeprecatedNextContext) {
// This is a deprecated path that made `this.unsubscribe()` available in
// next handler functions passed to subscribe. This only exists behind a flag
// now, as it is *very* slow.
context = Object.create(observerOrNext);
context.unsubscribe = () => this.unsubscribe();
partialObserver = {
next: observerOrNext.next && bind(observerOrNext.next, context),
error: observerOrNext.error && bind(observerOrNext.error, context),
complete: observerOrNext.complete && bind(observerOrNext.complete, context),
};
} else {
context = observerOrNext;
// The "normal" path. Just use the partial observer directly.
partialObserver = observerOrNext;
}
next = next && bind(next, context);
error = error && bind(error, context);
complete = complete && bind(complete, context);
}

// Once we set the destination, the superclass `Subscriber` will
// do it's magic in the `_next`, `_error`, and `_complete` methods.
this.destination = {
next: next ? wrapForErrorHandling(next, this) : noop,
error: wrapForErrorHandling(error ?? defaultErrorHandler, this),
complete: complete ? wrapForErrorHandling(complete, this) : noop,
};
// Wrap the partial observer to ensure it's a full observer, and
// make sure proper error handling is accounted for.
this.destination = new ConsumerObserver(partialObserver);
}
}

/**
* Wraps a user-provided handler (or our {@link defaultErrorHandler} in one case) to
* ensure that any thrown errors are caught and handled appropriately.
*
* @param handler The handler to wrap
* @param instance The SafeSubscriber instance we're going to mark if there's an error.
*/
function wrapForErrorHandling(handler: (arg?: any) => void, instance: SafeSubscriber<any>) {
return (...args: any[]) => {
try {
handler(...args);
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
captureError(err);
} else {
// Ideal path, we report this as an unhandled error,
// which is thrown on a new call stack.
reportUnhandledError(err);
}
}
};
function handleUnhandledError(error: any) {
if (config.useDeprecatedSynchronousErrorHandling) {
captureError(error);
} else {
// Ideal path, we report this as an unhandled error,
// which is thrown on a new call stack.
reportUnhandledError(error);
}
}

/**
* An error handler used when no error handler was supplied
* to the SafeSubscriber -- meaning no error handler was supplied
Expand Down

0 comments on commit fb375a0

Please sign in to comment.