diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e8b4365d1a..3279d7e5e7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -36,6 +36,7 @@ import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; import rx.operators.OperationMergeDelayError; +import rx.operators.OperationNext; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; @@ -1710,6 +1711,17 @@ public Iterator iterator() { }; } + /** + * Samples the next value (blocking without buffering) from in an observable sequence. + * + * @param items the source observable sequence. + * @param the type of observable. + * @return iterable that blocks upon each iteration until the next element in the observable source sequence becomes available. + */ + public static Iterable next(Observable items) { + return OperationNext.next(items); + } + /** * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. * @@ -2892,6 +2904,15 @@ public Iterable toIterable() { return toIterable(this); } + /** + * Samples the next value (blocking without buffering) from in an observable sequence. + * + * @return iterable that blocks upon each iteration until the next element in the observable source sequence becomes available. + */ + public Iterable next() { + return next(this); + } + public static class UnitTest { @Mock diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java new file mode 100644 index 0000000000..a3d637ec12 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -0,0 +1,267 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.Exceptions; + +import java.util.Iterator; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + + +/** + * Samples the next value (blocking without buffering) from in an observable sequence. + */ +public final class OperationNext { + + public static Iterable next(final Observable items) { + + NextObserver nextObserver = new NextObserver(); + final NextIterator nextIterator = new NextIterator(nextObserver); + + items.materialize().subscribe(nextObserver); + + return new Iterable() { + @Override + public Iterator iterator() { + return nextIterator; + } + }; + + } + + private static class NextIterator implements Iterator { + + private final NextObserver observer; + + private NextIterator(NextObserver observer) { + this.observer = observer; + } + + @Override + public boolean hasNext() { + return !observer.isCompleted(); + } + + @Override + public T next() { + if (observer.isCompleted()) { + throw new IllegalStateException("Observable is completed"); + } + + observer.await(); + + try { + return observer.takeNext(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Exceptions.propagate(e); + } + + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read only iterator"); + } + } + + + private static class NextObserver implements Observer> { + private final BlockingQueue> buf = new ArrayBlockingQueue>(1); + private final AtomicBoolean waiting = new AtomicBoolean(false); + + @Override + public void onCompleted() { + // ignore + } + + @Override + public void onError(Exception e) { + // ignore + } + + @Override + public void onNext(Notification args) { + + if (waiting.getAndSet(false) || !args.isOnNext()) { + Notification toOffer = args; + while (!buf.offer(toOffer)) { + Notification concurrentItem = buf.poll(); + + // in case if we won race condition with onComplete/onError method + if (!concurrentItem.isOnNext()) { + toOffer = concurrentItem; + } + } + } + + } + + public void await() { + waiting.set(true); + } + + public boolean isCompleted() { + Notification lastItem = buf.peek(); + if (lastItem == null) { + return false; + } + + if (lastItem.isOnError()) { + throw Exceptions.propagate(lastItem.getException()); + } + + return lastItem.isOnCompleted(); + } + + public T takeNext() throws InterruptedException { + Notification next = buf.take(); + + if (next.isOnError()) { + throw Exceptions.propagate(next.getException()); + } + + if (next.isOnCompleted()) { + throw new IllegalStateException("Observable is completed"); + } + + return next.getValue(); + + } + + } + + + public static class UnitTest { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Test + public void testNext() throws Exception { + Subscription s = mock(Subscription.class); + final TestObservable obs = new TestObservable(s); + + Iterator it = next(obs).iterator(); + + assertTrue(it.hasNext()); + + Future next = nextAsync(it); + Thread.sleep(100); + obs.sendOnNext("one"); + assertEquals("one", next.get()); + + assertTrue(it.hasNext()); + + next = nextAsync(it); + Thread.sleep(100); + obs.sendOnNext("two"); + assertEquals("two", next.get()); + + assertTrue(it.hasNext()); + + obs.sendOnCompleted(); + + assertFalse(it.hasNext()); + + } + + @Test(expected = TestException.class) + public void testOnError() throws Throwable { + Subscription s = mock(Subscription.class); + final TestObservable obs = new TestObservable(s); + + Iterator it = next(obs).iterator(); + + assertTrue(it.hasNext()); + + Future next = nextAsync(it); + Thread.sleep(100); + obs.sendOnNext("one"); + assertEquals("one", next.get()); + + assertTrue(it.hasNext()); + + next = nextAsync(it); + Thread.sleep(100); + obs.sendOnError(new TestException()); + + try { + next.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + private Future nextAsync(final Iterator it) throws Exception { + + return executor.submit(new Callable() { + + @Override + public String call() throws Exception { + return it.next(); + } + }); + } + + private static class TestObservable extends Observable { + + Observer observer = null; + Subscription s; + + public TestObservable(Subscription s) { + this.s = s; + } + + /* used to simulate subscription */ + public void sendOnCompleted() { + observer.onCompleted(); + } + + /* used to simulate subscription */ + public void sendOnNext(String value) { + observer.onNext(value); + } + + /* used to simulate subscription */ + @SuppressWarnings("unused") + public void sendOnError(Exception e) { + observer.onError(e); + } + + @Override + public Subscription subscribe(final Observer observer) { + this.observer = observer; + return s; + } + + } + + private static class TestException extends RuntimeException { + + } + + + } + +}