Skip to content

Commit

Permalink
fix(buffer): emit last buffer on close
Browse files Browse the repository at this point in the history
Resolves #3990
  • Loading branch information
benlesh committed Sep 26, 2018
1 parent 7f00e90 commit 61b1767
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 51 deletions.
35 changes: 19 additions & 16 deletions src/internal/operators/buffer-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptionsTo }) => {
const a = hot('-a-b-c-d-e-f-g-h-i-|');
const b = hot('-----B-----B-----B-|');
const expected = '-----x-----y-----z-|';
const expected = '-----x-----y-----z-(a|)';
const expectedValues = {
x: ['a', 'b', 'c'],
y: ['d', 'e', 'f'],
z: ['g', 'h', 'i']
z: ['g', 'h', 'i'],
a: [] as string[],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
Expand All @@ -32,26 +33,26 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptionsTo }) => {
const a = EMPTY;
const b = EMPTY;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(x|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { x: [] });
});
});

it('should work with empty and non-empty selector', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptionsTo }) => {
const a = EMPTY;
const b = hot('-----a-----');
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(x|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { x: [] });
})
});

it('should work with non-empty and empty selector', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptionsTo }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const b = EMPTY;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(x|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { x: [] });
});
});

Expand All @@ -68,17 +69,17 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptionsTo }) => {
const a = NEVER;
const b = EMPTY;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(x|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { x: [] });
});
});

it('should work with empty and never selector', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptionsTo }) => {
const a = EMPTY;
const b = NEVER;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(x|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { x: [] });
});
});

Expand Down Expand Up @@ -123,14 +124,15 @@ describe('Observable.prototype.buffer', () => {
// Buffer Boundaries Simple (RxJS 4)
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const b = hot('--------^--a-------b---cd---------e---f---|');
const expected = '---a-------b---cd---------e---f-|';
const expected = '---a-------b---cd---------e---f-(x|)';
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6'],
d: [] as string[],
e: ['7', '8', '9'],
f: ['0']
f: ['0'],
x: [] as string[],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
Expand All @@ -142,12 +144,13 @@ describe('Observable.prototype.buffer', () => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const subs = '^ ! ';
const b = hot('--------^--a-------b---cd| ');
const expected = '---a-------b---cd| ';
const expected = '---a-------b---cd(x|) ';
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6'],
d: [] as string[]
d: [] as string[],
x: [] as string[],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
expectSubscriptionsTo(a).toBe(subs);
Expand Down
38 changes: 8 additions & 30 deletions src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Observable } from '../Observable';
import { Operation, Sink, FOType, SinkArg } from '../types';
import { Operation } from '../types';
import { pipe } from '../util/pipe';
import { lift } from 'rxjs/internal/util/lift';
import { Subscription } from '../Subscription';
import { window } from './window';
import { mergeMap } from './mergeMap';
import { toArray } from './derived/toArray';

/**
* Buffers the source Observable values until `closingNotifier` emits.
Expand Down Expand Up @@ -42,31 +43,8 @@ import { Subscription } from '../Subscription';
* @owner Observable
*/
export function buffer<T>(closingNotifier: Observable<any>): Operation<T, T[]> {
return lift((source: Observable<T>, dest: Sink<T[]>, subs: Subscription) => {
const closingSubs = new Subscription();
let buffer: T[] = [];

subs.add(closingSubs);
closingNotifier(FOType.SUBSCRIBE, (t: FOType, v: SinkArg<any>, closingSubs: Subscription) => {
if (t === FOType.NEXT) {
const copy = buffer.slice();
buffer = [];
dest(FOType.NEXT, copy, subs);
} else {
dest(t, v, subs);
subs.unsubscribe();
}
}, closingSubs);

if (!subs.closed) {
source(FOType.SUBSCRIBE, (t: FOType, v: SinkArg<T>, subs: Subscription) => {
if (t === FOType.NEXT) {
buffer.push(v);
} else {
dest(t, v, subs);
subs.unsubscribe();
}
}, subs);
}
});
return pipe(
window(closingNotifier),
mergeMap(toArray()),
);
}
9 changes: 4 additions & 5 deletions src/internal/operators/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,16 @@ export function mergeMap<T, R>(

source(FOType.SUBSCRIBE, (t: FOType, v: SinkArg<T>) => {
switch (t) {
case FOType.SUBSCRIBE:
subs = v;
break;
case FOType.NEXT:
let outerIndex = counter++;
buffer.push({ outerValue: v, outerIndex });
startNextInner();
break;
case FOType.ERROR:
dest(FOType.ERROR, v, subs);
subs.unsubscribe();
if (!subs.closed) {
dest(FOType.ERROR, v, subs);
subs.unsubscribe();
}
break;
case FOType.COMPLETE:
outerComplete = true;
Expand Down

0 comments on commit 61b1767

Please sign in to comment.