From 0202e63829c6b320a1a13da66929b808a2d8aa37 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 22 Jan 2014 09:15:35 +0100 Subject: [PATCH] Fixed testSingleSourceManyIterators --- .../main/java/rx/operators/OperationNext.java | 9 ++++++++- .../java/rx/operators/OperationNextTest.java | 19 +++++++++++-------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 45d1fff72b..d4d62b65b5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -48,7 +48,8 @@ public Iterator iterator() { } - private static class NextIterator implements Iterator { + // test needs to access the observer.waiting flag non-blockingly. + /* private */static final class NextIterator implements Iterator { private final NextObserver observer; private T next; @@ -60,6 +61,12 @@ private NextIterator(NextObserver observer) { this.observer = observer; } + // in tests, set the waiting flag without blocking for the next value to + // allow lockstepping instead of multi-threading + void setWaiting(boolean value) { + observer.waiting.set(value); + } + @Override public boolean hasNext() { if (error != null) { diff --git a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java index 5630507c92..92a267c307 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java @@ -296,24 +296,27 @@ public void run() { System.out.println("a: " + a + " b: " + b + " c: " + c); } - @Test(timeout = 8000) + @Test /* (timeout = 8000) */ public void testSingleSourceManyIterators() throws InterruptedException { - BlockingObservable source = Observable.interval(200, TimeUnit.MILLISECONDS).take(10).toBlockingObservable(); + PublishSubject ps = PublishSubject.create(); + BlockingObservable source = ps.take(10).toBlockingObservable(); Iterable iter = source.next(); for (int j = 0; j < 3; j++) { - Iterator it = iter.iterator(); + OperationNext.NextIterator it = (OperationNext.NextIterator)iter.iterator(); - for (int i = 0; i < 9; i++) { + for (long i = 0; i < 9; i++) { // hasNext has to set the waiting to true, otherwise, all onNext will be skipped + it.setWaiting(true); + ps.onNext(i); Assert.assertEquals(true, it.hasNext()); - Assert.assertEquals(Long.valueOf(i), it.next()); + Assert.assertEquals(j + "th iteration", Long.valueOf(i), it.next()); } + it.setWaiting(true); + ps.onNext(9L); - Thread.sleep(400); - - Assert.assertEquals(false, it.hasNext()); + Assert.assertEquals(j + "th iteration", false, it.hasNext()); } }