Skip to content

Add doOnTerminate to Single/Maybe for consistency #6386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2892,6 +2892,33 @@ public final Maybe<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe) {
));
}

/**
* Returns a Maybe instance that calls the given onTerminate callback
* just before this Maybe completes normally or with an exception.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
* <p>
* This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
* {@code onError} notification.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onTerminate the action to invoke when the consumer calls {@code onComplete} or {@code onError}
* @return the new Maybe instance
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @see #doOnTerminate(Action)
* @since 2.2.7 - experimental
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @since 2.2.7 - experimental.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@Experimental
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @Experimental.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

public final Maybe<T> doOnTerminate(final Action onTerminate) {
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
return RxJavaPlugins.onAssembly(new MaybeDoOnTerminate<T>(this, onTerminate));
}

/**
* Calls the shared consumer with the success value sent via onSuccess for each
* MaybeObserver that subscribes to the current Maybe.
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2495,6 +2495,33 @@ public final Single<T> doOnSubscribe(final Consumer<? super Disposable> onSubscr
return RxJavaPlugins.onAssembly(new SingleDoOnSubscribe<T>(this, onSubscribe));
}

/**
* Returns a Single instance that calls the given onTerminate callback
* just before this Single completes normally or with an exception.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
* <p>
* This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
* {@code onError} notification.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onTerminate the action to invoke when the consumer calls {@code onComplete} or {@code onError}
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @see #doOnTerminate(Action)
* @since 2.2.7 - experimental
*/
@Experimental
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doOnTerminate(final Action onTerminate) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @Experimental.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
return RxJavaPlugins.onAssembly(new SingleDoOnTerminate<T>(this, onTerminate));
}

/**
* Calls the shared consumer with the success value sent via onSuccess for each
* SingleObserver that subscribes to the current Single.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.Maybe;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Action;

public final class MaybeDoOnTerminate<T> extends Maybe<T> {

final MaybeSource<T> source;

final Action onTerminate;

public MaybeDoOnTerminate(MaybeSource<T> source, Action onTerminate) {
this.source = source;
this.onTerminate = onTerminate;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new DoOnTerminate(observer));
}

final class DoOnTerminate implements MaybeObserver<T> {
final MaybeObserver<? super T> downstream;

DoOnTerminate(MaybeObserver<? super T> observer) {
this.downstream = observer;
}

@Override
public void onSubscribe(Disposable d) {
downstream.onSubscribe(d);
}

@Override
public void onSuccess(T value) {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

downstream.onSuccess(value);
}

@Override
public void onError(Throwable e) {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
}

downstream.onError(e);
}

@Override
public void onComplete() {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

downstream.onComplete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Action;

public final class SingleDoOnTerminate<T> extends Single<T> {

final SingleSource<T> source;

final Action onTerminate;

public SingleDoOnTerminate(SingleSource<T> source, Action onTerminate) {
this.source = source;
this.onTerminate = onTerminate;
}

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new DoOnTerminate(observer));
}

final class DoOnTerminate implements SingleObserver<T> {

final SingleObserver<? super T> downstream;

DoOnTerminate(SingleObserver<? super T> observer) {
this.downstream = observer;
}

@Override
public void onSubscribe(Disposable d) {
downstream.onSubscribe(d);
}

@Override
public void onSuccess(T value) {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

downstream.onSuccess(value);
}

@Override
public void onError(Throwable e) {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
}

downstream.onError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.Maybe;
import io.reactivex.TestHelper;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.PublishSubject;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertTrue;

public class MaybeDoOnTerminateTest {

@Test(expected = NullPointerException.class)
public void doOnTerminate() {
Maybe.just(1).doOnTerminate(null);
}

@Test
public void doOnTerminateSuccess() {
final AtomicBoolean atomicBoolean = new AtomicBoolean();
Maybe.just(1).doOnTerminate(new Action() {
@Override
public void run() {
atomicBoolean.set(true);
}
})
.test()
.assertResult(1);

assertTrue(atomicBoolean.get());
}

@Test
public void doOnTerminateError() {
final AtomicBoolean atomicBoolean = new AtomicBoolean();
Maybe.error(new TestException()).doOnTerminate(new Action() {
@Override
public void run() {
atomicBoolean.set(true);
}
})
.test()
.assertFailure(TestException.class);

assertTrue(atomicBoolean.get());
}

@Test
public void doOnTerminateComplete() {
final AtomicBoolean atomicBoolean = new AtomicBoolean();
Maybe.empty().doOnTerminate(new Action() {
@Override
public void run() {
atomicBoolean.set(true);
}
})
.test()
.assertResult();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd, as you suggested I've tried to use .assertResult(1) which results in the test failure. I found similar pattern here, where assertResult() has no params since Maybe doesn't emit a value. Please correct me if I'm wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about it a bit. You have an empty source and you want to verify it is still empty at the end.


assertTrue(atomicBoolean.get());
}

@Test
public void doOnTerminateSuccessCrash() {
Maybe.just(1).doOnTerminate(new Action() {
@Override
public void run() {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}

@Test
public void doOnTerminateErrorCrash() {
TestObserver<Object> to = Maybe.error(new TestException("Outer"))
.doOnTerminate(new Action() {
@Override
public void run() {
throw new TestException("Inner");
}
})
.test()
.assertFailure(CompositeException.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
TestHelper.assertError(errors, 0, TestException.class, "Outer");
TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

@Test
public void doOnTerminateCompleteCrash() {
Maybe.empty()
.doOnTerminate(new Action() {
@Override
public void run() {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
}
Loading