diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 4a1b8815e9..3c6f482ca4 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.annotations.Experimental; import io.reactivex.annotations.Nullable; import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.atomic.*; @@ -53,6 +54,9 @@ public final class UnicastSubject extends Subject { /** The optional callback when the Subject gets cancelled or terminates. */ final AtomicReference onTerminate; + /** deliver onNext events before error event */ + final boolean delayError; + /** Indicates the single observer has cancelled. */ volatile boolean disposed; @@ -79,7 +83,7 @@ public final class UnicastSubject extends Subject { */ @CheckReturnValue public static UnicastSubject create() { - return new UnicastSubject(bufferSize()); + return new UnicastSubject(bufferSize(), true); } /** @@ -90,7 +94,7 @@ public static UnicastSubject create() { */ @CheckReturnValue public static UnicastSubject create(int capacityHint) { - return new UnicastSubject(capacityHint); + return new UnicastSubject(capacityHint, true); } /** @@ -102,37 +106,91 @@ public static UnicastSubject create(int capacityHint) { * * @param the value type * @param capacityHint the hint to size the internal unbounded buffer - * @param onCancelled the non null callback + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @return an UnicastSubject instance + */ + @CheckReturnValue + public static UnicastSubject create(int capacityHint, Runnable onTerminate) { + return new UnicastSubject(capacityHint, onTerminate, true); + } + + /** + * Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and + * a callback for the case when the single Subscriber cancels its subscription. + * + *

The callback, if not null, is called exactly once and + * non-overlapped with any active replay. + * + * @param the value type + * @param capacityHint the hint to size the internal unbounded buffer + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @param delayError deliver pending onNext events before onError * @return an UnicastSubject instance + * @since 2.0.8 - experimental */ @CheckReturnValue - public static UnicastSubject create(int capacityHint, Runnable onCancelled) { - return new UnicastSubject(capacityHint, onCancelled); + @Experimental + public static UnicastSubject create(int capacityHint, Runnable onTerminate, boolean delayError) { + return new UnicastSubject(capacityHint, onTerminate, delayError); } /** - * Creates an UnicastSubject with the given capacity hint. + * Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag. + * + *

The callback, if not null, is called exactly once and + * non-overlapped with any active replay. + * + * @param the value type + * @param delayError deliver pending onNext events before onError + * @return an UnicastSubject instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public static UnicastSubject create(boolean delayError) { + return new UnicastSubject(bufferSize(), delayError); + } + + + /** + * Creates an UnicastSubject with the given capacity hint and delay error flag. * @param capacityHint the capacity hint for the internal, unbounded queue - * @since 2.0 + * @param delayError deliver pending onNext events before onError + * @since 2.0.8 - experimental */ - UnicastSubject(int capacityHint) { + UnicastSubject(int capacityHint, boolean delayError) { this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference(); + this.delayError = delayError; this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); this.wip = new UnicastQueueDisposable(); } /** - * Creates an UnicastProcessor with the given capacity hint and callback - * for when the Processor is terminated normally or its single Subscriber cancels. + * Creates an UnicastSubject with the given capacity hint and callback + * for when the Subject is terminated normally or its single Subscriber cancels. * @param capacityHint the capacity hint for the internal, unbounded queue - * @param onTerminate the callback to run when the Processor is terminated or cancelled, null not allowed + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @since 2.0 - */ + * + * */ UnicastSubject(int capacityHint, Runnable onTerminate) { + this(capacityHint, onTerminate, true); + } + + /** + * Creates an UnicastSubject with the given capacity hint, delay error flag and callback + * for when the Subject is terminated normally or its single Subscriber cancels. + * @param capacityHint the capacity hint for the internal, unbounded queue + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @param delayError deliver pending onNext events before onError + * @since 2.0.8 - experimental + */ + UnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) { this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference(ObjectHelper.requireNonNull(onTerminate, "onTerminate")); + this.delayError = delayError; this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); this.wip = new UnicastQueueDisposable(); @@ -212,6 +270,8 @@ public void onComplete() { void drainNormal(Observer a) { int missed = 1; SimpleQueue q = queue; + boolean failFast = !this.delayError; + boolean canBeError = true; for (;;) { for (;;) { @@ -221,19 +281,23 @@ void drainNormal(Observer a) { return; } - boolean d = done; + boolean d = this.done; T v = queue.poll(); boolean empty = v == null; - if (d && empty) { - actual.lazySet(null); - Throwable ex = error; - if (ex != null) { - a.onError(ex); - } else { - a.onComplete(); + if (d) { + if (failFast && canBeError) { + if (failedFast(q, a)) { + return; + } else { + canBeError = false; + } + } + + if (empty) { + errorOrComplete(a); + return; } - return; } if (empty) { @@ -254,6 +318,7 @@ void drainFused(Observer a) { int missed = 1; final SpscLinkedArrayQueue q = queue; + final boolean failFast = !delayError; for (;;) { @@ -262,20 +327,18 @@ void drainFused(Observer a) { q.clear(); return; } - boolean d = done; + if (failFast && d) { + if (failedFast(q, a)) { + return; + } + } + a.onNext(null); if (d) { - actual.lazySet(null); - - Throwable ex = error; - if (ex != null) { - a.onError(ex); - } else { - a.onComplete(); - } + errorOrComplete(a); return; } @@ -286,6 +349,28 @@ void drainFused(Observer a) { } } + void errorOrComplete(Observer a) { + actual.lazySet(null); + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + } + + boolean failedFast(final SimpleQueue q, Observer a) { + Throwable ex = error; + if (ex != null) { + actual.lazySet(null); + q.clear(); + a.onError(ex); + return true; + } else { + return false; + } + } + void drain() { if (wip.getAndIncrement() != 0) { return; diff --git a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java index 7cf8f8c374..1788638967 100644 --- a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java @@ -27,6 +27,7 @@ import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; +import static org.mockito.Mockito.mock; public class UnicastSubjectTest { @@ -69,6 +70,90 @@ public void fusionOfflie() { .assertResult(1); } + @Test + public void failFast() { + UnicastSubject ap = UnicastSubject.create(false); + ap.onNext(1); + ap.onError(new RuntimeException()); + TestObserver ts = TestObserver.create(); + ap.subscribe(ts); + + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + + @Test + public void threeArgsFactoryFailFast() { + Runnable noop = mock(Runnable.class); + UnicastSubject ap = UnicastSubject.create(16, noop, false); + ap.onNext(1); + ap.onError(new RuntimeException()); + TestObserver ts = TestObserver.create(); + ap.subscribe(ts); + + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + + @Test + public void threeArgsFactoryDelayError() { + Runnable noop = mock(Runnable.class); + UnicastSubject ap = UnicastSubject.create(16, noop, true); + ap.onNext(1); + ap.onError(new RuntimeException()); + TestObserver ts = TestObserver.create(); + ap.subscribe(ts); + + ts + .assertValueCount(1) + .assertError(RuntimeException.class); + } + + @Test + public void fusionOfflineFailFast() { + UnicastSubject ap = UnicastSubject.create(false); + ap.onNext(1); + ap.onError(new RuntimeException()); + TestObserver ts = ObserverFusion.newTest(QueueDisposable.ANY); + ap.subscribe(ts); + + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + + @Test + public void fusionOfflineFailFastMultipleEvents() { + UnicastSubject ap = UnicastSubject.create(false); + ap.onNext(1); + ap.onNext(2); + ap.onNext(3); + ap.onComplete(); + TestObserver ts = ObserverFusion.newTest(QueueDisposable.ANY); + ap.subscribe(ts); + + ts + .assertValueCount(3) + .assertComplete(); + } + + @Test + public void failFastMultipleEvents() { + UnicastSubject ap = UnicastSubject.create(false); + ap.onNext(1); + ap.onNext(2); + ap.onNext(3); + ap.onComplete(); + TestObserver ts = TestObserver.create(); + ap.subscribe(ts); + + ts + .assertValueCount(3) + .assertComplete(); + } + @Test public void onTerminateCalledWhenOnError() { final AtomicBoolean didRunOnTerminate = new AtomicBoolean();