From f968a791c1b48f3100e925d700e8a0ecd69cc7e5 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 27 Nov 2020 20:04:20 -0600 Subject: [PATCH] feat(connectable): Adds `connectable` creation method --- src/index.ts | 1 + src/internal/observable/connectable.ts | 47 ++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 src/internal/observable/connectable.ts diff --git a/src/index.ts b/src/index.ts index cc725f1e1c..6afbc59fb7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -51,6 +51,7 @@ export { bindCallback } from './internal/observable/bindCallback'; export { bindNodeCallback } from './internal/observable/bindNodeCallback'; export { combineLatest } from './internal/observable/combineLatest'; export { concat } from './internal/observable/concat'; +export { connectable } from './internal/observable/connectable'; export { defer } from './internal/observable/defer'; export { empty } from './internal/observable/empty'; export { forkJoin } from './internal/observable/forkJoin'; diff --git a/src/internal/observable/connectable.ts b/src/internal/observable/connectable.ts new file mode 100644 index 0000000000..d2ccdf9c11 --- /dev/null +++ b/src/internal/observable/connectable.ts @@ -0,0 +1,47 @@ +/** @prettier */ + +import { ObservableInput } from '../types'; +import { Subject } from '../Subject'; +import { Subscription } from '../Subscription'; +import { Observable } from '../Observable'; +import { defer } from './defer'; + +export type ConnectableObservableLike = Observable & { + /** + * (Idempotent) Calling this method will connect the underlying source observable to all subscribed consumers + * through an underlying {@link Subject}. + * @returns A subscription, that when unsubscribed, will "disconnect" the source from the connector subject, + * severing notifications to all consumers. + */ + connect(): Subscription; +}; + +/** + * Creates an observable that multicasts once `connect()` is called on it. + * + * @param source The observable source to make connectable. + * @param connector The subject to used to multicast the source observable to all subscribers. + * Defaults to a new {@link Subject}. + * @returns A "connectable" observable, that has a `connect()` method, that you must call to + * connect the source to all consumers through the subject provided as the connector. + */ +export function connectable(source: ObservableInput, connector: Subject = new Subject()): ConnectableObservableLike { + // The subscription representing the connection. + let connection: Subscription | null = null; + + const result: any = new Observable((subscriber) => { + return connector.subscribe(subscriber); + }); + + // Define the `connect` function. This is what users must call + // in order to "connect" the source to the subject that is + // multicasting it. + result.connect = () => { + if (!connection) { + connection = defer(() => source).subscribe(connector); + } + return connection; + }; + + return result; +}