Skip to content

Commit

Permalink
Fixed testOnErrorViaHasNext in issue ReactiveX#383
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Oct 27, 2013
1 parent b985cac commit 7e348ce
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationNext.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void remove() {
private static class NextObserver<T> implements Observer<Notification<? extends T>> {
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
private final AtomicBoolean waiting = new AtomicBoolean(false);
private volatile boolean completed = false;

@Override
public void onCompleted() {
Expand Down Expand Up @@ -139,7 +140,11 @@ public void await() {
public boolean isCompleted(boolean rethrowExceptionIfExists) {
Notification<? extends T> lastItem = buf.peek();
if (lastItem == null) {
return false;
// Fixed issue #383 testOnErrorViaHasNext fails sometimes.
// If the buf is empty, there are two cases:
// 1. The next item has not been emitted yet.
// 2. The error or completed notification is removed in takeNext method.
return completed;
}

if (lastItem.isOnError()) {
Expand All @@ -157,10 +162,12 @@ public T takeNext() throws InterruptedException {
Notification<? extends T> next = buf.take();

if (next.isOnError()) {
completed = true;
throw Exceptions.propagate(next.getThrowable());
}

if (next.isOnCompleted()) {
completed = true;
throw new IllegalStateException("Observable is completed");
}

Expand Down

0 comments on commit 7e348ce

Please sign in to comment.