Skip to content

Commit ce76e4e

Browse files
committed
feat(combineLatest): supports promises, iterables, lowercase-o observables and Observables
- refactor combineLatest to standalone from zip - moves tests for combineAll to own file - moves test regarding immediate scheduling to static combineLatest tests - adds support for iterables, promises, Observables and lowercase-o observables - adds scheduling capability to static combineLatest method
1 parent 4c16aa6 commit ce76e4e

File tree

7 files changed

+159
-178
lines changed

7 files changed

+159
-178
lines changed

spec/observables/combineLatest-spec.js

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
/* globals describe, it, expect, expectObservable, hot */
1+
/* globals describe, it, expect, hot, cold, expectObservable */
22
var Rx = require('../../dist/cjs/Rx');
33
var Observable = Rx.Observable;
4+
var immediateScheduler = Rx.Scheduler.immediate;
45

56
describe('Observable.combineLatest', function () {
67
it('should combineLatest the provided observables', function () {
@@ -14,4 +15,17 @@ describe('Observable.combineLatest', function () {
1415

1516
expectObservable(combined).toBe(expected, {u: 'ad', v: 'ae', w: 'af', x: 'bf', y: 'bg', z: 'cg'});
1617
});
18+
19+
it("should combine an immediately-scheduled source with an immediately-scheduled second", function (done) {
20+
var a = Observable.of(1, 2, 3, immediateScheduler);
21+
var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler);
22+
var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]];
23+
var i = 0;
24+
Observable.combineLatest(a, b, immediateScheduler).subscribe(function (vals) {
25+
expect(vals).toDeepEqual(r[i++]);
26+
}, null, function() {
27+
expect(i).toEqual(r.length);
28+
done();
29+
});
30+
});
1731
});

spec/operators/combineAll-spec.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/* globals describe, it, expect, hot, cold, expectObservable */
2+
var Rx = require('../../dist/cjs/Rx');
3+
var Observable = Rx.Observable;
4+
var immediateScheduler = Rx.Scheduler.immediate;
5+
6+
describe('Observable.prototype.combineAll()', function(){
7+
it("should combine two observables", function (done) {
8+
var a = Observable.of(1, 2, 3);
9+
var b = Observable.of(4, 5, 6, 7, 8);
10+
var expected = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]];
11+
Observable.of(a, b).combineAll().subscribe(function (vals) {
12+
expect(vals).toEqual(expected.shift());
13+
}, null, function() {
14+
expect(expected.length).toBe(0);
15+
done();
16+
});
17+
});
18+
19+
it("should combine two immediately-scheduled observables", function (done) {
20+
var a = Observable.of(1, 2, 3, immediateScheduler);
21+
var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler);
22+
var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]];
23+
var i = 0;
24+
Observable.of(a, b, immediateScheduler).combineAll().subscribe(function (vals) {
25+
expect(vals).toDeepEqual(r[i++]);
26+
}, null, function() {
27+
expect(i).toEqual(r.length);
28+
done();
29+
});
30+
});
31+
});

spec/operators/combineLatest-spec.js

Lines changed: 3 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,54 +4,14 @@ var Observable = Rx.Observable;
44
var immediateScheduler = Rx.Scheduler.immediate;
55

