Skip to content

Commit 3b0d948

Browse files
Add onDropped callback for throttleFirst - addresses #7458 (#7482)
1 parent 159ae3b commit 3b0d948

File tree

7 files changed

+284
-17
lines changed

7 files changed

+284
-17
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+42-1
Original file line numberDiff line numberDiff line change
@@ -17096,7 +17096,48 @@ public final Flowable<T> throttleFirst(long windowDuration, @NonNull TimeUnit un
1709617096
public final Flowable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
1709717097
Objects.requireNonNull(unit, "unit is null");
1709817098
Objects.requireNonNull(scheduler, "scheduler is null");
17099-
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler));
17099+
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null));
17100+
}
17101+
17102+
/**
17103+
* Returns a {@code Flowable} that emits only the first item emitted by the current {@code Flowable} during sequential
17104+
* time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}.
17105+
* <p>
17106+
* This differs from {@link #throttleLast} in that this only tracks the passage of time whereas
17107+
* {@link #throttleLast} ticks at scheduled intervals.
17108+
* <p>
17109+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.s.v3.png" alt="">
17110+
* <dl>
17111+
* <dt><b>Backpressure:</b></dt>
17112+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
17113+
* <dt><b>Scheduler:</b></dt>
17114+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
17115+
* </dl>
17116+
*
17117+
* @param skipDuration
17118+
* time to wait before emitting another item after emitting the last item
17119+
* @param unit
17120+
* the unit of time of {@code skipDuration}
17121+
* @param scheduler
17122+
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
17123+
* event
17124+
* @param onDropped
17125+
* called when an item doesn't get delivered to the downstream
17126+
*
17127+
* @return the new {@code Flowable} instance
17128+
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
17129+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
17130+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
17131+
*/
17132+
@CheckReturnValue
17133+
@NonNull
17134+
@BackpressureSupport(BackpressureKind.ERROR)
17135+
@SchedulerSupport(SchedulerSupport.CUSTOM)
17136+
public final Flowable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
17137+
Objects.requireNonNull(unit, "unit is null");
17138+
Objects.requireNonNull(scheduler, "scheduler is null");
17139+
Objects.requireNonNull(onDropped, "onDropped is null");
17140+
return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped));
1710017141
}
1710117142

1710217143
/**

src/main/java/io/reactivex/rxjava3/core/Observable.java

+38-1
Original file line numberDiff line numberDiff line change
@@ -14163,7 +14163,44 @@ public final Observable<T> throttleFirst(long windowDuration, @NonNull TimeUnit
1416314163
public final Observable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
1416414164
Objects.requireNonNull(unit, "unit is null");
1416514165
Objects.requireNonNull(scheduler, "scheduler is null");
14166-
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler));
14166+
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null));
14167+
}
14168+
14169+
/**
14170+
* Returns an {@code Observable} that emits only the first item emitted by the current {@code Observable} during sequential
14171+
* time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}.
14172+
* <p>
14173+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas
14174+
* {@code throttleLast} ticks at scheduled intervals.
14175+
* <p>
14176+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.s.v3.png" alt="">
14177+
* <dl>
14178+
* <dt><b>Scheduler:</b></dt>
14179+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
14180+
* </dl>
14181+
*
14182+
* @param skipDuration
14183+
* time to wait before emitting another item after emitting the last item
14184+
* @param unit
14185+
* the unit of time of {@code skipDuration}
14186+
* @param scheduler
14187+
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
14188+
* event
14189+
* @param onDropped
14190+
* called when an item doesn't get delivered to the downstream
14191+
*
14192+
* @return the new {@code Observable} instance
14193+
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
14194+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14195+
*/
14196+
@CheckReturnValue
14197+
@SchedulerSupport(SchedulerSupport.CUSTOM)
14198+
@NonNull
14199+
public final Observable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
14200+
Objects.requireNonNull(unit, "unit is null");
14201+
Objects.requireNonNull(scheduler, "scheduler is null");
14202+
Objects.requireNonNull(onDropped, "onDropped is null");
14203+
return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped));
1416714204
}
1416814205

