Skip to content

Commit

Permalink
fix(buffer): change behavior of buffer to more closely match RxJS 4
Browse files Browse the repository at this point in the history
The behavior of buffer operator was different from RxJS 4. This patch changes the behavior of buffer and adds tests from RxJS 4 and tests with never, empty and error.
  • Loading branch information
adrianomelo authored and benlesh committed Oct 6, 2015
1 parent 3d833f3 commit b66592d
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 28 deletions.
112 changes: 106 additions & 6 deletions spec/operators/buffer-spec.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,112 @@
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.buffer', function () {
it('should work with empty and empty selector', function () {
var a = Observable.empty();
var b = Observable.empty();
var expected = '|';
expectObservable(a.buffer(b)).toBe(expected);
});
it('should work with empty and non-empty selector', function () {
var a = Observable.empty();
var b = hot('-----a-----');
var expected = '|';
expectObservable(a.buffer(b)).toBe(expected);
});
it('should work with non-empty and empty selector', function () {
var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
var b = Observable.empty();
var expected = '|';
expectObservable(a.buffer(b)).toBe(expected);
});
it('should work with never and never selector', function () {
var a = Observable.never();
var b = Observable.never();
var expected = '-';
expectObservable(a.buffer(b)).toBe(expected);
});
it('should work with never and empty selector', function () {
var a = Observable.never();
var b = Observable.empty();
var expected = '|';
expectObservable(a.buffer(b)).toBe(expected);
});
it('should work with empty and never selector', function () {
var a = Observable.empty();
var b = Observable.never();
var expected = '|';
expectObservable(a.buffer(b)).toBe(expected);
});
it('should emit buffers that close and reopen', function () {
var e1 = hot('-a-b-c-d-e-f-g-h-i-|');
var expected = '-----x-----y-----z-|';
var interval = hot('-----1-----2-----3-|');

expectObservable(e1.buffer(interval)).toBe(expected, {x: ['a','b','c'], y: ['d','e','f'], z: ['g','h','i']});
var a = hot('-a-b-c-d-e-f-g-h-i-|');
var b = hot('-----1-----2-----3-|');
var expected = '-----x-----y-----z-|';
expectObservable(a.buffer(b)).toBe(expected, {x: ['a','b','c'], y: ['d','e','f'], z: ['g','h','i']});
});
it('should work with non-empty and throw selector', function () {
var a = hot('---^--a--');
var b = Observable.throw(new Error('too bad'));
var expected = '#';
expectObservable(a.buffer(b)).toBe(expected, null, new Error('too bad'));
});
it('should work with throw and non-empty selector', function () {
var a = Observable.throw(new Error('too bad'));
var b = hot('---^--a--');
var expected = '#';
expectObservable(a.buffer(b)).toBe(expected, null, new Error('too bad'));
});
it('should work with error', function () {
var a = hot('---^-------#', null, new Error('too bad'));
var b = hot('---^--------')
var expected = '--------#';
expectObservable(a.buffer(b)).toBe(expected, null, new Error('too bad'));
});
it('should work with error and non-empty selector', function () {
var a = hot('---^-------#', null, new Error('too bad'));
var b = hot('---^---a----')
var expected = '----a---#';
expectObservable(a.buffer(b)).toBe(expected, { a: [] }, new Error('too bad'));
});
it('should work with selector', function () {
// Buffer Boundaries Simple (RxJS 4)
var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
var b = hot('--------^--a-------b---cd---------e---f---|')
var expected = '---a-------b---cd---------e---f-|';
expectObservable(a.buffer(b)).toBe(expected,
{ a: ['3'], b: ['4', '5'], c: ['6'], d: [], e: ['7', '8', '9'], f: ['0'] });
});
it('should work with selector completed', function () {
// Buffer Boundaries onCompletedBoundaries (RxJS 4)
var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
var b = hot('--------^--a-------b---cd|')
var expected = '---a-------b---cd|';
expectObservable(a.buffer(b)).toBe(expected,
{ a: ['3'], b: ['4', '5'], c: ['6'], d: [] });
});
it('should work with non-empty and selector error', function () {
// Buffer Boundaries onErrorSource (RxJS 4)
var a = hot('--1--2--^--3-----#', {'3': 3}, new Error('too bad'));
var b = hot('--------^--a--b---')
var expected = '---a--b--#';
expectObservable(a.buffer(b)).toBe(expected,
{ a: [3], b: [] }, new Error('too bad'));
});
it('should work with non-empty and empty selector error', function () {
var obj = { a: true, b: true, c: true };
var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
var b = hot('--------^----------------#', null, new Error('too bad'))
var expected = '-----------------#';
expectObservable(a.buffer(b)).toBe(expected, null, new Error('too bad'));
});
it('should work with non-empty and selector error', function () {
// Buffer Boundaries onErrorBoundaries (RxJS 4)
var obj = { a: true, b: true, c: true };
var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
var b = hot('--------^--a-------b---c-#', obj, new Error('too bad'))
var expected = '---a-------b---c-#';
expectObservable(a.buffer(b)).toBe(expected,
{ a: ['3'], b: ['4', '5'], c: ['6'] }, new Error('too bad'));
});
});
});
38 changes: 16 additions & 22 deletions src/operators/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

/**
* buffers the incoming observable values until the passed `closingNotifier`
* emits a value, at which point it emits the buffer on the returned observable
* and starts a new buffer internally, awaiting the next time `closingNotifier`
* emits.
*
* @param {Observable<any>} closingNotifier an observable, that signals the buffer
* to be emitted from the returned observable
* buffers the incoming observable values until the passed `closingNotifier` emits a value, at which point
* it emits the buffer on the returned observable and starts a new buffer internally, awaiting the
* next time `closingNotifier` emits
*
* @param {Observable<any>} closingNotifier an observable, that signals the buffer to be emitted from the returned observable
* @returns {Observable<T[]>} an observable of buffers, which are arrays of values
*/
export default function buffer<T>(closingNotifier: Observable<any>): Observable<T[]> {
Expand All @@ -33,49 +31,45 @@ class BufferOperator<T, R> implements Operator<T, R> {

class BufferSubscriber<T> extends Subscriber<T> {
buffer: T[] = [];

constructor(destination: Subscriber<T>, closingNotifier: Observable<any>) {
super(destination);
this.add(closingNotifier._subscribe(new BufferClosingNotifierSubscriber(this)));
}

_next(value: T) {
this.buffer.push(value);
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
this.flushBuffer();
this.destination.complete();
}

flushBuffer() {
const buffer = this.buffer;
this.buffer = [];

if (buffer.length > 0) {
this.destination.next(buffer);
}
this.destination.next(buffer);
}
}

class BufferClosingNotifierSubscriber<T> extends Subscriber<T> {
constructor(private parent: BufferSubscriber<any>) {
super(null);
}

_next(value: T) {
this.parent.flushBuffer();
}

_error(err: any) {
this.parent.error(err);
}

_complete() {
// noop
this.parent.complete();
}
}
}

0 comments on commit b66592d

Please sign in to comment.