Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operators: Throttle and Debounce #368

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
2ea065c
Created and wired an implementation for the throttle operation on Obs…
michaeldejong May 5, 2013
622d861
Ensure static star imports are used for test cases.
michaeldejong May 5, 2013
77ac15b
No longer using Notification<T> for scheduling throttled events.
michaeldejong May 5, 2013
02ee6fa
Cleaned up imports and removed unnecessary final keywords in the Oper…
michaeldejong May 5, 2013
2519ef8
Fixed a typo the UnitTest class of OperationThrottle.
michaeldejong May 5, 2013
87e766d
Merge branch 'operation-throttle' of git://github.com/michaeldejong/R…
benjchristensen Sep 10, 2013
4c0c4db
Operator: throttleWithTimeout
benjchristensen Sep 10, 2013
2e625a6
Operator: throttleFirst
benjchristensen Sep 10, 2013
b75b37c
Update javadoc for throttleLast
benjchristensen Sep 10, 2013
2a3ade2
Update javadoc for throttleWithTimeout
benjchristensen Sep 10, 2013
1c47b0c
Merge branch 'operation-throttle' of git://github.com/michaeldejong/R…
benjchristensen Sep 10, 2013
c95b810
Merge branch 'throttleWithTimeout' into throttle
benjchristensen Sep 10, 2013
57f9aa4
Merge branch 'throttleLast' into throttle
benjchristensen Sep 10, 2013
d0e50fe
Merge branch 'throttleFirst' into throttle
benjchristensen Sep 10, 2013
78cecdd
Operators: throttleWithTimeout, throttleFirst, throttleLast
benjchristensen Sep 10, 2013
5fabd58
Use 'debounce' as proper name for ThrottleWithTimeout which unfortuna…
benjchristensen Sep 10, 2013
5e7edd2
Javadoc with images
benjchristensen Sep 11, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 179 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/
package rx;

import static rx.util.functions.Functions.not;
import static rx.util.functions.Functions.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -64,6 +63,8 @@
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationDebounce;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1809,6 +1810,182 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
return create(OperationInterval.interval(interval, unit, scheduler));
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #throttleWithTimeout};
*/
public Observable<T> debounce(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #throttleWithTimeout};
*/
public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler has no effect here...

}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #debounce}
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #debounce}
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
}

/**
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
* <p>
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
*
* @param skipDuration
* Time to wait before sending another value after emitting last value.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler does not appear in parameter list of this method...

* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
}

/**
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
* <p>
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
*
* @param skipDuration
* Time to wait before sending another value after emitting last value.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler));
}

/**
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
* <p>
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
*
* @param intervalDuration
* Duration of windows within with the last value will be chosen.
* @param unit
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit)}
*/
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
return sample(intervalDuration, unit);
}

/**
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
* <p>
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
*
* @param intervalDuration
* Duration of windows within with the last value will be chosen.
* @param unit
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit, Scheduler)}
*/
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
return sample(intervalDuration, unit, scheduler);
}

/**
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
* <p>
Expand Down
27 changes: 22 additions & 5 deletions rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;

public class TestScheduler extends Scheduler {
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());

private static class TimedAction<T> {

private final long time;
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
private final T state;
private final TestScheduler scheduler;
private final AtomicBoolean isCancelled = new AtomicBoolean(false);

private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
this.time = time;
Expand All @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
this.scheduler = scheduler;
}

public void cancel() {
isCancelled.set(true);
}

@Override
public String toString() {
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
Expand Down Expand Up @@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
}
time = current.time;
queue.remove();
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);

// Only execute if the TimedAction has not yet been cancelled
if (!current.isCancelled.get()) {
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
}
}
time = targetTimeInNanos;
}
Expand All @@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
return Subscriptions.empty();
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
queue.add(timedAction);

return new Subscription() {
@Override
public void unsubscribe() {
timedAction.cancel();
}
};
}
}
Loading