Skip to content

[2.x] UnicastSubject fail fast support #5217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 115 additions & 30 deletions src/main/java/io/reactivex/subjects/UnicastSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.Nullable;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.concurrent.atomic.*;
Expand Down Expand Up @@ -53,6 +54,9 @@ public final class UnicastSubject<T> extends Subject<T> {
/** The optional callback when the Subject gets cancelled or terminates. */
final AtomicReference<Runnable> onTerminate;

/** deliver onNext events before error event */
final boolean delayError;

/** Indicates the single observer has cancelled. */
volatile boolean disposed;

Expand All @@ -79,7 +83,7 @@ public final class UnicastSubject<T> extends Subject<T> {
*/
@CheckReturnValue
public static <T> UnicastSubject<T> create() {
return new UnicastSubject<T>(bufferSize());
return new UnicastSubject<T>(bufferSize(), true);
}

/**
Expand All @@ -90,7 +94,7 @@ public static <T> UnicastSubject<T> create() {
*/
@CheckReturnValue
public static <T> UnicastSubject<T> create(int capacityHint) {
return new UnicastSubject<T>(capacityHint);
return new UnicastSubject<T>(capacityHint, true);
}

/**
Expand All @@ -102,37 +106,91 @@ public static <T> UnicastSubject<T> create(int capacityHint) {
*
* @param <T> the value type
* @param capacityHint the hint to size the internal unbounded buffer
* @param onCancelled the non null callback
* @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
* @return an UnicastSubject instance
*/
@CheckReturnValue
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate) {
return new UnicastSubject<T>(capacityHint, onTerminate, true);
}

/**
* Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and
* a callback for the case when the single Subscriber cancels its subscription.
*
* <p>The callback, if not null, is called exactly once and
* non-overlapped with any active replay.
*
* @param <T> the value type
* @param capacityHint the hint to size the internal unbounded buffer
* @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
* @param delayError deliver pending onNext events before onError
* @return an UnicastSubject instance
* @since 2.0.8 - experimental
*/
@CheckReturnValue
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onCancelled) {
return new UnicastSubject<T>(capacityHint, onCancelled);
@Experimental
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError) {
return new UnicastSubject<T>(capacityHint, onTerminate, delayError);
}

/**
* Creates an UnicastSubject with the given capacity hint.
* Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.
*
* <p>The callback, if not null, is called exactly once and
* non-overlapped with any active replay.
*
* @param <T> the value type
* @param delayError deliver pending onNext events before onError
* @return an UnicastSubject instance
* @since 2.0.8 - experimental
*/
@CheckReturnValue
@Experimental
public static <T> UnicastSubject<T> create(boolean delayError) {
return new UnicastSubject<T>(bufferSize(), delayError);
}


