diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 80ac9a6a0f..243be1b852 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -49,7 +49,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = EMPTY; - const expected = '|'; + const expected = ' --------------------------------|'; expectObservable(a.pipe(buffer(b))).toBe(expected); }); }); @@ -67,7 +67,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = NEVER; const b = EMPTY; - const expected = '|'; + const expected = '-'; expectObservable(a.pipe(buffer(b))).toBe(expected); }); }); @@ -139,9 +139,9 @@ describe('Observable.prototype.buffer', () => { // Buffshoulder Boundaries onCompletedBoundaries (RxJS 4) testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); - const subs = ' ^----------------! '; + const subs = ' ^-------------------------------!'; const b = hot('--------^--a-------b---cd| '); - const expected = ' ---a-------b---cd| '; + const expected = ' ---a-------b---cd---------------|'; const expectedValues = { a: ['3'], b: ['4', '5'], diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 56e4015ce0..a9deb9ae81 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -1,6 +1,7 @@ import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; import { operate } from '../util/lift'; +import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; /** @@ -50,12 +51,19 @@ export function buffer(closingNotifier: Observable): OperatorFunction { - // Start a new buffer and emit the previous one. - const b = currentBuffer; - currentBuffer = []; - subscriber.next(b); - }) + new OperatorSubscriber( + subscriber, + () => { + // Start a new buffer and emit the previous one. + const b = currentBuffer; + currentBuffer = []; + subscriber.next(b); + }, + // Pass all errors to the consumer. + undefined, + // Closing notifier should not complete the resulting observable. + noop + ) ); return () => {