Skip to content

Commit e82b8da

Browse files
committed
fix(fromObservable): support synchronous unsubscribe on completion
If the observable being converted to a stream will synchronously complete when subscribed to, then it would also immediately unsubscribe. The previous implementation of fromObservable was buggy in the sense that the synchronous unsubscribe would fail. This fixes fromObservable.
1 parent b3a0cf6 commit e82b8da

File tree

2 files changed

+37
-10
lines changed

2 files changed

+37
-10
lines changed

src/core.ts

+19-10
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,12 @@ export interface Listener<T> {
5656
complete: () => void;
5757
}
5858

59-
export type Observable<T> = {
60-
subscribe(listener: Listener<T>): { unsubscribe: () => void; }
59+
export interface Subscription {
60+
unsubscribe(): void;
61+
}
62+
63+
export interface Observable<T> {
64+
subscribe(listener: Listener<T>): Subscription;
6165
};
6266

6367
export type FromInput<T> = Promise<T> | Stream<T> | Array<T> | Observable<T>;
@@ -79,7 +83,7 @@ function and<T>(f1: (t: T) => boolean, f2: (t: T) => boolean): (t: T) => boolean
7983
};
8084
}
8185

82-
export class Subscription<T> {
86+
export class StreamSubscription<T> implements Subscription {
8387
constructor(private _stream: Stream<T>, private _listener: Listener<T>) {}
8488

8589
unsubscribe(): void {
@@ -89,21 +93,26 @@ export class Subscription<T> {
8993

9094
class ObservableProducer<T> implements InternalProducer<T> {
9195
public type = 'fromObservable';
92-
public ins: any;
96+
public ins: Observable<T>;
9397
public out: Stream<T>;
94-
private _subscription: { unsubscribe: () => void; };
98+
private active: boolean;
99+
private _sub: Subscription | undefined;
95100

96-
constructor(observable: any) {
101+
constructor(observable: Observable<T>) {
97102
this.ins = observable;
103+
this.active = false;
98104
}
99105

100106
_start(out: Stream<T>) {
101107
this.out = out;
102-
this._subscription = this.ins.subscribe(new ObservableListener(out));
108+
this.active = true;
109+
this._sub = this.ins.subscribe(new ObservableListener(out));
110+
if (!this.active) this._sub.unsubscribe();
103111
}
104112

105113
_stop() {
106-
this._subscription.unsubscribe();
114+
if (this._sub) this._sub.unsubscribe();
115+
this.active = false;
107116
}
108117
}
109118

@@ -1354,10 +1363,10 @@ export class Stream<T> implements InternalListener<T> {
13541363
* @param {Listener} listener
13551364
* @returns {Subscription}
13561365
*/
1357-
subscribe(listener: Listener<T>): Subscription<T> {
1366+
subscribe(listener: Listener<T>): Subscription {
13581367
this.addListener(listener);
13591368

1360-
return new Subscription<T>(this, listener);
1369+
return new StreamSubscription<T>(this, listener);
13611370
}
13621371

13631372
/**

tests/factory/fromObservable.ts

+18
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,22 @@ describe('xs.fromObservable', () => {
2323
},
2424
});
2525
});
26+
27+
it('should support synchronous unsubscribe on completion', (done: any) => {
28+
const stream = xs.fromObservable(xs.of(10, 20, 30));
29+
let expected = [10, 20, 30];
30+
31+
stream.addListener({
32+
next(x: number) {
33+
assert.strictEqual(x, expected.shift());
34+
},
35+
error(err: any) {
36+
done(err);
37+
},
38+
complete() {
39+
assert.strictEqual(expected.length, 0);
40+
done();
41+
},
42+
});
43+
});
2644
});

0 commit comments

Comments
 (0)