Skip to content

Commit

Permalink
feat(zip): zip now supports never-ending iterables
Browse files Browse the repository at this point in the history
- adds support for never-ending iterables
- adds fast-track for arrays
- adds additional tests for zip
- fixes issues where completion does not happen at the appropriate moment

closes #397
  • Loading branch information
benlesh committed Sep 25, 2015
1 parent 84e3684 commit a5684ba
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 36 deletions.
72 changes: 71 additions & 1 deletion spec/observables/zip-spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* globals describe, it, expect */
/* globals describe, it, expect, expectObservable, hot, cold */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

Expand All @@ -18,4 +18,74 @@ describe('Observable.zip', function(){
expect(x).toBe(expected[i++])
}, null, done);
});

it('should end once one observable completes and its buffer is empty', function (){
var e1 = hot('---a--b--c--|');
var e2 = hot('------d----e----f--------|');
var e3 = hot('--------h----i----j-------------'); // doesn't complete
var expected = '--------x----y----(z|)'; // e1 complete and buffer empty
var values = {
x: ['a','d','h'],
y: ['b','e','i'],
z: ['c','f','j']
};
expectObservable(Observable.zip(e1,e2,e3)).toBe(expected, values);
});


it('should end once one observable nexts and zips value from completed other observable whose buffer is empty', function (){
var e1 = hot('---a--b--c--|');
var e2 = hot('------d----e----f|');
var e3 = hot('--------h----i----j-------------'); // doesn't complete
var expected = '--------x----y----(z|)'; // e2 buffer empty and signaled complete
var values = {
x: ['a','d','h'],
y: ['b','e','i'],
z: ['c','f','j']
};
expectObservable(Observable.zip(e1,e2,e3)).toBe(expected, values);
});

describe('with iterables', function (){
it('should zip them with values', function () {
var myIterator = {
count: 0,
next: function (){
return { value: this.count++, done: false };
}
};
myIterator[Symbol.iterator] = function(){ return this; };

var e1 = hot('---a---b---c---d---|');
var expected = '---w---x---y---z---|';

var values = {
w: ['a', 0],
x: ['b', 1],
y: ['c', 2],
z: ['d', 3]
};

expectObservable(Observable.zip(e1, myIterator)).toBe(expected, values);
});

it('should only call `next` as needed', function (){
var nextCalled = 0;
var myIterator = {
count: 0,
next: function (){
nextCalled++;
return { value: this.count++, done: false };
}
};
myIterator[Symbol.iterator] = function(){ return this; };

Observable.zip(Observable.of(1,2,3), myIterator)
.subscribe();

// since zip will call `next()` in advance, total calls when
// zipped with 3 other values should be 4.
expect(nextCalled).toBe(4);
});
});
});
203 changes: 168 additions & 35 deletions src/operators/zip-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import Observer from '../Observer';
import Scheduler from '../Scheduler';
import Observable from '../Observable';
import Subscriber from '../Subscriber';

import Subscription from '../Subscription';
import ArrayObservable from '../observables/ArrayObservable';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import OuterSubscriber from '../OuterSubscriber';
import subscribeToResult from '../util/subscribeToResult';
import $$iterator from '../util/Symbol_iterator';

const isArray = Array.isArray;

export class ZipOperator<T, R> implements Operator<T, R> {

Expand All @@ -24,13 +26,12 @@ export class ZipOperator<T, R> implements Operator<T, R> {
}
}

