diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 79825d622b..5d6d77c15f 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -1250,6 +1250,29 @@ public final static Observable from(T[] array) { return from(Arrays.asList(array)); } + /** + * Returns an Observable that invokes passed function and emits its result for each new Observer that subscribes. + *

+ * Allows you to defer execution of passed function until Observer subscribes to the Observable. + * It makes passed function "lazy". + * Result of the function invocation will be emitted by the Observable. + *

+ *
Scheduler:
+ *
{@code fromCallable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * function which execution should be deferred, it will be invoked when Observer will subscribe to the Observable + * @param + * the type of the item emitted by the Observable + * @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function + * @see #defer(Func0) + */ + @Experimental + public static Observable fromCallable(Callable func) { + return create(new OnSubscribeFromCallable(func)); + } + /** * Returns an Observable that emits a sequential number every specified interval of time. *

diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromCallable.java b/src/main/java/rx/internal/operators/OnSubscribeFromCallable.java new file mode 100644 index 0000000000..35eb62f04e --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeFromCallable.java @@ -0,0 +1,38 @@ +package rx.internal.operators; + +import rx.Observable; +import rx.Subscriber; +import rx.exceptions.Exceptions; +import rx.internal.producers.SingleDelayedProducer; + +import java.util.concurrent.Callable; + +/** + * Do not invoke the function until an Observer subscribes; Invokes function on each + * subscription. + *

+ * Pass {@code fromCallable} a function, and {@code fromCallable} will call this function to emit result of invocation + * afresh each time a new Observer subscribes. + */ +public final class OnSubscribeFromCallable implements Observable.OnSubscribe { + + private final Callable resultFactory; + + public OnSubscribeFromCallable(Callable resultFactory) { + this.resultFactory = resultFactory; + } + + @Override + public void call(Subscriber subscriber) { + final SingleDelayedProducer singleDelayedProducer = new SingleDelayedProducer(subscriber); + + subscriber.setProducer(singleDelayedProducer); + + try { + singleDelayedProducer.setValue(resultFactory.call()); + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + subscriber.onError(t); + } + } +} diff --git a/src/test/java/rx/internal/operators/OnSubscribeFromCallableTest.java b/src/test/java/rx/internal/operators/OnSubscribeFromCallableTest.java new file mode 100644 index 0000000000..a4da6e3208 --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeFromCallableTest.java @@ -0,0 +1,140 @@ +package rx.internal.operators; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import rx.Observable; +import rx.Observer; +import rx.Subscription; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +import static org.mockito.Mockito.*; +import static rx.schedulers.Schedulers.computation; + +public class OnSubscribeFromCallableTest { + + @SuppressWarnings("unchecked") + @Test + public void shouldNotInvokeFuncUntilSubscription() throws Exception { + Callable func = mock(Callable.class); + + when(func.call()).thenReturn(new Object()); + + Observable fromCallableObservable = Observable.fromCallable(func); + + verifyZeroInteractions(func); + + fromCallableObservable.subscribe(); + + verify(func).call(); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCallOnNextAndOnCompleted() throws Exception { + Callable func = mock(Callable.class); + + when(func.call()).thenReturn("test_value"); + + Observable fromCallableObservable = Observable.fromCallable(func); + + Observer observer = mock(Observer.class); + + fromCallableObservable.subscribe(observer); + + verify(observer).onNext("test_value"); + verify(observer).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCallOnError() throws Exception { + Callable func = mock(Callable.class); + + Throwable throwable = new IllegalStateException("Test exception"); + when(func.call()).thenThrow(throwable); + + Observable fromCallableObservable = Observable.fromCallable(func); + + Observer observer = mock(Observer.class); + + fromCallableObservable.subscribe(observer); + + verify(observer, never()).onNext(anyObject()); + verify(observer, never()).onCompleted(); + verify(observer).onError(throwable); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception { + Callable func = mock(Callable.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.call()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Observable fromCallableObservable = Observable.fromCallable(func); + + Observer observer = mock(Observer.class); + + Subscription subscription = fromCallableObservable + .subscribeOn(computation()) + .subscribe(observer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + subscription.unsubscribe(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).call(); + + // Observer must not be notified at all + verifyZeroInteractions(observer); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAllowToThrowCheckedException() { + final Exception checkedException = new Exception("test exception"); + + Observable fromCallableObservable = Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw checkedException; + } + }); + + Observer observer = mock(Observer.class); + + fromCallableObservable.subscribe(observer); + + verify(observer).onError(checkedException); + verifyNoMoreInteractions(observer); + } +} \ No newline at end of file