From 34c05fed2055183ab233ace9002ff3298abbb8ad Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Tue, 1 Dec 2015 14:14:16 -0500 Subject: [PATCH] feat(AsyncSubject): add AsyncSubject --- src/Rx.KitchenSink.ts | 2 ++ src/Rx.ts | 2 ++ src/subjects/AsyncSubject.ts | 53 ++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+) create mode 100644 src/subjects/AsyncSubject.ts diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 722e9aab08..65b50cfc08 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -128,6 +128,7 @@ import './operator/zipAll'; import {Subject} from './Subject'; import {Subscription} from './Subscription'; import {Subscriber} from './Subscriber'; +import {AsyncSubject} from './subject/AsyncSubject'; import {ReplaySubject} from './subject/ReplaySubject'; import {BehaviorSubject} from './subject/BehaviorSubject'; import {ConnectableObservable} from './observable/ConnectableObservable'; @@ -156,6 +157,7 @@ export { Observable, Subscriber, Subscription, + AsyncSubject, ReplaySubject, BehaviorSubject, ConnectableObservable, diff --git a/src/Rx.ts b/src/Rx.ts index 9aeb142570..58d03f4474 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -104,6 +104,7 @@ import './operator/zipAll'; import {Subject} from './Subject'; import {Subscription} from './Subscription'; import {Subscriber} from './Subscriber'; +import {AsyncSubject} from './subject/AsyncSubject'; import {ReplaySubject} from './subject/ReplaySubject'; import {BehaviorSubject} from './subject/BehaviorSubject'; import {ConnectableObservable} from './observable/ConnectableObservable'; @@ -129,6 +130,7 @@ export { Observable, Subscriber, Subscription, + AsyncSubject, ReplaySubject, BehaviorSubject, ConnectableObservable, diff --git a/src/subjects/AsyncSubject.ts b/src/subjects/AsyncSubject.ts new file mode 100644 index 0000000000..0911bc99ba --- /dev/null +++ b/src/subjects/AsyncSubject.ts @@ -0,0 +1,53 @@ +import {Subject} from '../Subject'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; + +export class AsyncSubject extends Subject { + _value: T = void 0; + _hasNext: boolean = false; + _isScalar: boolean = false; + + constructor () { + super(); + } + + _subscribe(subscriber: Subscriber): Subscription { + const subscription = super._subscribe(subscriber); + if (!subscription) { + return; + } else if (!subscription.isUnsubscribed && this._hasNext) { + subscriber.next(this._value); + subscriber.complete(); + } + return subscription; + } + + _next(value: T): void { + this._value = value; + this._hasNext = true; + } + + _complete(): void { + let index = -1; + const observers = this.observers; + const len = observers.length; + + // optimization -- block next, complete, and unsubscribe while dispatching + this.observers = void 0; // optimization + this.isUnsubscribed = true; + + if (this._hasNext) { + while (++index < len) { + let o = observers[index]; + o.next(this._value); + o.complete(); + } + } else { + while (++index < len) { + observers[index].complete(); + } + } + + this.isUnsubscribed = false; + } +}