Skip to content

Commit

Permalink
fix(throttleTime): fix and rename throttleTime operator
Browse files Browse the repository at this point in the history
Rename throttle to throttleTime. Fix this operator to correctly take in the first 'next' value but
ignore subsequent 'next' values for a specified time period.

Related to issue #666.
  • Loading branch information
staltz committed Nov 11, 2015
1 parent f81b566 commit 245a55b
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export interface CoreOperators<T> {
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take?: (count: number) => Observable<T>;
takeUntil?: (notifier: Observable<any>) => Observable<T>;
throttle?: (delay: number, scheduler?: Scheduler) => Observable<T>;
throttleTime?: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout?: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith?: <R>(due: number | Date, withObservable: Observable<R>, scheduler?: Scheduler) => Observable<T> | Observable<R>;
toArray?: () => Observable<T[]>;
Expand Down
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ export class Observable<T> implements CoreOperators<T> {
switchMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take: (count: number) => Observable<T>;
takeUntil: (notifier: Observable<any>) => Observable<T>;
throttle: (delay: number, scheduler?: Scheduler) => Observable<T>;
throttleTime: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith: <R>(due: number | Date, withObservable: Observable<R>, scheduler?: Scheduler) => Observable<T> | Observable<R>;
toArray: () => Observable<T[]>;
Expand Down
4 changes: 2 additions & 2 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ observableProto.take = take;
import {takeUntil} from './operators/takeUntil';
observableProto.takeUntil = takeUntil;

import {throttle} from './operators/throttle';
observableProto.throttle = throttle;
import {throttleTime} from './operators/throttleTime';
observableProto.throttleTime = throttleTime;

import {timeInterval} from './operators/extended/timeInterval';
observableProto.timeInterval = timeInterval;
Expand Down
4 changes: 2 additions & 2 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ observableProto.take = take;
import {takeUntil} from './operators/takeUntil';
observableProto.takeUntil = takeUntil;

import {throttle} from './operators/throttle';
observableProto.throttle = throttle;
import {throttleTime} from './operators/throttleTime';
observableProto.throttleTime = throttleTime;

import {timeout} from './operators/timeout';
observableProto.timeout = timeout;
Expand Down
15 changes: 6 additions & 9 deletions src/operators/throttle.ts → src/operators/throttleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';
import {nextTick} from '../schedulers/nextTick';

export function throttle<T>(delay: number, scheduler: Scheduler = nextTick) {
export function throttleTime<T>(delay: number, scheduler: Scheduler = nextTick) {
return this.lift(new ThrottleOperator(delay, scheduler));
}

Expand All @@ -28,24 +28,21 @@ class ThrottleSubscriber<T> extends Subscriber<T> {

_next(value: T) {
if (!this.throttled) {
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { value, subscriber: this }));
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { subscriber: this }));
this.destination.next(value);
}
}

throttledNext(value: T) {
this.clearThrottle();
this.destination.next(value);
}

clearThrottle() {
const throttled = this.throttled;
if (throttled) {
throttled.unsubscribe();
this.remove(throttled);
this.throttled = null;
}
}
}

function dispatchNext<T>({ value, subscriber }) {
subscriber.throttledNext(value);
function dispatchNext<T>({ subscriber }) {
subscriber.clearThrottle();
}

0 comments on commit 245a55b

Please sign in to comment.