Skip to content

Commit

Permalink
fix(bufferWhen): don't signal on complete
Browse files Browse the repository at this point in the history
BREAKING CHANGE: the observable returned by the bufferWhen operator's
closing selector must emit a next notification to close the buffer.
Complete notifications no longer close the buffer.
  • Loading branch information
cartant authored and benlesh committed Nov 3, 2020
1 parent c919c68 commit a2ba364
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 12 deletions.
18 changes: 8 additions & 10 deletions spec/operators/bufferWhen-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe('bufferWhen operator', () => {
});
});

it('should emit buffers using varying empty delayed closings', () => {
it('should not emit buffers using varying empty delayed closings', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const subs = ' ^----------------------------------! ';
Expand All @@ -98,14 +98,12 @@ describe('bufferWhen operator', () => {
];
const closeSubs = [
' ^--------------! ',
' ---------------^---------! ',
' -------------------------^---------! '
' ',
' ',
];
const expected = ' ---------------x---------y---------(z|)';
const expected = ' -----------------------------------(x|)';
const values = {
x: ['b', 'c', 'd'],
y: ['e', 'f', 'g'],
z: ['h']
x: ['b', 'c', 'd', 'e', 'f', 'g', 'h']
};

let i = 0;
Expand Down Expand Up @@ -359,13 +357,13 @@ describe('bufferWhen operator', () => {
});
});

// bufferWhen is not supposed to handle a factory that returns always empty
// bufferWhen is not supposed to handle a factory that returns always sync
// closing Observables, because doing such would constantly recreate a new
// buffer in a synchronous infinite loop until the stack overflows. This also
// happens with buffer in RxJS 4.
it('should NOT handle hot inner empty', (done: MochaDone) => {
it('should NOT handle synchronous inner', (done: MochaDone) => {
const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9);
const closing = EMPTY;
const closing = of(1);
const TOO_MANY_INVOCATIONS = 30;

source.pipe(
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { Subscriber } from '../Subscriber';
import { ObservableInput, OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { noop } from '../util/noop';
import { OperatorSubscriber } from './OperatorSubscriber';
import { innerFrom } from '../observable/from';

Expand Down Expand Up @@ -68,8 +69,7 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
b && subscriber.next(b);

// Get a new closing notifier and subscribe to it.
// TODO: We probably want to stop counting `completion` as a notification here.
innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, undefined, openBuffer)));
innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, undefined, noop)));
};

// Start the first buffer.
Expand Down

0 comments on commit a2ba364

Please sign in to comment.