Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add static multicasters #5432

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,38 @@ describe('Observable.lift', () => {
});
});

// TODO: This test shows a limitation of the library as it is currently implemented
// This test should never pass, and it was probably not a great goal to have the one
// above this pass.
// See issue here: https://github.com/ReactiveX/rxjs/issues/5431
xit('should compose through more than one multicast and a refCount', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
multicast(() => new Subject<number>()),
multicast(() => new Subject<number>()),
refCount(),
map(x => 10 * x),
);

// NOTE: This was a bad goal.
expect(result instanceof MyCustomObservable).to.be.true;

const expected = [10, 20, 30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
}, () => {
done();
});
});

it('should compose through multicast with selector function', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
Expand Down
10 changes: 9 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* Observable */
export { Observable } from './internal/Observable';
export { ConnectableObservable } from './internal/observable/ConnectableObservable';
export { GroupedObservable } from './internal/operators/groupBy';
export { Operator } from './internal/Operator';
export { observable } from './internal/symbol/observable';
Expand Down Expand Up @@ -74,6 +73,15 @@ export { using } from './internal/observable/using';
export { zip } from './internal/observable/zip';
export { scheduled } from './internal/scheduled/scheduled';

/* Connectable Observable creators */
export {
ConnectableObservable,
multicastFrom,
publishFrom,
publishBehaviorFrom,
publishReplayFrom,
} from './internal/observable/ConnectableObservable';

/* Constants */
export { EMPTY } from './internal/observable/empty';
export { NEVER } from './internal/observable/never';
Expand Down
225 changes: 122 additions & 103 deletions src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
import { Subject, SubjectSubscriber } from '../Subject';
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { TeardownLogic } from '../types';
import { refCount as higherOrderRefCount } from '../operators/refCount';
import { ObservableInput, TimestampProvider } from '../types';
import { refCount } from '../operators/refCount';
import { from } from './from';
import { ReplaySubject } from '../ReplaySubject';
import { BehaviorSubject } from '../BehaviorSubject';

/**
* @class ConnectableObservable<T>
* An observable that connects a single subscription to a source, through a subject,
* to multiple subscribers, when `connect()` is called on it.
*
* Subclassing is not recommended.
*/
export class ConnectableObservable<T> extends Observable<T> {

protected _subject: Subject<T> | undefined;
protected _refCount: number = 0;
protected _connection: Subscription | null | undefined;
/** @internal */
_isComplete = false;

constructor(public source: Observable<T>,
protected subjectFactory: () => Subject<T>) {
/**
* Creates a new ConnectableObservable.
* Do not use directly. Instead, use {@link multicastFrom}.
*
* @param source The source observable to subcribe to upon connection
* @param subjectFactory A factor function used to create the `Subject` that
* connects the source to all subscribers.
*/
constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
super();
}

Expand All @@ -27,6 +38,7 @@ export class ConnectableObservable<T> extends Observable<T> {
return this.getSubject().subscribe(subscriber);
}

/** @deprecated Implementation detail, do not use */
protected getSubject(): Subject<T> {
const subject = this._subject;
if (!subject || subject.isStopped) {
Expand All @@ -35,13 +47,16 @@ export class ConnectableObservable<T> extends Observable<T> {
return this._subject!;
}

/**
* Connects all current and future subscribers to this observable
* to the source observable
*/
connect(): Subscription {
let connection = this._connection;
if (!connection) {
this._isComplete = false;
connection = this._connection = new Subscription();
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
connection.add(this.source.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
if (connection.closed) {
this._connection = null;
connection = Subscription.EMPTY;
Expand All @@ -50,29 +65,23 @@ export class ConnectableObservable<T> extends Observable<T> {
return connection;
}

/**
* Returns an Observable that will count the number of active subscriptions to
* the connectable observable, and:
*
* 1. Increments the active subscriptions count for each subscription to the resulting observable
* 2. When the active subscriptions count goes from 0 to 1, will "connect" the `ConnectableObservable` automatically.
* 3. Unsubscribing from the resulting observable will decrement the active subscriptions count.
* 4. If the active subscriptions count returns to zero, the "connection" will be terminated, and the
* subscription to the source observable will be unsubscribed.
*/
refCount(): Observable<T> {
return higherOrderRefCount()(this) as Observable<T>;
return refCount<T>()(this);
}
}

export const connectableObservableDescriptor: PropertyDescriptorMap = (() => {
const connectableProto = <any>ConnectableObservable.prototype;
return {
operator: { value: null as null },
_refCount: { value: 0, writable: true },
_subject: { value: null as null, writable: true },
_connection: { value: null as null, writable: true },
_subscribe: { value: connectableProto._subscribe },
_isComplete: { value: connectableProto._isComplete, writable: true },
getSubject: { value: connectableProto.getSubject },
connect: { value: connectableProto.connect },
refCount: { value: connectableProto.refCount }
};
})();

class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
constructor(destination: Subject<T>,
private connectable: ConnectableObservable<T>) {
constructor(destination: Subject<T>, private connectable: ConnectableObservable<T>) {
super(destination);
}
protected _error(err: any): void {
Expand All @@ -99,84 +108,94 @@ class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
}
}

class RefCountOperator<T> implements Operator<T, T> {
constructor(private connectable: ConnectableObservable<T>) {
}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {

const { connectable } = this;
(<any> connectable)._refCount++;

const refCounter = new RefCountSubscriber(subscriber, connectable);
const subscription = source.subscribe(refCounter);

if (!refCounter.closed) {
(<any> refCounter).connection = connectable.connect();
}

return subscription;
}
/**
* Multicasts values from a single subscription to an observable source, through a subject.
*
* Requires "connection".
*
* This returns a {@link ConnectableObservable}, that connects a single observable subscription
* to many subscribers through a created subject.
*
* Subscribers to the returned observable will actually be subscribing to the subject provided by
* the second argument. When `connect()` is called on the returned `ConnectableObservable`, that
* subject is used to create a single subscription to the observable source provided as the `input`
* argument, and that {@link Subscription} is returned. If you `unsubscribe` from the subscription
* returned by `connect()`, all subscribers will be "disconnected" and stop recieving notifications.
*
* When the subscription to the shared source, provided via the `input` argument, is torn down,
* either by completion of the source, an error from the source, or "disconnection" via calling `unsubscribe`
* on the `Subscription` returned by `connect()`, one of two things will happen:
*
* 1. If you provided a factory function that creates a `Subject`, the subject state is "reset", and you
* may reconnect, which will call the subject factor and create a new Subject for use with the new connection.
* 2. If you provided a `Subject` directly, that subject instance will remain the subject new subscriptions
* will attempt to "connect" through, however, that `Subject` will likely be "closed", thus meaning that
* the returned `ConnectableObservable` cannot be retried or repeated.
*
* Note that multicasting in this manner is not generally recommended, but RxJS provides this functionality
* for the following generalized cases:
*
* 1. Multicasting synchronous emissions from an observable source.
* 2. Multicasting through a custom `Subject` in a "connectable" way.
* 3. Both 1 and 2.
*
* In most cases, if you want to share values from a single subscription to an observable to
* multiple subscribers, you really should be using the {@link share} operator.
*
* ### Example
*
* ```ts
* import { range, multicastFrom } from 'rxjs';
* import { finalize } from 'rxjs/operators';
*
* const source = range(0, 5).pipe(
* finalize(() => console.log('finalized'))
* );
*
* const published = multicastFrom(source, () => new Subject());
*
* published.subscribe(x => console.log('A', x));
* published.subscribe(x => console.log('B', x));
*
* // Nothing happens until you connect
* const subcription = publish.connect();
*
* // subscription.unsubscribe() will disconnect all subscribers.
* ```
* @param input The observable input to publish
* @param subjectFactoryOrSubject A Subject instance, or a function used to create the subject upon connection
*/
export function multicastFrom<T>(
input: ObservableInput<T>,
subjectFactoryOrSubject: Subject<T> | (() => Subject<T>)
): ConnectableObservable<T> {
const subjectFactory =
typeof subjectFactoryOrSubject === 'function' ? subjectFactoryOrSubject : () => subjectFactoryOrSubject;
return new ConnectableObservable<T>(from(input), subjectFactory);
}

class RefCountSubscriber<T> extends Subscriber<T> {

private connection: Subscription | null | undefined;

constructor(destination: Subscriber<T>,
private connectable: ConnectableObservable<T>) {
super(destination);
}

protected _unsubscribe() {

const { connectable } = this;
if (!connectable) {
this.connection = null;
return;
}

this.connectable = null!;
const refCount = (<any> connectable)._refCount;
if (refCount <= 0) {
this.connection = null;
return;
}

(<any> connectable)._refCount = refCount - 1;
if (refCount > 1) {
this.connection = null;
return;
}
/**
* Identical to {@link multicastFrom} called as `multicastFrom(input, new Subject())`
*/
export function publishFrom<T>(input: ObservableInput<T>) {
return multicastFrom(input, new Subject<T>());
}

///
// Compare the local RefCountSubscriber's connection Subscription to the
// connection Subscription on the shared ConnectableObservable. In cases
// where the ConnectableObservable source synchronously emits values, and
// the RefCountSubscriber's downstream Observers synchronously unsubscribe,
// execution continues to here before the RefCountOperator has a chance to
// supply the RefCountSubscriber with the shared connection Subscription.
// For example:
// ```
// range(0, 10).pipe(
// publish(),
// refCount(),
// take(5),
// ).subscribe();
// ```
// In order to account for this case, RefCountSubscriber should only dispose
// the ConnectableObservable's shared connection Subscription if the
// connection Subscription exists, *and* either:
// a. RefCountSubscriber doesn't have a reference to the shared connection
// Subscription yet, or,
// b. RefCountSubscriber's connection Subscription reference is identical
// to the shared connection Subscription
///
const { connection } = this;
const sharedConnection = (<any> connectable)._connection;
this.connection = null;
/**
* Identical to {@link multicastFrom} called as `multicastFrom(input, new ReplaySubject<T>(bufferSize, windowTime, timestampProvider);`
*/
export function publishReplayFrom<T>(
input: ObservableInput<T>,
bufferSize?: number,
windowTime?: number,
timestampProvider?: TimestampProvider
) {
return multicastFrom(input, new ReplaySubject<T>(bufferSize, windowTime, timestampProvider));
}

if (sharedConnection && (!connection || sharedConnection === connection)) {
sharedConnection.unsubscribe();
}
}
/**
* Identical to {@link multicastFrom} called as `multicastFrom(input, new BehaviorSubject<T>(initialValue);`
*/
export function publishBehaviorFrom<T>(input: ObservableInput<T>, initialValue: T) {
return multicastFrom(input, new BehaviorSubject<T>(initialValue));
}
Loading