1616
1717import org .reactivestreams .Publisher ;
1818
19- import io .reactivex .annotations .SchedulerSupport ;
19+ import io .reactivex .annotations .* ;
2020import io .reactivex .disposables .Disposable ;
2121import io .reactivex .exceptions .Exceptions ;
2222import io .reactivex .functions .*;
2323import io .reactivex .internal .functions .*;
24+ import io .reactivex .internal .fuseable .*;
2425import io .reactivex .internal .observers .*;
2526import io .reactivex .internal .operators .completable .*;
2627import io .reactivex .internal .operators .flowable .FlowableDelaySubscriptionOther ;
@@ -43,7 +44,7 @@ public abstract class Completable implements CompletableSource {
4344 * terminates (normally or with an error) and cancels all other Completables.
4445 * <dl>
4546 * <dt><b>Scheduler:</b></dt>
46- * <dd>{@code amb } does not operate by default on a particular {@link Scheduler}.</dd>
47+ * <dd>{@code ambArray } does not operate by default on a particular {@link Scheduler}.</dd>
4748 * </dl>
4849 * @param sources the array of source Completables
4950 * @return the new Completable instance
@@ -97,7 +98,7 @@ public static Completable complete() {
9798 * Returns a Completable which completes only when all sources complete, one after another.
9899 * <dl>
99100 * <dt><b>Scheduler:</b></dt>
100- * <dd>{@code concat } does not operate by default on a particular {@link Scheduler}.</dd>
101+ * <dd>{@code concatArray } does not operate by default on a particular {@link Scheduler}.</dd>
101102 * </dl>
102103 * @param sources the sources to concatenate
103104 * @return the Completable instance which completes only when all sources complete
@@ -143,6 +144,7 @@ public static Completable concat(Iterable<? extends CompletableSource> sources)
143144 * @throws NullPointerException if sources is null
144145 */
145146 @ SchedulerSupport (SchedulerSupport .NONE )
147+ @ BackpressureSupport (BackpressureKind .FULL )
146148 public static Completable concat (Publisher <? extends CompletableSource > sources ) {
147149 return concat (sources , 2 );
148150 }
@@ -159,6 +161,7 @@ public static Completable concat(Publisher<? extends CompletableSource> sources)
159161 * @throws NullPointerException if sources is null
160162 */
161163 @ SchedulerSupport (SchedulerSupport .NONE )
164+ @ BackpressureSupport (BackpressureKind .FULL )
162165 public static Completable concat (Publisher <? extends CompletableSource > sources , int prefetch ) {
163166 ObjectHelper .requireNonNull (sources , "sources is null" );
164167 ObjectHelper .verifyPositive (prefetch , "prefetch" );
@@ -359,6 +362,7 @@ public static <T> Completable fromObservable(final ObservableSource<T> observabl
359362 * @return the new Completable instance
360363 * @throws NullPointerException if publisher is null
361364 */
365+ @ BackpressureSupport (BackpressureKind .UNBOUNDED_IN )
362366 @ SchedulerSupport (SchedulerSupport .NONE )
363367 public static <T > Completable fromPublisher (final Publisher <T > publisher ) {
364368 ObjectHelper .requireNonNull (publisher , "publisher is null" );
@@ -388,7 +392,7 @@ public static <T> Completable fromSingle(final SingleSource<T> single) {
388392 * completes only when all source Completables complete or one of them emits an error.
389393 * <dl>
390394 * <dt><b>Scheduler:</b></dt>
391- * <dd>{@code merge } does not operate by default on a particular {@link Scheduler}.</dd>
395+ * <dd>{@code mergeArray } does not operate by default on a particular {@link Scheduler}.</dd>
392396 * </dl>
393397 * @param sources the iterable sequence of sources.
394398 * @return the new Completable instance
@@ -435,6 +439,7 @@ public static Completable merge(Iterable<? extends CompletableSource> sources) {
435439 * @throws NullPointerException if sources is null
436440 */
437441 @ SchedulerSupport (SchedulerSupport .NONE )
442+ @ BackpressureSupport (BackpressureKind .UNBOUNDED_IN )
438443 public static Completable merge (Publisher <? extends CompletableSource > sources ) {
439444 return merge0 (sources , Integer .MAX_VALUE , false );
440445 }
@@ -453,6 +458,7 @@ public static Completable merge(Publisher<? extends CompletableSource> sources)
453458 * @throws IllegalArgumentException if maxConcurrency is less than 1
454459 */
455460 @ SchedulerSupport (SchedulerSupport .NONE )
461+ @ BackpressureSupport (BackpressureKind .FULL )
456462 public static Completable merge (Publisher <? extends CompletableSource > sources , int maxConcurrency ) {
457463 return merge0 (sources , maxConcurrency , false );
458464 }
@@ -473,6 +479,7 @@ public static Completable merge(Publisher<? extends CompletableSource> sources,
473479 * @throws IllegalArgumentException if maxConcurrency is less than 1
474480 */
475481 @ SchedulerSupport (SchedulerSupport .NONE )
482+ @ BackpressureSupport (BackpressureKind .FULL )
476483 private static Completable merge0 (Publisher <? extends CompletableSource > sources , int maxConcurrency , boolean delayErrors ) {
477484 ObjectHelper .requireNonNull (sources , "sources is null" );
478485 ObjectHelper .verifyPositive (maxConcurrency , "maxConcurrency" );
@@ -485,7 +492,7 @@ private static Completable merge0(Publisher<? extends CompletableSource> sources
485492 * them terminate in a way or another.
486493 * <dl>
487494 * <dt><b>Scheduler:</b></dt>
488- * <dd>{@code mergeDelayError } does not operate by default on a particular {@link Scheduler}.</dd>
495+ * <dd>{@code mergeArrayDelayError } does not operate by default on a particular {@link Scheduler}.</dd>
489496 * </dl>
490497 * @param sources the array of Completables
491498 * @return the new Completable instance
@@ -529,6 +536,7 @@ public static Completable mergeDelayError(Iterable<? extends CompletableSource>
529536 * @throws NullPointerException if sources is null
530537 */
531538 @ SchedulerSupport (SchedulerSupport .NONE )
539+ @ BackpressureSupport (BackpressureKind .UNBOUNDED_IN )
532540 public static Completable mergeDelayError (Publisher <? extends CompletableSource > sources ) {
533541 return merge0 (sources , Integer .MAX_VALUE , true );
534542 }
@@ -548,6 +556,7 @@ public static Completable mergeDelayError(Publisher<? extends CompletableSource>
548556 * @throws NullPointerException if sources is null
549557 */
550558 @ SchedulerSupport (SchedulerSupport .NONE )
559+ @ BackpressureSupport (BackpressureKind .FULL )
551560 public static Completable mergeDelayError (Publisher <? extends CompletableSource > sources , int maxConcurrency ) {
552561 return merge0 (sources , maxConcurrency , true );
553562 }
@@ -670,7 +679,7 @@ public static <R> Completable using(
670679 * if not already Completable.
671680 * <dl>
672681 * <dt><b>Scheduler:</b></dt>
673- * <dd>{@code amb } does not operate by default on a particular {@link Scheduler}.</dd>
682+ * <dd>{@code wrap } does not operate by default on a particular {@link Scheduler}.</dd>
674683 * </dl>
675684 * @param source the source to wrap
676685 * @return the source or its wrapper Completable
@@ -736,6 +745,7 @@ public final <T> Observable<T> andThen(ObservableSource<T> next) {
736745 * @return Flowable that composes this Completable and next
737746 * @throws NullPointerException if next is null
738747 */
748+ @ BackpressureSupport (BackpressureKind .FULL )
739749 @ SchedulerSupport (SchedulerSupport .NONE )
740750 public final <T > Flowable <T > andThen (Publisher <T > next ) {
741751 ObjectHelper .requireNonNull (next , "next is null" );
@@ -821,7 +831,7 @@ public final boolean blockingAwait(long timeout, TimeUnit unit) {
821831 * the emitted exception if any.
822832 * <dl>
823833 * <dt><b>Scheduler:</b></dt>
824- * <dd>{@code doAfterTerminate } does not operate by default on a particular {@link Scheduler}.</dd>
834+ * <dd>{@code blockingGet } does not operate by default on a particular {@link Scheduler}.</dd>
825835 * </dl>
826836 * @return the throwable if this terminated with an error, null otherwise
827837 * @throws RuntimeException that wraps an InterruptedException if the wait is interrupted
@@ -836,6 +846,10 @@ public final Throwable blockingGet() {
836846 /**
837847 * Subscribes to this Completable instance and blocks until it terminates or the specified timeout
838848 * elapses, then returns null for normal termination or the emitted exception if any.
849+ * <dl>
850+ * <dt><b>Scheduler:</b></dt>
851+ * <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
852+ * </dl>
839853 * @param timeout the timeout value
840854 * @param unit the time unit
841855 * @return the throwable if this terminated with an error, null otherwise
@@ -1162,7 +1176,7 @@ public final Completable onErrorComplete() {
11621176 * true, it will emit an onComplete and swallow the throwable.
11631177 * <dl>
11641178 * <dt><b>Scheduler:</b></dt>
1165- * <dd>{@code doErrorComplete } does not operate by default on a particular {@link Scheduler}.</dd>
1179+ * <dd>{@code onErrorComplete } does not operate by default on a particular {@link Scheduler}.</dd>
11661180 * </dl>
11671181 * @param predicate the predicate to call when an Throwable is emitted which should return true
11681182 * if the Throwable should be swallowed and replaced with an onComplete.
@@ -1381,6 +1395,7 @@ public final <T> Observable<T> startWith(Observable<T> other) {
13811395 * @return the new Observable instance
13821396 * @throws NullPointerException if other is null
13831397 */
1398+ @ BackpressureSupport (BackpressureKind .FULL )
13841399 @ SchedulerSupport (SchedulerSupport .NONE )
13851400 public final <T > Flowable <T > startWith (Publisher <T > other ) {
13861401 ObjectHelper .requireNonNull (other , "other is null" );
@@ -1442,12 +1457,17 @@ public final void subscribe(CompletableObserver s) {
14421457 *
14431458 * composite.add(source.subscribeWith(new ResourceCompletableObserver()));
14441459 * </code></pre>
1460+ * <dl>
1461+ * <dt><b>Scheduler:</b></dt>
1462+ * <dd>{@code subscribeWith} does not operate by default on a particular {@link Scheduler}.</dd>
1463+ * </dl>
14451464 * @param <E> the type of the CompletableObserver to use and return
14461465 * @param observer the CompletableObserver (subclass) to use and return, not null
14471466 * @return the input {@code observer}
14481467 * @throws NullPointerException if {@code observer} is null
14491468 * @since 2.0
14501469 */
1470+ @ SchedulerSupport (SchedulerSupport .NONE )
14511471 public final <E extends CompletableObserver > E subscribeWith (E observer ) {
14521472 subscribe (observer );
14531473 return observer ;
@@ -1595,6 +1615,10 @@ public final Completable timeout(long timeout, TimeUnit unit, Scheduler schedule
15951615 * Returns a Completable that runs this Completable and optionally switches to the other Completable
15961616 * in case this Completable doesn't complete within the given time while "waiting" on
15971617 * the specified scheduler.
1618+ * <dl>
1619+ * <dt><b>Scheduler:</b></dt>
1620+ * <dd>you specify the {@link Scheduler} this operator runs on.</dd>
1621+ * </dl>
15981622 * @param timeout the timeout value
15991623 * @param unit the timeout unit
16001624 * @param scheduler the scheduler to use to wait for completion
@@ -1641,8 +1665,13 @@ public final <U> U to(Function<? super Completable, U> converter) {
16411665 * @param <T> the value type
16421666 * @return the new Observable created
16431667 */
1668+ @ SuppressWarnings ("unchecked" )
1669+ @ BackpressureSupport (BackpressureKind .FULL )
16441670 @ SchedulerSupport (SchedulerSupport .NONE )
16451671 public final <T > Flowable <T > toFlowable () {
1672+ if (this instanceof FuseToFlowable ) {
1673+ return ((FuseToFlowable <T >)this ).fuseToFlowable ();
1674+ }
16461675 return RxJavaPlugins .onAssembly (new CompletableToFlowable <T >(this ));
16471676 }
16481677
@@ -1652,13 +1681,18 @@ public final <T> Flowable<T> toFlowable() {
16521681 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toObservable.png" alt="">
16531682 * <dl>
16541683 * <dt><b>Scheduler:</b></dt>
1655- * <dd>{@code toCompletable } does not operate by default on a particular {@link Scheduler}.</dd>
1684+ * <dd>{@code toMaybe } does not operate by default on a particular {@link Scheduler}.</dd>
16561685 * </dl>
16571686 *
16581687 * @param <T> the value type
16591688 * @return an {@link Maybe} that emits a single item T or an error.
16601689 */
1690+ @ SuppressWarnings ("unchecked" )
1691+ @ SchedulerSupport (SchedulerSupport .NONE )
16611692 public final <T > Maybe <T > toMaybe () {
1693+ if (this instanceof FuseToMaybe ) {
1694+ return ((FuseToMaybe <T >)this ).fuseToMaybe ();
1695+ }
16621696 return RxJavaPlugins .onAssembly (new MaybeFromCompletable <T >(this ));
16631697 }
16641698
@@ -1672,8 +1706,12 @@ public final <T> Maybe<T> toMaybe() {
16721706 * @param <T> the value type
16731707 * @return the new Observable created
16741708 */
1709+ @ SuppressWarnings ("unchecked" )
16751710 @ SchedulerSupport (SchedulerSupport .NONE )
16761711 public final <T > Observable <T > toObservable () {
1712+ if (this instanceof FuseToObservable ) {
1713+ return ((FuseToObservable <T >)this ).fuseToObservable ();
1714+ }
16771715 return RxJavaPlugins .onAssembly (new CompletableToObservable <T >(this ));
16781716 }
16791717
@@ -1736,9 +1774,14 @@ public final Completable unsubscribeOn(final Scheduler scheduler) {
17361774 /**
17371775 * Creates a TestSubscriber and subscribes
17381776 * it to this Completable.
1777+ * <dl>
1778+ * <dt><b>Scheduler:</b></dt>
1779+ * <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
1780+ * </dl>
17391781 * @return the new TestSubscriber instance
17401782 * @since 2.0
17411783 */
1784+ @ SchedulerSupport (SchedulerSupport .NONE )
17421785 public final TestSubscriber <Void > test () {
17431786 TestSubscriber <Void > ts = new TestSubscriber <Void >();
17441787 subscribe (new SubscriberCompletableObserver <Void >(ts ));
@@ -1749,9 +1792,14 @@ public final TestSubscriber<Void> test() {
17491792 * Creates a TestSubscriber optionally in cancelled state, then subscribes it to this Completable.
17501793 * @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
17511794 * Completable.
1795+ * <dl>
1796+ * <dt><b>Scheduler:</b></dt>
1797+ * <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
1798+ * </dl>
17521799 * @return the new TestSubscriber instance
17531800 * @since 2.0
17541801 */
1802+ @ SchedulerSupport (SchedulerSupport .NONE )
17551803 public final TestSubscriber <Void > test (boolean cancelled ) {
17561804 TestSubscriber <Void > ts = new TestSubscriber <Void >();
17571805
0 commit comments