diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java
index 2ab23028bd..7e28439ae2 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java
@@ -2323,6 +2323,64 @@ public final Completable onErrorResumeWith(@NonNull CompletableSource fallback)
return onErrorResumeNext(Functions.justFunction(fallback));
}
+ /**
+ * Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current
+ * {@code Completable} instead of signaling the error via {@code onError}.
+ *
+ *
+ *
+ * You can use this to prevent errors from propagating or to supply fallback data should errors be
+ * encountered.
+ *
+ *
Scheduler:
+ *
{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the item type to return on error
+ * @param itemSupplier
+ * a function that returns a single value that will be emitted as success value
+ * the current {@code Completable} signals an {@code onError} event
+ * @return the new {@link Maybe} instance
+ * @throws NullPointerException if {@code itemSupplier} is {@code null}
+ * @see ReactiveX operators documentation: Catch
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Maybe onErrorReturn(@NonNull Function super Throwable, ? extends T> itemSupplier) {
+ Objects.requireNonNull(itemSupplier, "itemSupplier is null");
+ return RxJavaPlugins.onAssembly(new CompletableOnErrorReturn<>(this, itemSupplier));
+ }
+
+ /**
+ * Ends the flow with the given success item when the current {@code Completable}
+ * fails instead of signaling the error via {@code onError}.
+ *
+ *
+ *
+ * You can use this to prevent errors from propagating or to supply fallback data should errors be
+ * encountered.
+ *
+ *
Scheduler:
+ *
{@code onErrorReturnItem} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the item type to return on error
+ * @param item
+ * the value that is emitted as {@code onSuccess} in case the current {@code Completable} signals an {@code onError}
+ * @return the new {@link Maybe} instance
+ * @throws NullPointerException if {@code item} is {@code null}
+ * @see ReactiveX operators documentation: Catch
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Maybe onErrorReturnItem(@NonNull T item) {
+ Objects.requireNonNull(item, "item is null");
+ return onErrorReturn(Functions.justFunction(item));
+ }
+
/**
* Nulls out references to the upstream producer and downstream {@link CompletableObserver} if
* the sequence is terminated or downstream calls {@code dispose()}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 96ff14c1e9..40902efa53 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -4467,7 +4467,7 @@ public final Maybe onErrorResumeNext(@NonNull Function super Throwable, ? e
* Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current
* {@code Maybe} instead of signaling the error via {@code onError}.
*
- *
+ *
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
@@ -4494,7 +4494,7 @@ public final Maybe onErrorReturn(@NonNull Function super Throwable, ? exten
/**
* Ends the flow with the given success item when the current {@code Maybe} fails instead of signaling the error via {@code onError}.
*
- *
+ *
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
@@ -4504,7 +4504,7 @@ public final Maybe onErrorReturn(@NonNull Function super Throwable, ? exten
*
*
* @param item
- * the value that is emitted as {@code onSuccess} in case this {@code Maybe} signals an {@code onError}
+ * the value that is emitted as {@code onSuccess} in case the current {@code Maybe} signals an {@code onError}
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code item} is {@code null}
* @see ReactiveX operators documentation: Catch
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java
new file mode 100644
index 0000000000..c8fa43499f
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorReturn.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.completable;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.functions.Function;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+
+import java.util.Objects;
+
+/**
+ * Returns a value generated via a function if the main source signals an onError.
+ * @param the value type
+ * @since 3.0.0
+ */
+public final class CompletableOnErrorReturn extends Maybe {
+
+ final CompletableSource source;
+
+ final Function super Throwable, ? extends T> valueSupplier;
+
+ public CompletableOnErrorReturn(CompletableSource source,
+ Function super Throwable, ? extends T> valueSupplier) {
+ this.source = source;
+ this.valueSupplier = valueSupplier;
+ }
+
+ @Override
+ protected void subscribeActual(MaybeObserver super T> observer) {
+ source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier));
+ }
+
+ static final class OnErrorReturnMaybeObserver implements CompletableObserver, Disposable {
+
+ final MaybeObserver super T> downstream;
+
+ final Function super Throwable, ? extends T> itemSupplier;
+
+ Disposable upstream;
+
+ OnErrorReturnMaybeObserver(MaybeObserver super T> actual,
+ Function super Throwable, ? extends T> itemSupplier) {
+ this.downstream = actual;
+ this.itemSupplier = itemSupplier;
+ }
+
+ @Override
+ public void dispose() {
+ upstream.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return upstream.isDisposed();
+ }
+
+ @Override
+ public void onSubscribe(Disposable d) {
+ if (DisposableHelper.validate(this.upstream, d)) {
+ this.upstream = d;
+
+ downstream.onSubscribe(this);
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ T v;
+
+ try {
+ v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value");
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ downstream.onError(new CompositeException(e, ex));
+ return;
+ }
+
+ downstream.onSuccess(v);
+ }
+
+ @Override
+ public void onComplete() {
+ downstream.onComplete();
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java
index 1a75120e84..190856d5c7 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java
@@ -27,31 +27,31 @@
*/
public final class MaybeOnErrorReturn extends AbstractMaybeWithUpstream {
- final Function super Throwable, ? extends T> valueSupplier;
+ final Function super Throwable, ? extends T> itemSupplier;
public MaybeOnErrorReturn(MaybeSource source,
- Function super Throwable, ? extends T> valueSupplier) {
+ Function super Throwable, ? extends T> itemSupplier) {
super(source);
- this.valueSupplier = valueSupplier;
+ this.itemSupplier = itemSupplier;
}
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier));
+ source.subscribe(new OnErrorReturnMaybeObserver<>(observer, itemSupplier));
}
static final class OnErrorReturnMaybeObserver implements MaybeObserver, Disposable {
final MaybeObserver super T> downstream;
- final Function super Throwable, ? extends T> valueSupplier;
+ final Function super Throwable, ? extends T> itemSupplier;
Disposable upstream;
OnErrorReturnMaybeObserver(MaybeObserver super T> actual,
Function super Throwable, ? extends T> valueSupplier) {
this.downstream = actual;
- this.valueSupplier = valueSupplier;
+ this.itemSupplier = valueSupplier;
}
@Override
@@ -83,7 +83,7 @@ public void onError(Throwable e) {
T v;
try {
- v = Objects.requireNonNull(valueSupplier.apply(e), "The valueSupplier returned a null value");
+ v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(e, ex));
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java
index 4a1333b530..531f13b8cc 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java
@@ -15,10 +15,16 @@
import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Function;
+import io.reactivex.rxjava3.internal.functions.Functions;
+import io.reactivex.rxjava3.subjects.CompletableSubject;
+import io.reactivex.rxjava3.testsupport.TestHelper;
public class CompletableOnErrorXTest extends RxJavaTest {
@@ -46,4 +52,55 @@ public CompletableSource apply(Throwable e) throws Exception {
assertEquals(0, call[0]);
}
+
+ @Test
+ public void onErrorReturnConst() {
+ Completable.error(new TestException())
+ .onErrorReturnItem(1)
+ .test()
+ .assertResult(1);
+ }
+
+ @Test
+ public void onErrorReturn() {
+ Completable.error(new TestException())
+ .onErrorReturn(Functions.justFunction(1))
+ .test()
+ .assertResult(1);
+ }
+
+ @Test
+ public void onErrorReturnFunctionThrows() {
+ TestHelper.assertCompositeExceptions(Completable.error(new TestException())
+ .onErrorReturn(new Function() {
+ @Override
+ public Object apply(Throwable v) throws Exception {
+ throw new IOException();
+ }
+ })
+ .to(TestHelper.testConsumer()), TestException.class, IOException.class);
+ }
+
+ @Test
+ public void onErrorReturnEmpty() {
+ Completable.complete()
+ .onErrorReturnItem(2)
+ .test()
+ .assertResult();
+ }
+
+ @Test
+ public void onErrorReturnDispose() {
+ TestHelper.checkDisposed(CompletableSubject.create().onErrorReturnItem(1));
+ }
+
+ @Test
+ public void onErrorReturnDoubleOnSubscribe() {
+ TestHelper.checkDoubleOnSubscribeCompletableToMaybe(new Function>() {
+ @Override
+ public MaybeSource