Skip to content

Commit 2a9e54c

Browse files
committed
feat(windowCount): add higher-order lettable version of windowCount
1 parent 29ffa1b commit 2a9e54c

File tree

3 files changed

+138
-78
lines changed

3 files changed

+138
-78
lines changed

src/operator/windowCount.ts

Lines changed: 3 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import { Operator } from '../Operator';
2-
import { Subscriber } from '../Subscriber';
1+
32
import { Observable } from '../Observable';
4-
import { Subject } from '../Subject';
3+
import { windowCount as higherOrder } from '../operators';
54

65
/**
76
* Branch out the source Observable values as a nested Observable with each
@@ -53,79 +52,5 @@ import { Subject } from '../Subject';
5352
*/
5453
export function windowCount<T>(this: Observable<T>, windowSize: number,
5554
startWindowEvery: number = 0): Observable<Observable<T>> {
56-
return this.lift(new WindowCountOperator<T>(windowSize, startWindowEvery));
57-
}
58-
59-
class WindowCountOperator<T> implements Operator<T, Observable<T>> {
60-
61-
constructor(private windowSize: number,
62-
private startWindowEvery: number) {
63-
}
64-
65-
call(subscriber: Subscriber<Observable<T>>, source: any): any {
66-
return source.subscribe(new WindowCountSubscriber(subscriber, this.windowSize, this.startWindowEvery));
67-
}
68-
}
69-
70-
/**
71-
* We need this JSDoc comment for affecting ESDoc.
72-
* @ignore
73-
* @extends {Ignored}
74-
*/
75-
class WindowCountSubscriber<T> extends Subscriber<T> {
76-
private windows: Subject<T>[] = [ new Subject<T>() ];
77-
private count: number = 0;
78-
79-
constructor(protected destination: Subscriber<Observable<T>>,
80-
private windowSize: number,
81-
private startWindowEvery: number) {
82-
super(destination);
83-
destination.next(this.windows[0]);
84-
}
85-
86-
protected _next(value: T) {
87-
const startWindowEvery = (this.startWindowEvery > 0) ? this.startWindowEvery : this.windowSize;
88-
const destination = this.destination;
89-
const windowSize = this.windowSize;
90-
const windows = this.windows;
91-
const len = windows.length;
92-
93-
for (let i = 0; i < len && !this.closed; i++) {
94-
windows[i].next(value);
95-
}
96-
const c = this.count - windowSize + 1;
97-
if (c >= 0 && c % startWindowEvery === 0 && !this.closed) {
98-
windows.shift().complete();
99-
}
100-
if (++this.count % startWindowEvery === 0 && !this.closed) {
101-
const window = new Subject<T>();
102-
windows.push(window);
103-
destination.next(window);
104-
}
105-
}
106-
107-
protected _error(err: any) {
108-
const windows = this.windows;
109-
if (windows) {
110-
while (windows.length > 0 && !this.closed) {
111-
windows.shift().error(err);
112-
}
113-
}
114-
this.destination.error(err);
115-
}
116-
117-
protected _complete() {
118-
const windows = this.windows;
119-
if (windows) {
120-
while (windows.length > 0 && !this.closed) {
121-
windows.shift().complete();
122-
}
123-
}
124-
this.destination.complete();
125-
}
126-
127-
protected _unsubscribe() {
128-
this.count = 0;
129-
this.windows = null;
130-
}
55+
return higherOrder(windowSize, startWindowEvery)(this);
13156
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ export { switchMap } from './switchMap';
2323
export { takeLast } from './takeLast';
2424
export { tap } from './tap';
2525
export { window } from './window';
26+
export { windowCount } from './windowCount';
2627
export { windowTime } from './windowTime';
2728
export { windowToggle } from './windowToggle';

src/operators/windowCount.ts

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import { Operator } from '../Operator';
2+
import { Subscriber } from '../Subscriber';
3+
import { Observable } from '../Observable';
4+
import { Subject } from '../Subject';
5+
import { OperatorFunction } from '../interfaces';
6+
7+
/**
8+
* Branch out the source Observable values as a nested Observable with each
9+
* nested Observable emitting at most `windowSize` values.
10+
*
11+
* <span class="informal">It's like {@link bufferCount}, but emits a nested
12+
* Observable instead of an array.</span>
13+
*
14+
* <img src="./img/windowCount.png" width="100%">
15+
*
16+
* Returns an Observable that emits windows of items it collects from the source
17+
* Observable. The output Observable emits windows every `startWindowEvery`
18+
* items, each containing no more than `windowSize` items. When the source
19+
* Observable completes or encounters an error, the output Observable emits
20+
* the current window and propagates the notification from the source
21+
* Observable. If `startWindowEvery` is not provided, then new windows are
22+
* started immediately at the start of the source and when each window completes
23+
* with size `windowSize`.
24+
*
25+
* @example <caption>Ignore every 3rd click event, starting from the first one</caption>
26+
* var clicks = Rx.Observable.fromEvent(document, 'click');
27+
* var result = clicks.windowCount(3)
28+
* .map(win => win.skip(1)) // skip first of every 3 clicks
29+
* .mergeAll(); // flatten the Observable-of-Observables
30+
* result.subscribe(x => console.log(x));
31+
*
32+
* @example <caption>Ignore every 3rd click event, starting from the third one</caption>
33+
* var clicks = Rx.Observable.fromEvent(document, 'click');
34+
* var result = clicks.windowCount(2, 3)
35+
* .mergeAll(); // flatten the Observable-of-Observables
36+
* result.subscribe(x => console.log(x));
37+
*
38+
* @see {@link window}
39+
* @see {@link windowTime}
40+
* @see {@link windowToggle}
41+
* @see {@link windowWhen}
42+
* @see {@link bufferCount}
43+
*
44+
* @param {number} windowSize The maximum number of values emitted by each
45+
* window.
46+
* @param {number} [startWindowEvery] Interval at which to start a new window.
47+
* For example if `startWindowEvery` is `2`, then a new window will be started
48+
* on every other value from the source. A new window is started at the
49+
* beginning of the source by default.
50+
* @return {Observable<Observable<T>>} An Observable of windows, which in turn
51+
* are Observable of values.
52+
* @method windowCount
53+
* @owner Observable
54+
*/
55+
export function windowCount<T>(windowSize: number,
56+
startWindowEvery: number = 0): OperatorFunction<T, Observable<T>> {
57+
return function windowCountOperatorFunction(source: Observable<T>) {
58+
return source.lift(new WindowCountOperator<T>(windowSize, startWindowEvery));
59+
};
60+
}
61+
62+
class WindowCountOperator<T> implements Operator<T, Observable<T>> {
63+
64+
constructor(private windowSize: number,
65+
private startWindowEvery: number) {
66+
}
67+
68+
call(subscriber: Subscriber<Observable<T>>, source: any): any {
69+
return source.subscribe(new WindowCountSubscriber(subscriber, this.windowSize, this.startWindowEvery));
70+
}
71+
}
72+
73+
/**
74+
* We need this JSDoc comment for affecting ESDoc.
75+
* @ignore
76+
* @extends {Ignored}
77+
*/
78+
class WindowCountSubscriber<T> extends Subscriber<T> {
79+
private windows: Subject<T>[] = [ new Subject<T>() ];
80+
private count: number = 0;
81+
82+
constructor(protected destination: Subscriber<Observable<T>>,
83+
private windowSize: number,
84+
private startWindowEvery: number) {
85+
super(destination);
86+
destination.next(this.windows[0]);
87+
}
88+
89+
protected _next(value: T) {
90+
const startWindowEvery = (this.startWindowEvery > 0) ? this.startWindowEvery : this.windowSize;
91+
const destination = this.destination;
92+
const windowSize = this.windowSize;
93+
const windows = this.windows;
94+
const len = windows.length;
95+
96+
for (let i = 0; i < len && !this.closed; i++) {
97+
windows[i].next(value);
98+
}
99+
const c = this.count - windowSize + 1;
100+
if (c >= 0 && c % startWindowEvery === 0 && !this.closed) {
101+
windows.shift().complete();
102+
}
103+
if (++this.count % startWindowEvery === 0 && !this.closed) {
104+
const window = new Subject<T>();
105+
windows.push(window);
106+
destination.next(window);
107+
}
108+
}
109+
110+
protected _error(err: any) {
111+
const windows = this.windows;
112+
if (windows) {
113+
while (windows.length > 0 && !this.closed) {
114+
windows.shift().error(err);
115+
}
116+
}
117+
this.destination.error(err);
118+
}
119+
120+
protected _complete() {
121+
const windows = this.windows;
122+
if (windows) {
123+
while (windows.length > 0 && !this.closed) {
124+
windows.shift().complete();
125+
}
126+
}
127+
this.destination.complete();
128+
}
129+
130+
protected _unsubscribe() {
131+
this.count = 0;
132+
this.windows = null;
133+
}
134+
}

0 commit comments

Comments
 (0)