66
describe('Observable.prototype.combineLatest', function () {
7-
it("should combine two observables", function (done) {
8-
var a = Observable.of(1, 2, 3);
9-
var b = Observable.of(4, 5, 6, 7, 8);
10-
var r = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]];
11-
var i = 0;
12-
Observable.of(a, b).combineAll().subscribe(function (vals) {
13-
expect(vals).toDeepEqual(r[i++]);
14-
}, null, function() {
15-
expect(i).toEqual(r.length);
16-
done();
17-
});
18-
});
19-
207
it("should combine a source with a second", function (done) {
218
var a = Observable.of(1, 2, 3);
229
var b = Observable.of(4, 5, 6, 7, 8);
23-
var r = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]];
24-
var i = 0;
25-
a.combineLatest(b).subscribe(function (vals) {
26-
expect(vals).toDeepEqual(r[i++]);
27-
}, null, function() {
28-
expect(i).toEqual(r.length);
29-
done();
30-
});
31-
});
32-
33-
it("should combine two immediately-scheduled observables", function (done) {
34-
var a = Observable.of(1, 2, 3, immediateScheduler);
35-
var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler);
36-
var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]];
37-
var i = 0;
38-
Observable.of(a, b, immediateScheduler).combineAll().subscribe(function (vals) {
39-
expect(vals).toDeepEqual(r[i++]);
40-
}, null, function() {
41-
expect(i).toEqual(r.length);
42-
done();
43-
});
44-
});
45-
46-
it("should combine an immediately-scheduled source with an immediately-scheduled second", function (done) {
47-
var a = Observable.of(1, 2, 3, immediateScheduler);
48-
var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler);
49-
var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]];
50-
var i = 0;
10+
var expected = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]];
5111
a.combineLatest(b).subscribe(function (vals) {
52-
expect(vals).toDeepEqual(r[i++]);
12+
expect(vals).toEqual(expected.shift());
5313
}, null, function() {
54-
expect(i).toEqual(r.length);
14+
expect(expected.length).toEqual(0);
5515
done();
5616
});
5717
});

src/operators/combineLatest-static.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
import Observable from '../Observable';
22
import ArrayObservable from '../observables/ArrayObservable';
33
import { CombineLatestOperator } from './combineLatest-support';
4+
import Scheduler from '../Scheduler';
45

