Skip to content

Commit

Permalink
Merge pull request #1501 from benjchristensen/blocking-next
Browse files Browse the repository at this point in the history
blocking synchronous next
  • Loading branch information
benjchristensen committed Jul 24, 2014
2 parents cf229ec + 4233c2a commit 0af90b7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Notification;
Expand Down Expand Up @@ -46,11 +47,7 @@ public static <T> Iterable<T> next(final Observable<? extends T> items) {
@Override
public Iterator<T> iterator() {
NextObserver<T> nextObserver = new NextObserver<T>();
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);

items.materialize().subscribe(nextObserver);

return nextIterator;
return new NextIterator<T>(items, nextObserver);
}
};

Expand All @@ -59,28 +56,19 @@ public Iterator<T> iterator() {
// test needs to access the observer.waiting flag non-blockingly.
/* private */static final class NextIterator<T> implements Iterator<T> {

private final NextObserver<? extends T> observer;
private final NextObserver<T> observer;
private final Observable<? extends T> items;
private T next;
private boolean hasNext = true;
private boolean isNextConsumed = true;
private Throwable error = null;
private boolean started = false;

private NextIterator(NextObserver<? extends T> observer) {
private NextIterator(Observable<? extends T> items, NextObserver<T> observer) {
this.items = items;
this.observer = observer;
}


// in tests, set the waiting flag without blocking for the next value to
// allow lockstepping instead of multi-threading
/**
* In tests, set the waiting flag without blocking for the next value to
* allow lockstepping instead of multi-threading
* @param value use 1 to enter into the waiting state
*/
void setWaiting(int value) {
observer.setWaiting(value);
}

@Override
public boolean hasNext() {
if (error != null) {
Expand All @@ -102,6 +90,13 @@ public boolean hasNext() {

private boolean moveToNext() {
try {
if (!started) {
started = true;
// if not started, start now
observer.setWaiting(1);
items.materialize().subscribe(observer);
}

Notification<? extends T> nextNotification = observer.takeNext();
if (nextNotification.isOnNext()) {
isNextConsumed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import rx.internal.operators.BlockingOperatorNext;
import rx.observables.BlockingObservable;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

Expand Down Expand Up @@ -295,26 +296,27 @@ public void run() {

@Test /* (timeout = 8000) */
public void testSingleSourceManyIterators() throws InterruptedException {
PublishSubject<Long> ps = PublishSubject.create();
BlockingObservable<Long> source = ps.take(10).toBlocking();
Observable<Long> o = Observable.interval(10, TimeUnit.MILLISECONDS);
PublishSubject<Void> terminal = PublishSubject.create();
BlockingObservable<Long> source = o.takeUntil(terminal).toBlocking();

Iterable<Long> iter = source.next();

for (int j = 0; j < 3; j++) {
BlockingOperatorNext.NextIterator<Long> it = (BlockingOperatorNext.NextIterator<Long>)iter.iterator();

for (long i = 0; i < 9; i++) {
// hasNext has to set the waiting to true, otherwise, all onNext will be skipped
it.setWaiting(1);
ps.onNext(i);
for (long i = 0; i < 10; i++) {
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(j + "th iteration", Long.valueOf(i), it.next());
Assert.assertEquals(j + "th iteration next", Long.valueOf(i), it.next());
}
it.setWaiting(1);
ps.onNext(9L);

Assert.assertEquals(j + "th iteration", false, it.hasNext());
terminal.onNext(null);
}

}

@Test
public void testSynchronousNext() {
assertEquals(1, BehaviorSubject.create(1).take(1).toBlocking().single().intValue());
assertEquals(2, BehaviorSubject.create(2).toBlocking().toIterable().iterator().next().intValue());
assertEquals(3, BehaviorSubject.create(3).toBlocking().next().iterator().next().intValue());
}
}

0 comments on commit 0af90b7

Please sign in to comment.