Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Improve memory pressure #5613

Merged
merged 1 commit into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions src/internal/innerSubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/** @prettier */
import { Subscription } from './Subscription';
import { Subscriber } from './Subscriber';
import { Observable } from './Observable';
import { subscribeTo } from './util/subscribeTo';

interface SimpleOuterSubscriberLike<T> {
/**
* A handler for inner next notifications from the inner subscription
* @param innerValue the value nexted by the inner producer
*/
notifyNext(innerValue: T): void;
/**
* A handler for inner error notifications from the inner subscription
* @param err the error from the inner producer
*/
notifyError(err: any): void;
/**
* A handler for inner complete notifications from the inner subscription.
*/
notifyComplete(): void;
}

export class SimpleInnerSubscriber<T> extends Subscriber<T> {
constructor(private parent: SimpleOuterSubscriberLike<any>) {
super();
}

protected _next(value: T): void {
this.parent.notifyNext(value);
}

protected _error(error: any): void {
this.parent.notifyError(error);
this.unsubscribe();
}

protected _complete(): void {
this.parent.notifyComplete();
this.unsubscribe();
}
}

export class ComplexInnerSubscriber<T, R> extends Subscriber<R> {
constructor(private parent: ComplexOuterSubscriber<T, R>, public outerValue: T, public outerIndex: number) {
super();
}

protected _next(value: R): void {
this.parent.notifyNext(this.outerValue, value, this.outerIndex, this);
}

protected _error(error: any): void {
this.parent.notifyError(error);
this.unsubscribe();
}

protected _complete(): void {
this.parent.notifyComplete(this);
this.unsubscribe();
}
}

export class SimpleOuterSubscriber<T, R> extends Subscriber<T> implements SimpleOuterSubscriberLike<R> {
notifyNext(innerValue: R): void {
this.destination.next(innerValue);
}

notifyError(err: any): void {
this.destination.error(err);
}

notifyComplete(): void {
this.destination.complete();
}
}

/**
* DO NOT USE (formerly "OuterSubscriber")
* TODO: We want to refactor this and remove it. It is retaining values it shouldn't for long
* periods of time.
*/
export class ComplexOuterSubscriber<T, R> extends Subscriber<T> {
/**
* @param _outerValue Used by: bufferToggle, delayWhen, windowToggle
* @param innerValue Used by: subclass default, combineLatest, race, bufferToggle, windowToggle, withLatestFrom
* @param _outerIndex Used by: combineLatest, race, withLatestFrom
* @param _innerSub Used by: delayWhen
*/
notifyNext(_outerValue: T, innerValue: R, _outerIndex: number, _innerSub: ComplexInnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}

notifyError(error: any): void {
this.destination.error(error);
}

/**
* @param _innerSub Used by: race, bufferToggle, delayWhen, windowToggle, windowWhen
*/
notifyComplete(_innerSub: ComplexInnerSubscriber<T, R>): void {
this.destination.complete();
}
}

export function innerSubscribe(result: any, innerSubscriber: Subscriber<any>): Subscription | undefined {
if (innerSubscriber.closed) {
return undefined;
}
if (result instanceof Observable) {
return result.subscribe(innerSubscriber);
}
return subscribeTo(result)(innerSubscriber) as Subscription;
}
27 changes: 13 additions & 14 deletions src/internal/observable/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ export function combineLatest<R>(...observables: Array<ObservableInput<any> | ((
export function combineLatest<O extends ObservableInput<any>, R>(
...observables: (O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike)[]
): Observable<R> {
let resultSelector: (...values: Array<any>) => R = null;
let scheduler: SchedulerLike = null;
let resultSelector: ((...values: Array<any>) => R) | undefined = undefined;
let scheduler: SchedulerLike|undefined = undefined;

if (isScheduler(observables[observables.length - 1])) {
scheduler = observables.pop() as SchedulerLike;
Expand All @@ -243,7 +243,7 @@ export function combineLatest<O extends ObservableInput<any>, R>(
observables = observables[0] as any;
}

return fromArray(observables, scheduler).lift(new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
return fromArray(observables, scheduler).lift(new CombineLatestOperator(resultSelector));
}

export class CombineLatestOperator<T, R> implements Operator<T, R> {
Expand All @@ -264,7 +264,7 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
private active: number = 0;
private values: any[] = [];
private observables: any[] = [];
private toRespond: number;
private toRespond?: number;

constructor(destination: Subscriber<R>, private resultSelector?: (...values: Array<any>) => R) {
super(destination);
Expand All @@ -279,26 +279,25 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
const observables = this.observables;
const len = observables.length;
if (len === 0) {
this.destination.complete();
this.destination.complete!();
} else {
this.active = len;
this.toRespond = len;
for (let i = 0; i < len; i++) {
const observable = observables[i];
this.add(subscribeToResult(this, observable, observable, i));
this.add(subscribeToResult(this, observable, undefined, i));
}
}
}

notifyComplete(unused: Subscriber<R>): void {
if ((this.active -= 1) === 0) {
this.destination.complete();
this.destination.complete!();
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
notifyNext(_outerValue: T, innerValue: R,
outerIndex: number): void {
const values = this.values;
const oldVal = values[outerIndex];
const toRespond = !this.toRespond
Expand All @@ -310,19 +309,19 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
if (this.resultSelector) {
this._tryResultSelector(values);
} else {
this.destination.next(values.slice());
this.destination.next!(values.slice());
}
}
}

private _tryResultSelector(values: any[]) {
let result: any;
try {
result = this.resultSelector.apply(this, values);
result = this.resultSelector!.apply(this, values);
} catch (err) {
this.destination.error(err);
this.destination.error!(err);
return;
}
this.destination.next(result);
this.destination.next!(result);
}
}
17 changes: 8 additions & 9 deletions src/internal/observable/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,23 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
const len = observables.length;

if (len === 0) {
this.destination.complete();
this.destination.complete!();
} else {
for (let i = 0; i < len && !this.hasFirst; i++) {
let observable = observables[i];
let subscription = subscribeToResult(this, observable, observable as any, i);
const observable = observables[i];
const subscription = subscribeToResult(this, observable, undefined, i)!;

if (this.subscriptions) {
this.subscriptions.push(subscription);
}
this.add(subscription);
}
this.observables = null;
this.observables = null!;
}
}

notifyNext(outerValue: T, innerValue: T,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, T>): void {
notifyNext(_outerValue: T, innerValue: T,
outerIndex: number): void {
if (!this.hasFirst) {
this.hasFirst = true;

Expand All @@ -132,9 +131,9 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
}
}

this.subscriptions = null;
this.subscriptions = null!;
}

this.destination.next(innerValue);
this.destination.next!(innerValue);
}
}
Loading