From b66592d2d8ed62462b582b193107f84101d9a853 Mon Sep 17 00:00:00 2001 From: Adriano Melo Date: Fri, 2 Oct 2015 23:32:38 -0300 Subject: [PATCH] fix(buffer): change behavior of buffer to more closely match RxJS 4 The behavior of buffer operator was different from RxJS 4. This patch changes the behavior of buffer and adds tests from RxJS 4 and tests with never, empty and error. --- spec/operators/buffer-spec.js | 112 ++++++++++++++++++++++++++++++++-- src/operators/buffer.ts | 38 +++++------- 2 files changed, 122 insertions(+), 28 deletions(-) diff --git a/spec/operators/buffer-spec.js b/spec/operators/buffer-spec.js index f500d93900..1be480ca0a 100644 --- a/spec/operators/buffer-spec.js +++ b/spec/operators/buffer-spec.js @@ -1,12 +1,112 @@ /* globals describe, it, expect, expectObservable, hot */ var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; describe('Observable.prototype.buffer', function () { + it('should work with empty and empty selector', function () { + var a = Observable.empty(); + var b = Observable.empty(); + var expected = '|'; + expectObservable(a.buffer(b)).toBe(expected); + }); + it('should work with empty and non-empty selector', function () { + var a = Observable.empty(); + var b = hot('-----a-----'); + var expected = '|'; + expectObservable(a.buffer(b)).toBe(expected); + }); + it('should work with non-empty and empty selector', function () { + var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); + var b = Observable.empty(); + var expected = '|'; + expectObservable(a.buffer(b)).toBe(expected); + }); + it('should work with never and never selector', function () { + var a = Observable.never(); + var b = Observable.never(); + var expected = '-'; + expectObservable(a.buffer(b)).toBe(expected); + }); + it('should work with never and empty selector', function () { + var a = Observable.never(); + var b = Observable.empty(); + var expected = '|'; + expectObservable(a.buffer(b)).toBe(expected); + }); + it('should work with empty and never selector', function () { + var a = Observable.empty(); + var b = Observable.never(); + var expected = '|'; + expectObservable(a.buffer(b)).toBe(expected); + }); it('should emit buffers that close and reopen', function () { - var e1 = hot('-a-b-c-d-e-f-g-h-i-|'); - var expected = '-----x-----y-----z-|'; - var interval = hot('-----1-----2-----3-|'); - - expectObservable(e1.buffer(interval)).toBe(expected, {x: ['a','b','c'], y: ['d','e','f'], z: ['g','h','i']}); + var a = hot('-a-b-c-d-e-f-g-h-i-|'); + var b = hot('-----1-----2-----3-|'); + var expected = '-----x-----y-----z-|'; + expectObservable(a.buffer(b)).toBe(expected, {x: ['a','b','c'], y: ['d','e','f'], z: ['g','h','i']}); + }); + it('should work with non-empty and throw selector', function () { + var a = hot('---^--a--'); + var b = Observable.throw(new Error('too bad')); + var expected = '#'; + expectObservable(a.buffer(b)).toBe(expected, null, new Error('too bad')); + }); + it('should work with throw and non-empty selector', function () { + var a = Observable.throw(new Error('too bad')); + var b = hot('---^--a--'); + var expected = '#'; + expectObservable(a.buffer(b)).toBe(expected, null, new Error('too bad')); + }); + it('should work with error', function () { + var a = hot('---^-------#', null, new Error('too bad')); + var b = hot('---^--------') + var expected = '--------#'; + expectObservable(a.buffer(b)).toBe(expected, null, new Error('too bad')); + }); + it('should work with error and non-empty selector', function () { + var a = hot('---^-------#', null, new Error('too bad')); + var b = hot('---^---a----') + var expected = '----a---#'; + expectObservable(a.buffer(b)).toBe(expected, { a: [] }, new Error('too bad')); + }); + it('should work with selector', function () { + // Buffer Boundaries Simple (RxJS 4) + var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); + var b = hot('--------^--a-------b---cd---------e---f---|') + var expected = '---a-------b---cd---------e---f-|'; + expectObservable(a.buffer(b)).toBe(expected, + { a: ['3'], b: ['4', '5'], c: ['6'], d: [], e: ['7', '8', '9'], f: ['0'] }); + }); + it('should work with selector completed', function () { + // Buffer Boundaries onCompletedBoundaries (RxJS 4) + var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); + var b = hot('--------^--a-------b---cd|') + var expected = '---a-------b---cd|'; + expectObservable(a.buffer(b)).toBe(expected, + { a: ['3'], b: ['4', '5'], c: ['6'], d: [] }); + }); + it('should work with non-empty and selector error', function () { + // Buffer Boundaries onErrorSource (RxJS 4) + var a = hot('--1--2--^--3-----#', {'3': 3}, new Error('too bad')); + var b = hot('--------^--a--b---') + var expected = '---a--b--#'; + expectObservable(a.buffer(b)).toBe(expected, + { a: [3], b: [] }, new Error('too bad')); + }); + it('should work with non-empty and empty selector error', function () { + var obj = { a: true, b: true, c: true }; + var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); + var b = hot('--------^----------------#', null, new Error('too bad')) + var expected = '-----------------#'; + expectObservable(a.buffer(b)).toBe(expected, null, new Error('too bad')); + }); + it('should work with non-empty and selector error', function () { + // Buffer Boundaries onErrorBoundaries (RxJS 4) + var obj = { a: true, b: true, c: true }; + var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); + var b = hot('--------^--a-------b---c-#', obj, new Error('too bad')) + var expected = '---a-------b---c-#'; + expectObservable(a.buffer(b)).toBe(expected, + { a: ['3'], b: ['4', '5'], c: ['6'] }, new Error('too bad')); }); -}); \ No newline at end of file +}); diff --git a/src/operators/buffer.ts b/src/operators/buffer.ts index 7af6c63050..bc0d735bf8 100644 --- a/src/operators/buffer.ts +++ b/src/operators/buffer.ts @@ -8,13 +8,11 @@ import {errorObject} from '../util/errorObject'; import bindCallback from '../util/bindCallback'; /** - * buffers the incoming observable values until the passed `closingNotifier` - * emits a value, at which point it emits the buffer on the returned observable - * and starts a new buffer internally, awaiting the next time `closingNotifier` - * emits. - * - * @param {Observable} closingNotifier an observable, that signals the buffer - * to be emitted from the returned observable + * buffers the incoming observable values until the passed `closingNotifier` emits a value, at which point + * it emits the buffer on the returned observable and starts a new buffer internally, awaiting the + * next time `closingNotifier` emits + * + * @param {Observable} closingNotifier an observable, that signals the buffer to be emitted from the returned observable * @returns {Observable} an observable of buffers, which are arrays of values */ export default function buffer(closingNotifier: Observable): Observable { @@ -33,32 +31,28 @@ class BufferOperator implements Operator { class BufferSubscriber extends Subscriber { buffer: T[] = []; - + constructor(destination: Subscriber, closingNotifier: Observable) { super(destination); this.add(closingNotifier._subscribe(new BufferClosingNotifierSubscriber(this))); } - + _next(value: T) { this.buffer.push(value); } - + _error(err: any) { this.destination.error(err); } - + _complete() { - this.flushBuffer(); this.destination.complete(); } - + flushBuffer() { const buffer = this.buffer; this.buffer = []; - - if (buffer.length > 0) { - this.destination.next(buffer); - } + this.destination.next(buffer); } } @@ -66,16 +60,16 @@ class BufferClosingNotifierSubscriber extends Subscriber { constructor(private parent: BufferSubscriber) { super(null); } - + _next(value: T) { this.parent.flushBuffer(); } - + _error(err: any) { this.parent.error(err); } - + _complete() { - // noop + this.parent.complete(); } -} \ No newline at end of file +}