Skip to content

Commit

Permalink
fix(throttleTime): fix and rename throttleTime operator
Browse files Browse the repository at this point in the history
Rename throttle to throttleTime. Fix this operator to correctly
take in the first 'next' value but
ignore subsequent 'next' values for a specified time period.

Related to issue #666.
  • Loading branch information
staltz committed Nov 11, 2015
1 parent f81b566 commit 5bcd14c
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export interface CoreOperators<T> {
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take?: (count: number) => Observable<T>;
takeUntil?: (notifier: Observable<any>) => Observable<T>;
throttle?: (delay: number, scheduler?: Scheduler) => Observable<T>;
throttleTime?: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout?: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith?: <R>(due: number | Date, withObservable: Observable<R>, scheduler?: Scheduler) => Observable<T> | Observable<R>;
toArray?: () => Observable<T[]>;
Expand Down
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ export class Observable<T> implements CoreOperators<T> {
switchMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take: (count: number) => Observable<T>;
takeUntil: (notifier: Observable<any>) => Observable<T>;
throttle: (delay: number, scheduler?: Scheduler) => Observable<T>;
throttleTime: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith: <R>(due: number | Date, withObservable: Observable<R>, scheduler?: Scheduler) => Observable<T> | Observable<R>;
toArray: () => Observable<T[]>;
Expand Down
4 changes: 2 additions & 2 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ observableProto.take = take;
import {takeUntil} from './operators/takeUntil';
observableProto.takeUntil = takeUntil;

import {throttle} from './operators/throttle';
observableProto.throttle = throttle;
import {throttleTime} from './operators/throttleTime';
observableProto.throttleTime = throttleTime;

import {timeInterval} from './operators/extended/timeInterval';
observableProto.timeInterval = timeInterval;
Expand Down
4 changes: 2 additions & 2 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ observableProto.take = take;
import {takeUntil} from './operators/takeUntil';
observableProto.takeUntil = takeUntil;

import {throttle} from './operators/throttle';
observableProto.throttle = throttle;
import {throttleTime} from './operators/throttleTime';
observableProto.throttleTime = throttleTime;

import {timeout} from './operators/timeout';
observableProto.timeout = timeout;
Expand Down
23 changes: 10 additions & 13 deletions src/operators/throttle.ts → src/operators/throttleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';
import {nextTick} from '../schedulers/nextTick';

export function throttle<T>(delay: number, scheduler: Scheduler = nextTick) {
return this.lift(new ThrottleOperator(delay, scheduler));
export function throttleTime<T>(delay: number, scheduler: Scheduler = nextTick) {
return this.lift(new ThrottleTimeOperator(delay, scheduler));
}

class ThrottleOperator<T, R> implements Operator<T, R> {
class ThrottleTimeOperator<T, R> implements Operator<T, R> {
constructor(private delay: number, private scheduler: Scheduler) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new ThrottleSubscriber(subscriber, this.delay, this.scheduler);
return new ThrottleTimeSubscriber(subscriber, this.delay, this.scheduler);
}
}

class ThrottleSubscriber<T> extends Subscriber<T> {
class ThrottleTimeSubscriber<T> extends Subscriber<T> {
private throttled: Subscription<any>;

constructor(destination: Subscriber<T>,
Expand All @@ -28,24 +28,21 @@ class ThrottleSubscriber<T> extends Subscriber<T> {

_next(value: T) {
if (!this.throttled) {
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { value, subscriber: this }));
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { subscriber: this }));
this.destination.next(value);
}
}

throttledNext(value: T) {
this.clearThrottle();
this.destination.next(value);
}

clearThrottle() {
const throttled = this.throttled;
if (throttled) {
throttled.unsubscribe();
this.remove(throttled);
this.throttled = null;
}
}
}

function dispatchNext<T>({ value, subscriber }) {
subscriber.throttledNext(value);
function dispatchNext<T>({ subscriber }) {
subscriber.clearThrottle();
}

0 comments on commit 5bcd14c

Please sign in to comment.