From 65ba785f54dfb10a6a0584099d9224c7e58681c9 Mon Sep 17 00:00:00 2001 From: des Date: Wed, 30 Nov 2022 21:06:21 +0000 Subject: [PATCH] Adding onDropped callback to throttleLast as a part of #7458 --- .../io/reactivex/rxjava3/core/Flowable.java | 87 ++++++++++++++++++- .../io/reactivex/rxjava3/core/Observable.java | 72 ++++++++++++++- .../flowable/FlowableSampleTimed.java | 35 +++++--- .../observable/ObservableSampleTimed.java | 41 ++++++--- .../flowable/FlowableThrottleLastTests.java | 72 ++++++++++++++- .../ObservableThrottleLastTests.java | 74 +++++++++++++++- .../ParamValidationCheckerTest.java | 6 +- 7 files changed, 359 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index c391ef9500..4c4236f29f 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -14672,7 +14672,7 @@ public final Flowable sample(long period, @NonNull TimeUnit unit, boolean emi public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false)); + return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false, null)); } /** @@ -14713,7 +14713,51 @@ public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Sc public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast)); + return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, null)); + } + + /** + * Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable} + * within periodic time intervals, where the intervals are defined on a particular {@link Scheduler} + * and optionally emit the very last upstream item when the upstream completes. + *

+ * + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + *

History: 2.0.5 - experimental + * @param period + * the sampling rate + * @param unit + * the {@link TimeUnit} in which {@code period} is defined + * @param scheduler + * the {@code Scheduler} to use when sampling + * @param emitLast + * if {@code true} and the upstream completes while there is still an unsampled item available, + * that item is emitted to downstream before completion + * if {@code false}, an unsampled last item is ignored. + * @param onDropped + * called with the current entry when it has been replaced by a new one + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Sample + * @see RxJava wiki: Backpressure + * @see #throttleLast(long, TimeUnit, Scheduler) + * @since 2.1 + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer onDropped) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + Objects.requireNonNull(onDropped, "onDropped is null"); + return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped)); } /** @@ -17211,6 +17255,45 @@ public final Flowable throttleLast(long intervalDuration, @NonNull TimeUnit u return sample(intervalDuration, unit, scheduler); } + /** + * Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential + * time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}. + *

