From 17122f9564caadc4e58f01c7841bb3158ad16fe4 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Sun, 24 Jan 2016 21:40:19 -0800 Subject: [PATCH] feat(delayWhen): add delayWhen operator --- spec/operators/delayWhen-spec.js | 211 +++++++++++++++++++++++++++++++ src/CoreOperators.ts | 1 + src/Observable.ts | 1 + src/Rx.DOM.ts | 1 + src/Rx.KitchenSink.ts | 1 + src/Rx.ts | 1 + src/add/operator/delayWhen.ts | 6 + src/operator/delayWhen.ts | 144 +++++++++++++++++++++ 8 files changed, 366 insertions(+) create mode 100644 spec/operators/delayWhen-spec.js create mode 100644 src/add/operator/delayWhen.ts create mode 100644 src/operator/delayWhen.ts diff --git a/spec/operators/delayWhen-spec.js b/spec/operators/delayWhen-spec.js new file mode 100644 index 0000000000..0f317c3738 --- /dev/null +++ b/spec/operators/delayWhen-spec.js @@ -0,0 +1,211 @@ +/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold, rxTestScheduler */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.delayWhen()', function () { + it.asDiagram('delay(durationSelector)')('should delay by duration selector', function () { + var e1 = hot('---a---b---c--|'); + var expected = '-----a------c----(b|)'; + var subs = '^ !'; + var selector = [cold( '--x--|'), + cold( '----------x-|'), + cold( '-x--|')]; + var selectorSubs = [' ^ ! ', + ' ^ !', + ' ^! ']; + + var idx = 0; + function durationSelector(x) { + return selector[idx++]; + } + + var result = e1.delayWhen(durationSelector); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector[0].subscriptions).toBe(selectorSubs[0]); + expectSubscriptions(selector[1].subscriptions).toBe(selectorSubs[1]); + expectSubscriptions(selector[2].subscriptions).toBe(selectorSubs[2]); + }); + + it('should delay by selector', function () { + var e1 = hot('--a--b--|'); + var expected = '---a--b-|'; + var subs = '^ !'; + var selector = cold( '-x--|'); + var selectorSubs = [' ^! ', + ' ^! ']; + + var result = e1.delayWhen(function (x) { return selector; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + }); + + it('should raise error if source raises error', function () { + var e1 = hot('--a--#'); + var expected = '---a-#'; + var subs = '^ !'; + var selector = cold( '-x--|'); + var selectorSubs = ' ^! '; + + var result = e1.delayWhen(function (x) { return selector; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + }); + + it('should raise error if selector raises error', function () { + var e1 = hot('--a--b--|'); + var expected = '---#'; + var subs = '^ !'; + var selector = cold( '-#'); + var selectorSubs = ' ^! '; + + var result = e1.delayWhen(function (x) { return selector; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + }); + + it('should delay by selector and completes after value emits', function () { + var e1 = hot('--a--b--|'); + var expected = '---------a--(b|)'; + var subs = '^ !'; + var selector = cold('-------x--|'); + var selectorSubs = [' ^ !', + ' ^ !']; + + var result = e1.delayWhen(function (x) { return selector; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + }); + + it('should delay by selector completes if selector does not emits', function () { + var e1 = hot('--a--b--|'); + var expected = '------a--(b|)'; + var subs = '^ !'; + var selector = cold( '----|'); + var selectorSubs = [' ^ !', + ' ^ !']; + + var result = e1.delayWhen(function (x) { return selector; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + }); + + it('should not emit if selector never emits', function () { + var e1 = hot('--a--b--|'); + var expected = '-'; + var subs = '^ '; + var selector = cold( '-'); + var selectorSubs = [' ^ ', + ' ^ ']; + + var result = e1.delayWhen(function (x) { return selector; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + }); + + it('should delay by first value from selector', function () { + var e1 = hot('--a--b--|'); + var expected = '------a--(b|)'; + var subs = '^ !'; + var selector = cold( '----x--y--|'); + var selectorSubs = [' ^ !', + ' ^ !']; + + var result = e1.delayWhen(function (x) { return selector; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + }); + + it('should delay by selector does not completes', function () { + var e1 = hot('--a--b--|'); + var expected = '------a--(b|)'; + var subs = '^ !'; + var selector = cold( '----x-----y---'); + var selectorSubs = [' ^ !', + ' ^ !']; + + var result = e1.delayWhen(function (x) { return selector; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + }); + + it('should raise error if selector throws', function () { + var e1 = hot('--a--b--|'); + var expected = '--#'; + var subs = '^ !'; + + var err = new Error('error'); + var result = e1.delayWhen(function (x) { throw err; }); + + expectObservable(result).toBe(expected, null, err); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should start subscription when subscription delay emits', function () { + var e1 = hot('-----a---b---|'); + var expected = ' -----a---b-|'; + var subs = ' ^ !'; + var selector = cold( '--x--|'); + var selectorSubs = [' ^ !', + ' ^ !']; + var subDelay = cold('--x--|'); + var subDelaySub = '^ !'; + + var result = e1.delayWhen(function (x) { return selector; }, subDelay); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + expectSubscriptions(subDelay.subscriptions).toBe(subDelaySub); + }); + + it('should start subscription when subscription delay completes without emit value', function () { + var e1 = hot('-----a---b---|'); + var expected = ' -----a---b-|'; + var subs = ' ^ !'; + var selector = cold( '--x--|'); + var selectorSubs = [' ^ !', + ' ^ !']; + var subDelay = cold('--|'); + var subDelaySub = '^ !'; + + var result = e1.delayWhen(function (x) { return selector; }, subDelay); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(selector.subscriptions).toBe(selectorSubs); + expectSubscriptions(subDelay.subscriptions).toBe(subDelaySub); + }); + + it('should raise error when subscription delay raises error', function () { + var e1 = hot('-----a---b---|'); + var expected = ' # '; + var selector = cold( '--x--|'); + var subDelay = cold('---#'); + var subDelaySub = '^ !'; + + var result = e1.delayWhen(function (x) { return selector; }, subDelay); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe([]); + expectSubscriptions(selector.subscriptions).toBe([]); + expectSubscriptions(subDelay.subscriptions).toBe(subDelaySub); + }); +}); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index aa5c92740c..0fae32bac9 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -24,6 +24,7 @@ export interface CoreOperators { debounceTime?: (dueTime: number, scheduler?: Scheduler) => Observable; defaultIfEmpty?: (defaultValue?: T | R) => Observable | Observable; delay?: (delay: number, scheduler?: Scheduler) => Observable; + delayWhen?: (delayDurationSelector: (value: T) => Observable, subscriptionDelay?: Observable) => Observable; distinctUntilChanged?: (compare?: (x: T, y: T) => boolean) => Observable; do?: (next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable; expand?: (project: (x: T, ix: number) => Observable, concurrent: number, scheduler: Scheduler) => Observable; diff --git a/src/Observable.ts b/src/Observable.ts index f17fed4349..893583aba3 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -204,6 +204,7 @@ export class Observable implements CoreOperators { debounceTime: (dueTime: number, scheduler?: Scheduler) => Observable; defaultIfEmpty: (defaultValue?: T | R) => Observable | Observable; delay: (delay: number, scheduler?: Scheduler) => Observable; + delayWhen: (delayDurationSelector: (value: T) => Observable, subscriptionDelay?: Observable) => Observable; distinctUntilChanged: (compare?: (x: T, y: T) => boolean) => Observable; do: (next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable; expand: (project: (x: T, ix: number) => Observable, concurrent: number, scheduler: Scheduler) => Observable; diff --git a/src/Rx.DOM.ts b/src/Rx.DOM.ts index 412d1fae40..864d6cefed 100644 --- a/src/Rx.DOM.ts +++ b/src/Rx.DOM.ts @@ -49,6 +49,7 @@ import './add/operator/debounce'; import './add/operator/debounceTime'; import './add/operator/defaultIfEmpty'; import './add/operator/delay'; +import './add/operator/delayWhen'; import './add/operator/distinctUntilChanged'; import './add/operator/do'; import './add/operator/expand'; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 132f3380b0..c9d10dd49c 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -65,6 +65,7 @@ import './add/operator/debounce'; import './add/operator/debounceTime'; import './add/operator/defaultIfEmpty'; import './add/operator/delay'; +import './add/operator/delayWhen'; import './add/operator/distinct'; import './add/operator/distinctKey'; import './add/operator/distinctUntilChanged'; diff --git a/src/Rx.ts b/src/Rx.ts index d8c04944f6..b051da45ff 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -48,6 +48,7 @@ import './add/operator/debounce'; import './add/operator/debounceTime'; import './add/operator/defaultIfEmpty'; import './add/operator/delay'; +import './add/operator/delayWhen'; import './add/operator/distinctUntilChanged'; import './add/operator/do'; import './add/operator/expand'; diff --git a/src/add/operator/delayWhen.ts b/src/add/operator/delayWhen.ts new file mode 100644 index 0000000000..616b4d18f1 --- /dev/null +++ b/src/add/operator/delayWhen.ts @@ -0,0 +1,6 @@ +import {Observable} from '../../Observable'; +import {delayWhen} from '../../operator/delayWhen'; + +Observable.prototype.delayWhen = delayWhen; + +export var _void: void; \ No newline at end of file diff --git a/src/operator/delayWhen.ts b/src/operator/delayWhen.ts new file mode 100644 index 0000000000..01b3615a8c --- /dev/null +++ b/src/operator/delayWhen.ts @@ -0,0 +1,144 @@ +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {Observable} from '../Observable'; +import {Subscription} from '../Subscription'; + +/** + * Returns an Observable that delays the emission of items from the source Observable + * by a subscription delay and a delay selector function for each element. + * @param {Function} selector function to retrieve a sequence indicating the delay for each given element. + * @param {Observable} sequence indicating the delay for the subscription to the source. + * @returns {Observable} an Observable that delays the emissions of the source Observable by the specified timeout or Date. + */ + +export function delayWhen(delayDurationSelector: (value: T) => Observable, + subscriptionDelay?: Observable): Observable { + if (subscriptionDelay) { + return new SubscriptionDelayObservable(this, subscriptionDelay) + .lift(new DelayWhenOperator(delayDurationSelector)); + } + return this.lift(new DelayWhenOperator(delayDurationSelector)); +} + +class DelayWhenOperator implements Operator { + constructor(private delayDurationSelector: (value: T) => Observable) { + } + + call(subscriber: Subscriber): Subscriber { + return new DelayWhenSubscriber(subscriber, this.delayDurationSelector); + } +} + +class DelayWhenSubscriber extends Subscriber { + private completed: boolean = false; + private delayNotifierSubscriptions: Array = []; + + constructor(destination: Subscriber, + private delayDurationSelector: (value: T) => Observable) { + super(destination); + } + + notifyNext(value: T, subscription: Subscription): void { + this.destination.next(value); + + const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription); + if (subscriptionIdx !== -1) { + this.delayNotifierSubscriptions.splice(subscriptionIdx, 1); + } + + this.tryComplete(); + } + + protected _next(value: T): void { + try { + const delayNotifier = this.delayDurationSelector(value); + if (delayNotifier) { + this._tryNext(delayNotifier, value); + } + } catch (err) { + this.destination.error(err); + } + } + + protected _complete(): void { + this.completed = true; + this.tryComplete(); + } + + private _tryNext(delayNotifier: Observable, value: T): void { + const notifierSubscription = new Subscription(); + notifierSubscription.add(delayNotifier.subscribe(new DelayNotifierSubscriber(this, value, notifierSubscription))); + this.add(notifierSubscription); + this.delayNotifierSubscriptions.push(notifierSubscription); + } + + private tryComplete(): void { + if (this.completed && this.delayNotifierSubscriptions.length === 0) { + this.destination.complete(); + } + } +} + +class DelayNotifierSubscriber extends Subscriber { + constructor(private parent: DelayWhenSubscriber, private value: T, private subscription: Subscription) { + super(); + } + + protected _next(unused: any) { + this.emitValue(); + } + + protected _error(err: any) { + this.parent.error(err); + } + + protected _complete() { + this.emitValue(); + } + + private emitValue(): void { + if (!this.isUnsubscribed) { + this.unsubscribe(); + this.parent.notifyNext(this.value, this.subscription); + } + } +} + +class SubscriptionDelayObservable extends Observable { + constructor(protected source: Observable, private subscriptionDelay: Observable) { + super(); + } + + protected _subscribe(subscriber: Subscriber) { + this.subscriptionDelay.subscribe(new SubscriptionDelaySubscriber(subscriber, this.source)); + } +} + +class SubscriptionDelaySubscriber extends Subscriber { + private sourceSubscribed: boolean = false; + + constructor(private parent: Subscriber, private source: Observable) { + super(); + } + + protected _next(unused: any) { + this.subscribeToSource(); + } + + protected _error(err: any) { + this.unsubscribe(); + this.parent.error(err); + } + + protected _complete() { + this.subscribeToSource(); + } + + private subscribeToSource(): void { + if (!this.sourceSubscribed) { + this.sourceSubscribed = true; + this.unsubscribe(); + this.source.subscribe(this.parent); + } + } +} \ No newline at end of file