Skip to content

Commit

Permalink
3.x: Fix experimental, signatures of throttleLast (#7490)
Browse files Browse the repository at this point in the history
* 3.x: Fix experimental, signatures.

* Fix Observable.sample overload

* Fix validator signature override
  • Loading branch information
akarnokd authored Dec 4, 2022
1 parent 1104c09 commit 5b3510b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 36 deletions.
12 changes: 8 additions & 4 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14729,7 +14729,6 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Sc
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* <p>History: 2.0.5 - experimental
* @param period
* the sampling rate
* @param unit
Expand All @@ -14747,13 +14746,14 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Sc
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #throttleLast(long, TimeUnit, Scheduler)
* @since 2.1
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<T> onDropped) {
@Experimental
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
Expand Down Expand Up @@ -17172,11 +17172,13 @@ public final Flowable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Flowable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Expand Down Expand Up @@ -17285,12 +17287,14 @@ public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit u
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #sample(long, TimeUnit, Scheduler)
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
@Experimental
public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
return sample(intervalDuration, unit, scheduler, false, onDropped);
}

Expand Down
46 changes: 28 additions & 18 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12133,49 +12133,51 @@ public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull

/**
* Returns an {@code Observable} that emits the most recently emitted item (if any) emitted by the current {@code Observable}
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}.
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}
* and optionally emit the very last upstream item when the upstream completes.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.v3.png" alt="">
* <img width="640" height="277" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.emitlast.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* <p>History: 2.0.5 - experimental
* @param period
* the sampling rate
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @param scheduler
* the {@code Scheduler} to use when sampling
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @param emitLast
* if {@code true} and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if {@code false}, an unsampled last item is ignored.
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see #throttleLast(long, TimeUnit, Scheduler)
* @since 2.1
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false, onDropped));
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
}

/**
* Returns an {@code Observable} that emits the most recently emitted item (if any) emitted by the current {@code Observable}
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}
* and optionally emit the very last upstream item when the upstream completes.
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}.
* <p>
* <img width="640" height="277" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.emitlast.png" alt="">
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.v3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* <p>History: 2.0.5 - experimental
* @param period
* the sampling rate
* @param unit
Expand All @@ -12186,19 +12188,23 @@ public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull
* if {@code true} and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if {@code false}, an unsampled last item is ignored.
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see #throttleLast(long, TimeUnit, Scheduler)
* @since 2.1
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
@Experimental
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped));
}

/**
Expand Down Expand Up @@ -14225,10 +14231,12 @@ public final Observable<T> throttleFirst(long skipDuration, @NonNull TimeUnit un
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
@Experimental
public final Observable<T> throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Expand Down Expand Up @@ -14293,12 +14301,14 @@ public final Observable<T> throttleLast(long intervalDuration, @NonNull TimeUnit
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see #sample(long, TimeUnit, Scheduler)
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
return sample(intervalDuration, unit, scheduler, onDropped);
@Experimental
public final Observable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
return sample(intervalDuration, unit, scheduler, false, onDropped);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public final class FlowableSampleTimed<T> extends AbstractFlowableWithUpstream<T
final TimeUnit unit;
final Scheduler scheduler;
final boolean emitLast;
final Consumer<T> onDropped;
final Consumer<? super T> onDropped;

public FlowableSampleTimed(Flowable<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast, Consumer<T> onDropped) {
public FlowableSampleTimed(Flowable<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast, Consumer<? super T> onDropped) {
super(source);
this.period = period;
this.unit = unit;
Expand All @@ -61,15 +61,15 @@ abstract static class SampleTimedSubscriber<T> extends AtomicReference<T> implem
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;
final Consumer<? super T> onDropped;

final AtomicLong requested = new AtomicLong();

final SequentialDisposable timer = new SequentialDisposable();

Subscription upstream;

SampleTimedSubscriber(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
SampleTimedSubscriber(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
this.downstream = actual;
this.period = period;
this.unit = unit;
Expand Down Expand Up @@ -152,7 +152,7 @@ static final class SampleTimedNoLast<T> extends SampleTimedSubscriber<T> {

private static final long serialVersionUID = -7139995637533111443L;

SampleTimedNoLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
SampleTimedNoLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
super(actual, period, unit, scheduler, onDropped);
}

Expand All @@ -173,7 +173,7 @@ static final class SampleTimedEmitLast<T> extends SampleTimedSubscriber<T> {

final AtomicInteger wip;

SampleTimedEmitLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
SampleTimedEmitLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
super(actual, period, unit, scheduler, onDropped);
this.wip = new AtomicInteger(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class ObservableSampleTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;
final Consumer<? super T> onDropped;
final boolean emitLast;

public ObservableSampleTimed(ObservableSource<T> source,
long period,
TimeUnit unit,
Scheduler scheduler,
boolean emitLast,
Consumer<T> onDropped) {
Consumer<? super T> onDropped) {
super(source);
this.period = period;
this.unit = unit;
Expand All @@ -63,13 +62,13 @@ abstract static class SampleTimedObserver<T> extends AtomicReference<T> implemen
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;
final Consumer<? super T> onDropped;

final AtomicReference<Disposable> timer = new AtomicReference<>();

Disposable upstream;

SampleTimedObserver(Observer<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
SampleTimedObserver(Observer<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
this.downstream = actual;
this.period = period;
this.unit = unit;
Expand Down Expand Up @@ -144,7 +143,7 @@ static final class SampleTimedNoLast<T> extends SampleTimedObserver<T> {

private static final long serialVersionUID = -7139995637533111443L;

SampleTimedNoLast(Observer<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
SampleTimedNoLast(Observer<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
super(actual, period, unit, scheduler, onDropped);
}

Expand All @@ -165,7 +164,7 @@ static final class SampleTimedEmitLast<T> extends SampleTimedObserver<T> {

final AtomicInteger wip;

SampleTimedEmitLast(Observer<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
SampleTimedEmitLast(Observer<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
super(actual, period, unit, scheduler, onDropped);
this.wip = new AtomicInteger(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ public void checkParallelFlowable() {
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class));
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Boolean.TYPE));
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class));
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class));
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE));
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Consumer.class));

// negative time is considered as zero time
addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class));
Expand Down

0 comments on commit 5b3510b

Please sign in to comment.