Skip to content

Commit

Permalink
Add onDropped callback for onBackpressureLatest - #7458
Browse files Browse the repository at this point in the history
  • Loading branch information
Desislav-Petrov committed Mar 18, 2023
1 parent 329e8c9 commit 9e908da
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 6 deletions.
40 changes: 39 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12755,7 +12755,45 @@ public final Flowable<T> onBackpressureDrop(@NonNull Consumer<? super T> onDrop)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Flowable<T> onBackpressureLatest() {
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this));
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, null));
}

/**
* Drops all but the latest item emitted by the current {@code Flowable} if the downstream is not ready to receive
* new items (indicated by a lack of {@link Subscription#request(long)} calls from it) and emits this latest
* item when the downstream becomes ready.
* <p>
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.latest.v3.png" alt="">
* <p>
* Its behavior is logically equivalent to {@code blockingLatest()} with the exception that
* the downstream is not blocking while requesting more values.
* <p>
* Note that if the current {@code Flowable} does support backpressure, this operator ignores that capability
* and doesn't propagate any backpressure requests from downstream.
* <p>
* Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
* requesting more than 1 from downstream doesn't guarantee a continuous delivery of {@code onNext} events.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
* manner (i.e., not applying backpressure to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureLatest} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @throws NullPointerException if {@code onDropped} is {@code null}
* @return the new {@code Flowable} instance
* @since 1.1.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Flowable<T> onBackpressureLatest(@NonNull Consumer<T> onDropped) {
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, onDropped));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,48 @@
package io.reactivex.rxjava3.internal.operators.flowable;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.Subscriber;

public final class FlowableOnBackpressureLatest<T> extends AbstractFlowableWithUpstream<T, T> {

public FlowableOnBackpressureLatest(Flowable<T> source) {
final Consumer<? super T> onDropped;

public FlowableOnBackpressureLatest(Flowable<T> source, Consumer<? super T> onDropped) {
super(source);
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new BackpressureLatestSubscriber<>(s));
source.subscribe(new BackpressureLatestSubscriber<>(s, onDropped));
}

static final class BackpressureLatestSubscriber<T> extends AbstractBackpressureThrottlingSubscriber<T, T> {

private static final long serialVersionUID = 163080509307634843L;

BackpressureLatestSubscriber(Subscriber<? super T> downstream) {
final Consumer<? super T> onDropped;

BackpressureLatestSubscriber(Subscriber<? super T> downstream,
Consumer<? super T> onDropped) {
super(downstream);
this.onDropped = onDropped;
}

@Override
public void onNext(T t) {
current.lazySet(t);
T oldValue = current.getAndSet(t);
if (onDropped != null && oldValue != null) {
try {
onDropped.accept(oldValue);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
downstream.onError(ex);
}
}
drain();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.TimeUnit;

import org.junit.*;
import org.mockito.InOrder;
import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.core.*;
Expand All @@ -27,6 +28,8 @@
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;

import static org.mockito.Mockito.inOrder;

public class FlowableOnBackpressureLatestTest extends RxJavaTest {
@Test
public void simple() {
Expand Down Expand Up @@ -62,6 +65,68 @@ public void simpleBackpressure() {
ts.assertNotComplete();
}

@Test
public void simpleBackpressureWithOnDroppedCallback() {
PublishProcessor<Integer> source = PublishProcessor.create();
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);

Observer<Object> dropCallbackObserver = TestHelper.mockObserver();

source.onBackpressureLatest(dropCallbackObserver::onNext)
.subscribe(ts);

ts.assertNoValues();

source.onNext(1);
source.onNext(2);
source.onNext(3);

ts.request(1);

ts.assertValues(3);

source.onNext(4);
source.onNext(5);

ts.request(2);

ts.assertValues(3,5);

InOrder dropCallbackOrder = inOrder(dropCallbackObserver);
dropCallbackOrder.verify(dropCallbackObserver).onNext(1);
dropCallbackOrder.verify(dropCallbackObserver).onNext(2);
dropCallbackOrder.verify(dropCallbackObserver).onNext(4);
dropCallbackOrder.verifyNoMoreInteractions();
}

@Test
public void simpleBackpressureWithOnDroppedCallbackEx() {
PublishProcessor<Integer> source = PublishProcessor.create();
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);

source.onBackpressureLatest(e -> {
if (e == 3) {
throw new TestException("forced");
}
})
.subscribe(ts);

ts.assertNoValues();

source.onNext(1);
source.onNext(2);

ts.request(1);

ts.assertValues(2);

source.onNext(3);
source.onNext(4);

ts.assertError(TestException.class);
ts.assertValues(2);
}

@Test
public void synchronousDrop() {
PublishProcessor<Integer> source = PublishProcessor.create();
Expand Down Expand Up @@ -105,7 +170,7 @@ public void synchronousDrop() {
}

@Test
public void asynchronousDrop() throws InterruptedException {
public void asynchronousDrop() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>(1L) {
final Random rnd = new Random();
@Override
Expand Down

0 comments on commit 9e908da

Please sign in to comment.