From 5f3c1bdb11e8f199d1e25870f6bae0c6121c849a Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Wed, 14 Dec 2016 18:27:33 +0100 Subject: [PATCH] fix(buffer): subscribe to source and closingNotifier in proper order In buffer operator subscribe to source observable first, so that when closingNotifier emits value, all source values emited before land in buffer Closes #1610 --- spec/operators/buffer-spec.ts | 13 +++++++++++++ src/operator/buffer.ts | 13 ++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 6912aaf7b2..b6d3bfebe0 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -219,4 +219,17 @@ describe('Observable.prototype.buffer', () => { expectObservable(a.buffer(b).take(1)).toBe(expected, expectedValues); expectSubscriptions(b.subscriptions).toBe(bsubs); }); + + it('should work with filtered source as closingNotifier', () => { + const values = {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8}; + + const source = hot('-0-1-2-3-4-5-6-7-8-|', values); + const expected = '-a---b---c---d---e-|'; + + const expectedValues = {a: [0], b: [1, 2], c: [3, 4], d: [5, 6], e: [7, 8]}; + const filteredSource = source.filter(x => x % 2 === 0); + + const result = source.buffer(filteredSource); + expectObservable(result).toBe(expected, expectedValues); + }); }); diff --git a/src/operator/buffer.ts b/src/operator/buffer.ts index ca43bec9f2..e90aeb2dc1 100644 --- a/src/operator/buffer.ts +++ b/src/operator/buffer.ts @@ -1,5 +1,6 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { TeardownLogic } from '../Subscription'; import { Observable } from '../Observable'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -47,8 +48,11 @@ class BufferOperator implements Operator { constructor(private closingNotifier: Observable) { } - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier)); + call(subscriber: Subscriber, source: any): TeardownLogic { + const bufferSubscriber = new BufferSubscriber(subscriber); + const subscription = source.subscribe(bufferSubscriber); + bufferSubscriber.subscribeToClosingNotifier(this.closingNotifier); + return subscription; } } @@ -60,8 +64,11 @@ class BufferOperator implements Operator { class BufferSubscriber extends OuterSubscriber { private buffer: T[] = []; - constructor(destination: Subscriber, closingNotifier: Observable) { + constructor(destination: Subscriber) { super(destination); + } + + subscribeToClosingNotifier(closingNotifier: Observable) { this.add(subscribeToResult(this, closingNotifier)); }