-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Observable.lazy() as a companion for Observable.defer()
- Loading branch information
1 parent
689e73f
commit 7d4f825
Showing
3 changed files
with
201 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
src/main/java/rx/internal/operators/OnSubscribeDeferJust.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package rx.internal.operators; | ||
|
||
import rx.Observable; | ||
import rx.Subscriber; | ||
import rx.exceptions.Exceptions; | ||
import rx.internal.producers.SingleDelayedProducer; | ||
|
||
import java.util.concurrent.Callable; | ||
|
||
/** | ||
* Do not invoke the function until an Observer subscribes; Invokes function on each | ||
* subscription. | ||
* <p> | ||
* Pass {@code deferJust} a function, and {@code deferJust} will call this function to emit result of invocation | ||
* afresh each time a new Observer subscribes. | ||
*/ | ||
public final class OnSubscribeDeferJust<T> implements Observable.OnSubscribe<T> { | ||
|
||
private final Callable<? extends T> resultFactory; | ||
|
||
public OnSubscribeDeferJust(Callable<? extends T> resultFactory) { | ||
this.resultFactory = resultFactory; | ||
} | ||
|
||
@Override | ||
public void call(Subscriber<? super T> subscriber) { | ||
final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<T>(subscriber); | ||
|
||
subscriber.setProducer(singleDelayedProducer); | ||
|
||
try { | ||
singleDelayedProducer.setValue(resultFactory.call()); | ||
} catch (Throwable t) { | ||
Exceptions.throwIfFatal(t); | ||
subscriber.onError(t); | ||
} | ||
} | ||
} |
140 changes: 140 additions & 0 deletions
140
src/test/java/rx/internal/operators/OnSubscribeDeferJustTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package rx.internal.operators; | ||
|
||
import org.junit.Test; | ||
import org.mockito.invocation.InvocationOnMock; | ||
import org.mockito.stubbing.Answer; | ||
import rx.Observable; | ||
import rx.Observer; | ||
import rx.Subscription; | ||
|
||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.CountDownLatch; | ||
|
||
import static org.mockito.Mockito.*; | ||
import static rx.schedulers.Schedulers.computation; | ||
|
||
public class OnSubscribeDeferJustTest { | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void shouldNotInvokeFuncUntilSubscription() throws Exception { | ||
Callable<Object> func = mock(Callable.class); | ||
|
||
when(func.call()).thenReturn(new Object()); | ||
|
||
Observable<Object> deferJustObservable = Observable.deferJust(func); | ||
|
||
verifyZeroInteractions(func); | ||
|
||
deferJustObservable.subscribe(); | ||
|
||
verify(func).call(); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void shouldCallOnNextAndOnCompleted() throws Exception { | ||
Callable<String> func = mock(Callable.class); | ||
|
||
when(func.call()).thenReturn("test_value"); | ||
|
||
Observable<String> deferJustObservable = Observable.deferJust(func); | ||
|
||
Observer<String> observer = mock(Observer.class); | ||
|
||
deferJustObservable.subscribe(observer); | ||
|
||
verify(observer).onNext("test_value"); | ||
verify(observer).onCompleted(); | ||
verify(observer, never()).onError(any(Throwable.class)); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void shouldCallOnError() throws Exception { | ||
Callable<Object> func = mock(Callable.class); | ||
|
||
Throwable throwable = new IllegalStateException("Test exception"); | ||
when(func.call()).thenThrow(throwable); | ||
|
||
Observable<Object> deferJustObservable = Observable.deferJust(func); | ||
|
||
Observer<Object> observer = mock(Observer.class); | ||
|
||
deferJustObservable.subscribe(observer); | ||
|
||
verify(observer, never()).onNext(anyObject()); | ||
verify(observer, never()).onCompleted(); | ||
verify(observer).onError(throwable); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception { | ||
Callable<String> func = mock(Callable.class); | ||
|
||
final CountDownLatch funcLatch = new CountDownLatch(1); | ||
final CountDownLatch observerLatch = new CountDownLatch(1); | ||
|
||
when(func.call()).thenAnswer(new Answer<String>() { | ||
@Override | ||
public String answer(InvocationOnMock invocation) throws Throwable { | ||
observerLatch.countDown(); | ||
|
||
try { | ||
funcLatch.await(); | ||
} catch (InterruptedException e) { | ||
// It's okay, unsubscription causes Thread interruption | ||
|
||
// Restoring interruption status of the Thread | ||
Thread.currentThread().interrupt(); | ||
} | ||
|
||
return "should_not_be_delivered"; | ||
} | ||
}); | ||
|
||
Observable<String> deferJustObservable = Observable.deferJust(func); | ||
|
||
Observer<String> observer = mock(Observer.class); | ||
|
||
Subscription subscription = deferJustObservable | ||
.subscribeOn(computation()) | ||
.subscribe(observer); | ||
|
||
// Wait until func will be invoked | ||
observerLatch.await(); | ||
|
||
// Unsubscribing before emission | ||
subscription.unsubscribe(); | ||
|
||
// Emitting result | ||
funcLatch.countDown(); | ||
|
||
// func must be invoked | ||
verify(func).call(); | ||
|
||
// Observer must not be notified at all | ||
verifyZeroInteractions(observer); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void shouldAllowToThrowCheckedException() { | ||
final Exception checkedException = new Exception("test exception"); | ||
|
||
Observable<Object> deferJustObservable = Observable.deferJust(new Callable<Object>() { | ||
@Override | ||
public Object call() throws Exception { | ||
throw checkedException; | ||
} | ||
}); | ||
|
||
Observer<Object> observer = mock(Observer.class); | ||
|
||
deferJustObservable.subscribe(observer); | ||
|
||
verify(observer).onError(checkedException); | ||
verifyNoMoreInteractions(observer); | ||
} | ||
} |