From 43b63cc841c5febca238faa74ff4e8ea13897ef5 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 14 Sep 2015 21:37:54 -0700 Subject: [PATCH] fix(mergeAll): merge all will properly handle async observables --- spec/operators/merge-all-spec.js | 8 ++++ src/operators/mergeAll.ts | 68 ++++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/spec/operators/merge-all-spec.js b/spec/operators/merge-all-spec.js index 16e808a57f..2a1813377e 100644 --- a/spec/operators/merge-all-spec.js +++ b/spec/operators/merge-all-spec.js @@ -33,4 +33,12 @@ describe('mergeAll', function () { done(); }); }); + + it('should handle merging a hot observable of observables', function (){ + var x = cold( 'a---b---c---|'); + var y = cold( 'd---e---f---|'); + var e1 = hot('--x--y--|', { x: x, y: y }); + var expected = '--a--db--ec--f---|'; + expectObservable(e1.mergeAll()).toBe(expected); + }); }); \ No newline at end of file diff --git a/src/operators/mergeAll.ts b/src/operators/mergeAll.ts index de6ba0fafd..b45e47427f 100644 --- a/src/operators/mergeAll.ts +++ b/src/operators/mergeAll.ts @@ -1,6 +1,68 @@ import Observable from '../Observable'; -import { MergeOperator } from './merge-support'; +import Operator from '../Operator'; +import Subscriber from '../Subscriber'; +import Observer from '../Observer'; +import Subscription from '../Subscription'; -export default function mergeAll(concurrent?: any): Observable { - return this.lift(new MergeOperator(concurrent)); +export default function mergeAll(concurrent: number = Number.POSITIVE_INFINITY): Observable { + return this.lift(new MergeAllOperator(concurrent)); } + +class MergeAllOperator implements Operator { + constructor(private concurrent: number) { + + } + + call(observer: Observer) { + return new MergeAllSubscriber(observer, this.concurrent); + } +} + +class MergeAllSubscriber extends Subscriber { + private hasCompleted: boolean = false; + private buffer: Observable[] = []; + private active: number = 0; + constructor(destination: Observer, private concurrent:number) { + super(destination); + } + + _next(value: any) { + if(this.active < this.concurrent) { + const innerSub = new Subscription(); + this.add(innerSub); + this.active++; + innerSub.add(value.subscribe(new MergeAllInnerSubscriber(this.destination, this, innerSub))); + } else { + this.buffer.push(value); + } + } + + _complete() { + this.hasCompleted = true; + if(this.active === 0 && this.buffer.length === 0) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription) { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if(buffer.length > 0) { + this._next(buffer.shift()); + } else if (this.active === 0 && this.hasCompleted) { + this.destination.complete(); + } + } +} + +class MergeAllInnerSubscriber extends Subscriber { + constructor(destination: Observer, private parent: MergeAllSubscriber, + private innerSub: Subscription ) { + super(destination); + } + + _complete() { + this.parent.notifyComplete(this.innerSub); + } +} \ No newline at end of file