Skip to content

Commit 7a15304

Browse files
committed
fix(merge): fix issues with async in merge
- moves support types from merge-support to mergeAll-support - fixes strange issues with async - adds virtual time tests for merge and mergeAll
1 parent eae9209 commit 7a15304

File tree

7 files changed

+84
-174
lines changed

7 files changed

+84
-174
lines changed

spec/operators/merge-all-spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* globals describe, it, expect */
1+
/* globals describe, it, expect, expectObservable, hot, cold */
22
var Rx = require('../../dist/cjs/Rx');
33
var Observable = Rx.Observable;
44

spec/operators/merge-spec.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* globals describe, it, expect */
1+
/* globals describe, it, expect, hot, cold, expectObservable, rxTestScheduler */
22
var Rx = require('../../dist/cjs/Rx');
33
var Observable = Rx.Observable;
44
var immediateScheduler = Rx.Scheduler.immediate;
@@ -23,6 +23,14 @@ describe("Observable.prototype.merge", function () {
2323
expect(val).toBe(r[i++]);
2424
}, null, done);
2525
});
26+
27+
28+
it('should handle merging two hot observables', function (){
29+
var e1 = hot('--a-----b-----c----|');
30+
var e2 = hot('-----d-----e-----f---|');
31+
var expected = '--a--d--b--e--c--f---|';
32+
expectObservable(e1.merge(e2, rxTestScheduler)).toBe(expected);
33+
});
2634
});
2735

2836
describe('Observable.prototype.mergeAll', function () {

src/operators/merge-static.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Scheduler from '../Scheduler';
22
import Observable from '../Observable';
33
import ArrayObservable from '../observables/ArrayObservable';
4-
import { MergeOperator } from './merge-support';
4+
import { MergeAllOperator } from './mergeAll-support';
55
import immediate from '../schedulers/immediate';
66

77
export default function merge<R>(...observables: (Observable<any>|Scheduler|number)[]): Observable<R> {
@@ -20,6 +20,6 @@ export default function merge<R>(...observables: (Observable<any>|Scheduler|numb
2020
if(observables.length === 1) {
2121
return <Observable<R>>observables[0];
2222
}
23-
24-
return new ArrayObservable(observables, scheduler).lift(new MergeOperator(concurrent));
23+
24+
return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator(concurrent));
2525
}

src/operators/merge-support.ts

Lines changed: 0 additions & 109 deletions
This file was deleted.

src/operators/merge.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import Observable from '../Observable';
22
import mergeStatic from './merge-static';
3+
import Scheduler from '../Scheduler';
34

4-
export default function merge<R>(...observables: (Observable<any>|number)[]): Observable<R> {
5+
export default function merge<R>(...observables: (Observable<any>|Scheduler|number)[]): Observable<R> {
56
observables.unshift(this);
67
return mergeStatic.apply(this, observables);
78
}

src/operators/mergeAll-support.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import Observable from '../Observable';
2+
import Operator from '../Operator';
3+
import Subscriber from '../Subscriber';
4+
import Observer from '../Observer';
5+
import Subscription from '../Subscription';
6+
7+
export class MergeAllOperator<T, R> implements Operator<T, R> {
8+
constructor(private concurrent: number) {
9+
10+
}
11+
12+
call(observer: Observer<T>) {
13+
return new MergeAllSubscriber(observer, this.concurrent);
14+
}
15+
}
16+
17+
export class MergeAllSubscriber<T> extends Subscriber<T> {
18+
private hasCompleted: boolean = false;
19+
private buffer: Observable<any>[] = [];
20+
private active: number = 0;
21+
constructor(destination: Observer<T>, private concurrent:number) {
22+
super(destination);
23+
}
24+
25+
_next(observable: any) {
26+
if(this.active < this.concurrent) {
27+
if(observable._isScalar) {
28+
this.destination.next(observable.value);
29+
} else {
30+
const innerSub = new Subscription();
31+
this.add(innerSub);
32+
this.active++;
33+
innerSub.add(observable.subscribe(new MergeAllInnerSubscriber(this.destination, this, innerSub)));
34+
}
35+
} else {
36+
this.buffer.push(observable);
37+
}
38+
}
39+
40+
_complete() {
41+
this.hasCompleted = true;
42+
if(this.active === 0 && this.buffer.length === 0) {
43+
this.destination.complete();
44+
}
45+
}
46+
47+
notifyComplete(innerSub: Subscription<T>) {
48+
const buffer = this.buffer;
49+
this.remove(innerSub);
50+
this.active--;
51+
if(buffer.length > 0) {
52+
this._next(buffer.shift());
53+
} else if (this.active === 0 && this.hasCompleted) {
54+
this.destination.complete();
55+
}
56+
}
57+
}
58+
59+
export class MergeAllInnerSubscriber<T> extends Subscriber<T> {
60+
constructor(destination: Observer<T>, private parent: MergeAllSubscriber<T>,
61+
private innerSub: Subscription<T> ) {
62+
super(destination);
63+
}
64+
65+
_complete() {
66+
this.parent.notifyComplete(this.innerSub);
67+
}
68+
}

src/operators/mergeAll.ts

Lines changed: 1 addition & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,66 +3,8 @@ import Operator from '../Operator';
33
import Subscriber from '../Subscriber';
44
import Observer from '../Observer';
55
import Subscription from '../Subscription';
6+
import { MergeAllOperator } from './mergeAll-support';
67

78
export default function mergeAll<R>(concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
89
return this.lift(new MergeAllOperator(concurrent));
9-
}
10-
11-
class MergeAllOperator<T, R> implements Operator<T, R> {
12-
constructor(private concurrent: number) {
13-
14-
}
15-
16-
call(observer: Observer<T>) {
17-
return new MergeAllSubscriber(observer, this.concurrent);
18-
}
19-
}
20-
21-
class MergeAllSubscriber<T> extends Subscriber<T> {
22-
private hasCompleted: boolean = false;
23-
private buffer: Observable<any>[] = [];
24-
private active: number = 0;
25-
constructor(destination: Observer<T>, private concurrent:number) {
26-
super(destination);
27-
}
28-
29-
_next(value: any) {
30-
if(this.active < this.concurrent) {
31-
const innerSub = new Subscription();
32-
this.add(innerSub);
33-
this.active++;
34-
innerSub.add(value.subscribe(new MergeAllInnerSubscriber(this.destination, this, innerSub)));
35-
} else {
36-
this.buffer.push(value);
37-
}
38-
}
39-
40-
_complete() {
41-
this.hasCompleted = true;
42-
if(this.active === 0 && this.buffer.length === 0) {
43-
this.destination.complete();
44-
}
45-
}
46-
47-
notifyComplete(innerSub: Subscription<T>) {
48-
const buffer = this.buffer;
49-
this.remove(innerSub);
50-
this.active--;
51-
if(buffer.length > 0) {
52-
this._next(buffer.shift());
53-
} else if (this.active === 0 && this.hasCompleted) {
54-
this.destination.complete();
55-
}
56-
}
57-
}
58-
59-
class MergeAllInnerSubscriber<T> extends Subscriber<T> {
60-
constructor(destination: Observer<T>, private parent: MergeAllSubscriber<T>,
61-
private innerSub: Subscription<T> ) {
62-
super(destination);
63-
}
64-
65-
_complete() {
66-
this.parent.notifyComplete(this.innerSub);
67-
}
6810
}

0 commit comments

Comments
 (0)