1416914206
/**

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java

+29-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.concurrent.TimeUnit;
1717
import java.util.concurrent.atomic.AtomicLong;
1818

19+
import io.reactivex.rxjava3.exceptions.Exceptions;
20+
import io.reactivex.rxjava3.functions.Consumer;
1921
import org.reactivestreams.*;
2022

2123
import io.reactivex.rxjava3.core.*;
@@ -32,44 +34,53 @@ public final class FlowableThrottleFirstTimed<T> extends AbstractFlowableWithUps
3234
final long timeout;
3335
final TimeUnit unit;
3436
final Scheduler scheduler;
37+
final Consumer<? super T> onDropped;
3538

36-
public FlowableThrottleFirstTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
39+
public FlowableThrottleFirstTimed(Flowable<T> source,
40+
long timeout,
41+
TimeUnit unit,
42+
Scheduler scheduler,
43+
Consumer<? super T> onDropped) {
3744
super(source);
3845
this.timeout = timeout;
3946
this.unit = unit;
4047
this.scheduler = scheduler;
48+
this.onDropped = onDropped;
4149
}
4250

4351
@Override
4452
protected void subscribeActual(Subscriber<? super T> s) {
4553
source.subscribe(new DebounceTimedSubscriber<>(
4654
new SerializedSubscriber<>(s),
47-
timeout, unit, scheduler.createWorker()));
55+
timeout, unit, scheduler.createWorker(),
56+
onDropped));
4857
}
4958

5059
static final class DebounceTimedSubscriber<T>
5160
extends AtomicLong
5261
implements FlowableSubscriber<T>, Subscription, Runnable {
53-
5462
private static final long serialVersionUID = -9102637559663639004L;
63+
5564
final Subscriber<? super T> downstream;
5665
final long timeout;
5766
final TimeUnit unit;
5867
final Scheduler.Worker worker;
59-
68+
final Consumer<? super T> onDropped;
6069
Subscription upstream;
61-
6270
final SequentialDisposable timer = new SequentialDisposable();
63-
6471
volatile boolean gate;
65-
6672
boolean done;
6773

68-
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
74+
DebounceTimedSubscriber(Subscriber<? super T> actual,
75+
long timeout,
76+
TimeUnit unit,
77+
Worker worker,
78+
Consumer<? super T> onDropped) {
6979
this.downstream = actual;
7080
this.timeout = timeout;
7181
this.unit = unit;
7282
this.worker = worker;
83+
this.onDropped = onDropped;
7384
}
7485

7586
@Override
@@ -106,6 +117,16 @@ public void onNext(T t) {
106117
}
107118

108119
timer.replace(worker.schedule(this, timeout, unit));
120+
} else if (onDropped != null) {
121+
try {
122+
onDropped.accept(t);
123+
} catch (Throwable ex) {
124+
Exceptions.throwIfFatal(ex);
125+
downstream.onError(ex);
126+
worker.dispose();
127+
upstream.cancel();
128+
done = true;
129+
}
109130
}
110131
}
111132

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java

+30-7
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,36 @@
1919
import io.reactivex.rxjava3.core.*;
2020
import io.reactivex.rxjava3.core.Scheduler.Worker;
2121
import io.reactivex.rxjava3.disposables.Disposable;
22+
import io.reactivex.rxjava3.exceptions.Exceptions;
23+
import io.reactivex.rxjava3.functions.Consumer;
2224
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
2325
import io.reactivex.rxjava3.observers.SerializedObserver;
2426

2527
public final class ObservableThrottleFirstTimed<T> extends AbstractObservableWithUpstream<T, T> {
2628
final long timeout;
2729
final TimeUnit unit;
2830
final Scheduler scheduler;
29-
30-
public ObservableThrottleFirstTimed(ObservableSource<T> source,
31-
long timeout, TimeUnit unit, Scheduler scheduler) {
31+
final Consumer<? super T> onDropped;
32+
33+
public ObservableThrottleFirstTimed(
34+
ObservableSource<T> source,
35+
long timeout,
36+
TimeUnit unit,
37+
Scheduler scheduler,
38+
Consumer<? super T> onDropped) {
3239
super(source);
3340
this.timeout = timeout;
3441
this.unit = unit;
3542
this.scheduler = scheduler;
43+
this.onDropped = onDropped;
3644
}
3745

3846
@Override
3947
public void subscribeActual(Observer<? super T> t) {
4048
source.subscribe(new DebounceTimedObserver<>(
4149
new SerializedObserver<>(t),
42-
timeout, unit, scheduler.createWorker()));
50+
timeout, unit, scheduler.createWorker(),
51+
onDropped));
4352
}
4453

4554
static final class DebounceTimedObserver<T>
@@ -51,16 +60,21 @@ static final class DebounceTimedObserver<T>
5160
final long timeout;
5261
final TimeUnit unit;
5362
final Scheduler.Worker worker;
54-
63+
final Consumer<? super T> onDropped;
5564
Disposable upstream;
56-
5765
volatile boolean gate;
5866

59-
DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
67+
DebounceTimedObserver(
68+
Observer<? super T> actual,
69+
long timeout,
70+
TimeUnit unit,
71+
Worker worker,
72+
Consumer<? super T> onDropped) {
6073
this.downstream = actual;
6174
this.timeout = timeout;
6275
this.unit = unit;
6376
this.worker = worker;
77+
this.onDropped = onDropped;
6478
}
6579

6680
@Override
@@ -83,6 +97,15 @@ public void onNext(T t) {
8397
d.dispose();
8498
}
8599
DisposableHelper.replace(this, worker.schedule(this, timeout, unit));
100+
} else if (onDropped != null) {
101+
try {
102+
onDropped.accept(t);
103+
} catch (Throwable ex) {
104+
Exceptions.throwIfFatal(ex);
105+
downstream.onError(ex);
106+
worker.dispose();
107+
upstream.dispose();
108+
}
86109
}
87110
}
88111

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTest.java

+72
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020
import java.util.concurrent.TimeUnit;
2121

22+
import io.reactivex.rxjava3.functions.Action;
2223
import org.junit.*;
2324
import org.mockito.InOrder;
2425
import org.reactivestreams.*;
@@ -44,6 +45,77 @@ public void before() {
4445
subscriber = TestHelper.mockSubscriber();
4546
}
4647

48+
@Test
49+
public void throttlingWithDropCallbackCrashes() throws Throwable {
50+
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
51+
@Override
52+
public void subscribe(Subscriber<? super String> subscriber) {
53+
subscriber.onSubscribe(new BooleanSubscription());
54+
publishNext(subscriber, 100, "one"); // publish as it's first
55+
publishNext(subscriber, 300, "two"); // skip as it's last within the first 400
56+
publishNext(subscriber, 900, "three"); // publish
57+
publishNext(subscriber, 905, "four"); // skip
58+
publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
59+
}
60+
});
61+
62+
Action whenDisposed = mock(Action.class);
63+
64+
Flowable<String> sampled = source
65+
.doOnCancel(whenDisposed)
66+
.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, e -> {
67+
if ("two".equals(e)) {
68+
throw new TestException("forced");
69+
}
70+
});
71+
sampled.subscribe(subscriber);
72+
73+
InOrder inOrder = inOrder(subscriber);
74+
75+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
76+
inOrder.verify(subscriber, times(1)).onNext("one");
77+
inOrder.verify(subscriber, times(1)).onError(any(TestException.class));
78+
inOrder.verify(subscriber, times(0)).onNext("two");
79+
inOrder.verify(subscriber, times(0)).onNext("three");
80+
inOrder.verify(subscriber, times(0)).onNext("four");
81+
inOrder.verify(subscriber, times(0)).onComplete();
82+
inOrder.verifyNoMoreInteractions();
83+
verify(whenDisposed).run();
84+
}
85+
86+
@Test
87+
public void throttlingWithDropCallback() {
88+
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
89+
@Override
90+
public void subscribe(Subscriber<? super String> subscriber) {
91+
subscriber.onSubscribe(new BooleanSubscription());
92+
publishNext(subscriber, 100, "one"); // publish as it's first
93+
publishNext(subscriber, 300, "two"); // skip as it's last within the first 400
94+
publishNext(subscriber, 900, "three"); // publish
95+
publishNext(subscriber, 905, "four"); // skip
96+
publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
97+
}
98+
});
99+
100+
Observer<Object> dropCallbackObserver = TestHelper.mockObserver();
101+
Flowable<String> sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, dropCallbackObserver::onNext);
102+
sampled.subscribe(subscriber);
103+
104+
InOrder inOrder = inOrder(subscriber);
105+
InOrder dropCallbackOrder = inOrder(dropCallbackObserver);
106+
107+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
108+
inOrder.verify(subscriber, times(1)).onNext("one");
109+
inOrder.verify(subscriber, times(0)).onNext("two");
110+
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("two");
111+
inOrder.verify(subscriber, times(1)).onNext("three");
112+
inOrder.verify(subscriber, times(0)).onNext("four");
113+
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("four");
114+
inOrder.verify(subscriber, times(1)).onComplete();
115+
inOrder.verifyNoMoreInteractions();
116+
dropCallbackOrder.verifyNoMoreInteractions();
117+
}
118+
47119
@Test
48120
public void throttlingWithCompleted() {
49121
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {

0 commit comments

Comments
 (0)