Skip to content

Commit 0ae2ed5

Browse files
committed
feat(bufferCount): add higher-order lettable version of bufferCount
1 parent d8ca9de commit 0ae2ed5

File tree

3 files changed

+149
-97
lines changed

3 files changed

+149
-97
lines changed

src/operator/bufferCount.ts

Lines changed: 3 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import { Operator } from '../Operator';
2-
import { Subscriber } from '../Subscriber';
1+
32
import { Observable } from '../Observable';
4-
import { TeardownLogic } from '../Subscription';
3+
import { bufferCount as higherOrder } from '../operators';
54

65
/**
76
* Buffers the source Observable values until the size hits the maximum
@@ -45,98 +44,5 @@ import { TeardownLogic } from '../Subscription';
4544
* @owner Observable
4645
*/
4746
export function bufferCount<T>(this: Observable<T>, bufferSize: number, startBufferEvery: number = null): Observable<T[]> {
48-
return this.lift(new BufferCountOperator<T>(bufferSize, startBufferEvery));
49-
}
50-
51-
class BufferCountOperator<T> implements Operator<T, T[]> {
52-
private subscriberClass: any;
53-
54-
constructor(private bufferSize: number, private startBufferEvery: number) {
55-
if (!startBufferEvery || bufferSize === startBufferEvery) {
56-
this.subscriberClass = BufferCountSubscriber;
57-
} else {
58-
this.subscriberClass = BufferSkipCountSubscriber;
59-
}
60-
}
61-
62-
call(subscriber: Subscriber<T[]>, source: any): TeardownLogic {
63-
return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery));
64-
}
65-
}
66-
67-
/**
68-
* We need this JSDoc comment for affecting ESDoc.
69-
* @ignore
70-
* @extends {Ignored}
71-
*/
72-
class BufferCountSubscriber<T> extends Subscriber<T> {
73-
private buffer: T[] = [];
74-
75-
constructor(destination: Subscriber<T[]>, private bufferSize: number) {
76-
super(destination);
77-
}
78-
79-
protected _next(value: T): void {
80-
const buffer = this.buffer;
81-
82-
buffer.push(value);
83-
84-
if (buffer.length == this.bufferSize) {
85-
this.destination.next(buffer);
86-
this.buffer = [];
87-
}
88-
}
89-
90-
protected _complete(): void {
91-
const buffer = this.buffer;
92-
if (buffer.length > 0) {
93-
this.destination.next(buffer);
94-
}
95-
super._complete();
96-
}
97-
}
98-
99-
/**
100-
* We need this JSDoc comment for affecting ESDoc.
101-
* @ignore
102-
* @extends {Ignored}
103-
*/
104-
class BufferSkipCountSubscriber<T> extends Subscriber<T> {
105-
private buffers: Array<T[]> = [];
106-
private count: number = 0;
107-
108-
constructor(destination: Subscriber<T[]>, private bufferSize: number, private startBufferEvery: number) {
109-
super(destination);
110-
}
111-
112-
protected _next(value: T): void {
113-
const { bufferSize, startBufferEvery, buffers, count } = this;
114-
115-
this.count++;
116-
if (count % startBufferEvery === 0) {
117-
buffers.push([]);
118-
}
119-
120-
for (let i = buffers.length; i--; ) {
121-
const buffer = buffers[i];
122-
buffer.push(value);
123-
if (buffer.length === bufferSize) {
124-
buffers.splice(i, 1);
125-
this.destination.next(buffer);
126-
}
127-
}
128-
}
129-
130-
protected _complete(): void {
131-
const { buffers, destination } = this;
132-
133-
while (buffers.length > 0) {
134-
let buffer = buffers.shift();
135-
if (buffer.length > 0) {
136-
destination.next(buffer);
137-
}
138-
}
139-
super._complete();
140-
}
141-
47+
return higherOrder(bufferSize, startBufferEvery)(this);
14248
}

