This repository has been archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
zip.js
91 lines (76 loc) · 3.06 KB
/
zip.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
function falseFactory() { return false; }
function emptyArrayFactory() { return []; }
var ZipObservable = (function(__super__) {
inherits(ZipObservable, __super__);
function ZipObservable(sources, resultSelector) {
this._s = sources;
this._cb = resultSelector;
__super__.call(this);
}
ZipObservable.prototype.subscribeCore = function(observer) {
var n = this._s.length,
subscriptions = new Array(n),
done = arrayInitialize(n, falseFactory),
q = arrayInitialize(n, emptyArrayFactory);
for (var i = 0; i < n; i++) {
var source = this._s[i], sad = new SingleAssignmentDisposable();
subscriptions[i] = sad;
isPromise(source) && (source = observableFromPromise(source));
sad.setDisposable(source.subscribe(new ZipObserver(observer, i, this, q, done)));
}
return new NAryDisposable(subscriptions);
};
return ZipObservable;
}(ObservableBase));
var ZipObserver = (function (__super__) {
inherits(ZipObserver, __super__);
function ZipObserver(o, i, p, q, d) {
this._o = o;
this._i = i;
this._p = p;
this._q = q;
this._d = d;
__super__.call(this);
}
function notEmpty(x) { return x.length > 0; }
function shiftEach(x) { return x.shift(); }
function notTheSame(i) {
return function (x, j) {
return j !== i;
};
}
ZipObserver.prototype.next = function (x) {
this._q[this._i].push(x);
if (this._q.every(notEmpty)) {
var queuedValues = this._q.map(shiftEach);
var res = tryCatch(this._p._cb).apply(null, queuedValues);
if (res === errorObj) { return this._o.onError(res.e); }
this._o.onNext(res);
} else if (this._d.filter(notTheSame(this._i)).every(identity)) {
this._o.onCompleted();
}
};
ZipObserver.prototype.error = function (e) {
this._o.onError(e);
};
ZipObserver.prototype.completed = function () {
this._d[this._i] = true;
this._d.every(identity) && this._o.onCompleted();
};
return ZipObserver;
}(AbstractObserver));
/**
* Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences or an array have produced an element at a corresponding index.
* The last element in the arguments must be a function to invoke for each series of elements at corresponding indexes in the args.
* @returns {Observable} An observable sequence containing the result of combining elements of the args using the specified result selector function.
*/
observableProto.zip = function () {
if (arguments.length === 0) { throw new Error('invalid arguments'); }
var len = arguments.length, args = new Array(len);
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
Array.isArray(args[0]) && (args = args[0]);
var parent = this;
args.unshift(parent);
return new ZipObservable(args, resultSelector);
};