From ebad70de2750c7fc72a217a6011c9d2a452ab198 Mon Sep 17 00:00:00 2001 From: Artem Zinnatullin Date: Sat, 5 Dec 2015 22:15:51 +0300 Subject: [PATCH] Add Single.doOnUnsubscribe() --- src/main/java/rx/Single.java | 22 +++++++++++ src/test/java/rx/SingleTest.java | 63 ++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 190e1630b3..b577f9a812 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -21,6 +21,7 @@ import rx.annotations.Experimental; import rx.exceptions.Exceptions; import rx.exceptions.OnErrorNotImplementedException; +import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; @@ -34,6 +35,7 @@ import rx.internal.operators.OnSubscribeToObservableFuture; import rx.internal.operators.OperatorDelay; import rx.internal.operators.OperatorDoOnEach; +import rx.internal.operators.OperatorDoOnUnsubscribe; import rx.internal.operators.OperatorMap; import rx.internal.operators.OperatorObserveOn; import rx.internal.operators.OperatorOnErrorReturn; @@ -1998,4 +2000,24 @@ public void call(SingleSubscriber singleSubscriber) { } }); } + + /** + * Modifies the source {@link Single} so that it invokes the given action when it is unsubscribed from + * its subscribers. + *

+ * + *

+ *
Scheduler:
+ *
{@code doOnUnsubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param action + * the action that gets called when this {@link Single} is unsubscribed. + * @return the source {@link Single} modified so as to call this Action when appropriate. + * @see ReactiveX operators documentation: Do + */ + @Experimental + public final Single doOnUnsubscribe(final Action0 action) { + return lift(new OperatorDoOnUnsubscribe(action)); + } } diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 30fe99e92f..0fa8750709 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -40,6 +40,7 @@ import org.mockito.stubbing.Answer; import rx.Single.OnSubscribe; import rx.exceptions.CompositeException; +import rx.functions.Action; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; @@ -828,4 +829,66 @@ public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryRet verify(singleFactory).call(); } + + @Test + public void doOnUnsubscribeShouldInvokeActionAfterSuccess() { + Action0 action = mock(Action0.class); + + Single single = Single + .just("test") + .doOnUnsubscribe(action); + + verifyZeroInteractions(action); + + TestSubscriber testSubscriber = new TestSubscriber(); + single.subscribe(testSubscriber); + + testSubscriber.assertValue("test"); + testSubscriber.assertCompleted(); + + verify(action).call(); + } + + @Test + public void doOnUnsubscribeShouldInvokeActionAfterError() { + Action0 action = mock(Action0.class); + + Single single = Single + .error(new RuntimeException("test")) + .doOnUnsubscribe(action); + + verifyZeroInteractions(action); + + TestSubscriber testSubscriber = new TestSubscriber(); + single.subscribe(testSubscriber); + + testSubscriber.assertError(RuntimeException.class); + assertEquals("test", testSubscriber.getOnErrorEvents().get(0).getMessage()); + + verify(action).call(); + } + + @Test + public void doOnUnsubscribeShouldInvokeActionAfterExplicitUnsubscription() { + Action0 action = mock(Action0.class); + + Single single = Single + .create(new OnSubscribe() { + @Override + public void call(SingleSubscriber singleSubscriber) { + // Broken Single that never ends itself (simulates long computation in one thread). + } + }) + .doOnUnsubscribe(action); + + TestSubscriber testSubscriber = new TestSubscriber(); + Subscription subscription = single.subscribe(testSubscriber); + + verifyZeroInteractions(action); + + subscription.unsubscribe(); + verify(action).call(); + testSubscriber.assertNoValues(); + testSubscriber.assertNoTerminalEvent(); + } }