Skip to content

Commit

Permalink
refactor(timeout): Getting rid of _unsubscribeAndRecycle (#5626)
Browse files Browse the repository at this point in the history
In an effort to simplify the code base, along with the other work to retry, retryWhen, repeat, and repeatWhen, this should help us get rid of `_unsubscribeAndRecycle` and move us toward simplifying Subscriber in the long run. There are obviously some memory trade offs using functions on POJOs over on prototypes, but given other recent memory improvements, I think that is probably not a big deal.
  • Loading branch information
benlesh authored Aug 5, 2020
1 parent 0ca8a65 commit af1824e
Showing 1 changed file with 61 additions and 85 deletions.
146 changes: 61 additions & 85 deletions src/internal/operators/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/** @prettier */
import { asyncScheduler } from '../scheduler/async';
import { MonoTypeOperatorFunction, SchedulerLike, OperatorFunction, ObservableInput, TeardownLogic } from '../types';
import { MonoTypeOperatorFunction, SchedulerLike, OperatorFunction, ObservableInput } from '../types';
import { isValidDate } from '../util/isDate';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { lift } from '../util/lift';
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
import { from } from '../observable/from';

export interface TimeoutConfig<T, R = T, M = unknown> {
/**
Expand Down Expand Up @@ -336,7 +335,65 @@ export function timeout<T, R, M>(
throw new TypeError('No timeout provided.');
}

return lift(source, new TimeoutWithOperator(first, each, _with, scheduler, meta));
return lift(source, function (this: Subscriber<T | R>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription;
let timerSubscription: Subscription | null = null;
let lastValue: T | null = null;
let seen = 0;
const startTimer = (delay: number) => {
subscription.add(
(timerSubscription = scheduler!.schedule(() => {
let withObservable: Observable<R>;
const info: TimeoutInfo<T, M> = {
meta,
lastValue,
seen,
};
try {
withObservable = from(_with!(info));
} catch (err) {
subscriber.error(err);
return;
}
innerSub.unsubscribe();
subscription.add(withObservable.subscribe(subscriber));
}, delay))
);
};

subscription.add(
(innerSub = source.subscribe({
next: (value) => {
timerSubscription?.unsubscribe();
timerSubscription = null;
seen++;
lastValue = value;
if (each != null && each > 0) {
startTimer(each);
}
subscriber.next(value);
},
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
}))
);

let firstTimer: number;
if (first != null) {
if (typeof first === 'number') {
firstTimer = first;
} else {
firstTimer = +first - scheduler!.now();
}
} else {
firstTimer = each!;
}
startTimer(firstTimer);

return subscription;
});
};
}

Expand All @@ -348,84 +405,3 @@ export function timeout<T, R, M>(
function timeoutErrorFactory(info: TimeoutInfo<any>): Observable<never> {
throw new TimeoutError(info);
}

class TimeoutWithOperator<T, R, M> implements Operator<T, R> {
constructor(
private first: number | Date | undefined,
private each: number | undefined,
private _with: (info: TimeoutInfo<T, M>) => ObservableInput<R>,
private scheduler: SchedulerLike,
private meta: any
) {}

call(subscriber: Subscriber<T | R>, source: any): TeardownLogic {
return source.subscribe(new TimeoutWithSubscriber(subscriber, this.first, this.each, this._with, this.scheduler, this.meta));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class TimeoutWithSubscriber<T, R, M> extends SimpleOuterSubscriber<T, R> {
private _timerSubscription?: Subscription;
private lastValue?: T;
private seen = 0;

constructor(
destination: Subscriber<T>,
first: number | Date | undefined,
private each: number | undefined,
private _with: (info: TimeoutInfo<T, M>) => ObservableInput<R>,
private scheduler: SchedulerLike,
private meta: any
) {
super(destination);

let firstTimer: number;

if (first != null) {
if (typeof first === 'number') {
firstTimer = first;
} else {
firstTimer = +first - scheduler.now();
}
} else {
firstTimer = each!;
}

this.startTimer(firstTimer);
}

private startTimer(delay: number): void {
this._timerSubscription = this.scheduler.schedule(() => {
const { meta, seen, lastValue = null } = this;
let result: ObservableInput<R>;
try {
result = this._with({
meta,
seen,
lastValue,
});
} catch (err) {
this.destination.error(err);
return;
}
this._unsubscribeAndRecycle();
this.add(innerSubscribe(result, new SimpleInnerSubscriber(this)));
}, delay);
this.add(this._timerSubscription);
}

protected _next(value: T): void {
const { each } = this;
this._timerSubscription?.unsubscribe();
this.seen++;
this.lastValue = value;
if (each != null) {
this.startTimer(each);
}
super._next(value);
}
}

0 comments on commit af1824e

Please sign in to comment.