From fe0eb37aa06123e6fa59751aa5ff8950dab3e23a Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Thu, 3 Dec 2015 13:04:07 -0800 Subject: [PATCH] feat(mergeScan): support concurrency parameter for mergeScan - expose concurrency parameter to interface of mergeScan - expand test coverage to test concurrency works closes #868 --- spec/operators/mergeScan-spec.js | 148 ++++++++++++++++++++++++++++- src/Rx.KitchenSink.ts | 2 +- src/operator/extended/mergeScan.ts | 10 +- 3 files changed, 154 insertions(+), 6 deletions(-) diff --git a/spec/operators/mergeScan-spec.js b/spec/operators/mergeScan-spec.js index c531438ca1..29fbd0fb14 100644 --- a/spec/operators/mergeScan-spec.js +++ b/spec/operators/mergeScan-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */ +/* globals describe, it, expect, rxTestScheduler, expectObservable, expectSubscriptions, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; @@ -225,4 +225,150 @@ describe('Observable.prototype.mergeScan()', function () { expectObservable(source, sub).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(sub); }); + + it('should mergescan projects cold Observable with single concurrency', function () { + var e1 = hot('--a--b--c--|'); + var e1subs = '^ !'; + + var inner = [ + cold( '--d--e--f--| '), + cold( '--g--h--i--| '), + cold( '--j--k--l--|') + ]; + + var xsubs = ' ^ !'; + var ysubs = ' ^ !'; + var zsubs = ' ^ !'; + + var expected = '--x-d--e--f--f-g--h--i--i-j--k--l--|'; + + var index = 0; + var source = e1.mergeScan(function (acc, x) { + var value = inner[index++]; + return value.startWith(acc); + }, 'x', 1); + + expectObservable(source).toBe(expected); + + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(inner[0].subscriptions).toBe(xsubs); + expectSubscriptions(inner[1].subscriptions).toBe(ysubs); + expectSubscriptions(inner[2].subscriptions).toBe(zsubs); + }); + + it('should emit accumulator if inner completes without value', function () { + var e1 = hot('--a--^--b--c--d--e--f--g--|'); + var e1subs = '^ !'; + var expected = '---------------------(x|)'; + + var source = e1.mergeScan(function (acc, x) { + return Observable.empty(); + }, ['1']); + + expectObservable(source).toBe(expected, {x: ['1']}); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should emit accumulator if inner completes without value after source completes', function () { + var e1 = hot('--a--^--b--c--d--e--f--g--|'); + var e1subs = '^ !'; + var expected = '-----------------------(x|)'; + + var source = e1.mergeScan(function (acc, x) { + return Observable.empty().delay(50, rxTestScheduler); + }, ['1']); + + expectObservable(source).toBe(expected, {x: ['1']}); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should mergescan projects hot Observable with single concurrency', function () { + var e1 = hot('---a---b---c---|'); + var e1subs = '^ !'; + + var inner = [ + hot( '--d--e--f--|'), + hot( '----g----h----i----|'), + hot( '------j------k-------l------|') + ]; + + var xsubs = ' ^ !'; + var ysubs = ' ^ !'; + var zsubs = ' ^ !'; + + var expected = '---x-e--f--f--i----i-l------|'; + + var index = 0; + var source = e1.mergeScan(function (acc, x) { + var value = inner[index++]; + return value.startWith(acc); + }, 'x', 1); + + expectObservable(source).toBe(expected); + + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(inner[0].subscriptions).toBe(xsubs); + expectSubscriptions(inner[1].subscriptions).toBe(ysubs); + expectSubscriptions(inner[2].subscriptions).toBe(zsubs); + }); + + it('should mergescan projects cold Observable with dual concurrency', function () { + var e1 = hot('----a----b----c----|'); + var e1subs = '^ !'; + + var inner = [ + cold( '---d---e---f---| '), + cold( '---g---h---i---| '), + cold( '---j---k---l---|') + ]; + + var xsubs = ' ^ !'; + var ysubs = ' ^ !'; + var zsubs = ' ^ !'; + + var expected = '----x--d-d-eg--fh--hi-j---k---l---|'; + + var index = 0; + var source = e1.mergeScan(function (acc, x) { + var value = inner[index++]; + return value.startWith(acc); + }, 'x', 2); + + expectObservable(source).toBe(expected); + + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(inner[0].subscriptions).toBe(xsubs); + expectSubscriptions(inner[1].subscriptions).toBe(ysubs); + expectSubscriptions(inner[2].subscriptions).toBe(zsubs); + }); + + it('should mergescan projects hot Observable with dual concurrency', function () { + var e1 = hot('---a---b---c---|'); + var e1subs = '^ !'; + + var inner = [ + hot( '--d--e--f--|'), + hot( '----g----h----i----|'), + hot( '------j------k-------l------|') + ]; + + var xsubs = ' ^ !'; + var ysubs = ' ^ !'; + var zsubs = ' ^ !'; + + var expected = '---x-e-efh-h-ki------l------|'; + + var index = 0; + var source = e1.mergeScan(function (acc, x) { + var value = inner[index++]; + return value.startWith(acc); + }, 'x', 2); + + expectObservable(source).toBe(expected); + + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(inner[0].subscriptions).toBe(xsubs); + expectSubscriptions(inner[1].subscriptions).toBe(ysubs); + expectSubscriptions(inner[2].subscriptions).toBe(zsubs); + }); }); diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 518bb1233f..3b74aadd04 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -14,7 +14,7 @@ export interface KitchenSinkOperators extends CoreOperators { max?: (comparer?: (x: R, y: T) => R) => Observable; min?: (comparer?: (x: R, y: T) => R) => Observable; timeInterval?: (scheduler?: IScheduler) => Observable; - mergeScan?: (project: (acc: R, x: T) => Observable, seed: R) => Observable; + mergeScan?: (project: (acc: R, x: T) => Observable, seed: R, concurrent?: number) => Observable; switchFirst?: () => Observable; switchMapFirst?: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; diff --git a/src/operator/extended/mergeScan.ts b/src/operator/extended/mergeScan.ts index d251fe500e..813964ad6b 100644 --- a/src/operator/extended/mergeScan.ts +++ b/src/operator/extended/mergeScan.ts @@ -7,14 +7,16 @@ import {errorObject} from '../../util/errorObject'; import {subscribeToResult} from '../../util/subscribeToResult'; import {OuterSubscriber} from '../../OuterSubscriber'; -export function mergeScan(project: (acc: R, x: T) => Observable, seed: R) { - return this.lift(new MergeScanOperator(project, seed)); +export function mergeScan(project: (acc: R, x: T) => Observable, + seed: R, + concurrent: number = Number.POSITIVE_INFINITY) { + return this.lift(new MergeScanOperator(project, seed, concurrent)); } export class MergeScanOperator implements Operator { constructor(private project: (acc: R, x: T) => Observable, private seed: R, - private concurrent: number = Number.POSITIVE_INFINITY) { + private concurrent: number) { } call(subscriber: Subscriber): Subscriber { @@ -34,7 +36,7 @@ export class MergeScanSubscriber extends OuterSubscriber { constructor(destination: Subscriber, private project: (acc: R, x: T) => Observable, private acc: R, - private concurrent: number = Number.POSITIVE_INFINITY) { + private concurrent: number) { super(destination); }