/**
* Creates an UnicastSubject with the given capacity hint and delay error flag.
* @param capacityHint the capacity hint for the internal, unbounded queue
* @since 2.0
* @param delayError deliver pending onNext events before onError
* @since 2.0.8 - experimental
*/
UnicastSubject(int capacityHint) {
UnicastSubject(int capacityHint, boolean delayError) {
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
this.onTerminate = new AtomicReference<Runnable>();
this.delayError = delayError;
this.actual = new AtomicReference<Observer<? super T>>();
this.once = new AtomicBoolean();
this.wip = new UnicastQueueDisposable();
}

/**
* Creates an UnicastProcessor with the given capacity hint and callback
* for when the Processor is terminated normally or its single Subscriber cancels.
* Creates an UnicastSubject with the given capacity hint and callback
* for when the Subject is terminated normally or its single Subscriber cancels.
* @param capacityHint the capacity hint for the internal, unbounded queue
* @param onTerminate the callback to run when the Processor is terminated or cancelled, null not allowed
* @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
* @since 2.0
*/
*
* */
UnicastSubject(int capacityHint, Runnable onTerminate) {
this(capacityHint, onTerminate, true);
}

/**
* Creates an UnicastSubject with the given capacity hint, delay error flag and callback
* for when the Subject is terminated normally or its single Subscriber cancels.
* @param capacityHint the capacity hint for the internal, unbounded queue
* @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
* @param delayError deliver pending onNext events before onError
* @since 2.0.8 - experimental
*/
UnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) {
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate"));
this.delayError = delayError;
this.actual = new AtomicReference<Observer<? super T>>();
this.once = new AtomicBoolean();
this.wip = new UnicastQueueDisposable();
Expand Down Expand Up @@ -212,6 +270,8 @@ public void onComplete() {
void drainNormal(Observer<? super T> a) {
int missed = 1;
SimpleQueue<T> q = queue;
boolean failFast = !this.delayError;
boolean canBeError = true;
for (;;) {
for (;;) {

Expand All @@ -221,19 +281,23 @@ void drainNormal(Observer<? super T> a) {
return;
}

boolean d = done;
boolean d = this.done;
T v = queue.poll();
boolean empty = v == null;

if (d && empty) {
actual.lazySet(null);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
if (d) {
if (failFast && canBeError) {
if (failedFast(q, a)) {
return;
} else {
canBeError = false;
}
}

if (empty) {
errorOrComplete(a);
return;
}
return;
}

if (empty) {
Expand All @@ -254,6 +318,7 @@ void drainFused(Observer<? super T> a) {
int missed = 1;

final SpscLinkedArrayQueue<T> q = queue;
final boolean failFast = !delayError;

for (;;) {

Expand All @@ -262,20 +327,18 @@ void drainFused(Observer<? super T> a) {
q.clear();
return;
}

boolean d = done;

if (failFast && d) {
if (failedFast(q, a)) {
return;
}
}

a.onNext(null);

if (d) {
actual.lazySet(null);

Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
errorOrComplete(a);
return;
}

Expand All @@ -286,6 +349,28 @@ void drainFused(Observer<? super T> a) {
}
}

void errorOrComplete(Observer<? super T> a) {
actual.lazySet(null);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
}

boolean failedFast(final SimpleQueue<T> q, Observer<? super T> a) {
Throwable ex = error;
if (ex != null) {
actual.lazySet(null);
q.clear();
a.onError(ex);
return true;
} else {
return false;
}
}

void drain() {
if (wip.getAndIncrement() != 0) {
return;
Expand Down
85 changes: 85 additions & 0 deletions src/test/java/io/reactivex/subjects/UnicastSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.reactivex.observers.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import static org.mockito.Mockito.mock;

public class UnicastSubjectTest {

Expand Down Expand Up @@ -69,6 +70,90 @@ public void fusionOfflie() {
.assertResult(1);
}

@Test
public void failFast() {
UnicastSubject<Integer> ap = UnicastSubject.create(false);
ap.onNext(1);
ap.onError(new RuntimeException());
TestObserver<Integer> ts = TestObserver.create();
ap.subscribe(ts);

ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void threeArgsFactoryFailFast() {
Runnable noop = mock(Runnable.class);
UnicastSubject<Integer> ap = UnicastSubject.create(16, noop, false);
ap.onNext(1);
ap.onError(new RuntimeException());
TestObserver<Integer> ts = TestObserver.create();
ap.subscribe(ts);

ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void threeArgsFactoryDelayError() {
Runnable noop = mock(Runnable.class);
UnicastSubject<Integer> ap = UnicastSubject.create(16, noop, true);
ap.onNext(1);
ap.onError(new RuntimeException());
TestObserver<Integer> ts = TestObserver.create();
ap.subscribe(ts);

ts
.assertValueCount(1)
.assertError(RuntimeException.class);
}

@Test
public void fusionOfflineFailFast() {
UnicastSubject<Integer> ap = UnicastSubject.create(false);
ap.onNext(1);
ap.onError(new RuntimeException());
TestObserver<Integer> ts = ObserverFusion.newTest(QueueDisposable.ANY);
ap.subscribe(ts);

ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void fusionOfflineFailFastMultipleEvents() {
UnicastSubject<Integer> ap = UnicastSubject.create(false);
ap.onNext(1);
ap.onNext(2);
ap.onNext(3);
ap.onComplete();
TestObserver<Integer> ts = ObserverFusion.newTest(QueueDisposable.ANY);
ap.subscribe(ts);

ts
.assertValueCount(3)
.assertComplete();
}

@Test
public void failFastMultipleEvents() {
UnicastSubject<Integer> ap = UnicastSubject.create(false);
ap.onNext(1);
ap.onNext(2);
ap.onNext(3);
ap.onComplete();
TestObserver<Integer> ts = TestObserver.create();
ap.subscribe(ts);

ts
.assertValueCount(3)
.assertComplete();
}

@Test
public void onTerminateCalledWhenOnError() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
Expand Down