From 3e8284b43d84a733b63bd0b82fd2dc31b622f73f Mon Sep 17 00:00:00 2001 From: "s.kryvets" Date: Sat, 26 Jan 2019 10:11:11 -0600 Subject: [PATCH] Add doOnTerminate to Single/Maybe for consistency (#6379) --- src/main/java/io/reactivex/Maybe.java | 27 ++++ src/main/java/io/reactivex/Single.java | 27 ++++ .../operators/maybe/MaybeDoOnTerminate.java | 90 +++++++++++++ .../operators/single/SingleDoOnTerminate.java | 78 +++++++++++ .../maybe/MaybeDoOnTerminateTest.java | 124 ++++++++++++++++++ .../single/SingleDoOnTerminateTest.java | 96 ++++++++++++++ 6 files changed, 442 insertions(+) create mode 100644 src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnTerminate.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDoOnTerminate.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnTerminateTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleDoOnTerminateTest.java diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 147b3f3125..d978d4a9dd 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2892,6 +2892,33 @@ public final Maybe doOnSubscribe(Consumer onSubscribe) { )); } + /** + * Returns a Maybe instance that calls the given onTerminate callback + * just before this Maybe completes normally or with an exception. + *

+ * + *

+ * This differs from {@code doAfterTerminate} in that this happens before the {@code onComplete} or + * {@code onError} notification. + *

+ *
Scheduler:
+ *
{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onTerminate the action to invoke when the consumer calls {@code onComplete} or {@code onError} + * @return the new Maybe instance + * @see ReactiveX operators documentation: Do + * @see #doOnTerminate(Action) + * @since 2.2.7 - experimental + */ + @Experimental + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe doOnTerminate(final Action onTerminate) { + ObjectHelper.requireNonNull(onTerminate, "onTerminate is null"); + return RxJavaPlugins.onAssembly(new MaybeDoOnTerminate(this, onTerminate)); + } + /** * Calls the shared consumer with the success value sent via onSuccess for each * MaybeObserver that subscribes to the current Maybe. diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index f41d69a80c..1071fa836b 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -2495,6 +2495,33 @@ public final Single doOnSubscribe(final Consumer onSubscr return RxJavaPlugins.onAssembly(new SingleDoOnSubscribe(this, onSubscribe)); } + /** + * Returns a Single instance that calls the given onTerminate callback + * just before this Single completes normally or with an exception. + *

+ * + *

+ * This differs from {@code doAfterTerminate} in that this happens before the {@code onComplete} or + * {@code onError} notification. + *

+ *
Scheduler:
+ *
{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onTerminate the action to invoke when the consumer calls {@code onComplete} or {@code onError} + * @return the new Single instance + * @see ReactiveX operators documentation: Do + * @see #doOnTerminate(Action) + * @since 2.2.7 - experimental + */ + @Experimental + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Single doOnTerminate(final Action onTerminate) { + ObjectHelper.requireNonNull(onTerminate, "onTerminate is null"); + return RxJavaPlugins.onAssembly(new SingleDoOnTerminate(this, onTerminate)); + } + /** * Calls the shared consumer with the success value sent via onSuccess for each * SingleObserver that subscribes to the current Single. diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnTerminate.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnTerminate.java new file mode 100644 index 0000000000..81d0d8af34 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnTerminate.java @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.Maybe; +import io.reactivex.MaybeObserver; +import io.reactivex.MaybeSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Action; + +public final class MaybeDoOnTerminate extends Maybe { + + final MaybeSource source; + + final Action onTerminate; + + public MaybeDoOnTerminate(MaybeSource source, Action onTerminate) { + this.source = source; + this.onTerminate = onTerminate; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new DoOnTerminate(observer)); + } + + final class DoOnTerminate implements MaybeObserver { + final MaybeObserver downstream; + + DoOnTerminate(MaybeObserver observer) { + this.downstream = observer; + } + + @Override + public void onSubscribe(Disposable d) { + downstream.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + try { + onTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + downstream.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + try { + onTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + e = new CompositeException(e, ex); + } + + downstream.onError(e); + } + + @Override + public void onComplete() { + try { + onTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + downstream.onComplete(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnTerminate.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnTerminate.java new file mode 100644 index 0000000000..7497aff902 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnTerminate.java @@ -0,0 +1,78 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.Single; +import io.reactivex.SingleObserver; +import io.reactivex.SingleSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Action; + +public final class SingleDoOnTerminate extends Single { + + final SingleSource source; + + final Action onTerminate; + + public SingleDoOnTerminate(SingleSource source, Action onTerminate) { + this.source = source; + this.onTerminate = onTerminate; + } + + @Override + protected void subscribeActual(final SingleObserver observer) { + source.subscribe(new DoOnTerminate(observer)); + } + + final class DoOnTerminate implements SingleObserver { + + final SingleObserver downstream; + + DoOnTerminate(SingleObserver observer) { + this.downstream = observer; + } + + @Override + public void onSubscribe(Disposable d) { + downstream.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + try { + onTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + downstream.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + try { + onTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + e = new CompositeException(e, ex); + } + + downstream.onError(e); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnTerminateTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnTerminateTest.java new file mode 100644 index 0000000000..d442fcc135 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnTerminateTest.java @@ -0,0 +1,124 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.Maybe; +import io.reactivex.TestHelper; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Action; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertTrue; + +public class MaybeDoOnTerminateTest { + + @Test(expected = NullPointerException.class) + public void doOnTerminate() { + Maybe.just(1).doOnTerminate(null); + } + + @Test + public void doOnTerminateSuccess() { + final AtomicBoolean atomicBoolean = new AtomicBoolean(); + Maybe.just(1).doOnTerminate(new Action() { + @Override + public void run() { + atomicBoolean.set(true); + } + }) + .test() + .assertResult(1); + + assertTrue(atomicBoolean.get()); + } + + @Test + public void doOnTerminateError() { + final AtomicBoolean atomicBoolean = new AtomicBoolean(); + Maybe.error(new TestException()).doOnTerminate(new Action() { + @Override + public void run() { + atomicBoolean.set(true); + } + }) + .test() + .assertFailure(TestException.class); + + assertTrue(atomicBoolean.get()); + } + + @Test + public void doOnTerminateComplete() { + final AtomicBoolean atomicBoolean = new AtomicBoolean(); + Maybe.empty().doOnTerminate(new Action() { + @Override + public void run() { + atomicBoolean.set(true); + } + }) + .test() + .assertResult(); + + assertTrue(atomicBoolean.get()); + } + + @Test + public void doOnTerminateSuccessCrash() { + Maybe.just(1).doOnTerminate(new Action() { + @Override + public void run() { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doOnTerminateErrorCrash() { + TestObserver to = Maybe.error(new TestException("Outer")) + .doOnTerminate(new Action() { + @Override + public void run() { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + TestHelper.assertError(errors, 0, TestException.class, "Outer"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } + + @Test + public void doOnTerminateCompleteCrash() { + Maybe.empty() + .doOnTerminate(new Action() { + @Override + public void run() { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTerminateTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTerminateTest.java new file mode 100644 index 0000000000..ba15f9f71b --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTerminateTest.java @@ -0,0 +1,96 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.Single; +import io.reactivex.TestHelper; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Action; +import io.reactivex.observers.TestObserver; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertTrue; + +public class SingleDoOnTerminateTest { + + @Test(expected = NullPointerException.class) + public void doOnTerminate() { + Single.just(1).doOnTerminate(null); + } + + @Test + public void doOnTerminateSuccess() { + final AtomicBoolean atomicBoolean = new AtomicBoolean(); + + Single.just(1).doOnTerminate(new Action() { + @Override + public void run() throws Exception { + atomicBoolean.set(true); + } + }) + .test() + .assertResult(1); + + assertTrue(atomicBoolean.get()); + } + + @Test + public void doOnTerminateError() { + final AtomicBoolean atomicBoolean = new AtomicBoolean(); + Single.error(new TestException()).doOnTerminate(new Action() { + @Override + public void run() { + atomicBoolean.set(true); + } + }) + .test() + .assertFailure(TestException.class); + + assertTrue(atomicBoolean.get()); + } + + @Test + public void doOnTerminateSuccessCrash() { + Single.just(1).doOnTerminate(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doOnTerminateErrorCrash() { + TestObserver to = Single.error(new TestException("Outer")).doOnTerminate(new Action() { + @Override + public void run() { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "Outer"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } +}