diff --git a/spec/observables/zip-spec.js b/spec/observables/zip-spec.js index ec41f43fa8..3d73ee6bf4 100644 --- a/spec/observables/zip-spec.js +++ b/spec/observables/zip-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; @@ -18,4 +18,74 @@ describe('Observable.zip', function(){ expect(x).toBe(expected[i++]) }, null, done); }); + + it('should end once one observable completes and its buffer is empty', function (){ + var e1 = hot('---a--b--c--|'); + var e2 = hot('------d----e----f--------|'); + var e3 = hot('--------h----i----j-------------'); // doesn't complete + var expected = '--------x----y----(z|)'; // e1 complete and buffer empty + var values = { + x: ['a','d','h'], + y: ['b','e','i'], + z: ['c','f','j'] + }; + expectObservable(Observable.zip(e1,e2,e3)).toBe(expected, values); + }); + + + it('should end once one observable nexts and zips value from completed other observable whose buffer is empty', function (){ + var e1 = hot('---a--b--c--|'); + var e2 = hot('------d----e----f|'); + var e3 = hot('--------h----i----j-------------'); // doesn't complete + var expected = '--------x----y----(z|)'; // e2 buffer empty and signaled complete + var values = { + x: ['a','d','h'], + y: ['b','e','i'], + z: ['c','f','j'] + }; + expectObservable(Observable.zip(e1,e2,e3)).toBe(expected, values); + }); + + describe('with iterables', function (){ + it('should zip them with values', function () { + var myIterator = { + count: 0, + next: function (){ + return { value: this.count++, done: false }; + } + }; + myIterator[Symbol.iterator] = function(){ return this; }; + + var e1 = hot('---a---b---c---d---|'); + var expected = '---w---x---y---z---|'; + + var values = { + w: ['a', 0], + x: ['b', 1], + y: ['c', 2], + z: ['d', 3] + }; + + expectObservable(Observable.zip(e1, myIterator)).toBe(expected, values); + }); + + it('should only call `next` as needed', function (){ + var nextCalled = 0; + var myIterator = { + count: 0, + next: function (){ + nextCalled++; + return { value: this.count++, done: false }; + } + }; + myIterator[Symbol.iterator] = function(){ return this; }; + + Observable.zip(Observable.of(1,2,3), myIterator) + .subscribe(); + + // since zip will call `next()` in advance, total calls when + // zipped with 3 other values should be 4. + expect(nextCalled).toBe(4); + }); + }); }); \ No newline at end of file diff --git a/src/operators/zip-support.ts b/src/operators/zip-support.ts index 8ec2b19fcd..dd16cd78a7 100644 --- a/src/operators/zip-support.ts +++ b/src/operators/zip-support.ts @@ -3,13 +3,15 @@ import Observer from '../Observer'; import Scheduler from '../Scheduler'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; - +import Subscription from '../Subscription'; import ArrayObservable from '../observables/ArrayObservable'; - import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import OuterSubscriber from '../OuterSubscriber'; import subscribeToResult from '../util/subscribeToResult'; +import $$iterator from '../util/Symbol_iterator'; + +const isArray = Array.isArray; export class ZipOperator implements Operator { @@ -24,13 +26,12 @@ export class ZipOperator implements Operator { } } -export class ZipSubscriber extends OuterSubscriber { - - values: any; - active: number = 0; - observables: Observable[] = []; - project: (...values: Array) => R; - buffers: any[][] = []; +export class ZipSubscriber extends Subscriber { + private index = 0; + private values: any; + private project: (...values: Array) => R; + private iterators = []; + private active = 0; constructor(destination: Subscriber, project?: (...values: Array) => R, @@ -40,49 +41,77 @@ export class ZipSubscriber extends OuterSubscriber { this.values = values; } - _next(observable) { - this.buffers.push([]); - this.observables.push(observable); + _next(value) { + const iterators = this.iterators; + const index = this.index++; + if(isArray(value)) { + iterators.push(new StaticArrayIterator(value)); + } else if (typeof value[$$iterator] === 'function') { + iterators.push(new StaticIterator(value[$$iterator]())); + } else { + iterators.push(new ZipBufferIterator(this.destination, this, value, index)); + } } _complete() { const values = this.values; - const observables = this.observables; - - let index = -1; - const len = observables.length; - + const iterators = this.iterators; + const len = iterators.length; this.active = len; - - while(++index < len) { - const observable = observables[index]; - this.add(subscribeToResult(this, observable, observable, index)); + for(let i = 0; i < len; i++) { + let iterator = iterators[i]; + if(iterator.stillUnsubscribed) { + iterator.subscribe(iterator, i); + } else { + this.active--; // not an observable + } } } - - - notifyNext(value: R, observable: T, index: number, observableIndex: number) { - const buffers = this.buffers; - buffers[observableIndex].push(value); + + notifyInactive() { + this.active--; + if(this.active === 0) { + this.destination.complete(); + } + } + + checkIterators() { + const iterators = this.iterators; + const len = iterators.length; + const destination = this.destination; - const len = buffers.length; - for (let i = 0; i < len; i++) { - if(buffers[i].length === 0) { + // abort if not all of them have values + for(let i = 0; i < len; i++) { + let iterator = iterators[i]; + if(typeof iterator.hasValue === 'function' && !iterator.hasValue()) { return; } } + let shouldComplete = false; const args = []; - const destination = this.destination; - const project = this.project; - for(let i = 0; i < len; i++) { - args.push(buffers[i].shift()); + let iterator = iterators[i]; + let result = iterator.next(); + + // check to see if it's completed now that you've gotten + // the next value. + if(iterator.hasCompleted()) { + shouldComplete = true; + } + + if(result.done) { + destination.complete(); + return; + } + + args.push(result.value); } + const project = this.project; if(project) { let result = tryCatch(project).apply(this, args); - if(result === errorObject){ + if(result === errorObject) { destination.error(errorObject.e); } else { destination.next(result); @@ -90,11 +119,115 @@ export class ZipSubscriber extends OuterSubscriber { } else { destination.next(args); } + + if(shouldComplete) { + destination.complete(); + } } +} + +interface LookAheadIterator extends Iterator { + hasValue(): boolean; + hasCompleted(): boolean; +} +class StaticIterator implements LookAheadIterator { + private nextResult: IteratorResult; + + constructor(private iterator: Iterator) { + this.nextResult = iterator.next(); + } + + hasValue() { + return true; + } + + next(): IteratorResult { + const result = this.nextResult; + this.nextResult = this.iterator.next(); + return result; + } + + hasCompleted() { + const nextResult = this.nextResult; + return nextResult && nextResult.done; + } +} + +class StaticArrayIterator implements LookAheadIterator { + private index = 0; + private length = 0; + + constructor(private array: T[]) { + this.length = array.length; + } + + [$$iterator]() { + return this; + } + + next(value?: any): IteratorResult { + const i = this.index++; + const array = this.array; + return i < this.length ? { value: array[i], done: false } : { done: true }; + } + + hasValue() { + return this.array.length > this.index; + } + + hasCompleted() { + return this.array.length === this.index; + } +} + +class ZipBufferIterator extends OuterSubscriber implements LookAheadIterator { + stillUnsubscribed = true; + buffer: T[] = []; + isComplete = false; + + constructor(destination: Observer, private parent: ZipSubscriber, private observable: Observable, private index: number) { + super(destination); + } + + [$$iterator]() { + return this; + } + + // NOTE: there is actually a name collision here with Subscriber.next and Iterator.next + // this is legit because `next()` will never be called by a subscription in this case. + next(): IteratorResult { + const buffer = this.buffer; + if(buffer.length === 0 && this.isComplete) { + return { done: true }; + } else { + return { value: buffer.shift(), done: false }; + } + } + + hasValue() { + return this.buffer.length > 0; + } + + hasCompleted() { + return this.buffer.length === 0 && this.isComplete; + } + notifyComplete() { - if((this.active -= 1) === 0) { + if(this.buffer.length > 0) { + this.isComplete = true; + this.parent.notifyInactive(); + } else { this.destination.complete(); } } + + notifyNext(innerValue, outerValue, innerIndex, outerIndex) { + this.buffer.push(innerValue); + this.parent.checkIterators(); + } + + subscribe(value: any, index: number) { + this.add(subscribeToResult(this, this.observable, this, index)); + } } \ No newline at end of file