Skip to content

Commit

Permalink
fix(ReplaySubject): observer now subscribed prior to running subscrip…
Browse files Browse the repository at this point in the history
…tion function (#2046)

Removed reliance on inherited `_subscribe` method from `Subject`. ReplaySubject has different behaviors during subscription for notifying new subscribers after a completion or error. After moving the `super._subscribe` call before the notification from saved `_events`, it was apparent that the inherited behavior from `Subject` was stopping the subscriber before it could be notified properly.

fixes #2044
  • Loading branch information
benlesh authored and jayphelps committed Oct 24, 2016
1 parent a5e9cfe commit fea08e9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
15 changes: 15 additions & 0 deletions spec/subjects/ReplaySubject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ describe('ReplaySubject', () => {
done();
});

it('should add the observer before running subscription code', () => {
const subject = new ReplaySubject<number>();
subject.next(1);
const results = [];

subject.subscribe((value) => {
results.push(value);
if (value < 3) {
subject.next(value + 1);
}
});

expect(results).to.deep.equal([1, 2, 3]);
});

it('should replay values upon subscription', (done: MochaDone) => {
const subject = new ReplaySubject();
const expects = [1, 2, 3];
Expand Down
23 changes: 21 additions & 2 deletions src/ReplaySubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { queue } from './scheduler/queue';
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription';
import { ObserveOnSubscriber } from './operator/observeOn';

import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
import { SubjectSubscription } from './SubjectSubscription';
/**
* @class ReplaySubject<T>
*/
Expand All @@ -31,6 +32,18 @@ export class ReplaySubject<T> extends Subject<T> {
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const _events = this._trimBufferThenGetEvents();
const scheduler = this.scheduler;
let subscription: Subscription;

if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.hasError) {
subscription = Subscription.EMPTY;
} else if (this.isStopped) {
subscription = Subscription.EMPTY;
} else {
this.observers.push(subscriber);
subscription = new SubjectSubscription(this, subscriber);
}

if (scheduler) {
subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler));
Expand All @@ -41,7 +54,13 @@ export class ReplaySubject<T> extends Subject<T> {
subscriber.next(_events[i].value);
}

return super._subscribe(subscriber);
if (this.hasError) {
subscriber.error(this.thrownError);
} else if (this.isStopped) {
subscriber.complete();
}

return subscription;
}

_getNow(): number {
Expand Down

0 comments on commit fea08e9

Please sign in to comment.