diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index d218dd6a78..f0df749a51 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -283,6 +283,48 @@ describe('Observable.prototype.publishReplay', () => { done(); }); + it('should emit replayed values and resubscribe to the source when ' + + 'reconnected without source completion', () => { + const results1 = []; + const results2 = []; + let subscriptions = 0; + + const source = new Observable((observer: Rx.Observer) => { + subscriptions++; + observer.next(1); + observer.next(2); + observer.next(3); + observer.next(4); + // observer.complete(); + }); + + const connectable = source.publishReplay(2); + const subscription1 = connectable.subscribe((x: number) => { + results1.push(x); + }); + + expect(results1).to.deep.equal([]); + expect(results2).to.deep.equal([]); + + connectable.connect().unsubscribe(); + subscription1.unsubscribe(); + + expect(results1).to.deep.equal([1, 2, 3, 4]); + expect(results2).to.deep.equal([]); + expect(subscriptions).to.equal(1); + + const subscription2 = connectable.subscribe((x: number) => { + results2.push(x); + }); + + connectable.connect().unsubscribe(); + subscription2.unsubscribe(); + + expect(results1).to.deep.equal([1, 2, 3, 4]); + expect(results2).to.deep.equal([3, 4, 1, 2, 3, 4]); + expect(subscriptions).to.equal(2); + }); + it('should emit replayed values plus completed when subscribed after completed', (done: MochaDone) => { const results1 = []; const results2 = []; @@ -358,4 +400,4 @@ describe('Observable.prototype.publishReplay', () => { published.connect(); }); -}); \ No newline at end of file +}); diff --git a/src/observable/ConnectableObservable.ts b/src/observable/ConnectableObservable.ts index 6ddb9f7f23..13b3cced54 100644 --- a/src/observable/ConnectableObservable.ts +++ b/src/observable/ConnectableObservable.ts @@ -33,7 +33,9 @@ export class ConnectableObservable extends Observable { connect(): Subscription { let connection = this._connection; if (!connection) { - connection = this.source.subscribe(new ConnectableSubscriber(this.getSubject(), this)); + connection = this._connection = new Subscription(); + connection.add(this.source + .subscribe(new ConnectableSubscriber(this.getSubject(), this))); if (connection.isUnsubscribed) { this._connection = null; connection = Subscription.EMPTY; @@ -66,9 +68,13 @@ class ConnectableSubscriber extends SubjectSubscriber { const { connectable } = this; if (connectable) { this.connectable = null; + const connection = ( connectable)._connection; ( connectable)._refCount = 0; ( connectable)._subject = null; ( connectable)._connection = null; + if (connection) { + connection.unsubscribe(); + } } } } @@ -122,10 +128,35 @@ class RefCountSubscriber extends Subscriber { return; } + /// + // Compare the local RefCountSubscriber's connection Subscription to the + // connection Subscription on the shared ConnectableObservable. In cases + // where the ConnectableObservable source synchronously emits values, and + // the RefCountSubscriber's dowstream Observers synchronously unsubscribe, + // execution continues to here before the RefCountOperator has a chance to + // supply the RefCountSubscriber with the shared connection Subscription. + // For example: + // ``` + // Observable.range(0, 10) + // .publish() + // .refCount() + // .take(5) + // .subscribe(); + // ``` + // In order to account for this case, RefCountSubscriber should only dispose + // the ConnectableObservable's shared connection Subscription if the + // connection Subscription exists, *and* either: + // a. RefCountSubscriber doesn't have a reference to the shared connection + // Subscription yet, or, + // b. RefCountSubscriber's connection Subscription reference is identical + // to the shared connection Subscription + /// const { connection } = this; - if (connection) { - this.connection = null; - connection.unsubscribe(); + const sharedConnection = ( connectable)._connection; + this.connection = null; + + if (sharedConnection && (!connection || sharedConnection === connection)) { + sharedConnection.unsubscribe(); } } }