+ * This differs from {@link #throttleFirst(long, TimeUnit, Scheduler)} in that this ticks along at a scheduled interval whereas + * {@code throttleFirst} does not tick, it just tracks the passage of time. + *

+ * + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param intervalDuration + * duration of windows within which the last item emitted by the current {@code Flowable} will be + * emitted + * @param unit + * the unit of time of {@code intervalDuration} + * @param scheduler + * the {@code Scheduler} to use internally to manage the timers that handle timeout for each + * event + * @param onDropped + * called with the current entry when it has been replaced by a new one + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Sample + * @see RxJava wiki: Backpressure + * @see #sample(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + public final Flowable throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + return sample(intervalDuration, unit, scheduler, false, onDropped); + } + /** * Throttles items from the upstream {@code Flowable} by first emitting the next * item from upstream, then periodically emitting the latest item (if any) when diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 7492f6f636..f371fda259 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -12128,7 +12128,40 @@ public final Observable sample(long period, @NonNull TimeUnit unit, boolean e public final Observable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false)); + return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false, null)); + } + + /** + * Returns an {@code Observable} that emits the most recently emitted item (if any) emitted by the current {@code Observable} + * within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}. + *

+ * + *

+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param period + * the sampling rate + * @param unit + * the {@link TimeUnit} in which {@code period} is defined + * @param scheduler + * the {@code Scheduler} to use when sampling + * @param onDropped + * called with the current entry when it has been replaced by a new one + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Sample + * @see #throttleLast(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + public final Observable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + Objects.requireNonNull(onDropped, "onDropped is null"); + return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false, onDropped)); } /** @@ -12165,7 +12198,7 @@ public final Observable sample(long period, @NonNull TimeUnit unit, @NonNull public final Observable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast)); + return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast, null)); } /** @@ -14233,6 +14266,41 @@ public final Observable throttleLast(long intervalDuration, @NonNull TimeUnit return sample(intervalDuration, unit); } + /** + * Returns an {@code Observable} that emits only the last item emitted by the current {@code Observable} during sequential + * time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}. + *

+ * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas + * {@code throttleFirst} does not tick, it just tracks passage of time. + *

+ * + *

+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param intervalDuration + * duration of windows within which the last item emitted by the current {@code Observable} will be + * emitted + * @param unit + * the unit of time of {@code intervalDuration} + * @param scheduler + * the {@code Scheduler} to use internally to manage the timers that handle timeout for each + * event + * @param onDropped + * called with the current entry when it has been replaced by a new one + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Sample + * @see #sample(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + public final Observable throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + return sample(intervalDuration, unit, scheduler, onDropped); + } + /** * Returns an {@code Observable} that emits only the last item emitted by the current {@code Observable} during sequential * time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}. diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java index 67a0412c02..59dce50201 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java @@ -16,6 +16,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.*; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Consumer; import org.reactivestreams.*; import io.reactivex.rxjava3.core.*; @@ -29,24 +31,25 @@ public final class FlowableSampleTimed extends AbstractFlowableWithUpstream onDropped; - public FlowableSampleTimed(Flowable source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) { + public FlowableSampleTimed(Flowable source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast, Consumer onDropped) { super(source); this.period = period; this.unit = unit; this.scheduler = scheduler; this.emitLast = emitLast; + this.onDropped = onDropped; } @Override protected void subscribeActual(Subscriber s) { SerializedSubscriber serial = new SerializedSubscriber<>(s); if (emitLast) { - source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler)); + source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler, onDropped)); } else { - source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler)); + source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler, onDropped)); } } @@ -58,6 +61,7 @@ abstract static class SampleTimedSubscriber extends AtomicReference implem final long period; final TimeUnit unit; final Scheduler scheduler; + final Consumer onDropped; final AtomicLong requested = new AtomicLong(); @@ -65,11 +69,12 @@ abstract static class SampleTimedSubscriber extends AtomicReference implem Subscription upstream; - SampleTimedSubscriber(Subscriber actual, long period, TimeUnit unit, Scheduler scheduler) { + SampleTimedSubscriber(Subscriber actual, long period, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { this.downstream = actual; this.period = period; this.unit = unit; this.scheduler = scheduler; + this.onDropped = onDropped; } @Override @@ -84,7 +89,17 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - lazySet(t); + T oldValue = getAndSet(t); + if (oldValue != null && onDropped != null) { + try { + onDropped.accept(oldValue); + } catch (Throwable throwable) { + Exceptions.throwIfFatal(throwable); + cancelTimer(); + upstream.cancel(); + downstream.onError(throwable); + } + } } @Override @@ -137,8 +152,8 @@ static final class SampleTimedNoLast extends SampleTimedSubscriber { private static final long serialVersionUID = -7139995637533111443L; - SampleTimedNoLast(Subscriber actual, long period, TimeUnit unit, Scheduler scheduler) { - super(actual, period, unit, scheduler); + SampleTimedNoLast(Subscriber actual, long period, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { + super(actual, period, unit, scheduler, onDropped); } @Override @@ -158,8 +173,8 @@ static final class SampleTimedEmitLast extends SampleTimedSubscriber { final AtomicInteger wip; - SampleTimedEmitLast(Subscriber actual, long period, TimeUnit unit, Scheduler scheduler) { - super(actual, period, unit, scheduler); + SampleTimedEmitLast(Subscriber actual, long period, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { + super(actual, period, unit, scheduler, onDropped); this.wip = new AtomicInteger(1); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java index f439339e15..b2025060b9 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java @@ -18,31 +18,40 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; import io.reactivex.rxjava3.observers.SerializedObserver; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; public final class ObservableSampleTimed extends AbstractObservableWithUpstream { final long period; final TimeUnit unit; final Scheduler scheduler; - + final Consumer onDropped; final boolean emitLast; - public ObservableSampleTimed(ObservableSource source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) { + public ObservableSampleTimed(ObservableSource source, + long period, + TimeUnit unit, + Scheduler scheduler, + boolean emitLast, + Consumer onDropped) { super(source); this.period = period; this.unit = unit; this.scheduler = scheduler; this.emitLast = emitLast; + this.onDropped = onDropped; } @Override public void subscribeActual(Observer t) { SerializedObserver serial = new SerializedObserver<>(t); if (emitLast) { - source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler)); + source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler, onDropped)); } else { - source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler)); + source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler, onDropped)); } } @@ -54,16 +63,18 @@ abstract static class SampleTimedObserver extends AtomicReference implemen final long period; final TimeUnit unit; final Scheduler scheduler; + final Consumer onDropped; final AtomicReference timer = new AtomicReference<>(); Disposable upstream; - SampleTimedObserver(Observer actual, long period, TimeUnit unit, Scheduler scheduler) { + SampleTimedObserver(Observer actual, long period, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { this.downstream = actual; this.period = period; this.unit = unit; this.scheduler = scheduler; + this.onDropped = onDropped; } @Override @@ -79,7 +90,17 @@ public void onSubscribe(Disposable d) { @Override public void onNext(T t) { - lazySet(t); + T oldValue = getAndSet(t); + if (oldValue != null && onDropped != null) { + try { + onDropped.accept(oldValue); + } catch (Throwable throwable) { + Exceptions.throwIfFatal(throwable); + cancelTimer(); + upstream.dispose(); + downstream.onError(throwable); + } + } } @Override @@ -123,8 +144,8 @@ static final class SampleTimedNoLast extends SampleTimedObserver { private static final long serialVersionUID = -7139995637533111443L; - SampleTimedNoLast(Observer actual, long period, TimeUnit unit, Scheduler scheduler) { - super(actual, period, unit, scheduler); + SampleTimedNoLast(Observer actual, long period, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { + super(actual, period, unit, scheduler, onDropped); } @Override @@ -144,8 +165,8 @@ static final class SampleTimedEmitLast extends SampleTimedObserver { final AtomicInteger wip; - SampleTimedEmitLast(Observer actual, long period, TimeUnit unit, Scheduler scheduler) { - super(actual, period, unit, scheduler); + SampleTimedEmitLast(Observer actual, long period, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { + super(actual, period, unit, scheduler, onDropped); this.wip = new AtomicInteger(1); } diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableThrottleLastTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableThrottleLastTests.java index 0d2dc343fc..c5c858c430 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableThrottleLastTests.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableThrottleLastTests.java @@ -13,10 +13,14 @@ package io.reactivex.rxjava3.flowable; -import static org.mockito.Mockito.inOrder; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Action; import org.junit.Test; import org.mockito.InOrder; import org.reactivestreams.Subscriber; @@ -28,6 +32,72 @@ public class FlowableThrottleLastTests extends RxJavaTest { + @Test + public void throttleWithDroppedCallbackException() throws Throwable { + Subscriber subscriber = TestHelper.mockSubscriber(); + Action whenDisposed = mock(Action.class); + + TestScheduler s = new TestScheduler(); + PublishProcessor o = PublishProcessor.create(); + o.doOnCancel(whenDisposed) + .throttleLast(500, TimeUnit.MILLISECONDS, s, e-> { + if (e == 1) { + throw new TestException("forced"); + } + }) + .subscribe(subscriber); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // skip + o.onNext(2); // deliver + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + + InOrder inOrder = inOrder(subscriber); + inOrder.verify(subscriber).onError(any(TestException.class)); + inOrder.verifyNoMoreInteractions(); + verify(whenDisposed).run(); + } + + @Test + public void throttleWithDroppedCallback() { + Subscriber subscriber = TestHelper.mockSubscriber(); + Observer dropCallbackObserver = TestHelper.mockObserver(); + + TestScheduler s = new TestScheduler(); + PublishProcessor o = PublishProcessor.create(); + o.throttleLast(500, TimeUnit.MILLISECONDS, s, dropCallbackObserver::onNext).subscribe(subscriber); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // skip + o.onNext(2); // deliver + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + o.onNext(3); // skip + s.advanceTimeTo(600, TimeUnit.MILLISECONDS); + o.onNext(4); // skip + s.advanceTimeTo(700, TimeUnit.MILLISECONDS); + o.onNext(5); // skip + o.onNext(6); // deliver + s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); + o.onNext(7); // deliver + s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); + o.onComplete(); + + InOrder inOrder = inOrder(subscriber); + InOrder dropCallbackOrder = inOrder(dropCallbackObserver); + dropCallbackOrder.verify(dropCallbackObserver).onNext(1); + inOrder.verify(subscriber).onNext(2); + dropCallbackOrder.verify(dropCallbackObserver).onNext(3); + dropCallbackOrder.verify(dropCallbackObserver).onNext(4); + dropCallbackOrder.verify(dropCallbackObserver).onNext(5); + inOrder.verify(subscriber).onNext(6); + inOrder.verify(subscriber).onNext(7); + inOrder.verify(subscriber).onComplete(); + inOrder.verifyNoMoreInteractions(); + dropCallbackOrder.verifyNoMoreInteractions(); + } + @Test public void throttle() { Subscriber subscriber = TestHelper.mockSubscriber(); diff --git a/src/test/java/io/reactivex/rxjava3/observable/ObservableThrottleLastTests.java b/src/test/java/io/reactivex/rxjava3/observable/ObservableThrottleLastTests.java index 735bc4a312..d8107cc434 100644 --- a/src/test/java/io/reactivex/rxjava3/observable/ObservableThrottleLastTests.java +++ b/src/test/java/io/reactivex/rxjava3/observable/ObservableThrottleLastTests.java @@ -13,10 +13,10 @@ package io.reactivex.rxjava3.observable; -import static org.mockito.Mockito.inOrder; - import java.util.concurrent.TimeUnit; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Action; import org.junit.Test; import org.mockito.InOrder; @@ -25,8 +25,78 @@ import io.reactivex.rxjava3.subjects.PublishSubject; import io.reactivex.rxjava3.testsupport.TestHelper; +import static org.mockito.Mockito.*; + public class ObservableThrottleLastTests extends RxJavaTest { + @Test + public void throttleLastWithDropCallbackException() throws Throwable { + Observer observer = TestHelper.mockObserver(); + + Action whenDisposed = mock(Action.class); + + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.doOnDispose(whenDisposed) + .throttleLast(500, TimeUnit.MILLISECONDS, s, e -> { + if (e == 1) { + throw new TestException("Forced"); + } + }) + .subscribe(observer); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // skip + o.onNext(2); // try to deliver + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer).onError(any(TestException.class)); + inOrder.verifyNoMoreInteractions(); + verify(whenDisposed).run(); + } + + @Test + public void throttleLastWithDropCallback() { + Observer observer = TestHelper.mockObserver(); + + Observer dropCallbackObserver = TestHelper.mockObserver(); + + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleLast(500, TimeUnit.MILLISECONDS, s, dropCallbackObserver::onNext).subscribe(observer); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // skip + o.onNext(2); // deliver + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + o.onNext(3); // skip + s.advanceTimeTo(600, TimeUnit.MILLISECONDS); + o.onNext(4); // skip + s.advanceTimeTo(700, TimeUnit.MILLISECONDS); + o.onNext(5); // skip + o.onNext(6); // deliver + s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); + o.onNext(7); // deliver + s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); + o.onComplete(); + + InOrder inOrder = inOrder(observer); + InOrder dropCallbackOrder = inOrder(dropCallbackObserver); + dropCallbackOrder.verify(dropCallbackObserver).onNext(1); + inOrder.verify(observer).onNext(2); + dropCallbackOrder.verify(dropCallbackObserver).onNext(3); + dropCallbackOrder.verify(dropCallbackObserver).onNext(4); + dropCallbackOrder.verify(dropCallbackObserver).onNext(5); + inOrder.verify(observer).onNext(6); + inOrder.verify(observer).onNext(7); + inOrder.verify(observer).onComplete(); + inOrder.verifyNoMoreInteractions(); + dropCallbackOrder.verifyNoMoreInteractions(); + } + @Test public void throttle() { Observer observer = TestHelper.mockObserver(); diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index 4fa3dfb39a..21d8905ded 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -190,6 +190,7 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Boolean.TYPE)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Consumer.class)); // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class)); @@ -225,6 +226,7 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class)); @@ -435,6 +437,7 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Boolean.TYPE)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); // negative time is considered as zero time @@ -471,6 +474,7 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); // negative time is considered as zero time addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class)); @@ -1216,4 +1220,4 @@ public String toString() { return "NeverCompletable"; } } -} +} \ No newline at end of file