From 7e7f26f27d57f337728dbc2c60e2b153fe359305 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 20 Dec 2013 14:40:53 +0100 Subject: [PATCH] Operator BO.forEachFuture --- .../rx/observables/BlockingObservable.java | 107 ++++++++ .../java/rx/observers/LatchedObserver.java | 251 ++++++++++++++++++ .../rx/operators/OperationForEachFuture.java | 135 ++++++++++ .../main/java/rx/util/functions/Actions.java | 83 ++++++ .../operators/OperationForEachFutureTest.java | 169 ++++++++++++ 5 files changed, 745 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/observers/LatchedObserver.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperationForEachFuture.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationForEachFutureTest.java diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 8fe13c9e88..41fd693aa4 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -18,18 +18,23 @@ import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicReference; import rx.Observable; import rx.Observer; +import rx.Scheduler; import rx.Subscription; +import rx.operators.OperationForEachFuture; import rx.operators.OperationMostRecent; import rx.operators.OperationNext; import rx.operators.OperationToFuture; import rx.operators.OperationToIterator; import rx.operators.SafeObservableSubscription; import rx.operators.SafeObserver; +import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Actions; import rx.util.functions.Func1; /** @@ -93,6 +98,108 @@ private Subscription protectivelyWrapAndSubscribe(Observer observer) return subscription.wrap(o.subscribe(new SafeObserver(subscription, observer))); } + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + *

+ * Important note: The returned task blocks indefinitely unless + * the run() method is called or the task is scheduled on an Executor. + * @param onNext the action to call with each emitted element + * @return the Future representing the entire for-each operation + * @see #forEachFuture(rx.util.functions.Action1, rx.Scheduler) + */ + public FutureTask forEachFuture( + Action1 onNext) { + return OperationForEachFuture.forEachFuture(o, onNext); + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + *

+ * Important note: The returned task blocks indefinitely unless + * the run() method is called or the task is scheduled on an Executor. + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @return the Future representing the entire for-each operation + * @see #forEachFuture(rx.util.functions.Action1, rx.util.functions.Action1, rx.Scheduler) + */ + public FutureTask forEachFuture( + Action1 onNext, + Action1 onError) { + return OperationForEachFuture.forEachFuture(o, onNext, onError); + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + *

+ * Important note: The returned task blocks indefinitely unless + * the run() method is called or the task is scheduled on an Executor. + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @param onCompleted the action to call when the source completes + * @return the Future representing the entire for-each operation + * @see #forEachFuture(rx.util.functions.Action1, rx.util.functions.Action1, rx.util.functions.Action0, rx.Scheduler) + */ + public FutureTask forEachFuture( + Action1 onNext, + Action1 onError, + Action0 onCompleted) { + return OperationForEachFuture.forEachFuture(o, onNext, onError, onCompleted); + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future, scheduled on the given scheduler. + * @param onNext the action to call with each emitted element + * @param scheduler the scheduler where the task will await the termination of the for-each + * @return the Future representing the entire for-each operation + */ + public FutureTask forEachFuture( + Action1 onNext, + Scheduler scheduler) { + FutureTask task = OperationForEachFuture.forEachFuture(o, onNext); + scheduler.schedule(Actions.fromRunnable(task)); + return task; + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future, scheduled on the given scheduler. + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @param scheduler the scheduler where the task will await the termination of the for-each + * @return the Future representing the entire for-each operation + */ + public FutureTask forEachFuture( + Action1 onNext, + Action1 onError, + Scheduler scheduler) { + FutureTask task = OperationForEachFuture.forEachFuture(o, onNext, onError); + scheduler.schedule(Actions.fromRunnable(task)); + return task; + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future, scheduled on the given scheduler. + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @param onCompleted the action to call when the source completes + * @param scheduler the scheduler where the task will await the termination of the for-each + * @return the Future representing the entire for-each operation + */ + public FutureTask forEachFuture( + Action1 onNext, + Action1 onError, + Action0 onCompleted, + Scheduler scheduler) { + FutureTask task = OperationForEachFuture.forEachFuture(o, onNext, onError, onCompleted); + scheduler.schedule(Actions.fromRunnable(task)); + return task; + } + /** * Invoke a method on each item emitted by the {@link Observable}; block until the Observable * completes. diff --git a/rxjava-core/src/main/java/rx/observers/LatchedObserver.java b/rxjava-core/src/main/java/rx/observers/LatchedObserver.java new file mode 100644 index 0000000000..eed97ee449 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observers/LatchedObserver.java @@ -0,0 +1,251 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observers; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import rx.joins.ObserverBase; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action2; +import rx.util.functions.Actions; + +/** + * An observer implementation that calls a CountDownLatch in case + * a terminal state has been reached. + */ +public abstract class LatchedObserver extends ObserverBase { + /** The CountDownLatch to count-down on a terminal state. */ + protected final CountDownLatch latch; + /** Contains the error. */ + protected volatile Throwable error; + /** + * Consturcts a LatchedObserver instance. + * @param latch the CountDownLatch to use + */ + public LatchedObserver(CountDownLatch latch) { + this.latch = latch; + } + /** + * Block and await the latch. + * @throws InterruptedException if the wait is interrupted + */ + public void await() throws InterruptedException { + latch.await(); + } + /** + * Block and await the latch for a given amount of time. + * @see CountDownLatch#await(long, java.util.concurrent.TimeUnit) + */ + public boolean await(long time, TimeUnit unit) throws InterruptedException { + return latch.await(time, unit); + } + /** + * Returns the observed error or null if there was none. + *

+ * Should be generally called after the await() returns. + * @return the observed error + */ + public Throwable getThrowable() { + return error; + } + + /** + * Create a LatchedObserver with the given callback function(s). + */ + public static LatchedObserver create(Action1 onNext) { + return create(onNext, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given callback function(s). + */ + public static LatchedObserver create(Action1 onNext, Action1 onError) { + return create(onNext, onError, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given callback function(s). + */ + public static LatchedObserver create(Action1 onNext, Action1 onError, Action0 onCompleted) { + return create(onNext, onError, onCompleted, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given callback function(s) and a shared latch. + */ + public static LatchedObserver create(Action1 onNext, CountDownLatch latch) { + return new LatchedObserverImpl(onNext, Actions.emptyThrowable(), Actions.empty(), latch); + } + + /** + * Create a LatchedObserver with the given callback function(s) and a shared latch. + */ + public static LatchedObserver create(Action1 onNext, Action1 onError, CountDownLatch latch) { + return new LatchedObserverImpl(onNext, onError, Actions.empty(), latch); + } + + /** + * Create a LatchedObserver with the given callback function(s) and a shared latch. + */ + public static LatchedObserver create(Action1 onNext, Action1 onError, Action0 onCompleted, CountDownLatch latch) { + return new LatchedObserverImpl(onNext, onError, onCompleted, latch); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s). + */ + public static LatchedObserver createIndexed(Action2 onNext) { + return createIndexed(onNext, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s). + */ + public static LatchedObserver createIndexed(Action2 onNext, Action1 onError) { + return createIndexed(onNext, onError, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s). + */ + public static LatchedObserver createIndexed(Action2 onNext, Action1 onError, Action0 onCompleted) { + return createIndexed(onNext, onError, onCompleted, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s) and a shared latch. + */ + public static LatchedObserver createIndexed(Action2 onNext, CountDownLatch latch) { + return new LatchedObserverIndexedImpl(onNext, Actions.emptyThrowable(), Actions.empty(), latch); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s) and a shared latch. + */ + public static LatchedObserver createIndexed(Action2 onNext, Action1 onError, CountDownLatch latch) { + return new LatchedObserverIndexedImpl(onNext, onError, Actions.empty(), latch); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s) and a shared latch. + */ + public static LatchedObserver createIndexed(Action2 onNext, Action1 onError, Action0 onCompleted, CountDownLatch latch) { + return new LatchedObserverIndexedImpl(onNext, onError, onCompleted, latch); + } + + /** + * A latched observer which calls an action for each observed value + * and checks if a cancellation token is not unsubscribed. + * @param the observed value type + */ + private static final class LatchedObserverImpl extends LatchedObserver { + final Action1 onNext; + final Action1 onError; + final Action0 onCompleted; + + public LatchedObserverImpl(Action1 onNext, + Action1 onError, + Action0 onCompleted, + CountDownLatch latch) { + super(latch); + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + } + + @Override + protected void onNextCore(T args) { + try { + onNext.call(args); + } catch (Throwable t) { + fail(t); + } + } + + @Override + protected void onErrorCore(Throwable e) { + try { + error = e; + onError.call(e); + } finally { + latch.countDown(); + } + } + + @Override + protected void onCompletedCore() { + try { + onCompleted.call(); + } finally { + latch.countDown(); + } + } + } + /** + * A latched observer which calls an action for each observed value + * and checks if a cancellation token is not unsubscribed. + * @param the observed value type + */ + private static final class LatchedObserverIndexedImpl extends LatchedObserver { + final Action2 onNext; + final Action1 onError; + final Action0 onCompleted; + int index; + + public LatchedObserverIndexedImpl(Action2 onNext, + Action1 onError, + Action0 onCompleted, + CountDownLatch latch) { + super(latch); + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + } + + @Override + protected void onNextCore(T args) { + if (index == Integer.MAX_VALUE) { + fail(new ArithmeticException("index overflow")); + return; + } + try { + onNext.call(args, index++); + } catch (Throwable t) { + fail(t); + } + } + + @Override + protected void onErrorCore(Throwable e) { + try { + error = e; + onError.call(e); + } finally { + latch.countDown(); + } + } + + @Override + protected void onCompletedCore() { + try { + onCompleted.call(); + } finally { + latch.countDown(); + } + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationForEachFuture.java b/rxjava-core/src/main/java/rx/operators/OperationForEachFuture.java new file mode 100644 index 0000000000..c0ef47ee26 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationForEachFuture.java @@ -0,0 +1,135 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import rx.Observable; +import rx.Subscription; +import rx.observers.LatchedObserver; +import rx.util.Exceptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Actions; + +/** + * Convert the observation of a source observable to a big Future call. + *

+ * Remark: the cancellation token version's behavior is in doubt, so left out. + */ +public final class OperationForEachFuture { + /** Utility class. */ + private OperationForEachFuture() { throw new IllegalStateException("No instances!"); } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + * @param the element type of the Observable + * @param source the source Observable + * @param onNext the action to call with each emitted element + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext) { + return forEachFuture(source, onNext, Actions.emptyThrowable(), Actions.empty()); + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + * @param the element type of the Observable + * @param source the source Observable + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Action1 onError) { + return forEachFuture(source, onNext, onError, Actions.empty()); + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + * @param the element type of the Observable + * @param source the source Observable + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @param onCompleted the action to call when the source completes + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Action1 onError, + Action0 onCompleted) { + + LatchedObserver lo = LatchedObserver.create(onNext, onError, onCompleted); + + Subscription s = source.subscribe(lo); + + FutureTaskCancel task = new FutureTaskCancel(s, new RunAwait(lo)); + + return task; + } + /** + * A future task that unsubscribes the given subscription when cancelled. + * @param the return value type + */ + private static final class FutureTaskCancel extends FutureTask { + final Subscription cancel; + + public FutureTaskCancel(Subscription cancel, Callable callable) { + super(callable); + this.cancel = cancel; + } + + public FutureTaskCancel(Subscription cancel, Runnable runnable, T result) { + super(runnable, result); + this.cancel = cancel; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + cancel.unsubscribe(); + return super.cancel(mayInterruptIfRunning); + } + + } + + /** Await the completion of a latched observer and throw its exception if any. */ + private static final class RunAwait implements Callable { + final LatchedObserver observer; + + public RunAwait(LatchedObserver observer) { + this.observer = observer; + } + + @Override + public Void call() throws Exception { + observer.await(); + Throwable t = observer.getThrowable(); + if (t != null) { + throw Exceptions.propagate(t); + } + return null; + } + } +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Actions.java b/rxjava-core/src/main/java/rx/util/functions/Actions.java index 7a33d451ce..480a05d447 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Actions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Actions.java @@ -67,6 +67,89 @@ public void call() { } }; } + /** + * Return an action which takes a Throwable and does nothing. + *

(To avoid casting from the generic empty1().) + * @return the action + */ + public static Action1 emptyThrowable() { + return EMPTY_THROWABLE; + } + /** + * An action that takes a Throwable and does nothing. + */ + private static final Action1 EMPTY_THROWABLE = new EmptyThrowable(); + /** An empty throwable class. */ + private static final class EmptyThrowable implements Action1 { + public void call(Throwable t1) { + } + } + /** + * Return an Action0 instance which does nothing. + * @return an Action0 instance which does nothing + */ + public static Action0 empty() { + return EMPTY; + } + /** A single empty instance. */ + private static final Action0 EMPTY = new EmptyAction(); + /** An empty action class. */ + private static final class EmptyAction implements Action0 { + @Override + public void call() { + } + } + + /** + * Converts a runnable instance into an Action0 instance. + * @param run the Runnable to run when the Action0 is called + * @return the Action0 wrapping the Runnable + */ + public static Action0 fromRunnable(Runnable run) { + if (run == null) { + throw new NullPointerException("run"); + } + return new ActionWrappingRunnable(run); + } + /** An Action0 which wraps and calls a Runnable. */ + private static final class ActionWrappingRunnable implements Action0 { + final Runnable run; + + public ActionWrappingRunnable(Runnable run) { + this.run = run; + } + + @Override + public void call() { + run.run(); + } + + } + /** + * Converts an Action0 instance into a Runnable instance. + * @param action the Action0 to call when the Runnable is run + * @return the Runnable wrapping the Action0 + */ + public static Runnable toRunnable(Action0 action) { + if (action == null) { + throw new NullPointerException("action"); + } + return new RunnableAction(action); + } + /** An Action0 which wraps and calls a Runnable. */ + private static final class RunnableAction implements Runnable { + final Action0 action; + + public RunnableAction(Action0 action) { + this.action = action; + } + + @Override + public void run() { + action.call(); + } + + } /** * Convert an action to a function which calls * the action returns Void (null). diff --git a/rxjava-core/src/test/java/rx/operators/OperationForEachFutureTest.java b/rxjava-core/src/test/java/rx/operators/OperationForEachFutureTest.java new file mode 100644 index 0000000000..9bcb75de66 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationForEachFutureTest.java @@ -0,0 +1,169 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import org.junit.Test; +import rx.Observable; +import rx.schedulers.Schedulers; +import rx.util.functions.Action1; + +public class OperationForEachFutureTest { + @Test + public void testSimple() { + final ExecutorService exec = Executors.newCachedThreadPool(); + + try { + Observable source = Observable.from(1, 2, 3) + .subscribeOn(Schedulers.threadPoolForComputation()); + + final AtomicInteger sum = new AtomicInteger(); + Action1 add = new Action1() { + @Override + public void call(Integer t1) { + sum.addAndGet(t1); + } + }; + + FutureTask task = source.toBlockingObservable().forEachFuture(add); + + exec.execute(task); + + try { + Void value = task.get(1000, TimeUnit.MILLISECONDS); + + assertEquals(null, value); + + assertEquals(6, sum.get()); + } catch (TimeoutException ex) { + fail("Timed out: " + ex); + } catch (ExecutionException ex) { + fail("Exception: " + ex); + } catch (InterruptedException ex) { + fail("Exception: " + ex); + } + } finally { + exec.shutdown(); + } + } + private static final class CustomException extends RuntimeException { } + @Test + public void testSimpleThrowing() { + + final ExecutorService exec = Executors.newCachedThreadPool(); + + try { + Observable source = Observable.error(new CustomException()) + .subscribeOn(Schedulers.threadPoolForComputation()); + + final AtomicInteger sum = new AtomicInteger(); + Action1 add = new Action1() { + @Override + public void call(Integer t1) { + sum.addAndGet(t1); + } + }; + + FutureTask task = source.toBlockingObservable().forEachFuture(add); + + exec.execute(task); + + try { + task.get(1000, TimeUnit.MILLISECONDS); + } catch (TimeoutException ex) { + fail("Timed out: " + ex); + } catch (ExecutionException ex) { + if (!(ex.getCause() instanceof CustomException)) { + fail("Got different exception: " + ex.getCause()); + } + } catch (InterruptedException ex) { + fail("Exception: " + ex); + } + + assertEquals(0, sum.get()); + } finally { + exec.shutdown(); + } + } + + @Test + public void testSimpleScheduled() { + Observable source = Observable.from(1, 2, 3) + .subscribeOn(Schedulers.threadPoolForComputation()); + + final AtomicInteger sum = new AtomicInteger(); + Action1 add = new Action1() { + @Override + public void call(Integer t1) { + sum.addAndGet(t1); + } + }; + + FutureTask task = source.toBlockingObservable().forEachFuture(add, Schedulers.newThread()); + + try { + Void value = task.get(1000, TimeUnit.MILLISECONDS); + + assertEquals(null, value); + + assertEquals(6, sum.get()); + } catch (TimeoutException ex) { + fail("Timed out: " + ex); + } catch (ExecutionException ex) { + fail("Exception: " + ex); + } catch (InterruptedException ex) { + fail("Exception: " + ex); + } + } + @Test + public void testSimpleScheduledThrowing() { + + Observable source = Observable.error(new CustomException()) + .subscribeOn(Schedulers.threadPoolForComputation()); + + final AtomicInteger sum = new AtomicInteger(); + Action1 add = new Action1() { + @Override + public void call(Integer t1) { + sum.addAndGet(t1); + } + }; + + FutureTask task = source.toBlockingObservable().forEachFuture(add, Schedulers.newThread()); + + try { + task.get(1000, TimeUnit.MILLISECONDS); + } catch (TimeoutException ex) { + fail("Timed out: " + ex); + } catch (ExecutionException ex) { + if (!(ex.getCause() instanceof CustomException)) { + fail("Got different exception: " + ex.getCause()); + } + } catch (InterruptedException ex) { + fail("Exception: " + ex); + } + + assertEquals(0, sum.get()); + } +}