Skip to content

Commit 4e79c22

Browse files
Add onDropped callback for throttleFirst - addresses #7481
1 parent 159ae3b commit 4e79c22

File tree

7 files changed

+203
-17
lines changed

7 files changed

+203
-17
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+42-1
Original file line numberDiff line numberDiff line change
@@ -17096,7 +17096,48 @@ public final Flowable<T> throttleFirst(long windowDuration, @NonNull TimeUnit un
1709617096
public final Flowable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
1709717097
Objects.requireNonNull(unit, "unit is null");
1709817098
Objects.requireNonNull(scheduler, "scheduler is null");
17099-
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler));
17099+
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null));
17100+
}
17101+
17102+
/**
17103+
* Returns a {@code Flowable} that emits only the first item emitted by the current {@code Flowable} during sequential
17104+
* time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}.
17105+
* <p>
17106+
* This differs from {@link #throttleLast} in that this only tracks the passage of time whereas
17107+
* {@link #throttleLast} ticks at scheduled intervals.
17108+
* <p>
17109+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.s.v3.png" alt="">
17110+
* <dl>
17111+
* <dt><b>Backpressure:</b></dt>
17112+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
17113+
* <dt><b>Scheduler:</b></dt>
17114+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
17115+
* </dl>
17116+
*
17117+
* @param skipDuration
17118+
* time to wait before emitting another item after emitting the last item
17119+
* @param unit
17120+
* the unit of time of {@code skipDuration}
17121+
* @param scheduler
17122+
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
17123+
* event
17124+
* @param onDropped
17125+
* called when an item doesn't get delivered to the down stream
17126+
*
17127+
* @return the new {@code Flowable} instance
17128+
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
17129+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
17130+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
17131+
*/
17132+
@CheckReturnValue
17133+
@NonNull
17134+
@BackpressureSupport(BackpressureKind.ERROR)
17135+
@SchedulerSupport(SchedulerSupport.CUSTOM)
17136+
public final Flowable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
17137+
Objects.requireNonNull(unit, "unit is null");
17138+
Objects.requireNonNull(scheduler, "scheduler is null");
17139+
Objects.requireNonNull(onDropped, "onDropped is null");
17140+
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped));
1710017141
}
1710117142

1710217143
/**

src/main/java/io/reactivex/rxjava3/core/Observable.java

+38-1
Original file line numberDiff line numberDiff line change
@@ -14163,7 +14163,44 @@ public final Observable<T> throttleFirst(long windowDuration, @NonNull TimeUnit
1416314163
public final Observable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
1416414164
Objects.requireNonNull(unit, "unit is null");
1416514165
Objects.requireNonNull(scheduler, "scheduler is null");
14166-
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler));
14166+
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null));
14167+
}
14168+
14169+
/**
14170+
* Returns an {@code Observable} that emits only the first item emitted by the current {@code Observable} during sequential
14171+
* time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}.
14172+
* <p>
14173+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas
14174+
* {@code throttleLast} ticks at scheduled intervals.
14175+
* <p>
14176+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.s.v3.png" alt="">
14177+
* <dl>
14178+
* <dt><b>Scheduler:</b></dt>
14179+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
14180+
* </dl>
14181+
*
14182+
* @param skipDuration
14183+
* time to wait before emitting another item after emitting the last item
14184+
* @param unit
14185+
* the unit of time of {@code skipDuration}
14186+
* @param scheduler
14187+
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
14188+
* event
14189+
* @param onDropped
14190+
* called when an item doesn't get delivered to the down stream
14191+
*
14192+
* @return the new {@code Observable} instance
14193+
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
14194+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14195+
*/
14196+
@CheckReturnValue
14197+
@SchedulerSupport(SchedulerSupport.CUSTOM)
14198+
@NonNull
14199+
public final Observable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
14200+
Objects.requireNonNull(unit, "unit is null");
14201+
Objects.requireNonNull(scheduler, "scheduler is null");
14202+
Objects.requireNonNull(onDropped, "onDropped is null");
14203+
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped));
1416714204
}
1416814205

