From 4cd2024b47d5fbe0a261952852305ab790a94ff3 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 20 Dec 2019 12:13:05 +0100 Subject: [PATCH] 3.x: [Java 8] Add fromOpt/Stage, mapOptional, toCompletionStage to M/S/C --- .../reactivex/rxjava3/core/Completable.java | 70 ++++++- .../java/io/reactivex/rxjava3/core/Maybe.java | 162 +++++++++++++++- .../io/reactivex/rxjava3/core/Single.java | 91 ++++++++- .../jdk8/CompletableFromCompletionStage.java | 78 ++++++++ .../jdk8/CompletionStageConsumer.java | 99 ++++++++++ .../jdk8/MaybeFromCompletionStage.java | 81 ++++++++ .../internal/jdk8/MaybeMapOptional.java | 110 +++++++++++ .../jdk8/SingleFromCompletionStage.java | 81 ++++++++ .../internal/jdk8/SingleMapOptional.java | 105 +++++++++++ .../CompletableFromCompletionStageTest.java | 78 ++++++++ .../CompletableToCompletionStageTest.java | 154 ++++++++++++++++ .../jdk8/MaybeFromCompletionStageTest.java | 71 +++++++ .../internal/jdk8/MaybeFromOptionalTest.java | 38 ++++ .../internal/jdk8/MaybeMapOptionalTest.java | 89 +++++++++ .../jdk8/MaybeToCompletionStageTest.java | 174 ++++++++++++++++++ .../jdk8/SingleFromCompletionStageTest.java | 71 +++++++ .../internal/jdk8/SingleMapOptionalTest.java | 76 ++++++++ .../jdk8/SingleToCompletionStageTest.java | 153 +++++++++++++++ .../ParamValidationCheckerTest.java | 3 + 19 files changed, 1779 insertions(+), 5 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/CompletableFromCompletionStage.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/CompletionStageConsumer.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromCompletionStage.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeMapOptional.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFromCompletionStage.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleMapOptional.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/CompletableFromCompletionStageTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/CompletableToCompletionStageTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromCompletionStageTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromOptionalTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeMapOptionalTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeToCompletionStageTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFromCompletionStageTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleMapOptionalTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleToCompletionStageTest.java diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index f25320d66b..d9991a913e 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -12,7 +12,7 @@ */ package io.reactivex.rxjava3.core; -import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import org.reactivestreams.Publisher; @@ -23,6 +23,7 @@ import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.*; import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.jdk8.*; import io.reactivex.rxjava3.internal.observers.*; import io.reactivex.rxjava3.internal.operators.completable.*; import io.reactivex.rxjava3.internal.operators.maybe.*; @@ -2753,4 +2754,71 @@ public final TestObserver test(boolean dispose) { subscribe(to); return to; } + + // ------------------------------------------------------------------------- + // JDK 8 Support + // ------------------------------------------------------------------------- + + /** + * Signals completion (or error) when the {@link CompletionStage} terminates. + *

+ * + *

+ * Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}. + * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)} + * around {@code fromCompletionStage}: + *


+     * Maybe.defer(() -> Completable.fromCompletionStage(createCompletionStage()));
+     * 
+ *

+ * Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage} + * itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}. + *

