From 67f9623385299f374905b0e2cbfaacd6d731d840 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 15 Sep 2015 13:14:36 -0700 Subject: [PATCH] fix(expand): fix expand operator to match Rx3 - adds virtual time tests for expand - refactors expand to be its own operator, reducing inheritence --- spec/operators/expand-spec.js | 64 +++++++++++++++------ src/operators/expand.ts | 105 +++++++++++++++++++++------------- 2 files changed, 112 insertions(+), 57 deletions(-) diff --git a/spec/operators/expand-spec.js b/spec/operators/expand-spec.js index ef8d7ef83c..6ca103432d 100644 --- a/spec/operators/expand-spec.js +++ b/spec/operators/expand-spec.js @@ -1,29 +1,57 @@ +/* globals describe, it, expect, expectObservable, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; describe('Observable.prototype.expand()', function () { - it('should map and recursively flatten', function (done) { - var expected = [1, 2, 3, 4, 5]; - Observable.of(0).expand(function (x) { - if (x > 4) { + it('should map and recursively flatten', function() { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + } + var e1 = hot('a', values); + /* + expectation explanation: (conjunction junction?) ... + + since `cold('---(z|)')` emits `x + x` and completes on frame 30 + but the next "expanded" return value is synchronously subscribed to in + that same frame, it stacks like so: + + a + ---(b|) + ---(c|) + ---(d|) + ---(e|) (...which flattens into:) + a--b--c--d--(e|) + */ + var expected = 'a--b--c--d--(e|)'; + + expectObservable(e1.expand(function(x) { + if(x === 16) { return Observable.empty(); } - return Observable.of(x + 1); - }) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); + return cold('---(z|)', { z: x + x }); + })).toBe(expected, values); }); - it('should map and recursively flatten with ScalarObservables', function (done) { - var expected = [1, 2, 3, 4, 5]; - Observable.of(0).expand(function (x) { - if (x > 4) { + + it('should map and recursively flatten with scalars', function() { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + } + var e1 = hot('a', values); + var expected = '(abcde|)'; + + expectObservable(e1.expand(function(x) { + if(x === 16) { return Observable.empty(); } - return Observable.of(x + 1); - }) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); + return Observable.of(x + x); // scalar + })).toBe(expected, values); }); }); \ No newline at end of file diff --git a/src/operators/expand.ts b/src/operators/expand.ts index b93fe997b1..1442521061 100644 --- a/src/operators/expand.ts +++ b/src/operators/expand.ts @@ -2,69 +2,96 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; +import Subscription from '../Subscription'; -import { MergeSubscriber, MergeInnerSubscriber } from './merge-support'; +// import { MergeAllSubscriber, MergeAllInnerSubscriber } from './mergeAll-support'; import EmptyObservable from '../observables/EmptyObservable'; import ScalarObservable from '../observables/ScalarObservable'; import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; -export default function expand(project: (x: T, ix: number) => Observable): Observable { - return this.lift(new ExpandOperator(project)); +export default function expand(project: (value: T, index: number) => Observable, + concurrent: number = Number.POSITIVE_INFINITY): Observable { + return this.lift(new ExpandOperator(project, concurrent)); } class ExpandOperator implements Operator { - - project: (x: T, ix: number) => Observable; - - constructor(project: (x: T, ix: number) => Observable) { - this.project = project; + constructor(private project: (value: T, index: number) => Observable, + private concurrent: number = Number.POSITIVE_INFINITY) { } call(subscriber: Subscriber): Subscriber { - return new ExpandSubscriber(subscriber, this.project); + return new ExpandSubscriber(subscriber, this.project, this.concurrent); } } -class ExpandSubscriber extends MergeSubscriber { - - project: (x: T, ix: number) => Observable; - - constructor(destination: Observer, - project: (x: T, ix: number) => Observable) { - super(destination, Number.POSITIVE_INFINITY); - this.project = project; - } - - _project(value, index) { - const observable = tryCatch(this.project).call(this, value, index); - if (observable === errorObject) { - this.error(errorObject.e); - return null; +class ExpandSubscriber extends Subscriber { + private index: number = 0; + private active: number = 0; + private hasCompleted: boolean = true; + private buffer: T[]; + + constructor(destination: Observer, private project: (value: T, index: number) => Observable, + private concurrent: number = Number.POSITIVE_INFINITY) { + super(destination); + if(concurrent < Number.POSITIVE_INFINITY) { + this.buffer = []; } - return observable; } - - _subscribeInner(observable, value, index) { - if(observable._isScalar) { - this.destination.next((> observable).value); - this._innerComplete(); - this._next((observable).value); - } else if(observable instanceof EmptyObservable) { - this._innerComplete(); + + _next(value: T) { + const index = this.index++; + this.destination.next(value); + if(this.active < this.concurrent) { + let result = tryCatch(this.project)(value, index); + if(result === errorObject) { + this.destination.error(result.e); + } else { + if(result._isScalar) { + this._next(result.value); + } else { + let innerSub = new Subscription(); + this.active++; + innerSub.add(result.subscribe(new ExpandInnerSubscriber(this.destination, this, innerSub))); + this.add(innerSub); + } + } } else { - return observable._subscribe(new ExpandInnerSubscriber(this.destination, this)); + this.buffer.push(value); + } + } + + _complete() { + this.hasCompleted = true; + if(this.hasCompleted && this.active === 0) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription) { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if(buffer && buffer.length > 0) { + this._next(buffer.shift()); + } + if(this.hasCompleted && this.active === 0) { + this.destination.complete(); } } } -class ExpandInnerSubscriber extends MergeInnerSubscriber { - constructor(destination: Observer, parent: ExpandSubscriber) { - super(destination, parent); +class ExpandInnerSubscriber extends Subscriber { + constructor(destination: Observer, private parent: ExpandSubscriber, private innerSub: Subscription) { + super(destination); } + _next(value) { - this.destination.next(value); - this.parent.next(value); + this.parent._next(value); + } + + _complete() { + this.parent.notifyComplete(this.innerSub); } }