Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding onDropped callback to throttleLast as a part of #7458 #7488

Merged
merged 1 commit into from
Dec 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 85 additions & 2 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14672,7 +14672,7 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, boolean emi
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false));
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false, null));
}

/**
Expand Down Expand Up @@ -14713,7 +14713,51 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Sc
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast));
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
}

/**
* Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}
* and optionally emit the very last upstream item when the upstream completes.
* <p>
* <img width="640" height="277" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.emitlast.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* <p>History: 2.0.5 - experimental
* @param period
* the sampling rate
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @param scheduler
* the {@code Scheduler} to use when sampling
* @param emitLast
* if {@code true} and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if {@code false}, an unsampled last item is ignored.
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #throttleLast(long, TimeUnit, Scheduler)
* @since 2.1
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped));
}

/**
Expand Down Expand Up @@ -17211,6 +17255,45 @@ public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit u
return sample(intervalDuration, unit, scheduler);
}

/**
* Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
* <p>
* This differs from {@link #throttleFirst(long, TimeUnit, Scheduler)} in that this ticks along at a scheduled interval whereas
* {@code throttleFirst} does not tick, it just tracks the passage of time.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.s.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param intervalDuration
* duration of windows within which the last item emitted by the current {@code Flowable} will be
* emitted
* @param unit
* the unit of time of {@code intervalDuration}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
* event
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #sample(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
return sample(intervalDuration, unit, scheduler, false, onDropped);
}

/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
Expand Down
72 changes: 70 additions & 2 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12128,7 +12128,40 @@ public final Observable<T> sample(long period, @NonNull TimeUnit unit, boolean e
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false));
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false, null));
}

/**
* Returns an {@code Observable} that emits the most recently emitted item (if any) emitted by the current {@code Observable}
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.v3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param period
* the sampling rate
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @param scheduler
* the {@code Scheduler} to use when sampling
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see #throttleLast(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
Desislav-Petrov marked this conversation as resolved.
Show resolved Hide resolved
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false, onDropped));
}

/**
Expand Down Expand Up @@ -12165,7 +12198,7 @@ public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast));
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
}

/**
Expand Down Expand Up @@ -14233,6 +14266,41 @@ public final Observable<T> throttleLast(long intervalDuration, @NonNull TimeUnit
return sample(intervalDuration, unit);
}

/**
* Returns an {@code Observable} that emits only the last item emitted by the current {@code Observable} during sequential
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
* <p>
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas
* {@code throttleFirst} does not tick, it just tracks passage of time.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.s.v3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param intervalDuration
* duration of windows within which the last item emitted by the current {@code Observable} will be
* emitted
* @param unit
* the unit of time of {@code intervalDuration}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
* event
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see #sample(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
Desislav-Petrov marked this conversation as resolved.
Show resolved Hide resolved
return sample(intervalDuration, unit, scheduler, onDropped);
}

/**
* Returns an {@code Observable} that emits only the last item emitted by the current {@code Observable} during sequential
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;

import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
Expand All @@ -29,24 +31,25 @@ public final class FlowableSampleTimed<T> extends AbstractFlowableWithUpstream<T
final long period;
final TimeUnit unit;
final Scheduler scheduler;

final boolean emitLast;
final Consumer<T> onDropped;

public FlowableSampleTimed(Flowable<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
public FlowableSampleTimed(Flowable<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast, Consumer<T> onDropped) {
super(source);
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
this.emitLast = emitLast;
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
SerializedSubscriber<T> serial = new SerializedSubscriber<>(s);
if (emitLast) {
source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler));
source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler, onDropped));
} else {
source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler));
source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler, onDropped));
}
}

Expand All @@ -58,18 +61,20 @@ abstract static class SampleTimedSubscriber<T> extends AtomicReference<T> implem
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;

final AtomicLong requested = new AtomicLong();

final SequentialDisposable timer = new SequentialDisposable();

Subscription upstream;

SampleTimedSubscriber(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
SampleTimedSubscriber(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
this.downstream = actual;
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
this.onDropped = onDropped;
}

@Override
Expand All @@ -84,7 +89,17 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
lazySet(t);
T oldValue = getAndSet(t);
if (oldValue != null && onDropped != null) {
try {
onDropped.accept(oldValue);
} catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
cancelTimer();
upstream.cancel();
downstream.onError(throwable);
}
}
}

@Override
Expand Down Expand Up @@ -137,8 +152,8 @@ static final class SampleTimedNoLast<T> extends SampleTimedSubscriber<T> {

private static final long serialVersionUID = -7139995637533111443L;

SampleTimedNoLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
super(actual, period, unit, scheduler);
SampleTimedNoLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
super(actual, period, unit, scheduler, onDropped);
}

@Override
Expand All @@ -158,8 +173,8 @@ static final class SampleTimedEmitLast<T> extends SampleTimedSubscriber<T> {

final AtomicInteger wip;

SampleTimedEmitLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
super(actual, period, unit, scheduler);
SampleTimedEmitLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
super(actual, period, unit, scheduler, onDropped);
this.wip = new AtomicInteger(1);
}

Expand Down
Loading