1416914206
/**

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java

+27-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.concurrent.TimeUnit;
1717
import java.util.concurrent.atomic.AtomicLong;
1818

19+
import io.reactivex.rxjava3.exceptions.Exceptions;
20+
import io.reactivex.rxjava3.functions.Consumer;
1921
import org.reactivestreams.*;
2022

2123
import io.reactivex.rxjava3.core.*;
@@ -32,44 +34,53 @@ public final class FlowableThrottleFirstTimed<T> extends AbstractFlowableWithUps
3234
final long timeout;
3335
final TimeUnit unit;
3436
final Scheduler scheduler;
37+
final Consumer<? super T> onDropped;
3538

36-
public FlowableThrottleFirstTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
39+
public FlowableThrottleFirstTimed(Flowable<T> source,
40+
long timeout,
41+
TimeUnit unit,
42+
Scheduler scheduler,
43+
Consumer<? super T> onDropped) {
3744
super(source);
3845
this.timeout = timeout;
3946
this.unit = unit;
4047
this.scheduler = scheduler;
48+
this.onDropped = onDropped;
4149
}
4250

4351
@Override
4452
protected void subscribeActual(Subscriber<? super T> s) {
4553
source.subscribe(new DebounceTimedSubscriber<>(
4654
new SerializedSubscriber<>(s),
47-
timeout, unit, scheduler.createWorker()));
55+
timeout, unit, scheduler.createWorker(),
56+
onDropped));
4857
}
4958

5059
static final class DebounceTimedSubscriber<T>
5160
extends AtomicLong
5261
implements FlowableSubscriber<T>, Subscription, Runnable {
53-
5462
private static final long serialVersionUID = -9102637559663639004L;
63+
5564
final Subscriber<? super T> downstream;
5665
final long timeout;
5766
final TimeUnit unit;
5867
final Scheduler.Worker worker;
59-
68+
final Consumer<? super T> onDropped;
6069
Subscription upstream;
61-
6270
final SequentialDisposable timer = new SequentialDisposable();
63-
6471
volatile boolean gate;
65-
6672
boolean done;
6773

68-
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
74+
DebounceTimedSubscriber(Subscriber<? super T> actual,
75+
long timeout,
76+
TimeUnit unit,
77+
Worker worker,
78+
Consumer<? super T> onDropped) {
6979
this.downstream = actual;
7080
this.timeout = timeout;
7181
this.unit = unit;
7282
this.worker = worker;
83+
this.onDropped = onDropped;
7384
}
7485

7586
@Override
@@ -106,6 +117,14 @@ public void onNext(T t) {
106117
}
107118

108119
timer.replace(worker.schedule(this, timeout, unit));
120+
} else if (onDropped != null) {
121+
try {
122+
onDropped.accept(t);
123+
} catch (Throwable ex) {
124+
Exceptions.throwIfFatal(ex);
125+
upstream.cancel();
126+
done = true;
127+
}
109128
}
110129
}
111130

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java

+28-7
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,36 @@
1919
import io.reactivex.rxjava3.core.*;
2020
import io.reactivex.rxjava3.core.Scheduler.Worker;
2121
import io.reactivex.rxjava3.disposables.Disposable;
22+
import io.reactivex.rxjava3.exceptions.Exceptions;
23+
import io.reactivex.rxjava3.functions.Consumer;
2224
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
2325
import io.reactivex.rxjava3.observers.SerializedObserver;
2426

2527
public final class ObservableThrottleFirstTimed<T> extends AbstractObservableWithUpstream<T, T> {
2628
final long timeout;
2729
final TimeUnit unit;
2830
final Scheduler scheduler;
29-
30-
public ObservableThrottleFirstTimed(ObservableSource<T> source,
31-
long timeout, TimeUnit unit, Scheduler scheduler) {
31+
final Consumer<? super T> onDropped;
32+
33+
public ObservableThrottleFirstTimed(
34+
ObservableSource<T> source,
35+
long timeout,
36+
TimeUnit unit,
37+
Scheduler scheduler,
38+
Consumer<? super T> onDropped) {
3239
super(source);
3340
this.timeout = timeout;
3441
this.unit = unit;
3542
this.scheduler = scheduler;
43+
this.onDropped = onDropped;
3644
}
3745

3846
@Override
3947
public void subscribeActual(Observer<? super T> t) {
4048
source.subscribe(new DebounceTimedObserver<>(
4149
new SerializedObserver<>(t),
42-
timeout, unit, scheduler.createWorker()));
50+
timeout, unit, scheduler.createWorker(),
51+
onDropped));
4352
}
4453

4554
static final class DebounceTimedObserver<T>
@@ -51,16 +60,21 @@ static final class DebounceTimedObserver<T>
5160
final long timeout;
5261
final TimeUnit unit;
5362
final Scheduler.Worker worker;
54-
63+
final Consumer<? super T> onDropped;
5564
Disposable upstream;
56-
5765
volatile boolean gate;
5866

59-
DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
67+
DebounceTimedObserver(
68+
Observer<? super T> actual,
69+
long timeout,
70+
TimeUnit unit,
71+
Worker worker,
72+
Consumer<? super T> onDropped) {
6073
this.downstream = actual;
6174
this.timeout = timeout;
6275
this.unit = unit;
6376
this.worker = worker;
77+
this.onDropped = onDropped;
6478
}
6579

6680
@Override
@@ -83,6 +97,13 @@ public void onNext(T t) {
8397
d.dispose();
8498
}
8599
DisposableHelper.replace(this, worker.schedule(this, timeout, unit));
100+
} else if (onDropped != null) {
101+
try {
102+
onDropped.accept(t);
103+
} catch (Throwable ex) {
104+
Exceptions.throwIfFatal(ex);
105+
upstream.dispose();
106+
}
86107
}
87108
}
88109

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,39 @@ public void before() {
4444
subscriber = TestHelper.mockSubscriber();
4545
}
4646

47+
@Test
48+
public void throttlingWithDropCallback() {
49+
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
50+
@Override
51+
public void subscribe(Subscriber<? super String> subscriber) {
52+
subscriber.onSubscribe(new BooleanSubscription());
53+
publishNext(subscriber, 100, "one"); // publish as it's first
54+
publishNext(subscriber, 300, "two"); // skip as it's last within the first 400
55+
publishNext(subscriber, 900, "three"); // publish
56+
publishNext(subscriber, 905, "four"); // skip
57+
publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
58+
}
59+
});
60+
61+
Observer<Object> dropCallbackObserver = TestHelper.mockObserver();
62+
Flowable<String> sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, dropCallbackObserver::onNext);
63+
sampled.subscribe(subscriber);
64+
65+
InOrder inOrder = inOrder(subscriber);
66+
InOrder dropCallbackOrder = inOrder(dropCallbackObserver);
67+
68+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
69+
inOrder.verify(subscriber, times(1)).onNext("one");
70+
inOrder.verify(subscriber, times(0)).onNext("two");
71+
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("two");
72+
inOrder.verify(subscriber, times(1)).onNext("three");
73+
inOrder.verify(subscriber, times(0)).onNext("four");
74+
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("four");
75+
inOrder.verify(subscriber, times(1)).onComplete();
76+
inOrder.verifyNoMoreInteractions();
77+
dropCallbackOrder.verifyNoMoreInteractions();
78+
}
79+
4780
@Test
4881
public void throttlingWithCompleted() {
4982
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,39 @@ public void before() {
4141
observer = TestHelper.mockObserver();
4242
}
4343

44+
@Test
45+
public void throttlingWithDropCallback() {
46+
Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
47+
@Override
48+
public void subscribe(Observer<? super String> innerObserver) {
49+
innerObserver.onSubscribe(Disposable.empty());
50+
publishNext(innerObserver, 100, "one"); // publish as it's first
51+
publishNext(innerObserver, 300, "two"); // skip as it's last within the first 400
52+
publishNext(innerObserver, 900, "three"); // publish
53+
publishNext(innerObserver, 905, "four"); // skip
54+
publishCompleted(innerObserver, 1000); // Should be published as soon as the timeout expires.
55+
}
56+
});
57+
58+
Observer<Object> dropCallbackObserver = TestHelper.mockObserver();
59+
Observable<String> sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, dropCallbackObserver::onNext);
60+
sampled.subscribe(observer);
61+
62+
InOrder inOrder = inOrder(observer);
63+
InOrder dropCallbackOrder = inOrder(dropCallbackObserver);
64+
65+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
66+
inOrder.verify(observer, times(1)).onNext("one");
67+
inOrder.verify(observer, times(0)).onNext("two");
68+
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("two");
69+
inOrder.verify(observer, times(1)).onNext("three");
70+
inOrder.verify(observer, times(0)).onNext("four");
71+
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("four");
72+
inOrder.verify(observer, times(1)).onComplete();
73+
inOrder.verifyNoMoreInteractions();
74+
dropCallbackOrder.verifyNoMoreInteractions();
75+
}
76+
4477
@Test
4578
public void throttlingWithCompleted() {
4679
Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {

src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ public void checkParallelFlowable() {
220220
// negative time is considered as zero time
221221
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class));
222222
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class));
223+
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class));
223224

224225
// negative time is considered as zero time
225226
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class));
@@ -465,6 +466,7 @@ public void checkParallelFlowable() {
465466
// negative time is considered as zero time
466467
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class));
467468
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class));
469+
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class));
468470

469471
// negative time is considered as zero time
470472
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class));

0 commit comments

Comments
 (0)