-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(sampleTime): add higher-order lettable version of sampleTime
- Loading branch information
Showing
3 changed files
with
97 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import { Observable } from '../Observable'; | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { IScheduler } from '../Scheduler'; | ||
import { Action } from '../scheduler/Action'; | ||
import { async } from '../scheduler/async'; | ||
import { TeardownLogic } from '../Subscription'; | ||
|
||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Emits the most recently emitted value from the source Observable within | ||
* periodic time intervals. | ||
* | ||
* <span class="informal">Samples the source Observable at periodic time | ||
* intervals, emitting what it samples.</span> | ||
* | ||
* <img src="./img/sampleTime.png" width="100%"> | ||
* | ||
* `sampleTime` periodically looks at the source Observable and emits whichever | ||
* value it has most recently emitted since the previous sampling, unless the | ||
* source has not emitted anything since the previous sampling. The sampling | ||
* happens periodically in time every `period` milliseconds (or the time unit | ||
* defined by the optional `scheduler` argument). The sampling starts as soon as | ||
* the output Observable is subscribed. | ||
* | ||
* @example <caption>Every second, emit the most recent click at most once</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var result = clicks.sampleTime(1000); | ||
* result.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link auditTime} | ||
* @see {@link debounceTime} | ||
* @see {@link delay} | ||
* @see {@link sample} | ||
* @see {@link throttleTime} | ||
* | ||
* @param {number} period The sampling period expressed in milliseconds or the | ||
* time unit determined internally by the optional `scheduler`. | ||
* @param {Scheduler} [scheduler=async] The {@link IScheduler} to use for | ||
* managing the timers that handle the sampling. | ||
* @return {Observable<T>} An Observable that emits the results of sampling the | ||
* values emitted by the source Observable at the specified time interval. | ||
* @method sampleTime | ||
* @owner Observable | ||
*/ | ||
export function sampleTime<T>(period: number, scheduler: IScheduler = async): MonoTypeOperatorFunction<T> { | ||
return (source: Observable<T>) => source.lift(new SampleTimeOperator(period, scheduler)); | ||
} | ||
|
||
class SampleTimeOperator<T> implements Operator<T, T> { | ||
constructor(private period: number, | ||
private scheduler: IScheduler) { | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
return source.subscribe(new SampleTimeSubscriber(subscriber, this.period, this.scheduler)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class SampleTimeSubscriber<T> extends Subscriber<T> { | ||
lastValue: T; | ||
hasValue: boolean = false; | ||
|
||
constructor(destination: Subscriber<T>, | ||
private period: number, | ||
private scheduler: IScheduler) { | ||
super(destination); | ||
this.add(scheduler.schedule(dispatchNotification, period, { subscriber: this, period })); | ||
} | ||
|
||
protected _next(value: T) { | ||
this.lastValue = value; | ||
this.hasValue = true; | ||
} | ||
|
||
notifyNext() { | ||
if (this.hasValue) { | ||
this.hasValue = false; | ||
this.destination.next(this.lastValue); | ||
} | ||
} | ||
} | ||
|
||
function dispatchNotification<T>(this: Action<any>, state: any) { | ||
let { subscriber, period } = state; | ||
subscriber.notifyNext(); | ||
this.schedule(state, period); | ||
} |