5-
export default function combineLatest<T, R>(...observables: (Observable<any> | ((...values: Array<any>) => R))[]): Observable<R> {
6-
const project = <((...ys: Array<any>) => R)> observables[observables.length - 1];
7-
if (typeof project === "function") {
8-
observables.pop();
6+
export default function combineLatest<T, R>(...observables: (Observable<any> | ((...values: Array<any>) => R) | Scheduler)[]): Observable<R> {
7+
let project, scheduler;
8+
9+
if(typeof (<any>observables[observables.length - 1]).schedule === 'function') {
10+
scheduler = observables.pop();
911
}
10-
return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
12+
13+
if (typeof observables[observables.length - 1] === 'function') {
14+
project = observables.pop();
15+
}
16+
17+
return new ArrayObservable(observables, scheduler).lift(new CombineLatestOperator(project));
1118
}

src/operators/combineLatest-support.ts

Lines changed: 54 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,11 @@ import Subscriber from '../Subscriber';
55

66
import ArrayObservable from '../observables/ArrayObservable';
77
import EmptyObservable from '../observables/EmptyObservable';
8-
import {ZipSubscriber, ZipInnerSubscriber} from './zip-support';
98

109
import tryCatch from '../util/tryCatch';
1110
import {errorObject} from '../util/errorObject';
12-
13-
export function combineLatest<T, R>(...observables: (Observable<any> | ((...values: Array<any>) => R))[]): Observable<R> {
14-
const project = <((...ys: Array<any>) => R)> observables[observables.length - 1];
15-
if (typeof project === "function") {
16-
observables.pop();
17-
}
18-
return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
19-
}
20-
21-
export function combineLatestProto<R>(...observables: (Observable<any>|((...values: any[]) => R))[]): Observable<R> {
22-
const project = <((...ys: Array<any>) => R)> observables[observables.length - 1];
23-
if (typeof project === "function") {
24-
observables.pop();
25-
}
26-
observables.unshift(this);
27-
return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
28-
}
11+
import OuterSubscriber from '../OuterSubscriber';
12+
import subscribeToResult from '../util/subscribeToResult';
2913

3014
export class CombineLatestOperator<T, R> implements Operator<T, R> {
3115

@@ -40,51 +24,68 @@ export class CombineLatestOperator<T, R> implements Operator<T, R> {
4024
}
4125
}
4226

43-
export class CombineLatestSubscriber<T, R> extends ZipSubscriber<T, R> {
44-
45-
project: (...values: Array<any>) => R;
46-
limit: number = 0;
47-
48-
constructor(destination: Subscriber<R>, project?: (...values: Array<any>) => R) {
49-
super(destination, project, []);
27+
export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
28+
private active: number = 0;
29+
private values: any[] = [];
30+
private observables: any[] = [];
31+
private toRespond: number[] = [];
32+
33+
constructor(destination: Subscriber<R>, private project?: (...values: Array<any>) => R) {
34+
super(destination);
5035
}
5136

52-
_subscribeInner(observable, values, index, total) {
53-
return observable._subscribe(new CombineLatestInnerSubscriber(this.destination, this, values, index, total));
37+
_next(observable: any) {
38+
const toRespond = this.toRespond;
39+
toRespond.push(toRespond.length);
40+
this.observables.push(observable);
5441
}
55-
56-
_innerComplete(innerSubscriber) {
57-
if((this.active -= 1) === 0) {
42+
43+
_complete() {
44+
const observables = this.observables;
45+
const len = observables.length;
46+
if(len === 0) {
5847
this.destination.complete();
48+
} else {
49+
this.active = len;
50+
for(let i = 0; i < len; i++) {
51+
let observable = observables[i];
52+
this.add(subscribeToResult(this, observable, observable, i));
53+
}
5954
}
6055
}
61-
}
62-
63-
export class CombineLatestInnerSubscriber<T, R> extends ZipInnerSubscriber<T, R> {
6456

65-
constructor(destination: Observer<T>, parent: ZipSubscriber<T, R>, values: any, index : number, total : number) {
66-
super(destination, parent, values, index, total);
57+
notifyComplete(innerSubscriber) {
58+
if((this.active -= 1) === 0) {
59+
this.destination.complete();
60+
}
6761
}
68-
69-
_next(x) {
70-
71-
const index = this.index;
72-
const total = this.total;
73-
const parent = this.parent;
62+
63+
notifyNext(value: R, observable: any, innerIndex: number, outerIndex: number) {
7464
const values = this.values;
75-
const valueBox = values[index];
76-
let limit;
77-
78-
if(valueBox) {
79-
valueBox[0] = x;
80-
limit = parent.limit;
81-
} else {
82-
limit = parent.limit += 1;
83-
values[index] = [x];
65+
values[outerIndex] = value;
66+
const toRespond = this.toRespond;
67+
68+
if(toRespond.length > 0) {
69+
const found = toRespond.indexOf(outerIndex);
70+
if(found !== -1) {
71+
toRespond.splice(found, 1);
72+
}
8473
}
85-
86-
if(limit >= total) {
87-
this._projectNext(values, parent.project);
74+
75+
if(toRespond.length === 0) {
76+
const project = this.project;
77+
const destination = this.destination;
78+
79+
if(project) {
80+
let result = tryCatch(project).apply(this, values);
81+
if(result === errorObject) {
82+
destination.error(errorObject.e);
83+
} else {
84+
destination.next(result);
85+
}
86+
} else {
87+
destination.next(values);
88+
}
8889
}
8990
}
9091
}

src/operators/combineLatest.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ import ArrayObservable from '../observables/ArrayObservable';
33
import { CombineLatestOperator } from './combineLatest-support';
44

55
export default function combineLatest<R>(...observables: (Observable<any>|((...values: any[]) => R))[]): Observable<R> {
6-
const project = <((...ys: Array<any>) => R)> observables[observables.length - 1];
7-
if (typeof project === "function") {
8-
observables.pop();
9-
}
106
observables.unshift(this);
7+
let project;
8+
if (typeof observables[observables.length - 1] === "function") {
9+
project = observables.pop();
10+
}
1111
return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
1212
}

0 commit comments

Comments
 (0)