Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator BO.forEachFuture #646

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions rxjava-core/src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.operators.OperationForEachFuture;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationToFuture;
import rx.operators.OperationToIterator;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Actions;
import rx.util.functions.Func1;

/**
Expand Down Expand Up @@ -93,6 +98,108 @@ private Subscription protectivelyWrapAndSubscribe(Observer<? super T> observer)
return subscription.wrap(o.subscribe(new SafeObserver<T>(subscription, observer)));
}

/**
* Subscribes to the given source and calls the callback for each emitted item,
* and surfaces the completion or error through a Future.
* <p>
* <em>Important note:</em> The returned task blocks indefinitely unless
* the run() method is called or the task is scheduled on an Executor.
* @param onNext the action to call with each emitted element
* @return the Future representing the entire for-each operation
* @see #forEachFuture(rx.util.functions.Action1, rx.Scheduler)
*/
public FutureTask<Void> forEachFuture(
Action1<? super T> onNext) {
return OperationForEachFuture.forEachFuture(o, onNext);
}

/**
* Subscribes to the given source and calls the callback for each emitted item,
* and surfaces the completion or error through a Future.
* <p>
* <em>Important note:</em> The returned task blocks indefinitely unless
* the run() method is called or the task is scheduled on an Executor.
* @param onNext the action to call with each emitted element
* @param onError the action to call when an exception is emitted
* @return the Future representing the entire for-each operation
* @see #forEachFuture(rx.util.functions.Action1, rx.util.functions.Action1, rx.Scheduler)
*/
public FutureTask<Void> forEachFuture(
Action1<? super T> onNext,
Action1<? super Throwable> onError) {
return OperationForEachFuture.forEachFuture(o, onNext, onError);
}

/**
* Subscribes to the given source and calls the callback for each emitted item,
* and surfaces the completion or error through a Future.
* <p>
* <em>Important note:</em> The returned task blocks indefinitely unless
* the run() method is called or the task is scheduled on an Executor.
* @param onNext the action to call with each emitted element
* @param onError the action to call when an exception is emitted
* @param onCompleted the action to call when the source completes
* @return the Future representing the entire for-each operation
* @see #forEachFuture(rx.util.functions.Action1, rx.util.functions.Action1, rx.util.functions.Action0, rx.Scheduler)
*/
public FutureTask<Void> forEachFuture(
Action1<? super T> onNext,
Action1<? super Throwable> onError,
Action0 onCompleted) {
return OperationForEachFuture.forEachFuture(o, onNext, onError, onCompleted);
}

/**
* Subscribes to the given source and calls the callback for each emitted item,
* and surfaces the completion or error through a Future, scheduled on the given scheduler.
* @param onNext the action to call with each emitted element
* @param scheduler the scheduler where the task will await the termination of the for-each
* @return the Future representing the entire for-each operation
*/
public FutureTask<Void> forEachFuture(
Action1<? super T> onNext,
Scheduler scheduler) {
FutureTask<Void> task = OperationForEachFuture.forEachFuture(o, onNext);
scheduler.schedule(Actions.fromRunnable(task));
return task;
}

/**
* Subscribes to the given source and calls the callback for each emitted item,
* and surfaces the completion or error through a Future, scheduled on the given scheduler.
* @param onNext the action to call with each emitted element
* @param onError the action to call when an exception is emitted
* @param scheduler the scheduler where the task will await the termination of the for-each
* @return the Future representing the entire for-each operation
*/
public FutureTask<Void> forEachFuture(
Action1<? super T> onNext,
Action1<? super Throwable> onError,
Scheduler scheduler) {
FutureTask<Void> task = OperationForEachFuture.forEachFuture(o, onNext, onError);
scheduler.schedule(Actions.fromRunnable(task));
return task;
}

/**
* Subscribes to the given source and calls the callback for each emitted item,
* and surfaces the completion or error through a Future, scheduled on the given scheduler.
* @param onNext the action to call with each emitted element
* @param onError the action to call when an exception is emitted
* @param onCompleted the action to call when the source completes
* @param scheduler the scheduler where the task will await the termination of the for-each
* @return the Future representing the entire for-each operation
*/
public FutureTask<Void> forEachFuture(
Action1<? super T> onNext,
Action1<? super Throwable> onError,
Action0 onCompleted,
Scheduler scheduler) {
FutureTask<Void> task = OperationForEachFuture.forEachFuture(o, onNext, onError, onCompleted);
scheduler.schedule(Actions.fromRunnable(task));
return task;
}

