Skip to content

Commit

Permalink
fix(buffer): subscribe to source and closingNotifier in proper order
Browse files Browse the repository at this point in the history
In buffer operator subscribe to source observable first, so that when
closingNotifier emits value, all source values emited before land in buffer

Closes #1610
  • Loading branch information
mpodlasin committed Dec 14, 2016
1 parent 5f93f81 commit 5f3c1bd
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
13 changes: 13 additions & 0 deletions spec/operators/buffer-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,17 @@ describe('Observable.prototype.buffer', () => {
expectObservable(a.buffer(b).take(1)).toBe(expected, expectedValues);
expectSubscriptions(b.subscriptions).toBe(bsubs);
});

it('should work with filtered source as closingNotifier', () => {
const values = {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8};

const source = hot('-0-1-2-3-4-5-6-7-8-|', values);
const expected = '-a---b---c---d---e-|';

const expectedValues = {a: [0], b: [1, 2], c: [3, 4], d: [5, 6], e: [7, 8]};
const filteredSource = source.filter(x => x % 2 === 0);

const result = source.buffer(filteredSource);
expectObservable(result).toBe(expected, expectedValues);
});
});
13 changes: 10 additions & 3 deletions src/operator/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { TeardownLogic } from '../Subscription';
import { Observable } from '../Observable';

import { OuterSubscriber } from '../OuterSubscriber';
Expand Down Expand Up @@ -47,8 +48,11 @@ class BufferOperator<T> implements Operator<T, T[]> {
constructor(private closingNotifier: Observable<any>) {
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier));
call(subscriber: Subscriber<T[]>, source: any): TeardownLogic {
const bufferSubscriber = new BufferSubscriber(subscriber);
const subscription = source.subscribe(bufferSubscriber);
bufferSubscriber.subscribeToClosingNotifier(this.closingNotifier);
return subscription;
}
}

Expand All @@ -60,8 +64,11 @@ class BufferOperator<T> implements Operator<T, T[]> {
class BufferSubscriber<T> extends OuterSubscriber<T, any> {
private buffer: T[] = [];

constructor(destination: Subscriber<T[]>, closingNotifier: Observable<any>) {
constructor(destination: Subscriber<T[]>) {
super(destination);
}

subscribeToClosingNotifier(closingNotifier: Observable<any>) {
this.add(subscribeToResult(this, closingNotifier));
}

Expand Down

0 comments on commit 5f3c1bd

Please sign in to comment.