export class ZipSubscriber<T, R> extends OuterSubscriber<T, R> {

values: any;
active: number = 0;
observables: Observable<any>[] = [];
project: (...values: Array<any>) => R;
buffers: any[][] = [];
export class ZipSubscriber<T, R> extends Subscriber<T> {
private index = 0;
private values: any;
private project: (...values: Array<any>) => R;
private iterators = [];
private active = 0;

constructor(destination: Subscriber<R>,
project?: (...values: Array<any>) => R,
Expand All @@ -40,61 +41,193 @@ export class ZipSubscriber<T, R> extends OuterSubscriber<T, R> {
this.values = values;
}

_next(observable) {
this.buffers.push([]);
this.observables.push(observable);
_next(value) {
const iterators = this.iterators;
const index = this.index++;
if(isArray(value)) {
iterators.push(new StaticArrayIterator(value));
} else if (typeof value[$$iterator] === 'function') {
iterators.push(new StaticIterator(value[$$iterator]()));
} else {
iterators.push(new ZipBufferIterator(this.destination, this, value, index));
}
}

_complete() {
const values = this.values;
const observables = this.observables;

let index = -1;
const len = observables.length;

const iterators = this.iterators;
const len = iterators.length;
this.active = len;

while(++index < len) {
const observable = observables[index];
this.add(subscribeToResult(this, observable, observable, index));
for(let i = 0; i < len; i++) {
let iterator = iterators[i];
if(iterator.stillUnsubscribed) {
iterator.subscribe(iterator, i);
} else {
this.active--; // not an observable
}
}
}


notifyNext(value: R, observable: T, index: number, observableIndex: number) {
const buffers = this.buffers;
buffers[observableIndex].push(value);

notifyInactive() {
this.active--;
if(this.active === 0) {
this.destination.complete();
}
}

checkIterators() {
const iterators = this.iterators;
const len = iterators.length;
const destination = this.destination;

const len = buffers.length;
for (let i = 0; i < len; i++) {
if(buffers[i].length === 0) {
// abort if not all of them have values
for(let i = 0; i < len; i++) {
let iterator = iterators[i];
if(typeof iterator.hasValue === 'function' && !iterator.hasValue()) {
return;
}
}

let shouldComplete = false;
const args = [];
const destination = this.destination;
const project = this.project;

for(let i = 0; i < len; i++) {
args.push(buffers[i].shift());
let iterator = iterators[i];
let result = iterator.next();

// check to see if it's completed now that you've gotten
// the next value.
if(iterator.hasCompleted()) {
shouldComplete = true;
}

if(result.done) {
destination.complete();
return;
}

args.push(result.value);
}

const project = this.project;
if(project) {
let result = tryCatch(project).apply(this, args);
if(result === errorObject){
if(result === errorObject) {
destination.error(errorObject.e);
} else {
destination.next(result);
}
} else {
destination.next(args);
}

if(shouldComplete) {
destination.complete();
}
}
}

interface LookAheadIterator<T> extends Iterator<T> {
hasValue(): boolean;
hasCompleted(): boolean;
}

class StaticIterator<T> implements LookAheadIterator<T> {
private nextResult: IteratorResult<T>;

constructor(private iterator: Iterator<T>) {
this.nextResult = iterator.next();
}

hasValue() {
return true;
}

next(): IteratorResult<T> {
const result = this.nextResult;
this.nextResult = this.iterator.next();
return result;
}

hasCompleted() {
const nextResult = this.nextResult;
return nextResult && nextResult.done;
}
}

class StaticArrayIterator<T> implements LookAheadIterator<T> {
private index = 0;
private length = 0;

constructor(private array: T[]) {
this.length = array.length;
}

[$$iterator]() {
return this;
}

next(value?: any): IteratorResult<T> {
const i = this.index++;
const array = this.array;
return i < this.length ? { value: array[i], done: false } : { done: true };
}

hasValue() {
return this.array.length > this.index;
}

hasCompleted() {
return this.array.length === this.index;
}
}

class ZipBufferIterator<T, R> extends OuterSubscriber<T, R> implements LookAheadIterator<T> {
stillUnsubscribed = true;
buffer: T[] = [];
isComplete = false;

constructor(destination: Observer<T>, private parent: ZipSubscriber<T, R>, private observable: Observable<T>, private index: number) {
super(destination);
}

[$$iterator]() {
return this;
}

// NOTE: there is actually a name collision here with Subscriber.next and Iterator.next
// this is legit because `next()` will never be called by a subscription in this case.
next(): IteratorResult<T> {
const buffer = this.buffer;
if(buffer.length === 0 && this.isComplete) {
return { done: true };
} else {
return { value: buffer.shift(), done: false };
}
}

hasValue() {
return this.buffer.length > 0;
}

hasCompleted() {
return this.buffer.length === 0 && this.isComplete;
}

notifyComplete() {
if((this.active -= 1) === 0) {
if(this.buffer.length > 0) {
this.isComplete = true;
this.parent.notifyInactive();
} else {
this.destination.complete();
}
}

notifyNext(innerValue, outerValue, innerIndex, outerIndex) {
this.buffer.push(innerValue);
this.parent.checkIterators();
}

subscribe(value: any, index: number) {
this.add(subscribeToResult<any, any>(this, this.observable, this, index));
}
}

0 comments on commit a5684ba

Please sign in to comment.