Skip to content

Commit

Permalink
feat(windowTime): add higher-order lettable version of windowTime
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Jun 26, 2017
1 parent 9f6373e commit 29ffa1b
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 161 deletions.
163 changes: 2 additions & 161 deletions src/operator/windowTime.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import { IScheduler } from '../Scheduler';
import { Action } from '../scheduler/Action';
import { Subject } from '../Subject';
import { Operator } from '../Operator';
import { async } from '../scheduler/async';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { isNumeric } from '../util/isNumeric';
import { isScheduler } from '../util/isScheduler';
import { windowTime as higherOrder } from '../operators';

/**
* Branch out the source Observable values as a nested Observable periodically
Expand Down Expand Up @@ -102,160 +98,5 @@ export function windowTime<T>(this: Observable<T>,
windowCreationInterval = arguments[1];
}

return this.lift(new WindowTimeOperator<T>(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler));
}

class WindowTimeOperator<T> implements Operator<T, Observable<T>> {

constructor(private windowTimeSpan: number,
private windowCreationInterval: number | null,
private maxWindowSize: number,
private scheduler: IScheduler) {
}

call(subscriber: Subscriber<Observable<T>>, source: any): any {
return source.subscribe(new WindowTimeSubscriber(
subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler
));
}
}

interface CreationState<T> {
windowTimeSpan: number;
windowCreationInterval: number;
subscriber: WindowTimeSubscriber<T>;
scheduler: IScheduler;
}

interface TimeSpanOnlyState<T> {
window: CountedSubject<T>;
windowTimeSpan: number;
subscriber: WindowTimeSubscriber<T>;
}

interface CloseWindowContext<T> {
action: Action<CreationState<T>>;
subscription: Subscription;
}

interface CloseState<T> {
subscriber: WindowTimeSubscriber<T>;
window: CountedSubject<T>;
context: CloseWindowContext<T>;
}

class CountedSubject<T> extends Subject<T> {
private _numberOfNextedValues: number = 0;

next(value?: T): void {
this._numberOfNextedValues++;
super.next(value);
}

get numberOfNextedValues(): number {
return this._numberOfNextedValues;
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class WindowTimeSubscriber<T> extends Subscriber<T> {
private windows: CountedSubject<T>[] = [];

constructor(protected destination: Subscriber<Observable<T>>,
private windowTimeSpan: number,
private windowCreationInterval: number | null,
private maxWindowSize: number,
private scheduler: IScheduler) {
super(destination);

const window = this.openWindow();
if (windowCreationInterval !== null && windowCreationInterval >= 0) {
const closeState: CloseState<T> = { subscriber: this, window, context: <any>null };
const creationState: CreationState<T> = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler };
this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState));
this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState));
} else {
const timeSpanOnlyState: TimeSpanOnlyState<T> = { subscriber: this, window, windowTimeSpan };
this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
}
}

protected _next(value: T): void {
const windows = this.windows;
const len = windows.length;
for (let i = 0; i < len; i++) {
const window = windows[i];
if (!window.closed) {
window.next(value);
if (window.numberOfNextedValues >= this.maxWindowSize) {
this.closeWindow(window);
}
}
}
}

protected _error(err: any): void {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().error(err);
}
this.destination.error(err);
}

protected _complete(): void {
const windows = this.windows;
while (windows.length > 0) {
const window = windows.shift();
if (!window.closed) {
window.complete();
}
}
this.destination.complete();
}

public openWindow(): CountedSubject<T> {
const window = new CountedSubject<T>();
this.windows.push(window);
const destination = this.destination;
destination.next(window);
return window;
}

public closeWindow(window: CountedSubject<T>): void {
window.complete();
const windows = this.windows;
windows.splice(windows.indexOf(window), 1);
}
}

function dispatchWindowTimeSpanOnly<T>(this: Action<TimeSpanOnlyState<T>>, state: TimeSpanOnlyState<T>): void {
const { subscriber, windowTimeSpan, window } = state;
if (window) {
subscriber.closeWindow(window);
}
state.window = subscriber.openWindow();
this.schedule(state, windowTimeSpan);
}

function dispatchWindowCreation<T>(this: Action<CreationState<T>>, state: CreationState<T>): void {
const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
const window = subscriber.openWindow();
const action = this;
let context: CloseWindowContext<T> = { action, subscription: <any>null };
const timeSpanState: CloseState<T> = { subscriber, window, context };
context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState);
action.add(context.subscription);
action.schedule(state, windowCreationInterval);
}

function dispatchWindowClose<T>(state: CloseState<T>): void {
const { subscriber, window, context } = state;
if (context && context.action && context.subscription) {
context.action.remove(context.subscription);
}
subscriber.closeWindow(window);
return higherOrder(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)(this);
}
1 change: 1 addition & 0 deletions src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ export { switchMap } from './switchMap';
export { takeLast } from './takeLast';
export { tap } from './tap';
export { window } from './window';
export { windowTime } from './windowTime';
export { windowToggle } from './windowToggle';
Loading

0 comments on commit 29ffa1b

Please sign in to comment.