Skip to content

Commit

Permalink
Add static multicasters
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed May 14, 2020
1 parent ff314e6 commit 134bc87
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 128 deletions.
32 changes: 32 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,38 @@ describe('Observable.lift', () => {
});
});

// TODO: This test shows a limitation of the library as it is currently implemented
// This test should never pass, and it was probably not a great goal to have the one
// above this pass.
// See issue here: https://github.com/ReactiveX/rxjs/issues/5431
xit('should compose through more than one multicast and a refCount', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
multicast(() => new Subject<number>()),
multicast(() => new Subject<number>()),
refCount(),
map(x => 10 * x),
);

// NOTE: This was a bad goal.
expect(result instanceof MyCustomObservable).to.be.true;

const expected = [10, 20, 30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
}, () => {
done();
});
});

it('should compose through multicast with selector function', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
Expand Down
8 changes: 7 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
/* Observable */
export { Observable } from './internal/Observable';
export { ConnectableObservable } from './internal/observable/ConnectableObservable';
export {
ConnectableObservable,
multicastFrom,
publishFrom,
publishBehaviorFrom,
publishReplayFrom,
} from './internal/observable/ConnectableObservable';
export { GroupedObservable } from './internal/operators/groupBy';
export { Operator } from './internal/Operator';
export { observable } from './internal/symbol/observable';
Expand Down
225 changes: 122 additions & 103 deletions src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
import { Subject, SubjectSubscriber } from '../Subject';
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { TeardownLogic } from '../types';
import { refCount as higherOrderRefCount } from '../operators/refCount';
import { ObservableInput, TimestampProvider } from '../types';
import { refCount } from '../operators/refCount';
import { from } from './from';
import { ReplaySubject } from '../ReplaySubject';
import { BehaviorSubject } from '../BehaviorSubject';

/**
* @class ConnectableObservable<T>
* An observable that connects a single subscription to a source, through a subject,
* to multiple subscribers, when `connect()` is called on it.
*
* Subclassing is not recommended.
*/
export class ConnectableObservable<T> extends Observable<T> {

protected _subject: Subject<T> | undefined;
protected _refCount: number = 0;
protected _connection: Subscription | null | undefined;
/** @internal */
_isComplete = false;

constructor(public source: Observable<T>,
protected subjectFactory: () => Subject<T>) {
/**
* Creates a new ConnectableObservable.
* Do not use directly. Instead, use {@link multicastFrom}.
*
* @param source The source observable to subcribe to upon connection
* @param subjectFactory A factor function used to create the `Subject` that
* connects the source to all subscribers.
*/
constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
super();
}

Expand All @@ -27,6 +38,7 @@ export class ConnectableObservable<T> extends Observable<T> {
return this.getSubject().subscribe(subscriber);
}

/** @deprecated Implementation detail, do not use */
protected getSubject(): Subject<T> {
const subject = this._subject;
if (!subject || subject.isStopped) {
Expand All @@ -35,13 +47,16 @@ export class ConnectableObservable<T> extends Observable<T> {
return this._subject!;
}

/**
* Connects all current and future subscribers to this observable
* to the source observable
*/
connect(): Subscription {
let connection = this._connection;
if (!connection) {
this._isComplete = false;
connection = this._connection = new Subscription();
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
connection.add(this.source.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
if (connection.closed) {
this._connection = null;
connection = Subscription.EMPTY;
Expand All @@ -50,29 +65,23 @@ export class ConnectableObservable<T> extends Observable<T> {
return connection;
}

/**
* Returns an Observable that will count the number of active subscriptions to
* the connectable observable, and:
*
* 1. Increments the active subscriptions count for each subscription to the resulting observable
* 2. When the active subscriptions count goes from 0 to 1, will "connect" the `ConnectableObservable` automatically.
* 3. Unsubscribing from the resulting observable will decrement the active subscriptions count.
* 4. If the active subscriptions count returns to zero, the "connection" will be terminated, and the
* subscription to the source observable will be unsubscribed.
*/
refCount(): Observable<T> {
return higherOrderRefCount()(this) as Observable<T>;
return refCount<T>()(this);
}
}

export const connectableObservableDescriptor: PropertyDescriptorMap = (() => {
const connectableProto = <any>ConnectableObservable.prototype;
return {
operator: { value: null as null },
_refCount: { value: 0, writable: true },
_subject: { value: null as null, writable: true },
_connection: { value: null as null, writable: true },
_subscribe: { value: connectableProto._subscribe },
_isComplete: { value: connectableProto._isComplete, writable: true },
getSubject: { value: connectableProto.getSubject },
connect: { value: connectableProto.connect },
refCount: { value: connectableProto.refCount }
};
})();

class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
constructor(destination: Subject<T>,
private connectable: ConnectableObservable<T>) {
constructor(destination: Subject<T>, private connectable: ConnectableObservable<T>) {
super(destination);
}
protected _error(err: any): void {
Expand All @@ -99,84 +108,94 @@ class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
}
}

class RefCountOperator<T> implements Operator<T, T> {
constructor(private connectable: ConnectableObservable<T>) {
}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {

const { connectable } = this;
(<any> connectable)._refCount++;

const refCounter = new RefCountSubscriber(subscriber, connectable);
const subscription = source.subscribe(refCounter);

if (!refCounter.closed) {
(<any> refCounter).connection = connectable.connect();
}

return subscription;
}
/**
* Multicasts values from a single subscription to an observable source, through a subject.
*
* Requires "connection".
*
* This returns a {@link ConnectableObservable}, that connects a single observable subscription
* to many subscribers through a created subject.
*
* Subscribers to the returned observable will actually be subscribing to the subject provided by
* the second argument. When `connect()` is called on the returned `ConnectableObservable`, that
* subject is used to create a single subscription to the observable source provided as the `input`
* argument, and that {@link Subscription} is returned. If you `unsubscribe` from the subscription
* returned by `connect()`, all subscribers will be "disconnected" and stop recieving notifications.
*
* When the subscription to the shared source, provided via the `input` argument, is torn down,
* either by completion of the source, an error from the source, or "disconnection" via calling `unsubscribe`
* on the `Subscription` returned by `connect()`, one of two things will happen:
*
* 1. If you provided a factory function that creates a `Subject`, the subject state is "reset", and you
* may reconnect, which will call the subject factor and create a new Subject for use with the new connection.
* 2. If you provided a `Subject` directly, that subject instance will remain the subject new subscriptions
* will attempt to "connect" through, however, that `Subject` will likely be "closed", thus meaning that
* the returned `ConnectableObservable` cannot be retried or repeated.
*
* Note that multicasting in this manner is not generally recommended, but RxJS provides this functionality
* for the following generalized cases:
*
* 1. Multicasting synchronous emissions from an observable source.
* 2. Multicasting through a custom `Subject` in a "connectable" way.
* 3. Both 1 and 2.
*
* In most cases, if you want to share values from a single subscription to an observable to
* multiple subscribers, you really should be using the {@link share} operator.
*
* ### Example
*
* ```ts
* import { range, multicastFrom } from 'rxjs';
* import { finalize } from 'rxjs/operators';
*
* const source = range(0, 5).pipe(
* finalize(() => console.log('finalized'))
* );
*
* const published = multicastFrom(source, () => new Subject());
*
* published.subscribe(x => console.log('A', x));
* published.subscribe(x => console.log('B', x));
*
* // Nothing happens until you connect
* const subcription = publish.connect();
*
* // subscription.unsubscribe() will disconnect all subscribers.
* ```
* @param input The observable input to publish
* @param subjectFactoryOrSubject A Subject instance, or a function used to create the subject upon connection
*/
export function multicastFrom<T>(
input: ObservableInput<T>,
subjectFactoryOrSubject: Subject<T> | (() => Subject<T>)
): ConnectableObservable<T> {
const subjectFactory =
typeof subjectFactoryOrSubject === 'function' ? subjectFactoryOrSubject : () => subjectFactoryOrSubject;
return new ConnectableObservable<T>(from(input), subjectFactory);
}

class RefCountSubscriber<T> extends Subscriber<T> {

private connection: Subscription | null | undefined;

constructor(destination: Subscriber<T>,
private connectable: ConnectableObservable<T>) {
super(destination);
}

protected _unsubscribe() {

const { connectable } = this;
if (!connectable) {
this.connection = null;
return;
}

this.connectable = null!;
const refCount = (<any> connectable)._refCount;
if (refCount <= 0) {
this.connection = null;
return;
}

(<any> connectable)._refCount = refCount - 1;
if (refCount > 1) {
this.connection = null;
return;
}
/**
* Identical to {@link multicastFrom} called as `multicastFrom(input, new Subject())`
*/
export function publishFrom<T>(input: ObservableInput<T>) {
return multicastFrom(input, new Subject<T>());
}

///
// Compare the local RefCountSubscriber's connection Subscription to the
// connection Subscription on the shared ConnectableObservable. In cases
// where the ConnectableObservable source synchronously emits values, and
// the RefCountSubscriber's downstream Observers synchronously unsubscribe,
// execution continues to here before the RefCountOperator has a chance to
// supply the RefCountSubscriber with the shared connection Subscription.
// For example:
// ```
// range(0, 10).pipe(
// publish(),
// refCount(),
// take(5),
// ).subscribe();
// ```
// In order to account for this case, RefCountSubscriber should only dispose
// the ConnectableObservable's shared connection Subscription if the
// connection Subscription exists, *and* either:
// a. RefCountSubscriber doesn't have a reference to the shared connection
// Subscription yet, or,
// b. RefCountSubscriber's connection Subscription reference is identical
// to the shared connection Subscription
///
const { connection } = this;
const sharedConnection = (<any> connectable)._connection;
this.connection = null;
/**
* Identical to {@link multicastFrom} called as `multicastFrom(input, new ReplaySubject<T>(bufferSize, windowTime, timestampProvider);`
*/
export function publishReplayFrom<T>(
input: ObservableInput<T>,
bufferSize?: number,
windowTime?: number,
timestampProvider?: TimestampProvider
) {
return multicastFrom(input, new ReplaySubject<T>(bufferSize, windowTime, timestampProvider));
}

if (sharedConnection && (!connection || sharedConnection === connection)) {
sharedConnection.unsubscribe();
}
}
/**
* Identical to {@link multicastFrom} called as `multicastFrom(input, new BehaviorSubject<T>(initialValue);`
*/
export function publishBehaviorFrom<T>(input: ObservableInput<T>, initialValue: T) {
return multicastFrom(input, new BehaviorSubject<T>(initialValue));
}
Loading

0 comments on commit 134bc87

Please sign in to comment.