/**
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
* completes.
Expand Down
251 changes: 251 additions & 0 deletions rxjava-core/src/main/java/rx/observers/LatchedObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observers;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import rx.joins.ObserverBase;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Action2;
import rx.util.functions.Actions;

/**
* An observer implementation that calls a CountDownLatch in case
* a terminal state has been reached.
*/
public abstract class LatchedObserver<T> extends ObserverBase<T> {
/** The CountDownLatch to count-down on a terminal state. */
protected final CountDownLatch latch;
/** Contains the error. */
protected volatile Throwable error;
/**
* Consturcts a LatchedObserver instance.
* @param latch the CountDownLatch to use
*/
public LatchedObserver(CountDownLatch latch) {
this.latch = latch;
}
/**
* Block and await the latch.
* @throws InterruptedException if the wait is interrupted
*/
public void await() throws InterruptedException {
latch.await();
}
/**
* Block and await the latch for a given amount of time.
* @see CountDownLatch#await(long, java.util.concurrent.TimeUnit)
*/
public boolean await(long time, TimeUnit unit) throws InterruptedException {
return latch.await(time, unit);
}
/**
* Returns the observed error or null if there was none.
* <p>
* Should be generally called after the await() returns.
* @return the observed error
*/
public Throwable getThrowable() {
return error;
}

/**
* Create a LatchedObserver with the given callback function(s).
*/
public static <T> LatchedObserver<T> create(Action1<? super T> onNext) {
return create(onNext, new CountDownLatch(1));
}

/**
* Create a LatchedObserver with the given callback function(s).
*/
public static <T> LatchedObserver<T> create(Action1<? super T> onNext, Action1<? super Throwable> onError) {
return create(onNext, onError, new CountDownLatch(1));
}

/**
* Create a LatchedObserver with the given callback function(s).
*/
public static <T> LatchedObserver<T> create(Action1<? super T> onNext, Action1<? super Throwable> onError, Action0 onCompleted) {
return create(onNext, onError, onCompleted, new CountDownLatch(1));
}

/**
* Create a LatchedObserver with the given callback function(s) and a shared latch.
*/
public static <T> LatchedObserver<T> create(Action1<? super T> onNext, CountDownLatch latch) {
return new LatchedObserverImpl<T>(onNext, Actions.emptyThrowable(), Actions.empty(), latch);
}

/**
* Create a LatchedObserver with the given callback function(s) and a shared latch.
*/
public static <T> LatchedObserver<T> create(Action1<? super T> onNext, Action1<? super Throwable> onError, CountDownLatch latch) {
return new LatchedObserverImpl<T>(onNext, onError, Actions.empty(), latch);
}

/**
* Create a LatchedObserver with the given callback function(s) and a shared latch.
*/
public static <T> LatchedObserver<T> create(Action1<? super T> onNext, Action1<? super Throwable> onError, Action0 onCompleted, CountDownLatch latch) {
return new LatchedObserverImpl<T>(onNext, onError, onCompleted, latch);
}

/**
* Create a LatchedObserver with the given indexed callback function(s).
*/
public static <T> LatchedObserver<T> createIndexed(Action2<? super T, ? super Integer> onNext) {
return createIndexed(onNext, new CountDownLatch(1));
}

/**
* Create a LatchedObserver with the given indexed callback function(s).
*/
public static <T> LatchedObserver<T> createIndexed(Action2<? super T, ? super Integer> onNext, Action1<? super Throwable> onError) {
return createIndexed(onNext, onError, new CountDownLatch(1));
}

/**
* Create a LatchedObserver with the given indexed callback function(s).
*/
public static <T> LatchedObserver<T> createIndexed(Action2<? super T, ? super Integer> onNext, Action1<? super Throwable> onError, Action0 onCompleted) {
return createIndexed(onNext, onError, onCompleted, new CountDownLatch(1));
}

/**
* Create a LatchedObserver with the given indexed callback function(s) and a shared latch.
*/
public static <T> LatchedObserver<T> createIndexed(Action2<? super T, ? super Integer> onNext, CountDownLatch latch) {
return new LatchedObserverIndexedImpl<T>(onNext, Actions.emptyThrowable(), Actions.empty(), latch);
}

/**
* Create a LatchedObserver with the given indexed callback function(s) and a shared latch.
*/
public static <T> LatchedObserver<T> createIndexed(Action2<? super T, ? super Integer> onNext, Action1<? super Throwable> onError, CountDownLatch latch) {
return new LatchedObserverIndexedImpl<T>(onNext, onError, Actions.empty(), latch);
}

/**
* Create a LatchedObserver with the given indexed callback function(s) and a shared latch.
*/
public static <T> LatchedObserver<T> createIndexed(Action2<? super T, ? super Integer> onNext, Action1<? super Throwable> onError, Action0 onCompleted, CountDownLatch latch) {
return new LatchedObserverIndexedImpl<T>(onNext, onError, onCompleted, latch);
}

/**
* A latched observer which calls an action for each observed value
* and checks if a cancellation token is not unsubscribed.
* @param <T> the observed value type
*/
private static final class LatchedObserverImpl<T> extends LatchedObserver<T> {
final Action1<? super T> onNext;
final Action1<? super Throwable> onError;
final Action0 onCompleted;

public LatchedObserverImpl(Action1<? super T> onNext,
Action1<? super Throwable> onError,
Action0 onCompleted,
CountDownLatch latch) {
super(latch);
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

@Override
protected void onNextCore(T args) {
try {
onNext.call(args);
} catch (Throwable t) {
fail(t);
}
}

@Override
protected void onErrorCore(Throwable e) {
try {
error = e;
onError.call(e);
} finally {
latch.countDown();
}
}

@Override
protected void onCompletedCore() {
try {
onCompleted.call();
} finally {
latch.countDown();
}
}
}
/**
* A latched observer which calls an action for each observed value
* and checks if a cancellation token is not unsubscribed.
* @param <T> the observed value type
*/
private static final class LatchedObserverIndexedImpl<T> extends LatchedObserver<T> {
final Action2<? super T, ? super Integer> onNext;
final Action1<? super Throwable> onError;
final Action0 onCompleted;
int index;

public LatchedObserverIndexedImpl(Action2<? super T, ? super Integer> onNext,
Action1<? super Throwable> onError,
Action0 onCompleted,
CountDownLatch latch) {
super(latch);
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

@Override
protected void onNextCore(T args) {
if (index == Integer.MAX_VALUE) {
fail(new ArithmeticException("index overflow"));
return;
}
try {
onNext.call(args, index++);
} catch (Throwable t) {
fail(t);
}
}

@Override
protected void onErrorCore(Throwable e) {
try {
error = e;
onError.call(e);
} finally {
latch.countDown();
}
}

@Override
protected void onCompletedCore() {
try {
onCompleted.call();
} finally {
latch.countDown();
}
}
}
}
Loading