Skip to content

Commit

Permalink
Leading/Trailing throttle and throttleTime configuration (#2465)
Browse files Browse the repository at this point in the history
* feat(throttle): add leading/trailing configurability

Adds the ability to configure the `throttle` operator via a
configuration object in the same manner as Lodash's throttle.

Currently this defaults to the existing behavior, which is `{ leading:
true, trailing: false }`. In an upcoming major version, I think we
should change this to align with other libraries that have similar
functionality. In particular Lodash, given it's popularity.

NOTE: While working on this I discovered that the leading value is
actually being emitted *after* the durationSelector is called and
subscribed to. This is likely a bug, but I'll file a separate issue.

related #1625

* feat(throttleTime): add leading/trailing configurability

resolves #1625

* chore(throttle/throttleTime-spec): fix diagram text
  • Loading branch information
benlesh authored May 9, 2017
1 parent 5a2266a commit bb0738f
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 26 deletions.
36 changes: 36 additions & 0 deletions spec/operators/throttle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,40 @@ describe('Observable.prototype.throttle', () => {
}
);
});

describe('throttle(fn, { leading: true, trailing: true })', () => {
asDiagram('throttle(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
const expected = '-a---y----b---x-c---x-|';

const result = e1.throttle(() => e2, { leading: true, trailing: true });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});

describe('throttle(fn, { leading: false, trailing: true })', () => {
asDiagram('throttle(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
const expected = '-----y--------x-----x-|';

const result = e1.throttle(() => e2, { leading: false, trailing: true });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
});
29 changes: 29 additions & 0 deletions spec/operators/throttleTime-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;
declare const time: typeof marbleTestingSignature.time;

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;
Expand Down Expand Up @@ -141,4 +142,32 @@ describe('Observable.prototype.throttleTime', () => {
expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

describe('throttleTime(fn, { leading: true, trailing: true })', () => {
asDiagram('throttleTime(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const t = time( '----| ');
const expected = '-a---y----b---x-c---x-|';

const result = e1.throttleTime(t, rxTestScheduler, { leading: true, trailing: true });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

describe('throttleTime(fn, { leading: false, trailing: true })', () => {
asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const t = time( '----| ');
const expected = '-----y--------x-----x-|';

const result = e1.throttleTime(t, rxTestScheduler, { leading: false, trailing: true });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
});
83 changes: 64 additions & 19 deletions src/operator/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';

export interface ThrottleConfig {
leading?: boolean;
trailing?: boolean;
}

export const defaultThrottleConfig: ThrottleConfig = {
leading: true,
trailing: false
};

/**
* Emits a value from the source Observable, then ignores subsequent source
* values for a duration determined by another Observable, then repeats this
Expand Down Expand Up @@ -40,75 +50,110 @@ import { subscribeToResult } from '../util/subscribeToResult';
* @param {function(value: T): SubscribableOrPromise} durationSelector A function
* that receives a value from the source Observable, for computing the silencing
* duration for each source value, returned as an Observable or a Promise.
* @param {Object} config a configuration object to define `leading` and `trailing` behavior. Defaults
* to `{ leading: true, trailing: false }`.
* @return {Observable<T>} An Observable that performs the throttle operation to
* limit the rate of emissions from the source.
* @method throttle
* @owner Observable
*/
export function throttle<T>(this: Observable<T>, durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T> {
return this.lift(new ThrottleOperator(durationSelector));
export function throttle<T>(this: Observable<T>,
durationSelector: (value: T) => SubscribableOrPromise<number>,
config: ThrottleConfig = defaultThrottleConfig): Observable<T> {
return this.lift(new ThrottleOperator(durationSelector, config.leading, config.trailing));
}

class ThrottleOperator<T> implements Operator<T, T> {
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>) {
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>,
private leading: boolean,
private trailing: boolean) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new ThrottleSubscriber(subscriber, this.durationSelector));
return source.subscribe(
new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing)
);
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* We need this JSDoc comment for affecting ESDoc
* @ignore
* @extends {Ignored}
*/
class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
private throttled: Subscription;
private _trailingValue: T;
private _hasTrailingValue = false;

constructor(protected destination: Subscriber<T>,
private durationSelector: (value: T) => SubscribableOrPromise<number>) {
private durationSelector: (value: T) => SubscribableOrPromise<number>,
private _leading: boolean,
private _trailing: boolean) {
super(destination);
}

protected _next(value: T): void {
if (!this.throttled) {
this.tryDurationSelector(value);
if (this.throttled) {
if (this._trailing) {
this._hasTrailingValue = true;
this._trailingValue = value;
}
} else {
const duration = this.tryDurationSelector(value);
if (duration) {
this.add(this.throttled = subscribeToResult(this, duration));
}
if (this._leading) {
this.destination.next(value);
if (this._trailing) {
this._hasTrailingValue = true;
this._trailingValue = value;
}
}
}
}

private tryDurationSelector(value: T): void {
let duration: SubscribableOrPromise<number> = null;
private tryDurationSelector(value: T): SubscribableOrPromise<any> {
try {
duration = this.durationSelector(value);
return this.durationSelector(value);
} catch (err) {
this.destination.error(err);
return;
return null;
}
this.emitAndThrottle(value, duration);
}

private emitAndThrottle(value: T, duration: SubscribableOrPromise<number>) {
this.add(this.throttled = subscribeToResult(this, duration));
this.destination.next(value);
}

protected _unsubscribe() {
const throttled = this.throttled;
const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this;

this._trailingValue = null;
this._hasTrailingValue = false;

if (throttled) {
this.remove(throttled);
this.throttled = null;
throttled.unsubscribe();
}
}

private _sendTrailing() {
const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this;
if (throttled && _trailing && _hasTrailingValue) {
destination.next(_trailingValue);
this._trailingValue = null;
this._hasTrailingValue = false;
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this._sendTrailing();
this._unsubscribe();
}

notifyComplete(): void {
this._sendTrailing();
this._unsubscribe();
}
}
38 changes: 31 additions & 7 deletions src/operator/throttleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { IScheduler } from '../Scheduler';
import { Subscription, TeardownLogic } from '../Subscription';
import { async } from '../scheduler/async';
import { Observable } from '../Observable';
import { ThrottleConfig, defaultThrottleConfig } from './throttle';

/**
* Emits a value from the source Observable, then ignores subsequent source
Expand Down Expand Up @@ -44,17 +45,24 @@ import { Observable } from '../Observable';
* @method throttleTime
* @owner Observable
*/
export function throttleTime<T>(this: Observable<T>, duration: number, scheduler: IScheduler = async): Observable<T> {
return this.lift(new ThrottleTimeOperator(duration, scheduler));
export function throttleTime<T>(this: Observable<T>,
duration: number,
scheduler: IScheduler = async,
config: ThrottleConfig = defaultThrottleConfig): Observable<T> {
return this.lift(new ThrottleTimeOperator(duration, scheduler, config.leading, config.trailing));
}

class ThrottleTimeOperator<T> implements Operator<T, T> {
constructor(private duration: number,
private scheduler: IScheduler) {
private scheduler: IScheduler,
private leading: boolean,
private trailing: boolean) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler));
return source.subscribe(
new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler, this.leading, this.trailing)
);
}
}

Expand All @@ -65,23 +73,39 @@ class ThrottleTimeOperator<T> implements Operator<T, T> {
*/
class ThrottleTimeSubscriber<T> extends Subscriber<T> {
private throttled: Subscription;
private _hasTrailingValue: boolean = false;
private _trailingValue: T = null;

constructor(destination: Subscriber<T>,
private duration: number,
private scheduler: IScheduler) {
private scheduler: IScheduler,
private leading: boolean,
private trailing: boolean) {
super(destination);
}

protected _next(value: T) {
if (!this.throttled) {
if (this.throttled) {
if (this.trailing) {
this._trailingValue = value;
this._hasTrailingValue = true;
}
} else {
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this }));
this.destination.next(value);
if (this.leading) {
this.destination.next(value);
}
}
}

clearThrottle() {
const throttled = this.throttled;
if (throttled) {
if (this.trailing && this._hasTrailingValue) {
this.destination.next(this._trailingValue);
this._trailingValue = null;
this._hasTrailingValue = false;
}
throttled.unsubscribe();
this.remove(throttled);
this.throttled = null;
Expand Down

0 comments on commit bb0738f

Please sign in to comment.