src/operators/bufferCount.ts

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import { Operator } from '../Operator';
2+
import { Subscriber } from '../Subscriber';
3+
import { Observable } from '../Observable';
4+
import { TeardownLogic } from '../Subscription';
5+
import { OperatorFunction } from '../interfaces';
6+
7+
/**
8+
* Buffers the source Observable values until the size hits the maximum
9+
* `bufferSize` given.
10+
*
11+
* <span class="informal">Collects values from the past as an array, and emits
12+
* that array only when its size reaches `bufferSize`.</span>
13+
*
14+
* <img src="./img/bufferCount.png" width="100%">
15+
*
16+
* Buffers a number of values from the source Observable by `bufferSize` then
17+
* emits the buffer and clears it, and starts a new buffer each
18+
* `startBufferEvery` values. If `startBufferEvery` is not provided or is
19+
* `null`, then new buffers are started immediately at the start of the source
20+
* and when each buffer closes and is emitted.
21+
*
22+
* @example <caption>Emit the last two click events as an array</caption>
23+
* var clicks = Rx.Observable.fromEvent(document, 'click');
24+
* var buffered = clicks.bufferCount(2);
25+
* buffered.subscribe(x => console.log(x));
26+
*
27+
* @example <caption>On every click, emit the last two click events as an array</caption>
28+
* var clicks = Rx.Observable.fromEvent(document, 'click');
29+
* var buffered = clicks.bufferCount(2, 1);
30+
* buffered.subscribe(x => console.log(x));
31+
*
32+
* @see {@link buffer}
33+
* @see {@link bufferTime}
34+
* @see {@link bufferToggle}
35+
* @see {@link bufferWhen}
36+
* @see {@link pairwise}
37+
* @see {@link windowCount}
38+
*
39+
* @param {number} bufferSize The maximum size of the buffer emitted.
40+
* @param {number} [startBufferEvery] Interval at which to start a new buffer.
41+
* For example if `startBufferEvery` is `2`, then a new buffer will be started
42+
* on every other value from the source. A new buffer is started at the
43+
* beginning of the source by default.
44+
* @return {Observable<T[]>} An Observable of arrays of buffered values.
45+
* @method bufferCount
46+
* @owner Observable
47+
*/
48+
export function bufferCount<T>(bufferSize: number, startBufferEvery: number = null): OperatorFunction<T, T[]> {
49+
return function bufferCountOperatorFunction(source: Observable<T>) {
50+
return source.lift(new BufferCountOperator<T>(bufferSize, startBufferEvery));
51+
};
52+
}
53+
54+
class BufferCountOperator<T> implements Operator<T, T[]> {
55+
private subscriberClass: any;
56+
57+
constructor(private bufferSize: number, private startBufferEvery: number) {
58+
if (!startBufferEvery || bufferSize === startBufferEvery) {
59+
this.subscriberClass = BufferCountSubscriber;
60+
} else {
61+
this.subscriberClass = BufferSkipCountSubscriber;
62+
}
63+
}
64+
65+
call(subscriber: Subscriber<T[]>, source: any): TeardownLogic {
66+
return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery));
67+
}
68+
}
69+
70+
/**
71+
* We need this JSDoc comment for affecting ESDoc.
72+
* @ignore
73+
* @extends {Ignored}
74+
*/
75+
class BufferCountSubscriber<T> extends Subscriber<T> {
76+
private buffer: T[] = [];
77+
78+
constructor(destination: Subscriber<T[]>, private bufferSize: number) {
79+
super(destination);
80+
}
81+
82+
protected _next(value: T): void {
83+
const buffer = this.buffer;
84+
85+
buffer.push(value);
86+
87+
if (buffer.length == this.bufferSize) {
88+
this.destination.next(buffer);
89+
this.buffer = [];
90+
}
91+
}
92+
93+
protected _complete(): void {
94+
const buffer = this.buffer;
95+
if (buffer.length > 0) {
96+
this.destination.next(buffer);
97+
}
98+
super._complete();
99+
}
100+
}
101+
102+
/**
103+
* We need this JSDoc comment for affecting ESDoc.
104+
* @ignore
105+
* @extends {Ignored}
106+
*/
107+
class BufferSkipCountSubscriber<T> extends Subscriber<T> {
108+
private buffers: Array<T[]> = [];
109+
private count: number = 0;
110+
111+
constructor(destination: Subscriber<T[]>, private bufferSize: number, private startBufferEvery: number) {
112+
super(destination);
113+
}
114+
115+
protected _next(value: T): void {
116+
const { bufferSize, startBufferEvery, buffers, count } = this;
117+
118+
this.count++;
119+
if (count % startBufferEvery === 0) {
120+
buffers.push([]);
121+
}
122+
123+
for (let i = buffers.length; i--; ) {
124+
const buffer = buffers[i];
125+
buffer.push(value);
126+
if (buffer.length === bufferSize) {
127+
buffers.splice(i, 1);
128+
this.destination.next(buffer);
129+
}
130+
}
131+
}
132+
133+
protected _complete(): void {
134+
const { buffers, destination } = this;
135+
136+
while (buffers.length > 0) {
137+
let buffer = buffers.shift();
138+
if (buffer.length > 0) {
139+
destination.next(buffer);
140+
}
141+
}
142+
super._complete();
143+
}
144+
145+
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export { audit } from './audit';
22
export { auditTime } from './auditTime';
33
export { buffer } from './buffer';
4+
export { bufferCount } from './bufferCount';
45
export { catchError } from './catchError';
56
export { concat } from './concat';
67
export { concatAll } from './concatAll';

0 commit comments

Comments
 (0)