diff --git a/spec/subjects/ReplaySubject-spec.ts b/spec/subjects/ReplaySubject-spec.ts index 0b5a8c7f11..a415f98994 100644 --- a/spec/subjects/ReplaySubject-spec.ts +++ b/spec/subjects/ReplaySubject-spec.ts @@ -16,6 +16,21 @@ describe('ReplaySubject', () => { done(); }); + it('should add the observer before running subscription code', () => { + const subject = new ReplaySubject(); + subject.next(1); + const results = []; + + subject.subscribe((value) => { + results.push(value); + if (value < 3) { + subject.next(value + 1); + } + }); + + expect(results).to.deep.equal([1, 2, 3]); + }); + it('should replay values upon subscription', (done: MochaDone) => { const subject = new ReplaySubject(); const expects = [1, 2, 3]; diff --git a/src/ReplaySubject.ts b/src/ReplaySubject.ts index 72bd7691d5..0ab64ee603 100644 --- a/src/ReplaySubject.ts +++ b/src/ReplaySubject.ts @@ -4,7 +4,8 @@ import { queue } from './scheduler/queue'; import { Subscriber } from './Subscriber'; import { Subscription } from './Subscription'; import { ObserveOnSubscriber } from './operator/observeOn'; - +import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError'; +import { SubjectSubscription } from './SubjectSubscription'; /** * @class ReplaySubject */ @@ -31,6 +32,18 @@ export class ReplaySubject extends Subject { protected _subscribe(subscriber: Subscriber): Subscription { const _events = this._trimBufferThenGetEvents(); const scheduler = this.scheduler; + let subscription: Subscription; + + if (this.closed) { + throw new ObjectUnsubscribedError(); + } else if (this.hasError) { + subscription = Subscription.EMPTY; + } else if (this.isStopped) { + subscription = Subscription.EMPTY; + } else { + this.observers.push(subscriber); + subscription = new SubjectSubscription(this, subscriber); + } if (scheduler) { subscriber.add(subscriber = new ObserveOnSubscriber(subscriber, scheduler)); @@ -41,7 +54,13 @@ export class ReplaySubject extends Subject { subscriber.next(_events[i].value); } - return super._subscribe(subscriber); + if (this.hasError) { + subscriber.error(this.thrownError); + } else if (this.isStopped) { + subscriber.complete(); + } + + return subscription; } _getNow(): number {