Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(SafeSubscriber): avoid using Object.create #5646

Merged
merged 2 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ export declare function concat<O1 extends ObservableInput<any>, O2 extends Obser
export declare function concat<A extends ObservableInput<any>[]>(...observables: A): Observable<ObservedValueUnionFromArray<A>>;

export declare const config: {
quietBadConfig: boolean;
Promise: PromiseConstructorLike;
useDeprecatedSynchronousErrorHandling: boolean;
useDeprecatedNextContext: boolean;
};

export declare class ConnectableObservable<T> extends Observable<T> {
Expand Down
94 changes: 91 additions & 3 deletions spec/Subscriber-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { SafeSubscriber } from 'rxjs/internal/Subscriber';
import { Subscriber, Observable } from 'rxjs';
import { Subscriber, Observable, config, of } from 'rxjs';
import { asInteropSubscriber } from './helpers/interop-helper';
import { getRegisteredTeardowns } from './helpers/subscription';

Expand Down Expand Up @@ -157,7 +157,6 @@ describe('Subscriber', () => {
expect(getRegisteredTeardowns(subscriber).length).to.equal(0);
});


it('should teardown and unregister all teardowns after complete', () => {
let isTornDown = false;
const subscriber = new Subscriber();
Expand All @@ -166,4 +165,93 @@ describe('Subscriber', () => {
expect(isTornDown).to.be.true;
expect(getRegisteredTeardowns(subscriber).length).to.equal(0);
});
});

it('should NOT break this context on next methods from unfortunate consumers', () => {
// This is a contrived class to illustrate that we can pass another
// object that is "observer shaped" and not have it lose its context
// as it would have in v5 - v6.
class CustomConsumer {
valuesProcessed: string[] = [];

// In here, we access instance state and alter it.
next(value: string) {
if (value === 'reset') {
this.valuesProcessed = [];
} else {
this.valuesProcessed.push(value);
}
}
};

const consumer = new CustomConsumer();

of('old', 'old', 'reset', 'new', 'new').subscribe(consumer);

expect(consumer.valuesProcessed).not.to.equal(['new', 'new']);
});

describe('deprecated next context mode', () => {
beforeEach(() => {
config.quietBadConfig = true;
config.useDeprecatedNextContext = true;
});

afterEach(() => {
config.useDeprecatedNextContext = false;
config.quietBadConfig = false;
});

it('should allow changing the context of `this` in a POJO subscriber', () => {
const results: any[] = [];

const source = new Observable<number>(subscriber => {
for (let i = 0; i < 10 && !subscriber.closed; i++) {
subscriber.next(i);
}
subscriber.complete();

return () => {
results.push('teardown');
}
});

source.subscribe({
next: function (this: any, value) {
expect(this.unsubscribe).to.be.a('function');
results.push(value);
if (value === 3) {
this.unsubscribe();
}
},
complete() {
throw new Error('should not be called');
}
});

expect(results).to.deep.equal([0, 1, 2, 3, 'teardown'])
});

it('should NOT break this context on next methods from unfortunate consumers', () => {
// This is a contrived class to illustrate that we can pass another
// object that is "observer shaped"
class CustomConsumer {
valuesProcessed: string[] = [];

// In here, we access instance state and alter it.
next(value: string) {
if (value === 'reset') {
this.valuesProcessed = [];
} else {
this.valuesProcessed.push(value);
}
}
};

const consumer = new CustomConsumer();

of('old', 'old', 'reset', 'new', 'new').subscribe(consumer);

expect(consumer.valuesProcessed).not.to.equal(['new', 'new']);
});
});
});
31 changes: 17 additions & 14 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,33 +153,37 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
*/
export class SafeSubscriber<T> extends Subscriber<T> {

private _context: any;

constructor(private _parentSubscriber: Subscriber<T>,
observerOrNext?: PartialObserver<T> | ((value: T) => void) | null,
error?: ((e?: any) => void) | null,
complete?: (() => void) | null) {
super();

let next: ((value: T) => void) | undefined;
let context: any = this;

if (isFunction(observerOrNext)) {
next = (<((value: T) => void)> observerOrNext);
next = observerOrNext;
} else if (observerOrNext) {
next = (<PartialObserver<T>> observerOrNext).next;
error = (<PartialObserver<T>> observerOrNext).error;
complete = (<PartialObserver<T>> observerOrNext).complete;
next = observerOrNext.next;
error = observerOrNext.error;
complete = observerOrNext.complete;
if (observerOrNext !== emptyObserver) {
context = Object.create(observerOrNext);
let context: any;
if (config.useDeprecatedNextContext) {
context = Object.create(observerOrNext);
context.unsubscribe = this.unsubscribe.bind(this);
} else {
context = observerOrNext;
}
next = next && next.bind(context);
error = error && error.bind(context);
complete = complete && complete.bind(context);
if (isSubscription(observerOrNext)) {
observerOrNext.add(this.unsubscribe.bind(this));
}
context.unsubscribe = this.unsubscribe.bind(this);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to remove this statement without affecting any tests; this makes me a bit unsure on what this was trying to achieve and if it's safe to remove it, although I can't think of what this was supposed to do.

Copy link
Contributor Author

@JoostK JoostK Aug 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I realized after opening this that without this assignment, it becomes possible to just use context = observerOrNext as an alternative to binding the subscriber callbacks. If that is indeed possible (again, no tests start to fail) then I think switching to use that approach makes sense.

Edit: this assignment does allow anonymous subscribers to call this.unsubscribe(), which this PR would break. Since there's no tests for this, how intentional is that? Furthermore, I'd consider this to cause inconsistencies for anonymous objects that do have unsubscribe themselves. <- Actually, that appears to work, resulting in unsubscription and the anonymous' object's unsubscribe to be called. Here's a small StackBlitz sample.

}
}

this._context = context;
this._next = next!;
this._error = error!;
this._complete = complete!;
Expand Down Expand Up @@ -230,7 +234,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
if (!this.isStopped) {
const { _parentSubscriber } = this;
if (this._complete) {
const wrappedComplete = () => this._complete.call(this._context);
const wrappedComplete = () => this._complete.call(this);
Copy link
Collaborator

@cartant cartant Aug 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: I think this wrappedComplete business should also be conditional on config.useDeprecatedNextContext. I don't think we should provide the subscriber to the caller - via the context - for plain callbacks either.

const wrappedComplete = config.useDeprecatedNextContext
  ? () => this._complete.call(this)
  : this._complete;

And I think we need tests for plain callbacks, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am about to make a pass through all of this. I'm going to merge this as-is, and address the rest in another PR.


if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(wrappedComplete);
Expand All @@ -247,7 +251,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {

private __tryOrUnsub(fn: Function, value?: any): void {
try {
fn.call(this._context, value);
fn(value);
} catch (err) {
this.unsubscribe();
if (config.useDeprecatedSynchronousErrorHandling) {
Expand All @@ -263,7 +267,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
throw new Error('bad call');
}
try {
fn.call(this._context, value);
fn(value);
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
parent.syncErrorValue = err;
Expand All @@ -280,7 +284,6 @@ export class SafeSubscriber<T> extends Subscriber<T> {
/** @internal This is an internal implementation detail, do not use. */
_unsubscribe(): void {
const { _parentSubscriber } = this;
this._context = null;
this._parentSubscriber = null!;
_parentSubscriber.unsubscribe();
}
Expand Down
59 changes: 54 additions & 5 deletions src/internal/config.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
/** @prettier */
let _enable_super_gross_mode_that_will_cause_bad_things = false;
let _enable_deoptimized_subscriber_creation = false;

/**
* The global configuration object for RxJS, used to configure things
* like what Promise contructor should used to create Promises
*/
export const config = {
/**
* If true, console logs for deprecation warnings will not be emitted.
* @deprecated this will be removed in v8 when all deprecated settings are removed.
*/
quietBadConfig: false,

/**
* The promise constructor used by default for methods such as
* {@link toPromise} and {@link forEach}
Expand All @@ -28,11 +36,13 @@ export const config = {
* behaviors described above.
*/
set useDeprecatedSynchronousErrorHandling(value: boolean) {
if (value) {
const error = new Error();
console.warn('DEPRECATED! RxJS was set to use deprecated synchronous error handling behavior by code at: \n' + error.stack);
} else if (_enable_super_gross_mode_that_will_cause_bad_things) {
console.log('RxJS: Back to a better error behavior. Thank you. <3');
if (!this.quietBadConfig) {
if (value) {
const error = new Error();
console.warn('DEPRECATED! RxJS was set to use deprecated synchronous error handling behavior by code at: \n' + error.stack);
} else if (_enable_super_gross_mode_that_will_cause_bad_things) {
console.log('RxJS: Back to a better error behavior. Thank you. <3');
}
}
_enable_super_gross_mode_that_will_cause_bad_things = value;
},
Expand All @@ -45,4 +55,43 @@ export const config = {
get useDeprecatedSynchronousErrorHandling() {
return _enable_super_gross_mode_that_will_cause_bad_things;
},

/**
* If true, enables an as-of-yet undocumented feature from v5: The ability to access
* `unsubscribe()` via `this` context in `next` functions created in observers passed
* to `subscribe`.
*
* This is being removed because the performance was severely problematic, and it could also cause
* issues when types other than POJOs are passed to subscribe as subscribers, as they will likely have
* their `this` context overwritten.
*
* @deprecated remove in v8. As of version 8, RxJS will no longer support altering the
* context of next functions provided as part of an observer to Subscribe. Instead,
* you will have access to a subscription or a signal or token that will allow you to do things like
* unsubscribe and test closed status.
*/
set useDeprecatedNextContext(value: boolean) {
if (!this.quietBadConfig) {
if (value) {
const error = new Error();
console.warn(
'DEPRECATED! RxJS was set to use deprecated next context. This will result in deoptimizations when creating any new subscription. \n' +
error.stack
);
} else if (_enable_deoptimized_subscriber_creation) {
console.log('RxJS: back to more optimized subscription creation. Thank you. <3');
}
}
_enable_deoptimized_subscriber_creation = value;
},

/**
* @deprecated remove in v8. As of version 8, RxJS will no longer support altering the
* context of next functions provided as part of an observer to Subscribe. Instead,
* you will have access to a subscription or a signal or token that will allow you to do things like
* unsubscribe and test closed status.
*/
get useDeprecatedNextContext(): boolean {
return _enable_deoptimized_subscriber_creation;
},
};