Skip to content

Commit

Permalink
feat(connectable): Adds connectable creation method
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Dec 14, 2020
1 parent 9d53af0 commit f968a79
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export { bindCallback } from './internal/observable/bindCallback';
export { bindNodeCallback } from './internal/observable/bindNodeCallback';
export { combineLatest } from './internal/observable/combineLatest';
export { concat } from './internal/observable/concat';
export { connectable } from './internal/observable/connectable';
export { defer } from './internal/observable/defer';
export { empty } from './internal/observable/empty';
export { forkJoin } from './internal/observable/forkJoin';
Expand Down
47 changes: 47 additions & 0 deletions src/internal/observable/connectable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/** @prettier */

import { ObservableInput } from '../types';
import { Subject } from '../Subject';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
import { defer } from './defer';

export type ConnectableObservableLike<T> = Observable<T> & {
/**
* (Idempotent) Calling this method will connect the underlying source observable to all subscribed consumers
* through an underlying {@link Subject}.
* @returns A subscription, that when unsubscribed, will "disconnect" the source from the connector subject,
* severing notifications to all consumers.
*/
connect(): Subscription;
};

/**
* Creates an observable that multicasts once `connect()` is called on it.
*
* @param source The observable source to make connectable.
* @param connector The subject to used to multicast the source observable to all subscribers.
* Defaults to a new {@link Subject}.
* @returns A "connectable" observable, that has a `connect()` method, that you must call to
* connect the source to all consumers through the subject provided as the connector.
*/
export function connectable<T>(source: ObservableInput<T>, connector: Subject<T> = new Subject<T>()): ConnectableObservableLike<T> {
// The subscription representing the connection.
let connection: Subscription | null = null;

const result: any = new Observable<T>((subscriber) => {
return connector.subscribe(subscriber);
});

// Define the `connect` function. This is what users must call
// in order to "connect" the source to the subject that is
// multicasting it.
result.connect = () => {
if (!connection) {
connection = defer(() => source).subscribe(connector);
}
return connection;
};

return result;
}

0 comments on commit f968a79

Please sign in to comment.