Skip to content

Commit

Permalink
2.x: Add Flowable.switchMap{Maybe,Single}{DelayError} operators (#5873)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 2, 2018
1 parent 44fb7cd commit d3ed269
Show file tree
Hide file tree
Showing 5 changed files with 1,990 additions and 0 deletions.
140 changes: 140 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14648,6 +14648,146 @@ <R> Flowable<R> switchMap0(Function<? super T, ? extends Publisher<? extends R>>
return RxJavaPlugins.onAssembly(new FlowableSwitchMap<T, R>(this, mapper, bufferSize, delayError));
}

/**
* Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones
* while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if
* available while failing immediately if this {@code Flowable} or any of the
* active inner {@code MaybeSource}s fail.
* <p>
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an
* unbounded manner (i.e., without backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>This operator terminates with an {@code onError} if this {@code Flowable} or any of
* the inner {@code MaybeSource}s fail while they are active. When this happens concurrently, their
* individual {@code Throwable} errors may get combined and emitted as a single
* {@link io.reactivex.exceptions.CompositeException CompositeException}. Otherwise, a late
* (i.e., inactive or switched out) {@code onError} from this {@code Flowable} or from any of
* the inner {@code MaybeSource}s will be forwarded to the global error handler via
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} as
* {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}</dd>
* </dl>
* @param <R> the output value type
* @param mapper the function called with the current upstream event and should
* return a {@code MaybeSource} to replace the current active inner source
* and get subscribed to.
* @return the new Flowable instance
* @since 2.1.11 - experimental
* @see #switchMapMaybe(Function)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <R> Flowable<R> switchMapMaybe(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybe<T, R>(this, mapper, false));
}

/**
* Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones
* while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if
* available, delaying errors from this {@code Flowable} or the inner {@code MaybeSource}s until all terminate.
* <p>
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an
* unbounded manner (i.e., without backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the output value type
* @param mapper the function called with the current upstream event and should
* return a {@code MaybeSource} to replace the current active inner source
* and get subscribed to.
* @return the new Flowable instance
* @since 2.1.11 - experimental
* @see #switchMapMaybe(Function)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <R> Flowable<R> switchMapMaybeDelayError(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybe<T, R>(this, mapper, true));
}

/**
* Maps the upstream items into {@link SingleSource}s and switches (subscribes) to the newer ones
* while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one
* while failing immediately if this {@code Flowable} or any of the
* active inner {@code SingleSource}s fail.
* <p>
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an
* unbounded manner (i.e., without backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>This operator terminates with an {@code onError} if this {@code Flowable} or any of
* the inner {@code SingleSource}s fail while they are active. When this happens concurrently, their
* individual {@code Throwable} errors may get combined and emitted as a single
* {@link io.reactivex.exceptions.CompositeException CompositeException}. Otherwise, a late
* (i.e., inactive or switched out) {@code onError} from this {@code Flowable} or from any of
* the inner {@code SingleSource}s will be forwarded to the global error handler via
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} as
* {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}</dd>
* </dl>
* @param <R> the output value type
* @param mapper the function called with the current upstream event and should
* return a {@code SingleSource} to replace the current active inner source
* and get subscribed to.
* @return the new Flowable instance
* @since 2.1.11 - experimental
* @see #switchMapSingle(Function)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <R> Flowable<R> switchMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSingle<T, R>(this, mapper, false));
}

/**
* Maps the upstream items into {@link SingleSource}s and switches (subscribes) to the newer ones
* while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one,
* delaying errors from this {@code Flowable} or the inner {@code SingleSource}s until all terminate.
* <p>
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an
* unbounded manner (i.e., without backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the output value type
* @param mapper the function called with the current upstream event and should
* return a {@code SingleSource} to replace the current active inner source
* and get subscribed to.
* @return the new Flowable instance
* @since 2.1.11 - experimental
* @see #switchMapSingle(Function)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <R> Flowable<R> switchMapSingleDelayError(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSingle<T, R>(this, mapper, true));
}

/**
* Returns a Flowable that emits only the first {@code count} items emitted by the source Publisher. If the source emits fewer than
* {@code count} items then all of its items are emitted.
Expand Down
Loading

0 comments on commit d3ed269

Please sign in to comment.