Skip to content

Commit 9e963aa

Browse files
committed
feat(auditTime): add higher-order lettable version of auditTime
1 parent e2daefe commit 9e963aa

File tree

3 files changed

+55
-59
lines changed

3 files changed

+55
-59
lines changed

src/operator/auditTime.ts

Lines changed: 3 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import { async } from '../scheduler/async';
2-
import { Operator } from '../Operator';
32
import { IScheduler } from '../Scheduler';
4-
import { Subscriber } from '../Subscriber';
53
import { Observable } from '../Observable';
6-
import { Subscription, TeardownLogic } from '../Subscription';
4+
import { auditTime as higherOrder } from '../operators';
75

86
/**
97
* Ignores source values for `duration` milliseconds, then emits the most recent
@@ -48,59 +46,5 @@ import { Subscription, TeardownLogic } from '../Subscription';
4846
* @owner Observable
4947
*/
5048
export function auditTime<T>(this: Observable<T>, duration: number, scheduler: IScheduler = async): Observable<T> {
51-
return this.lift(new AuditTimeOperator(duration, scheduler));
52-
}
53-
54-
class AuditTimeOperator<T> implements Operator<T, T> {
55-
constructor(private duration: number,
56-
private scheduler: IScheduler) {
57-
}
58-
59-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
60-
return source.subscribe(new AuditTimeSubscriber(subscriber, this.duration, this.scheduler));
61-
}
62-
}
63-
64-
/**
65-
* We need this JSDoc comment for affecting ESDoc.
66-
* @ignore
67-
* @extends {Ignored}
68-
*/
69-
class AuditTimeSubscriber<T> extends Subscriber<T> {
70-
71-
private value: T;
72-
private hasValue: boolean = false;
73-
private throttled: Subscription;
74-
75-
constructor(destination: Subscriber<T>,
76-
private duration: number,
77-
private scheduler: IScheduler) {
78-
super(destination);
79-
}
80-
81-
protected _next(value: T): void {
82-
this.value = value;
83-
this.hasValue = true;
84-
if (!this.throttled) {
85-
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
86-
}
87-
}
88-
89-
clearThrottle(): void {
90-
const { value, hasValue, throttled } = this;
91-
if (throttled) {
92-
this.remove(throttled);
93-
this.throttled = null;
94-
throttled.unsubscribe();
95-
}
96-
if (hasValue) {
97-
this.value = null;
98-
this.hasValue = false;
99-
this.destination.next(value);
100-
}
101-
}
102-
}
103-
104-
function dispatchNext<T>(subscriber: AuditTimeSubscriber<T>): void {
105-
subscriber.clearThrottle();
106-
}
49+
return higherOrder(duration, scheduler)(this);
50+
}

src/operators/auditTime.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { async } from '../scheduler/async';
2+
import { IScheduler } from '../Scheduler';
3+
import { audit } from './audit';
4+
import { timer } from '../observable/timer';
5+
import { MonoTypeOperatorFunction } from '../interfaces';
6+
7+
/**
8+
* Ignores source values for `duration` milliseconds, then emits the most recent
9+
* value from the source Observable, then repeats this process.
10+
*
11+
* <span class="informal">When it sees a source values, it ignores that plus
12+
* the next ones for `duration` milliseconds, and then it emits the most recent
13+
* value from the source.</span>
14+
*
15+
* <img src="./img/auditTime.png" width="100%">
16+
*
17+
* `auditTime` is similar to `throttleTime`, but emits the last value from the
18+
* silenced time window, instead of the first value. `auditTime` emits the most
19+
* recent value from the source Observable on the output Observable as soon as
20+
* its internal timer becomes disabled, and ignores source values while the
21+
* timer is enabled. Initially, the timer is disabled. As soon as the first
22+
* source value arrives, the timer is enabled. After `duration` milliseconds (or
23+
* the time unit determined internally by the optional `scheduler`) has passed,
24+
* the timer is disabled, then the most recent source value is emitted on the
25+
* output Observable, and this process repeats for the next source value.
26+
* Optionally takes a {@link IScheduler} for managing timers.
27+
*
28+
* @example <caption>Emit clicks at a rate of at most one click per second</caption>
29+
* var clicks = Rx.Observable.fromEvent(document, 'click');
30+
* var result = clicks.auditTime(1000);
31+
* result.subscribe(x => console.log(x));
32+
*
33+
* @see {@link audit}
34+
* @see {@link debounceTime}
35+
* @see {@link delay}
36+
* @see {@link sampleTime}
37+
* @see {@link throttleTime}
38+
*
39+
* @param {number} duration Time to wait before emitting the most recent source
40+
* value, measured in milliseconds or the time unit determined internally
41+
* by the optional `scheduler`.
42+
* @param {Scheduler} [scheduler=async] The {@link IScheduler} to use for
43+
* managing the timers that handle the rate-limiting behavior.
44+
* @return {Observable<T>} An Observable that performs rate-limiting of
45+
* emissions from the source Observable.
46+
* @method auditTime
47+
* @owner Observable
48+
*/
49+
export function auditTime<T>(duration: number, scheduler: IScheduler = async): MonoTypeOperatorFunction<T> {
50+
return audit(() => timer(duration, scheduler));
51+
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export { audit } from './audit';
2+
export { auditTime } from './auditTime';
23
export { catchError } from './catchError';
34
export { concat } from './concat';
45
export { concatAll } from './concatAll';

0 commit comments

Comments
 (0)