From d332a0ee1b1f060c72ea750f6f96b96003ea6306 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 23 Sep 2015 12:36:23 -0700 Subject: [PATCH] feat(zip): supports promises, iterables and lowercase-o observables - renames spec file appropriately - removes limit functionality from zip, as it is not used by combineLatest any longer - adds support for promises, iterables, Observables and lowercase-o observables --- .../{zip-all-spec.js => zipAll-spec.js} | 18 ++++++------- src/operators/zip-support.ts | 25 ++++++------------- 2 files changed, 16 insertions(+), 27 deletions(-) rename spec/operators/{zip-all-spec.js => zipAll-spec.js} (71%) diff --git a/spec/operators/zip-all-spec.js b/spec/operators/zipAll-spec.js similarity index 71% rename from spec/operators/zip-all-spec.js rename to spec/operators/zipAll-spec.js index f64dad615f..35ec139b8f 100644 --- a/spec/operators/zip-all-spec.js +++ b/spec/operators/zipAll-spec.js @@ -2,14 +2,14 @@ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; -describe('zipAll', function () { +describe('Observable.prototype.zipAll', function () { it('should take all observables from the source and zip them', function (done) { var expected = ['a1', 'b2', 'c3']; var i = 0; - Observable.fromArray([ - Observable.fromArray(['a', 'b', 'c']), - Observable.fromArray([1, 2, 3]) - ]) + Observable.of( + Observable.of('a','b','c'), + Observable.of(1,2,3) + ) .zipAll(function (a, b) { return a + b; }) @@ -21,10 +21,10 @@ describe('zipAll', function () { it('should zip until one child terminates', function (done) { var expected = ['a1', 'b2']; var i = 0; - Observable.fromArray([ - Observable.fromArray(['a', 'b']), - Observable.fromArray([1, 2, 3]) - ]) + Observable.of( + Observable.of('a','b','c'), + Observable.of(1,2) + ) .zipAll(function (a, b) { return a + b; }) diff --git a/src/operators/zip-support.ts b/src/operators/zip-support.ts index b93216a942..8ec2b19fcd 100644 --- a/src/operators/zip-support.ts +++ b/src/operators/zip-support.ts @@ -30,7 +30,6 @@ export class ZipSubscriber extends OuterSubscriber { active: number = 0; observables: Observable[] = []; project: (...values: Array) => R; - limit: number = Number.POSITIVE_INFINITY; buffers: any[][] = []; constructor(destination: Subscriber, @@ -61,51 +60,41 @@ export class ZipSubscriber extends OuterSubscriber { } } + notifyNext(value: R, observable: T, index: number, observableIndex: number) { const buffers = this.buffers; buffers[observableIndex].push(value); const len = buffers.length; for (let i = 0; i < len; i++) { - let buffer = buffers[i]; - if(buffer.length === 0) { + if(buffers[i].length === 0) { return; } } - const outbound = []; + const args = []; const destination = this.destination; const project = this.project; for(let i = 0; i < len; i++) { - outbound.push(buffers[i].shift()); + args.push(buffers[i].shift()); } if(project) { - let result = tryCatch(project)(outbound); + let result = tryCatch(project).apply(this, args); if(result === errorObject){ destination.error(errorObject.e); } else { destination.next(result); } } else { - destination.next(outbound); + destination.next(args); } } - notifyComplete(innerSubscriber) { + notifyComplete() { if((this.active -= 1) === 0) { this.destination.complete(); - } else { - this.limit = innerSubscriber.events; } } -} - -function arrayInitialize(length) { - var arr = Array(length); - for (let i = 0; i < length; i++) { - arr[i] = null; - } - return arr; } \ No newline at end of file