diff --git a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java index 1bdbcd8272..828ee41a0e 100644 --- a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java @@ -16,7 +16,7 @@ package rx.observers; import java.util.Arrays; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Subscriber; import rx.exceptions.CompositeException; @@ -60,7 +60,11 @@ public class SafeSubscriber extends Subscriber { private final Subscriber actual; - private final AtomicBoolean isFinished = new AtomicBoolean(false); + /** Terminal state indication if not zero. */ + volatile int done; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater DONE_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(SafeSubscriber.class, "done"); public SafeSubscriber(Subscriber actual) { super(actual); @@ -69,7 +73,7 @@ public SafeSubscriber(Subscriber actual) { @Override public void onCompleted() { - if (isFinished.compareAndSet(false, true)) { + if (DONE_UPDATER.getAndSet(this, 1) == 0) { try { actual.onCompleted(); } catch (Throwable e) { @@ -90,7 +94,7 @@ public void onError(Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwIfFatal(e); - if (isFinished.compareAndSet(false, true)) { + if (DONE_UPDATER.getAndSet(this, 1) == 0) { _onError(e); } } @@ -98,7 +102,7 @@ public void onError(Throwable e) { @Override public void onNext(T args) { try { - if (!isFinished.get()) { + if (done == 0) { actual.onNext(args); } } catch (Throwable e) {