Skip to content

Commit

Permalink
fix(ConnectableObservable): fix race conditions in ConnectableObserva…
Browse files Browse the repository at this point in the history
…ble and refCount.
  • Loading branch information
trxcllnt committed May 12, 2016
1 parent 993a2c3 commit d1412bc
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 110 deletions.
71 changes: 57 additions & 14 deletions spec/operators/refCount-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ describe('ConnectableObservable.prototype.refCount', () => {
});

it('should count references', () => {
const source = Observable.never().publish().refCount();
const connectable = Observable.never().publish();
const refCounted = connectable.refCount();

const sub1 = source.subscribe({ next: function () { //noop
const sub1 = refCounted.subscribe({ next: function () { //noop
} });
const sub2 = source.subscribe({ next: function () { //noop
const sub2 = refCounted.subscribe({ next: function () { //noop
} });
const sub3 = source.subscribe({ next: function () { //noop
const sub3 = refCounted.subscribe({ next: function () { //noop
} });

expect((<any>source).refCount).to.equal(3);
expect((<any>connectable)._refCount).to.equal(3);

sub1.unsubscribe();
sub2.unsubscribe();
Expand All @@ -37,30 +38,72 @@ describe('ConnectableObservable.prototype.refCount', () => {

it('should unsub from the source when all other subscriptions are unsubbed', (done: MochaDone) => {
let unsubscribeCalled = false;
const source = new Observable((observer: Rx.Observer<boolean>) => {
const connectable = new Observable((observer: Rx.Observer<boolean>) => {
observer.next(true);

return () => {
unsubscribeCalled = true;
};
}).publish().refCount();
}).publish();
const refCounted = connectable.refCount();

const sub1 = source.subscribe(() => {
const sub1 = refCounted.subscribe(() => {
//noop
});
const sub2 = source.subscribe(() => {
const sub2 = refCounted.subscribe(() => {
//noop
});
const sub3 = source.subscribe((x: any) => {
expect((<any>source).refCount).to.equal(1);
const sub3 = refCounted.subscribe((x: any) => {
expect((<any>connectable)._refCount).to.equal(1);
});

sub1.unsubscribe();
sub2.unsubscribe();
sub3.unsubscribe();

expect((<any>source).refCount).to.equal(0);
expect((<any>connectable)._refCount).to.equal(0);
expect(unsubscribeCalled).to.be.true;
done();
});
});

it('should not unsubscribe when a subscriber synchronously unsubscribes if ' +
'other subscribers are present', () => {
let unsubscribeCalled = false;
const connectable = new Observable((observer: Rx.Observer<boolean>) => {
observer.next(true);
return () => {
unsubscribeCalled = true;
};
}).publishReplay(1);

const refCounted = connectable.refCount();

refCounted.subscribe();
refCounted.subscribe().unsubscribe();

expect((<any>connectable)._refCount).to.equal(1);
expect(unsubscribeCalled).to.be.false;
});

it('should not unsubscribe when a subscriber synchronously unsubscribes if ' +
'other subscribers are present and the source is a Subject', () => {

const arr = [];
const subject = new Rx.Subject();
const connectable = subject.publishReplay(1);
const refCounted = connectable.refCount();

refCounted.subscribe((val) => {
arr.push(val);
});

subject.next('the number one');

refCounted.first().subscribe().unsubscribe();

subject.next('the number two');

expect((<any>connectable)._refCount).to.equal(1);
expect(arr[0]).to.equal('the number one');
expect(arr[1]).to.equal('the number two');
});
});
168 changes: 72 additions & 96 deletions src/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import {Subject} from '../Subject';
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
Expand All @@ -8,8 +10,9 @@ import {Subscription} from '../Subscription';
*/
export class ConnectableObservable<T> extends Observable<T> {

protected subject: Subject<T>;
protected subscription: Subscription;
protected _subject: Subject<T>;
protected _refCount: number = 0;
protected _connection: Subscription;

constructor(protected source: Observable<T>,
protected subjectFactory: () => Subject<T>) {
Expand All @@ -20,133 +23,106 @@ export class ConnectableObservable<T> extends Observable<T> {
return this.getSubject().subscribe(subscriber);
}

protected getSubject() {
const subject = this.subject;
if (subject && !subject.isUnsubscribed) {
return subject;
}
return (this.subject = this.subjectFactory());
protected getSubject(): Subject<T> {
return this._subject || (this._subject = this.subjectFactory());
}

connect(): Subscription {
const source = this.source;
let subscription = this.subscription;
if (subscription && !subscription.isUnsubscribed) {
return subscription;
let connection = this._connection;
if (!connection) {
connection = this.source.subscribe(new ConnectableSubscriber(this.getSubject(), this));
if (connection.isUnsubscribed) {
this._connection = null;
connection = Subscription.EMPTY;
} else {
this._connection = connection;
}
}
subscription = source.subscribe(this.getSubject());
subscription.add(new ConnectableSubscription(this));
return (this.subscription = subscription);
return connection;
}

refCount(): Observable<T> {
return new RefCountObservable(this);
}

/**
* This method is opened for `ConnectableSubscription`.
* Not to call from others.
*/
_closeSubscription(): void {
this.subject = null;
this.subscription = null;
return this.lift(new RefCountOperator<T>(this));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class ConnectableSubscription extends Subscription {
constructor(protected connectable: ConnectableObservable<any>) {
super();
class ConnectableSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<T>,
private connectable: ConnectableObservable<T>) {
super(destination);
}
protected _error(err: any): void {
this._unsubscribe();
super._error(err);
}
protected _complete(): void {
this._unsubscribe();
super._complete();
}

protected _unsubscribe() {
const connectable = this.connectable;
connectable._closeSubscription();
this.connectable = null;
const { connectable } = this;
if (connectable) {
this.connectable = null;
(<any> connectable)._refCount = 0;
(<any> connectable)._subject = null;
(<any> connectable)._connection = null;
}
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class RefCountObservable<T> extends Observable<T> {
connection: Subscription;

constructor(protected connectable: ConnectableObservable<T>,
public refCount: number = 0) {
super();
class RefCountOperator<T> implements Operator<T, T> {
constructor(private connectable: ConnectableObservable<T>) {
}
call(subscriber: Subscriber<T>, source: any): any {

protected _subscribe(subscriber: Subscriber<T>) {
const connectable = this.connectable;
const refCountSubscriber: RefCountSubscriber<T> = new RefCountSubscriber(subscriber, this);
const subscription = connectable.subscribe(refCountSubscriber);
if (!subscription.isUnsubscribed && ++this.refCount === 1) {
refCountSubscriber.connection = this.connection = connectable.connect();
const { connectable } = this;
(<any> connectable)._refCount++;

const refCounter = new RefCountSubscriber(subscriber, connectable);
const subscription = source._subscribe(refCounter);

if (!refCounter.isUnsubscribed) {
(<any> refCounter).connection = connectable.connect();
}

return subscription;
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class RefCountSubscriber<T> extends Subscriber<T> {
connection: Subscription;

constructor(public destination: Subscriber<T>,
private refCountObservable: RefCountObservable<T>) {
super(null);
this.connection = refCountObservable.connection;
destination.add(this);
}
private connection: Subscription;

protected _next(value: T) {
this.destination.next(value);
constructor(destination: Subscriber<T>,
private connectable: ConnectableObservable<T>) {
super(destination);
}

protected _error(err: any) {
this._resetConnectable();
this.destination.error(err);
}
protected _unsubscribe() {

protected _complete() {
this._resetConnectable();
this.destination.complete();
}
const { connectable } = this;
if (!connectable) {
this.connection = null;
return;
}

private _resetConnectable() {
const observable = this.refCountObservable;
const obsConnection = observable.connection;
const subConnection = this.connection;
if (subConnection && subConnection === obsConnection) {
observable.refCount = 0;
obsConnection.unsubscribe();
observable.connection = null;
this.unsubscribe();
this.connectable = null;
const refCount = (<any> connectable)._refCount;
if (refCount <= 0) {
this.connection = null;
return;
}
}

protected _unsubscribe() {
const observable = this.refCountObservable;
if (observable.refCount === 0) {
(<any> connectable)._refCount = refCount - 1;
if (refCount > 1) {
this.connection = null;
return;
}
if (--observable.refCount === 0) {
const obsConnection = observable.connection;
const subConnection = this.connection;
if (subConnection && subConnection === obsConnection) {
obsConnection.unsubscribe();
observable.connection = null;
}

const { connection } = this;
if (connection) {
this.connection = null;
connection.unsubscribe();
}
}
}

0 comments on commit d1412bc

Please sign in to comment.