diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 4ba5c9268e..ca9554bebb 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -77,7 +77,7 @@ export declare const config: { useDeprecatedNextContext: boolean; }; -export declare function connectable(source: ObservableInput, connector?: Subject): ConnectableObservableLike; +export declare function connectable(source: ObservableInput, config?: ConnectableConfig): ConnectableObservableLike; export declare class ConnectableObservable extends Observable { protected _connection: Subscription | null; diff --git a/spec/observables/connectable-spec.ts b/spec/observables/connectable-spec.ts new file mode 100644 index 0000000000..d0f25af648 --- /dev/null +++ b/spec/observables/connectable-spec.ts @@ -0,0 +1,79 @@ +/** @prettier */ +import { expect } from 'chai'; +import { connectable, of, ReplaySubject } from 'rxjs'; +import { TestScheduler } from 'rxjs/testing'; +import { observableMatcher } from '../helpers/observableMatcher'; + +describe('connectable', () => { + let testScheduler: TestScheduler; + + beforeEach(() => { + testScheduler = new TestScheduler(observableMatcher); + }); + + it('should mirror a simple source Observable', () => { + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('--1-2---3-4--5-|'); + const sourceSubs = ' ^--------------!'; + const expected = ' --1-2---3-4--5-|'; + + const obs = connectable(source); + + expectObservable(obs).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + obs.connect(); + }); + }); + + it('should do nothing if connect is not called, despite subscriptions', () => { + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('--1-2---3-4--5-|'); + const sourceSubs: string[] = []; + const expected = ' -'; + + const obs = connectable(source); + + expectObservable(obs).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); + + it('should support resetOnDisconnect = true', () => { + const values: number[] = []; + const source = of(1, 2, 3); + const obs = connectable(source, { + connector: () => new ReplaySubject(1), + resetOnDisconnect: true, + }); + + obs.subscribe((value) => values.push(value)); + const connection = obs.connect(); + expect(values).to.deep.equal([1, 2, 3]); + + connection.unsubscribe(); + + obs.subscribe((value) => values.push(value)); + obs.connect(); + expect(values).to.deep.equal([1, 2, 3, 1, 2, 3]); + }); + + it('should support resetOnDisconnect = false', () => { + const values: number[] = []; + const source = of(1, 2, 3); + const obs = connectable(source, { + connector: () => new ReplaySubject(1), + resetOnDisconnect: false, + }); + + obs.subscribe((value) => values.push(value)); + const connection = obs.connect(); + expect(values).to.deep.equal([1, 2, 3]); + + connection.unsubscribe(); + + obs.subscribe((value) => values.push(value)); + obs.connect(); + expect(values).to.deep.equal([1, 2, 3, 3]); + }); +}); diff --git a/src/internal/observable/connectable.ts b/src/internal/observable/connectable.ts index 48ba9f0eab..635dcc72b8 100644 --- a/src/internal/observable/connectable.ts +++ b/src/internal/observable/connectable.ts @@ -1,4 +1,4 @@ -import { ObservableInput } from '../types'; +import { ObservableInput, SubjectLike } from '../types'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; import { Observable } from '../Observable'; @@ -18,29 +18,58 @@ export interface ConnectableObservableLike extends Observable { connect(): Subscription; } +export interface ConnectableConfig { + /** + * A factory function used to create the Subject through which the source + * is multicast. By default this creates a {@link Subject}. + */ + connector: () => SubjectLike; + /** + * If true, the resulting observable will reset internal state upon disconnetion + * and return to a "cold" state. This allows the resulting observable to be + * reconnected. + * If false, upon disconnection, the connecting subject will remain the + * connecting subject, meaning the resulting observable will not go "cold" again, + * and subsequent repeats or resubscriptions will resubscribe to that same subject. + */ + resetOnDisconnect?: boolean; +} + +/** + * The default configuration for `connectable`. + */ +const DEFAULT_CONFIG: ConnectableConfig = { + connector: () => new Subject(), + resetOnDisconnect: true, +}; + /** * Creates an observable that multicasts once `connect()` is called on it. * * @param source The observable source to make connectable. - * @param connector The subject to used to multicast the source observable to all subscribers. - * Defaults to a new {@link Subject}. + * @param config The configuration object for `connectable`. * @returns A "connectable" observable, that has a `connect()` method, that you must call to * connect the source to all consumers through the subject provided as the connector. */ -export function connectable(source: ObservableInput, connector: Subject = new Subject()): ConnectableObservableLike { +export function connectable(source: ObservableInput, config: ConnectableConfig = DEFAULT_CONFIG): ConnectableObservableLike { // The subscription representing the connection. let connection: Subscription | null = null; + const { connector, resetOnDisconnect = true } = config; + let subject = connector(); const result: any = new Observable((subscriber) => { - return connector.subscribe(subscriber); + return subject.subscribe(subscriber); }); // Define the `connect` function. This is what users must call // in order to "connect" the source to the subject that is // multicasting it. result.connect = () => { - if (!connection) { - connection = defer(() => source).subscribe(connector); + if (!connection || connection.closed) { + connection = defer(() => source).subscribe(subject); + if (resetOnDisconnect) { + connection.add(() => (subject = connector())); + } } return connection; }; diff --git a/src/internal/operators/connect.ts b/src/internal/operators/connect.ts index edbb86b479..3c666d6932 100644 --- a/src/internal/operators/connect.ts +++ b/src/internal/operators/connect.ts @@ -92,7 +92,7 @@ const DEFAULT_CONFIG: ConnectConfig = { * and Observable, that when subscribed to, will utilize the multicast observable. * After this function is executed -- and its return value subscribed to -- the * the operator will subscribe to the source, and the connection will be made. - * @param param0 The configuration object for `connect`. + * @param config The configuration object for `connect`. */ export function connect>( selector: (shared: Observable) => O,