Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(multicast,share,refCount,shareReplay): enable synchronous firehose #5834

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,15 @@ export declare class ConnectableObservable<T> extends Observable<T> {
protected _connection: Subscription | null;
protected _refCount: number;
protected _subject: Subject<T> | null;
protected _waiting: boolean;
source: Observable<T>;
protected subjectFactory: () => Subject<T>;
constructor(source: Observable<T>, subjectFactory: () => Subject<T>);
protected _subscribe(subscriber: Subscriber<T>): Subscription;
protected _teardown(): void;
connect(): Subscription;
protected getSubject(): Subject<T>;
prepare(): Subscription;
refCount(): Observable<T>;
}

Expand Down
3 changes: 1 addition & 2 deletions spec/operators/multicast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,6 @@ describe('multicast operator', () => {
});
});

// TODO: fix firehose unsubscription
// AFAICT, it's not possible for multicast observables to support ASAP
// unsubscription from synchronous firehose sources. The problem is that the
// chaining of the closed 'signal' is broken by the subject. For example,
Expand All @@ -718,7 +717,7 @@ describe('multicast operator', () => {
// That breaks the chaining of closed - i.e. even if the unsubscribe is
// called on the subject, closing it, the SafeSubscriber's closed property
// won't reflect that.
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
3 changes: 1 addition & 2 deletions spec/operators/refCount-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ describe('refCount', () => {
expect(arr[1]).to.equal('the number two');
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
3 changes: 1 addition & 2 deletions spec/operators/share-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,7 @@ describe('share operator', () => {
expectObservable(e1.pipe(share())).toBe(expected);
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
5 changes: 2 additions & 3 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ describe('shareReplay operator', () => {
expectObservable(result).toBe(expected);
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand All @@ -312,7 +311,7 @@ describe('shareReplay operator', () => {
});

synchronousObservable.pipe(
shareReplay(),
shareReplay({ refCount: true }),
take(3),
).subscribe(() => { /* noop */ });

Expand Down
47 changes: 30 additions & 17 deletions src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class ConnectableObservable<T> extends Observable<T> {
protected _subject: Subject<T> | null = null;
protected _refCount: number = 0;
protected _connection: Subscription | null = null;
protected _waiting: boolean = false;

constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
super();
Expand All @@ -37,28 +38,40 @@ export class ConnectableObservable<T> extends Observable<T> {
_connection?.unsubscribe();
}

connect(): Subscription {
/** @deprecated This is an internal implementation detail, do not use. */
prepare(): Subscription {
let connection = this._connection;
if (!connection) {
connection = this._connection = new Subscription();
this._waiting = true;
}
return connection;
}

connect(): Subscription {
let connection = this._connection;
if (!connection || this._waiting) {
if (!connection) {
connection = this._connection = new Subscription();
} else {
this._waiting = false;
}
const subject = this.getSubject();
connection.add(
this.source.subscribe(
new OperatorSubscriber(
subject as any,
undefined,
(err) => {
this._teardown();
subject.error(err);
},
() => {
this._teardown();
subject.complete();
},
() => this._teardown()
)
)
const subs = new OperatorSubscriber(
subject as any,
undefined,
(err) => {
this._teardown();
subject.error(err);
},
() => {
this._teardown();
subject.complete();
},
() => this._teardown()
);
connection.add(subs);
this.source.subscribe(subs);

if (connection.closed) {
this._connection = null;
Expand Down
6 changes: 5 additions & 1 deletion src/internal/operators/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ConnectableObservable } from '../observable/ConnectableObservable';
import { OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types';
import { hasLift, operate } from '../util/lift';
import { isFunction } from '../util/isFunction';
import { SafeSubscriber } from '../Subscriber';

/* tslint:disable:max-line-length */
export function multicast<T>(subject: Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
Expand Down Expand Up @@ -50,7 +51,10 @@ export function multicast<T, R>(
// that to the resulting subscription. The act of subscribing with `this`,
// the primary destination subscriber, will automatically add the subscription
// to the result.
selector(subject).subscribe(subscriber).add(source.subscribe(subject));
const subscription = selector(subject).subscribe(subscriber);
const subjectSubscriber = new SafeSubscriber(subject);
subscription.add(subjectSubscriber);
source.subscribe(subjectSubscriber);
});
}

Expand Down
33 changes: 4 additions & 29 deletions src/internal/operators/refCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,46 +70,21 @@ export function refCount<T>(): MonoTypeOperatorFunction<T> {
return;
}

///
// Compare the local RefCountSubscriber's connection Subscription to the
// connection Subscription on the shared ConnectableObservable. In cases
// where the ConnectableObservable source synchronously emits values, and
// the RefCountSubscriber's downstream Observers synchronously unsubscribe,
// execution continues to here before the RefCountOperator has a chance to
// supply the RefCountSubscriber with the shared connection Subscription.
// For example:
// ```
// range(0, 10).pipe(
// publish(),
// refCount(),
// take(5),
// )
// .subscribe();
// ```
// In order to account for this case, RefCountSubscriber should only dispose
// the ConnectableObservable's shared connection Subscription if the
// connection Subscription exists, *and* either:
// a. RefCountSubscriber doesn't have a reference to the shared connection
// Subscription yet, or,
// b. RefCountSubscriber's connection Subscription reference is identical
// to the shared connection Subscription
///

const sharedConnection = (source as any)._connection;
const conn = connection;
connection = null;

if (sharedConnection && (!conn || sharedConnection === conn)) {
if (sharedConnection && sharedConnection === conn) {
sharedConnection.unsubscribe();
}

subscriber.unsubscribe();
});

source.subscribe(refCounter);

if (!refCounter.closed) {
connection = (source as ConnectableObservable<T>).connect();
const connectable = source as ConnectableObservable<T>;
connection = connectable.prepare();
connectable.connect();
}
});
}
43 changes: 22 additions & 21 deletions src/internal/operators/shareReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Observable } from '../Observable';
import { ReplaySubject } from '../ReplaySubject';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { Subscriber } from '../Subscriber';
import { SafeSubscriber, Subscriber } from '../Subscriber';
import { operate } from '../util/lift';

export interface ShareReplayConfig {
Expand Down Expand Up @@ -150,45 +150,46 @@ function shareReplayOperator<T>({
}: ShareReplayConfig) {
let subject: ReplaySubject<T> | undefined;
let refCount = 0;
let subscription: Subscription | undefined;
let outerSubscriber: Subscriber<T> | undefined;

return (source: Observable<T>, subscriber: Subscriber<T>) => {
refCount++;
let innerSub: Subscription;
const innerSub = new Subscription();
subscriber.add(() => {
refCount--;
innerSub.unsubscribe();
if (useRefCount && refCount === 0 && outerSubscriber) {
outerSubscriber.unsubscribe();
outerSubscriber = undefined;
subject = undefined;
}
});

if (!subject) {
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
innerSub = subject.subscribe(subscriber);
subscription = source.subscribe({
innerSub.add(subject.subscribe(subscriber));
outerSubscriber = new SafeSubscriber<T>({
next(value) { subject!.next(value); },
error(err) {
const dest = subject;
subscription = undefined;
outerSubscriber = undefined;
subject = undefined;
dest!.error(err);
},
complete() {
subscription = undefined;
outerSubscriber = undefined;
subject!.complete();
},
});
})
source.subscribe(outerSubscriber);
// The following condition is needed because source can complete synchronously
// upon subscription. When that happens `subscription` is first set to `undefined`
// and right after is set to the "closed subscription" returned by `subscribe`
if (subscription.closed) {
subscription = undefined;
if (outerSubscriber.closed) {
outerSubscriber = undefined;
}
} else {
innerSub = subject.subscribe(subscriber);
innerSub.add(subject.subscribe(subscriber));
}

subscriber.add(() => {
refCount--;
innerSub.unsubscribe();
if (useRefCount && refCount === 0 && subscription) {
subscription.unsubscribe();
subscription = undefined;
subject = undefined;
}
});
};
}