Skip to content

Commit

Permalink
Add Observable.fromCallable() as a companion for Observable.defer()
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-zinnatullin authored and Artem Zinnatullin committed Aug 27, 2015
1 parent 689e73f commit 0c8b250
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 0 deletions.
23 changes: 23 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,29 @@ public final static <T> Observable<T> from(T[] array) {
return from(Arrays.asList(array));
}

/**
* Returns an Observable that invokes passed function and emits its result for each new Observer that subscribes.
* <p>
* Allows you to defer execution of passed function until Observer subscribes to the Observable.
* It makes passed function "lazy".
* Result of the function invocation will be emitted by the Observable.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param func
* function which execution should be deferred, it will be invoked when Observer will subscribe to the Observable
* @param <T>
* the type of the item emitted by the Observable
* @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
* @see #defer(Func0)
*/
@Experimental
public static <T> Observable<T> fromCallable(Callable<? extends T> func) {
return create(new OnSubscribeFromCallable<T>(func));
}

/**
* Returns an Observable that emits a sequential number every specified interval of time.
* <p>
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeFromCallable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package rx.internal.operators;

import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.internal.producers.SingleDelayedProducer;

import java.util.concurrent.Callable;

/**
* Do not invoke the function until an Observer subscribes; Invokes function on each
* subscription.
* <p>
* Pass {@code fromCallable} a function, and {@code fromCallable} will call this function to emit result of invocation
* afresh each time a new Observer subscribes.
*/
public final class OnSubscribeFromCallable<T> implements Observable.OnSubscribe<T> {

private final Callable<? extends T> resultFactory;

public OnSubscribeFromCallable(Callable<? extends T> resultFactory) {
this.resultFactory = resultFactory;
}

@Override
public void call(Subscriber<? super T> subscriber) {
final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<T>(subscriber);

subscriber.setProducer(singleDelayedProducer);

try {
singleDelayedProducer.setValue(resultFactory.call());
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
subscriber.onError(t);
}
}
}
140 changes: 140 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeFromCallableTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package rx.internal.operators;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import rx.Observable;
import rx.Observer;
import rx.Subscription;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

import static org.mockito.Mockito.*;
import static rx.schedulers.Schedulers.computation;

public class OnSubscribeFromCallableTest {

@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);

when(func.call()).thenReturn(new Object());

Observable<Object> fromCallableObservable = Observable.fromCallable(func);

verifyZeroInteractions(func);

fromCallableObservable.subscribe();

verify(func).call();
}

@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);

when(func.call()).thenReturn("test_value");

Observable<String> fromCallableObservable = Observable.fromCallable(func);

Observer<String> observer = mock(Observer.class);

fromCallableObservable.subscribe(observer);

verify(observer).onNext("test_value");
verify(observer).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}

@SuppressWarnings("unchecked")
@Test
public void shouldCallOnError() throws Exception {
Callable<Object> func = mock(Callable.class);

Throwable throwable = new IllegalStateException("Test exception");
when(func.call()).thenThrow(throwable);

Observable<Object> fromCallableObservable = Observable.fromCallable(func);

Observer<Object> observer = mock(Observer.class);

fromCallableObservable.subscribe(observer);

verify(observer, never()).onNext(anyObject());
verify(observer, never()).onCompleted();
verify(observer).onError(throwable);
}

@SuppressWarnings("unchecked")
@Test
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception {
Callable<String> func = mock(Callable.class);

final CountDownLatch funcLatch = new CountDownLatch(1);
final CountDownLatch observerLatch = new CountDownLatch(1);

when(func.call()).thenAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
observerLatch.countDown();

try {
funcLatch.await();
} catch (InterruptedException e) {
// It's okay, unsubscription causes Thread interruption

// Restoring interruption status of the Thread
Thread.currentThread().interrupt();
}

return "should_not_be_delivered";
}
});

Observable<String> fromCallableObservable = Observable.fromCallable(func);

Observer<String> observer = mock(Observer.class);

Subscription subscription = fromCallableObservable
.subscribeOn(computation())
.subscribe(observer);

// Wait until func will be invoked
observerLatch.await();

// Unsubscribing before emission
subscription.unsubscribe();

// Emitting result
funcLatch.countDown();

// func must be invoked
verify(func).call();

// Observer must not be notified at all
verifyZeroInteractions(observer);
}

@SuppressWarnings("unchecked")
@Test
public void shouldAllowToThrowCheckedException() {
final Exception checkedException = new Exception("test exception");

Observable<Object> fromCallableObservable = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw checkedException;
}
});

Observer<Object> observer = mock(Observer.class);

fromCallableObservable.subscribe(observer);

verify(observer).onError(checkedException);
verifyNoMoreInteractions(observer);
}
}

0 comments on commit 0c8b250

Please sign in to comment.