Skip to content

Commit

Permalink
Merge pull request #2439 from saneyuki/clarify-the-type-delay
Browse files Browse the repository at this point in the history
refactor(delay): clarify the typing of DelaySubscriber
  • Loading branch information
kwonoj authored Mar 6, 2017
2 parents 7c41c08 + b680723 commit 69d051b
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions src/operator/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import { isDate } from '../util/isDate';
import { Operator } from '../Operator';
import { IScheduler } from '../Scheduler';
import { Subscriber } from '../Subscriber';
import { Action } from '../scheduler/Action';
import { Notification } from '../Notification';
import { Observable } from '../Observable';
import { PartialObserver } from '../Observer';
import { TeardownLogic } from '../Subscription';

/**
Expand Down Expand Up @@ -63,17 +65,23 @@ class DelayOperator<T> implements Operator<T, T> {
}
}

interface DelayState<T> {
source: DelaySubscriber<T>;
destination: PartialObserver<T>;
scheduler: IScheduler;
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DelaySubscriber<T> extends Subscriber<T> {
private queue: Array<any> = [];
private queue: Array<DelayMessage<T>> = [];
private active: boolean = false;
private errored: boolean = false;

private static dispatch(state: any): void {
private static dispatch<T>(this: Action<DelayState<T>>, state: DelayState<T>): void {
const source = state.source;
const queue = source.queue;
const scheduler = state.scheduler;
Expand All @@ -85,7 +93,7 @@ class DelaySubscriber<T> extends Subscriber<T> {

if (queue.length > 0) {
const delay = Math.max(0, queue[0].time - scheduler.now());
(<any> this).schedule(state, delay);
this.schedule(state, delay);
} else {
source.active = false;
}
Expand All @@ -99,12 +107,12 @@ class DelaySubscriber<T> extends Subscriber<T> {

private _schedule(scheduler: IScheduler): void {
this.active = true;
this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, {
this.add(scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch, this.delay, {
source: this, destination: this.destination, scheduler: scheduler
}));
}

private scheduleNotification(notification: Notification<any>): void {
private scheduleNotification(notification: Notification<T>): void {
if (this.errored === true) {
return;
}
Expand Down Expand Up @@ -134,7 +142,7 @@ class DelaySubscriber<T> extends Subscriber<T> {
}

class DelayMessage<T> {
constructor(private time: number,
private notification: any) {
constructor(public readonly time: number,
public readonly notification: Notification<T>) {
}
}

0 comments on commit 69d051b

Please sign in to comment.