+ *
Scheduler:
+ *
{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param stage the CompletionStage to convert to Maybe and signal its terminal value or error + * @return the new Completable instance + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public static Completable fromCompletionStage(@NonNull CompletionStage stage) { + Objects.requireNonNull(stage, "stage is null"); + return RxJavaPlugins.onAssembly(new CompletableFromCompletionStage<>(stage)); + } + + /** + * Signals the given default item when the upstream completes or signals the upstream error via + * a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ * {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use + * a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}: + *


+     * CompletionStage<Optional<T>> stage = source.map(Optional::of).toCompletionStage(Optional.empty());
+     * 
+ *
+ *
Scheduler:
+ *
{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the type of the default item to signal upon completion + * @param defaultItem the item to signal if the upstream is empty + * @return the new CompletionStage instance + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage toCompletionStage(@Nullable T defaultItem) { + return subscribeWith(new CompletionStageConsumer<>(true, defaultItem)); + } } diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index e3f4641ad3..35484ebef4 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -13,8 +13,7 @@ package io.reactivex.rxjava3.core; -import java.util.NoSuchElementException; -import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import org.reactivestreams.*; @@ -25,6 +24,7 @@ import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.*; import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.jdk8.*; import io.reactivex.rxjava3.internal.observers.BlockingMultiObserver; import io.reactivex.rxjava3.internal.operators.flowable.*; import io.reactivex.rxjava3.internal.operators.maybe.*; @@ -4794,4 +4794,162 @@ public final TestObserver test(boolean dispose) { subscribe(to); return to; } + + // ------------------------------------------------------------------------- + // JDK 8 Support + // ------------------------------------------------------------------------- + + /** + * Converts the existing value of the provided optional into a {@link #just(Object)} + * or an empty optional into an {@link #empty()} {@code Maybe} instance. + *

+ * + *

+ * Note that the operator takes an already instantiated optional reference and does not + * by any means create this original optional. If the optional is to be created per + * consumer upon subscription, use {@link #defer(Supplier)} around {@code fromOptional}: + *


+     * Maybe.defer(() -> Maybe.fromOptional(createOptional()));
+     * 
+ *
+ *
Scheduler:
+ *
{@code fromOptional} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the optional value + * @param optional the optional value to convert into a {@code Maybe} + * @return the new Maybe instance + * @see #just(Object) + * @see #empty() + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public static Maybe<@NonNull T> fromOptional(@NonNull Optional optional) { + Objects.requireNonNull(optional, "optional is null"); + return optional.map(Maybe::just).orElseGet(Maybe::empty); + } + + /** + * Signals the completion value or error of the given (hot) {@link CompletionStage}-based asynchronous calculation. + *

+ * + *

+ * Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}. + * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)} + * around {@code fromCompletionStage}: + *


+     * Maybe.defer(() -> Maybe.fromCompletionStage(createCompletionStage()));
+     * 
+ *

+ * If the {@code CompletionStage} completes with {@code null}, the resulting {@code Maybe} is completed via {@code onComplete}. + *

+ * Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage} + * itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}. + *

+ *
Scheduler:
+ *
{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the CompletionStage + * @param stage the CompletionStage to convert to Maybe and signal its terminal value or error + * @return the new Maybe instance + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public static Maybe<@NonNull T> fromCompletionStage(@NonNull CompletionStage stage) { + Objects.requireNonNull(stage, "stage is null"); + return RxJavaPlugins.onAssembly(new MaybeFromCompletionStage<>(stage)); + } + + /** + * Maps the upstream success value into an {@link Optional} and emits the contained item if not empty. + *

+ * + * + *

+ *
Scheduler:
+ *
{@code mapOptional} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the non-null output type + * @param mapper the function that receives the upstream success iteem and should return a non-empty {@code Optional} + * to emit as the success output or an empty {@code Optional} to complete the {@code Maybe} + * @return the new Maybe instance + * @since 3.0.0 + * @see #map(Function) + * @see #filter(Predicate) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> Maybe mapOptional(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new MaybeMapOptional<>(this, mapper)); + } + + /** + * Signals the upstream success item (or a {@link NoSuchElementException} if the upstream is empty) via + * a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ * {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use + * {@link #toCompletionStage(Object)} with {@code null} or turn the upstrea into a sequence of {@link Optional}s and + * default to {@link Optional#empty()}: + *


+     * CompletionStage<Optional<T>> stage = source.map(Optional::of).toCompletionStage(Optional.empty());
+     * 
+ *
+ *
Scheduler:
+ *
{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new CompletionStage instance + * @since 3.0.0 + * @see #toCompletionStage(Object) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage toCompletionStage() { + return subscribeWith(new CompletionStageConsumer<>(false, null)); + } + + /** + * Signals the upstream success item (or the default item if the upstream is empty) via + * a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ * {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use + * a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}: + *


+     * CompletionStage<Optional<T>> stage = source.map(Optional::of).toCompletionStage(Optional.empty());
+     * 
+ *
+ *
Scheduler:
+ *
{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param defaultItem the item to signal if the upstream is empty + * @return the new CompletionStage instance + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage toCompletionStage(@Nullable T defaultItem) { + return subscribeWith(new CompletionStageConsumer<>(true, defaultItem)); + } } diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 27fad6d53e..ae2b04fc7d 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -13,8 +13,7 @@ package io.reactivex.rxjava3.core; -import java.util.NoSuchElementException; -import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import org.reactivestreams.Publisher; @@ -25,6 +24,7 @@ import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.*; import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.jdk8.*; import io.reactivex.rxjava3.internal.observers.*; import io.reactivex.rxjava3.internal.operators.completable.*; import io.reactivex.rxjava3.internal.operators.flowable.*; @@ -4181,4 +4181,91 @@ public final TestObserver test(boolean dispose) { private static Single toSingle(Flowable source) { return RxJavaPlugins.onAssembly(new FlowableSingleSingle(source, null)); } + + // ------------------------------------------------------------------------- + // JDK 8 Support + // ------------------------------------------------------------------------- + + /** + * Signals the completion value or error of the given (hot) {@link CompletionStage}-based asynchronous calculation. + *

+ * + *

+ * Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}. + * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)} + * around {@code fromCompletionStage}: + *


+     * Single.defer(() -> Single.fromCompletionStage(createCompletionStage()));
+     * 
+ *

+ * If the {@code CompletionStage} completes with {@code null}, the resulting {@code Single} is terminated with + * a {@link NullPointerException}. + *

+ * Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage} + * itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}. + *

+ *
Scheduler:
+ *
{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the CompletionStage + * @param stage the CompletionStage to convert to Single and signal its success value or error + * @return the new Single instance + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public static Single<@NonNull T> fromCompletionStage(@NonNull CompletionStage stage) { + Objects.requireNonNull(stage, "stage is null"); + return RxJavaPlugins.onAssembly(new SingleFromCompletionStage<>(stage)); + } + + /** + * Maps the upstream success value into an {@link Optional} and emits the contained item if not empty. + *

+ * + * + *

+ *
Scheduler:
+ *
{@code mapOptional} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the non-null output type + * @param mapper the function that receives the upstream success iteem and should return a non-empty {@code Optional} + * to emit as the success output or an empty {@code Optional} to complete the {@code Maybe} + * @return the new Maybe instance + * @since 3.0.0 + * @see #map(Function) + * @see #filter(Predicate) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> Maybe mapOptional(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new SingleMapOptional<>(this, mapper)); + } + + /** + * Signals the upstream success item (or error) via a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ *
Scheduler:
+ *
{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new CompletionStage instance + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage toCompletionStage() { + return subscribeWith(new CompletionStageConsumer<>(false, null)); + } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/CompletableFromCompletionStage.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/CompletableFromCompletionStage.java new file mode 100644 index 0000000000..44f539bf4b --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/CompletableFromCompletionStage.java @@ -0,0 +1,78 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.jdk8.FlowableFromCompletionStage.BiConsumerAtomicReference; + +/** + * Wrap a CompletionStage and signal its outcome. + * @param the element type of the CompletionsStage + * @since 3.0.0 + */ +public final class CompletableFromCompletionStage extends Completable { + + final CompletionStage stage; + + public CompletableFromCompletionStage(CompletionStage stage) { + this.stage = stage; + } + + @Override + protected void subscribeActual(CompletableObserver observer) { + // We need an indirection because one can't detach from a whenComplete + // and cancellation should not hold onto the stage. + BiConsumerAtomicReference whenReference = new BiConsumerAtomicReference<>(); + CompletionStageHandler handler = new CompletionStageHandler<>(observer, whenReference); + whenReference.lazySet(handler); + + observer.onSubscribe(handler); + stage.whenComplete(whenReference); + } + + static final class CompletionStageHandler + implements Disposable, BiConsumer { + + final CompletableObserver downstream; + + final BiConsumerAtomicReference whenReference; + + CompletionStageHandler(CompletableObserver downstream, BiConsumerAtomicReference whenReference) { + this.downstream = downstream; + this.whenReference = whenReference; + } + + @Override + public void accept(T item, Throwable error) { + if (error != null) { + downstream.onError(error); + } else { + downstream.onComplete(); + } + } + + @Override + public void dispose() { + whenReference.set(null); + } + + @Override + public boolean isDisposed() { + return whenReference.get() == null; + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/CompletionStageConsumer.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/CompletionStageConsumer.java new file mode 100644 index 0000000000..d92e56041e --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/CompletionStageConsumer.java @@ -0,0 +1,99 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Class that extends CompletableFuture and converts multiple types of reactive consumers + * and their signals into completion signals. + * @param the element type + * @since 3.0.0 + */ +public final class CompletionStageConsumer extends CompletableFuture +implements MaybeObserver, SingleObserver, CompletableObserver { + + final AtomicReference upstream; + + final boolean hasDefault; + + final T defaultItem; + + public CompletionStageConsumer(boolean hasDefault, T defaultItem) { + this.hasDefault = hasDefault; + this.defaultItem = defaultItem; + this.upstream = new AtomicReference<>(); + } + + @Override + public void onSubscribe(@NonNull Disposable d) { + DisposableHelper.setOnce(upstream, d); + } + + @Override + public void onSuccess(@NonNull T t) { + clear(); + complete(t); + } + + @Override + public void onError(Throwable t) { + clear(); + if (!completeExceptionally(t)) { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + if (hasDefault) { + complete(defaultItem); + } else { + completeExceptionally(new NoSuchElementException("The source was empty")); + } + } + + void cancelUpstream() { + DisposableHelper.dispose(upstream); + } + + void clear() { + upstream.lazySet(DisposableHelper.DISPOSED); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + cancelUpstream(); + return super.cancel(mayInterruptIfRunning); + } + + @Override + public boolean complete(T value) { + cancelUpstream(); + return super.complete(value); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + cancelUpstream(); + return super.completeExceptionally(ex); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromCompletionStage.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromCompletionStage.java new file mode 100644 index 0000000000..d8523b8ead --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromCompletionStage.java @@ -0,0 +1,81 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.jdk8.FlowableFromCompletionStage.BiConsumerAtomicReference; + +/** + * Wrap a CompletionStage and signal its outcome. + * @param the element type + * @since 3.0.0 + */ +public final class MaybeFromCompletionStage extends Maybe { + + final CompletionStage stage; + + public MaybeFromCompletionStage(CompletionStage stage) { + this.stage = stage; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + // We need an indirection because one can't detach from a whenComplete + // and cancellation should not hold onto the stage. + BiConsumerAtomicReference whenReference = new BiConsumerAtomicReference<>(); + CompletionStageHandler handler = new CompletionStageHandler<>(observer, whenReference); + whenReference.lazySet(handler); + + observer.onSubscribe(handler); + stage.whenComplete(whenReference); + } + + static final class CompletionStageHandler + implements Disposable, BiConsumer { + + final MaybeObserver downstream; + + final BiConsumerAtomicReference whenReference; + + CompletionStageHandler(MaybeObserver downstream, BiConsumerAtomicReference whenReference) { + this.downstream = downstream; + this.whenReference = whenReference; + } + + @Override + public void accept(T item, Throwable error) { + if (error != null) { + downstream.onError(error); + } + else if (item != null) { + downstream.onSuccess(item); + } else { + downstream.onComplete(); + } + } + + @Override + public void dispose() { + whenReference.set(null); + } + + @Override + public boolean isDisposed() { + return whenReference.get() == null; + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeMapOptional.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeMapOptional.java new file mode 100644 index 0000000000..8330d5249a --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeMapOptional.java @@ -0,0 +1,110 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +/** + * Maps the success value to an {@link Optional} and emits its non-empty value or completes. + * + * @param the upstream success value type + * @param the result value type + * @since 3.0.0 + */ +public final class MaybeMapOptional extends Maybe { + + final Maybe source; + + final Function> mapper; + + public MaybeMapOptional(Maybe source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new MapOptionalMaybeObserver<>(observer, mapper)); + } + + static final class MapOptionalMaybeObserver implements MaybeObserver, Disposable { + + final MaybeObserver downstream; + + final Function> mapper; + + Disposable upstream; + + MapOptionalMaybeObserver(MaybeObserver downstream, Function> mapper) { + this.downstream = downstream; + this.mapper = mapper; + } + + @Override + public void dispose() { + Disposable d = this.upstream; + this.upstream = DisposableHelper.DISPOSED; + d.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + Optional v; + + try { + v = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null item"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + if (v.isPresent()) { + downstream.onSuccess(v.get()); + } else { + downstream.onComplete(); + } + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFromCompletionStage.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFromCompletionStage.java new file mode 100644 index 0000000000..f1be64375e --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFromCompletionStage.java @@ -0,0 +1,81 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.jdk8.FlowableFromCompletionStage.BiConsumerAtomicReference; + +/** + * Wrap a CompletionStage and signal its outcome. + * @param the element type + * @since 3.0.0 + */ +public final class SingleFromCompletionStage extends Single { + + final CompletionStage stage; + + public SingleFromCompletionStage(CompletionStage stage) { + this.stage = stage; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + // We need an indirection because one can't detach from a whenComplete + // and cancellation should not hold onto the stage. + BiConsumerAtomicReference whenReference = new BiConsumerAtomicReference<>(); + CompletionStageHandler handler = new CompletionStageHandler<>(observer, whenReference); + whenReference.lazySet(handler); + + observer.onSubscribe(handler); + stage.whenComplete(whenReference); + } + + static final class CompletionStageHandler + implements Disposable, BiConsumer { + + final SingleObserver downstream; + + final BiConsumerAtomicReference whenReference; + + CompletionStageHandler(SingleObserver downstream, BiConsumerAtomicReference whenReference) { + this.downstream = downstream; + this.whenReference = whenReference; + } + + @Override + public void accept(T item, Throwable error) { + if (error != null) { + downstream.onError(error); + } + else if (item != null) { + downstream.onSuccess(item); + } else { + downstream.onError(new NullPointerException("The CompletionStage terminated with null.")); + } + } + + @Override + public void dispose() { + whenReference.set(null); + } + + @Override + public boolean isDisposed() { + return whenReference.get() == null; + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleMapOptional.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleMapOptional.java new file mode 100644 index 0000000000..9ee9a6f803 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleMapOptional.java @@ -0,0 +1,105 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +/** + * Maps the success value to an {@link Optional} and emits its non-empty value or completes. + * + * @param the upstream success value type + * @param the result value type + * @since 3.0.0 + */ +public final class SingleMapOptional extends Maybe { + + final Single source; + + final Function> mapper; + + public SingleMapOptional(Single source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new MapOptionalSingleObserver<>(observer, mapper)); + } + + static final class MapOptionalSingleObserver implements SingleObserver, Disposable { + + final MaybeObserver downstream; + + final Function> mapper; + + Disposable upstream; + + MapOptionalSingleObserver(MaybeObserver downstream, Function> mapper) { + this.downstream = downstream; + this.mapper = mapper; + } + + @Override + public void dispose() { + Disposable d = this.upstream; + this.upstream = DisposableHelper.DISPOSED; + d.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + Optional v; + + try { + v = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null item"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + if (v.isPresent()) { + downstream.onSuccess(v.get()); + } else { + downstream.onComplete(); + } + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/CompletableFromCompletionStageTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/CompletableFromCompletionStageTest.java new file mode 100644 index 0000000000..3519cc5454 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/CompletableFromCompletionStageTest.java @@ -0,0 +1,78 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class CompletableFromCompletionStageTest extends RxJavaTest { + + @Test + public void syncSuccess() { + Completable.fromCompletionStage(CompletableFuture.completedFuture(1)) + .test() + .assertResult(); + } + + @Test + public void syncSuccessNull() { + Completable.fromCompletionStage(CompletableFuture.completedFuture(null)) + .test() + .assertResult(); + } + + @Test + public void syncFailure() { + CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(new TestException()); + + Completable.fromCompletionStage(cf) + .test() + .assertFailure(TestException.class); + } + + @Test + public void syncNull() { + Completable.fromCompletionStage(CompletableFuture.completedFuture(null)) + .test() + .assertResult(); + } + + @Test + public void dispose() { + CompletableFuture cf = new CompletableFuture<>(); + + TestObserver to = Completable.fromCompletionStage(cf) + .test(); + + to.assertEmpty(); + + to.dispose(); + + cf.complete(1); + + to.assertEmpty(); + } + + @Test + public void dispose2() { + TestHelper.checkDisposed(Completable.fromCompletionStage(new CompletableFuture<>())); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/CompletableToCompletionStageTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/CompletableToCompletionStageTest.java new file mode 100644 index 0000000000..91fdfb9cde --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/CompletableToCompletionStageTest.java @@ -0,0 +1,154 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.subjects.CompletableSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class CompletableToCompletionStageTest extends RxJavaTest { + + @Test + public void complete() throws Exception { + Object v = Completable.complete() + .toCompletionStage(null) + .toCompletableFuture() + .get(); + + assertNull(v); + } + + @Test + public void completableFutureCancels() throws Exception { + CompletableSubject source = CompletableSubject.create(); + + CompletableFuture cf = source + .toCompletionStage(null) + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasObservers()); + } + + @Test + public void completableManualCompleteCancels() throws Exception { + CompletableSubject source = CompletableSubject.create(); + + CompletableFuture cf = source + .toCompletionStage(null) + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasObservers()); + + assertEquals(1, cf.get()); + } + + @Test + public void completableManualCompleteExceptionallyCancels() throws Exception { + CompletableSubject source = CompletableSubject.create(); + + CompletableFuture cf = source + .toCompletionStage(null) + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasObservers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void error() throws Exception { + CompletableFuture cf = Completable.error(new TestException()) + .toCompletionStage(null) + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void sourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Object v = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + observer.onSubscribe(Disposable.empty()); + observer.onComplete(); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .toCompletionStage(null) + .toCompletableFuture() + .get(); + + assertNull(v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void doubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Object v = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + observer.onSubscribe(Disposable.empty()); + observer.onSubscribe(Disposable.empty()); + observer.onComplete(); + } + } + .toCompletionStage(null) + .toCompletableFuture() + .get(); + + assertNull(v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromCompletionStageTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromCompletionStageTest.java new file mode 100644 index 0000000000..90ad2bc626 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromCompletionStageTest.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class MaybeFromCompletionStageTest extends RxJavaTest { + + @Test + public void syncSuccess() { + Maybe.fromCompletionStage(CompletableFuture.completedFuture(1)) + .test() + .assertResult(1); + } + + @Test + public void syncFailure() { + CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(new TestException()); + + Maybe.fromCompletionStage(cf) + .test() + .assertFailure(TestException.class); + } + + @Test + public void syncNull() { + Maybe.fromCompletionStage(CompletableFuture.completedFuture(null)) + .test() + .assertResult(); + } + + @Test + public void dispose() { + CompletableFuture cf = new CompletableFuture<>(); + + TestObserver to = Maybe.fromCompletionStage(cf) + .test(); + + to.assertEmpty(); + + to.dispose(); + + cf.complete(1); + + to.assertEmpty(); + } + + @Test + public void dispose2() { + TestHelper.checkDisposed(Maybe.fromCompletionStage(new CompletableFuture<>())); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromOptionalTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromOptionalTest.java new file mode 100644 index 0000000000..4e09fea0b1 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFromOptionalTest.java @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Optional; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; + +public class MaybeFromOptionalTest extends RxJavaTest { + + @Test + public void hasValue() { + Maybe.fromOptional(Optional.of(1)) + .test() + .assertResult(1); + } + + @Test + public void empty() { + Maybe.fromOptional(Optional.empty()) + .test() + .assertResult(); + } + +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeMapOptionalTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeMapOptionalTest.java new file mode 100644 index 0000000000..e8b9f2aac3 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeMapOptionalTest.java @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.Optional; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class MaybeMapOptionalTest extends RxJavaTest { + + @Test + public void successSuccess() { + Maybe.just(1) + .mapOptional(Optional::of) + .test() + .assertResult(1); + } + + @Test + public void successEmpty() { + Maybe.just(1) + .mapOptional(v -> Optional.empty()) + .test() + .assertResult(); + } + + @Test + public void empty() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Maybe.empty() + .mapOptional(f) + .test() + .assertResult(); + + verify(f, never()).apply(any()); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Maybe.error(new TestException()) + .mapOptional(f) + .test() + .assertFailure(TestException.class); + + verify(f, never()).apply(any()); + } + + @Test + public void mapperCrash() { + Maybe.just(1) + .mapOptional(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.never().mapOptional(Optional::of)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(m -> m.mapOptional(Optional::of)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeToCompletionStageTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeToCompletionStageTest.java new file mode 100644 index 0000000000..c6c83e68a3 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeToCompletionStageTest.java @@ -0,0 +1,174 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class MaybeToCompletionStageTest extends RxJavaTest { + + @Test + public void just() throws Exception { + Integer v = Maybe.just(1) + .toCompletionStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + } + + @Test + public void empty() throws Exception { + Integer v = Maybe.empty() + .toCompletionStage(2) + .toCompletableFuture() + .get(); + + assertEquals((Integer)2, v); + } + + @Test + public void emptyError() throws Exception { + CompletableFuture cf = Maybe.empty() + .toCompletionStage() + .toCompletableFuture(); + + TestHelper.assertError(cf, NoSuchElementException.class); + } + + @Test + public void completableFutureCancels() throws Exception { + MaybeSubject source = MaybeSubject.create(); + + CompletableFuture cf = source + .toCompletionStage(null) + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasObservers()); + } + + @Test + public void completableManualCompleteCancels() throws Exception { + MaybeSubject source = MaybeSubject.create(); + + CompletableFuture cf = source + .toCompletionStage(null) + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasObservers()); + + assertEquals((Integer)1, cf.get()); + } + + @Test + public void completableManualCompleteExceptionallyCancels() throws Exception { + MaybeSubject source = MaybeSubject.create(); + + CompletableFuture cf = source + .toCompletionStage(null) + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasObservers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void error() throws Exception { + CompletableFuture cf = Maybe.error(new TestException()) + .toCompletionStage(null) + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void sourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(Disposable.empty()); + observer.onSuccess(1); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .toCompletionStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void doubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(Disposable.empty()); + observer.onSubscribe(Disposable.empty()); + observer.onSuccess(1); + } + } + .toCompletionStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFromCompletionStageTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFromCompletionStageTest.java new file mode 100644 index 0000000000..7adb3664be --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFromCompletionStageTest.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class SingleFromCompletionStageTest extends RxJavaTest { + + @Test + public void syncSuccess() { + Single.fromCompletionStage(CompletableFuture.completedFuture(1)) + .test() + .assertResult(1); + } + + @Test + public void syncFailure() { + CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(new TestException()); + + Single.fromCompletionStage(cf) + .test() + .assertFailure(TestException.class); + } + + @Test + public void syncNull() { + Single.fromCompletionStage(CompletableFuture.completedFuture(null)) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void dispose() { + CompletableFuture cf = new CompletableFuture<>(); + + TestObserver to = Single.fromCompletionStage(cf) + .test(); + + to.assertEmpty(); + + to.dispose(); + + cf.complete(1); + + to.assertEmpty(); + } + + @Test + public void dispose2() { + TestHelper.checkDisposed(Single.fromCompletionStage(new CompletableFuture<>())); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleMapOptionalTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleMapOptionalTest.java new file mode 100644 index 0000000000..b8ab17a17c --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleMapOptionalTest.java @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.Optional; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class SingleMapOptionalTest extends RxJavaTest { + + @Test + public void successSuccess() { + Single.just(1) + .mapOptional(Optional::of) + .test() + .assertResult(1); + } + + @Test + public void successEmpty() { + Single.just(1) + .mapOptional(v -> Optional.empty()) + .test() + .assertResult(); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Single.error(new TestException()) + .mapOptional(f) + .test() + .assertFailure(TestException.class); + + verify(f, never()).apply(any()); + } + + @Test + public void mapperCrash() { + Single.just(1) + .mapOptional(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.never().mapOptional(Optional::of)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToMaybe(m -> m.mapOptional(Optional::of)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleToCompletionStageTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleToCompletionStageTest.java new file mode 100644 index 0000000000..2560c68ed8 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleToCompletionStageTest.java @@ -0,0 +1,153 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class SingleToCompletionStageTest extends RxJavaTest { + + @Test + public void just() throws Exception { + Integer v = Single.just(1) + .toCompletionStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + } + + @Test + public void completableFutureCancels() throws Exception { + SingleSubject source = SingleSubject.create(); + + CompletableFuture cf = source + .toCompletionStage() + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasObservers()); + } + + @Test + public void completableManualCompleteCancels() throws Exception { + SingleSubject source = SingleSubject.create(); + + CompletableFuture cf = source + .toCompletionStage() + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasObservers()); + + assertEquals((Integer)1, cf.get()); + } + + @Test + public void completableManualCompleteExceptionallyCancels() throws Exception { + SingleSubject source = SingleSubject.create(); + + CompletableFuture cf = source + .toCompletionStage() + .toCompletableFuture(); + + assertTrue(source.hasObservers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasObservers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void error() throws Exception { + CompletableFuture cf = Single.error(new TestException()) + .toCompletionStage() + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void sourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + observer.onSubscribe(Disposable.empty()); + observer.onSuccess(1); + observer.onError(new TestException()); + } + } + .toCompletionStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void doubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + observer.onSubscribe(Disposable.empty()); + observer.onSubscribe(Disposable.empty()); + observer.onSuccess(1); + } + } + .toCompletionStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index 99f6783b4a..b796904cdb 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -509,6 +509,9 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "singleStage", Object.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "lastStage", Object.class)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "toCompletionStage", Object.class)); + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "toCompletionStage", Object.class)); + // ----------------------------------------------------------------------------------- ignores = new HashMap>();