Skip to content

Commit

Permalink
fix(bufferCount): will behave as expected when startBufferEvery is …
Browse files Browse the repository at this point in the history
…less than `bufferSize`

- fixed issue where internal `buffers` store was keeping an additional buffer for no good reason
- improved logic and performance around updating internal `buffers` list
- adds a test to ensure proper behavior

fixes #2062
  • Loading branch information
benlesh committed Oct 25, 2016
1 parent 260d335 commit 1787b6e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
21 changes: 21 additions & 0 deletions spec/operators/bufferCount-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as Rx from '../../dist/cjs/Rx';
import { expect } from 'chai';
declare const {hot, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;
Expand Down Expand Up @@ -31,6 +32,26 @@ describe('Observable.prototype.bufferCount', () => {
expectObservable(e1.bufferCount(2)).toBe(expected, values);
});

it('should buffer properly (issue #2062)', () => {
const item$ = new Rx.Subject();
const results = [];
item$
.bufferCount(3, 1)
.subscribe(value => {
results.push(value);

if (value.join() === '1,2,3') {
item$.next(4);
}
});

item$.next(1);
item$.next(2);
item$.next(3);

expect(results).to.deep.equal([[1, 2, 3], [2, 3, 4]]);
});

it('should emit partial buffers if source completes before reaching specified buffer count', () => {
const e1 = hot('--a--b--c--d--|');
const expected = '--------------(x|)';
Expand Down
22 changes: 7 additions & 15 deletions src/operator/bufferCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,38 +62,30 @@ class BufferCountOperator<T> implements Operator<T, T[]> {
* @extends {Ignored}
*/
class BufferCountSubscriber<T> extends Subscriber<T> {
private buffers: Array<T[]> = [[]];
private buffers: Array<T[]> = [];
private count: number = 0;

constructor(destination: Subscriber<T[]>, private bufferSize: number, private startBufferEvery: number) {
super(destination);
}

protected _next(value: T) {
const count = (this.count += 1);
const destination = this.destination;
const bufferSize = this.bufferSize;
const startBufferEvery = (this.startBufferEvery == null) ? bufferSize : this.startBufferEvery;
const buffers = this.buffers;
const len = buffers.length;
let remove = -1;
const count = this.count++;
const { destination, bufferSize, startBufferEvery, buffers } = this;
const startOn = (startBufferEvery == null) ? bufferSize : startBufferEvery;

if (count % startBufferEvery === 0) {
if (count % startOn === 0) {
buffers.push([]);
}

for (let i = 0; i < len; i++) {
for (let i = buffers.length; i--; ) {
const buffer = buffers[i];
buffer.push(value);
if (buffer.length === bufferSize) {
remove = i;
buffers.splice(i, 1);
destination.next(buffer);
}
}

if (remove !== -1) {
buffers.splice(remove, 1);
}
}

protected _complete() {
Expand Down

0 comments on commit 1787b6e

Please sign in to comment.