Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: [Java 8] Add fromOpt/Stage, mapOptional, toCompletionStage to M/S/C #6783

Merged
merged 1 commit into from
Dec 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package io.reactivex.rxjava3.core;

import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;

import org.reactivestreams.Publisher;
Expand All @@ -23,6 +23,7 @@
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.operators.completable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
Expand Down Expand Up @@ -2753,4 +2754,71 @@ public final TestObserver<Void> test(boolean dispose) {
subscribe(to);
return to;
}

// -------------------------------------------------------------------------
// JDK 8 Support
// -------------------------------------------------------------------------

/**
* Signals completion (or error) when the {@link CompletionStage} terminates.
* <p>
* <img width="640" height="262" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCompletionStage.c.png" alt="">
* <p>
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
* If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
* <pre><code>
* Maybe.defer(() -&gt; Completable.fromCompletionStage(createCompletionStage()));
* </code></pre>
* <p>
* Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage}
* itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param stage the CompletionStage to convert to Maybe and signal its terminal value or error
* @return the new Completable instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static Completable fromCompletionStage(@NonNull CompletionStage<?> stage) {
Objects.requireNonNull(stage, "stage is null");
return RxJavaPlugins.onAssembly(new CompletableFromCompletionStage<>(stage));
}

/**
* Signals the given default item when the upstream completes or signals the upstream error via
* a {@link CompletionStage}.
* <p>
* <img width="640" height="323" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toCompletionStage.c.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <p>
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
* <pre><code>
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).toCompletionStage(Optional.empty());
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the type of the default item to signal upon completion
* @param defaultItem the item to signal if the upstream is empty
* @return the new CompletionStage instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <T> CompletionStage<T> toCompletionStage(@Nullable T defaultItem) {
return subscribeWith(new CompletionStageConsumer<>(true, defaultItem));
}
}
162 changes: 160 additions & 2 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

package io.reactivex.rxjava3.core;

import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;

import org.reactivestreams.*;
Expand All @@ -25,6 +24,7 @@
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.observers.BlockingMultiObserver;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
Expand Down Expand Up @@ -4794,4 +4794,162 @@ public final TestObserver<T> test(boolean dispose) {
subscribe(to);
return to;
}

// -------------------------------------------------------------------------
// JDK 8 Support
// -------------------------------------------------------------------------

/**
* Converts the existing value of the provided optional into a {@link #just(Object)}
* or an empty optional into an {@link #empty()} {@code Maybe} instance.
* <p>
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromOptional.m.png" alt="">
* <p>
* Note that the operator takes an already instantiated optional reference and does not
* by any means create this original optional. If the optional is to be created per
* consumer upon subscription, use {@link #defer(Supplier)} around {@code fromOptional}:
* <pre><code>
* Maybe.defer(() -&gt; Maybe.fromOptional(createOptional()));
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromOptional} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the element type of the optional value
* @param optional the optional value to convert into a {@code Maybe}
* @return the new Maybe instance
* @see #just(Object)
* @see #empty()
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Maybe<@NonNull T> fromOptional(@NonNull Optional<T> optional) {
Objects.requireNonNull(optional, "optional is null");
return optional.map(Maybe::just).orElseGet(Maybe::empty);
}

/**
* Signals the completion value or error of the given (hot) {@link CompletionStage}-based asynchronous calculation.
* <p>
* <img width="640" height="262" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCompletionStage.s.png" alt="">
* <p>
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
* If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
* <pre><code>
* Maybe.defer(() -&gt; Maybe.fromCompletionStage(createCompletionStage()));
* </code></pre>
* <p>
* If the {@code CompletionStage} completes with {@code null}, the resulting {@code Maybe} is completed via {@code onComplete}.
* <p>
* Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage}
* itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the element type of the CompletionStage
* @param stage the CompletionStage to convert to Maybe and signal its terminal value or error
* @return the new Maybe instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Maybe<@NonNull T> fromCompletionStage(@NonNull CompletionStage<T> stage) {
Objects.requireNonNull(stage, "stage is null");
return RxJavaPlugins.onAssembly(new MaybeFromCompletionStage<>(stage));
}

/**
* Maps the upstream success value into an {@link Optional} and emits the contained item if not empty.
* <p>
* <img width="640" height="323" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mapOptional.m.png" alt="">
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mapOptional} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the non-null output type
* @param mapper the function that receives the upstream success iteem and should return a <em>non-empty</em> {@code Optional}
* to emit as the success output or an <em>empty</em> {@code Optional} to complete the {@code Maybe}
* @return the new Maybe instance
* @since 3.0.0
* @see #map(Function)
* @see #filter(Predicate)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <@NonNull R> Maybe<R> mapOptional(@NonNull Function<? super T, @NonNull Optional<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeMapOptional<>(this, mapper));
}

/**
* Signals the upstream success item (or a {@link NoSuchElementException} if the upstream is empty) via
* a {@link CompletionStage}.
* <p>
* <img width="640" height="349" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toCompletionStage.m.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <p>
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
* {@link #toCompletionStage(Object)} with {@code null} or turn the upstrea into a sequence of {@link Optional}s and
* default to {@link Optional#empty()}:
* <pre><code>
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).toCompletionStage(Optional.empty());
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new CompletionStage instance
* @since 3.0.0
* @see #toCompletionStage(Object)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> toCompletionStage() {
return subscribeWith(new CompletionStageConsumer<>(false, null));
}

/**
* Signals the upstream success item (or the default item if the upstream is empty) via
* a {@link CompletionStage}.
* <p>
* <img width="640" height="323" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toCompletionStage.mv.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <p>
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
* <pre><code>
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).toCompletionStage(Optional.empty());
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultItem the item to signal if the upstream is empty
* @return the new CompletionStage instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> toCompletionStage(@Nullable T defaultItem) {
return subscribeWith(new CompletionStageConsumer<>(true, defaultItem));
}
}
91 changes: 89 additions & 2 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

package io.reactivex.rxjava3.core;

import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;

import org.reactivestreams.Publisher;
Expand All @@ -25,6 +24,7 @@
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.operators.completable.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
Expand Down Expand Up @@ -4181,4 +4181,91 @@ public final TestObserver<T> test(boolean dispose) {
private static <T> Single<T> toSingle(Flowable<T> source) {
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(source, null));
}

// -------------------------------------------------------------------------
// JDK 8 Support
// -------------------------------------------------------------------------

/**
* Signals the completion value or error of the given (hot) {@link CompletionStage}-based asynchronous calculation.
* <p>
* <img width="640" height="262" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCompletionStage.s.png" alt="">
* <p>
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
* If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
* <pre><code>
* Single.defer(() -&gt; Single.fromCompletionStage(createCompletionStage()));
* </code></pre>
* <p>
* If the {@code CompletionStage} completes with {@code null}, the resulting {@code Single} is terminated with
* a {@link NullPointerException}.
* <p>
* Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage}
* itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the element type of the CompletionStage
* @param stage the CompletionStage to convert to Single and signal its success value or error
* @return the new Single instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Single<@NonNull T> fromCompletionStage(@NonNull CompletionStage<T> stage) {
Objects.requireNonNull(stage, "stage is null");
return RxJavaPlugins.onAssembly(new SingleFromCompletionStage<>(stage));
}

/**
* Maps the upstream success value into an {@link Optional} and emits the contained item if not empty.
* <p>
* <img width="640" height="323" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mapOptional.s.png" alt="">
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mapOptional} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the non-null output type
* @param mapper the function that receives the upstream success iteem and should return a <em>non-empty</em> {@code Optional}
* to emit as the success output or an <em>empty</em> {@code Optional} to complete the {@code Maybe}
* @return the new Maybe instance
* @since 3.0.0
* @see #map(Function)
* @see #filter(Predicate)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <@NonNull R> Maybe<R> mapOptional(@NonNull Function<? super T, @NonNull Optional<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMapOptional<>(this, mapper));
}

/**
* Signals the upstream success item (or error) via a {@link CompletionStage}.
* <p>
* <img width="640" height="321" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toCompletionStage.s.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toCompletionStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new CompletionStage instance
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> toCompletionStage() {
return subscribeWith(new CompletionStageConsumer<>(false, null));
}
}
Loading