Skip to content

Commit

Permalink
2.x: Dedicated {Single|Maybe}.flatMap{Publisher|Observable} & andThen…
Browse files Browse the repository at this point in the history
…(Observable|Publisher) implementations (#6024)

* 2.x: Dedicated {0..1}.flatMap{Publisher|Obs} & andThen implementations

* Fix local variable name.
  • Loading branch information
akarnokd authored May 27, 2018
1 parent 07586f4 commit 43ceedf
Show file tree
Hide file tree
Showing 15 changed files with 1,086 additions and 18 deletions.
7 changes: 3 additions & 4 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.observers.*;
import io.reactivex.internal.operators.completable.*;
import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther;
import io.reactivex.internal.operators.maybe.*;
import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther;
import io.reactivex.internal.operators.mixed.*;
import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.TestObserver;
Expand Down Expand Up @@ -872,7 +871,7 @@ public final Completable ambWith(CompletableSource other) {
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Observable<T> andThen(ObservableSource<T> next) {
ObjectHelper.requireNonNull(next, "next is null");
return RxJavaPlugins.onAssembly(new ObservableDelaySubscriptionOther<T, Object>(next, toObservable()));
return RxJavaPlugins.onAssembly(new CompletableAndThenObservable<T>(this, next));
}

/**
Expand All @@ -897,7 +896,7 @@ public final <T> Observable<T> andThen(ObservableSource<T> next) {
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Flowable<T> andThen(Publisher<T> next) {
ObjectHelper.requireNonNull(next, "next is null");
return RxJavaPlugins.onAssembly(new FlowableDelaySubscriptionOther<T, Object>(next, toFlowable()));
return RxJavaPlugins.onAssembly(new CompletableAndThenPublisher<T>(this, next));
}

/**
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.reactivex.internal.observers.BlockingMultiObserver;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.maybe.*;
import io.reactivex.internal.operators.mixed.*;
import io.reactivex.internal.util.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -2961,7 +2962,8 @@ public final <U> Observable<U> flattenAsObservable(final Function<? super T, ? e
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return toObservable().flatMap(mapper);
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlatMapObservable<T, R>(this, mapper));
}

/**
Expand All @@ -2987,7 +2989,8 @@ public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends O
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return toFlowable().flatMap(mapper);
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlatMapPublisher<T, R>(this, mapper));
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.reactivex.internal.operators.completable.*;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.maybe.*;
import io.reactivex.internal.operators.mixed.*;
import io.reactivex.internal.operators.observable.*;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -2535,7 +2536,8 @@ public final <U> Observable<U> flattenAsObservable(final Function<? super T, ? e
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return toObservable().flatMap(mapper);
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapObservable<T, R>(this, mapper));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.mixed;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;

/**
* After Completable completes, it relays the signals
* of the ObservableSource to the downstream observer.
*
* @param <R> the result type of the ObservableSource and this operator
* @since 2.1.15
*/
public final class CompletableAndThenObservable<R> extends Observable<R> {

final CompletableSource source;

final ObservableSource<? extends R> other;

public CompletableAndThenObservable(CompletableSource source,
ObservableSource<? extends R> other) {
this.source = source;
this.other = other;
}

@Override
protected void subscribeActual(Observer<? super R> s) {
AndThenObservableObserver<R> parent = new AndThenObservableObserver<R>(s, other);
s.onSubscribe(parent);
source.subscribe(parent);
}

static final class AndThenObservableObserver<R>
extends AtomicReference<Disposable>
implements Observer<R>, CompletableObserver, Disposable {

private static final long serialVersionUID = -8948264376121066672L;

final Observer<? super R> downstream;

ObservableSource<? extends R> other;

AndThenObservableObserver(Observer<? super R> downstream, ObservableSource<? extends R> other) {
this.other = other;
this.downstream = downstream;
}

@Override
public void onNext(R t) {
downstream.onNext(t);
}

@Override
public void onError(Throwable t) {
downstream.onError(t);
}

@Override
public void onComplete() {
ObservableSource<? extends R> o = other;
if (o == null) {
downstream.onComplete();
} else {
other = null;
o.subscribe(this);
}
}


@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void onSubscribe(Disposable d) {
DisposableHelper.replace(this, d);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.mixed;

import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

/**
* After Completable completes, it relays the signals
* of the Publisher to the downstream subscriber.
*
* @param <R> the result type of the Publisher and this operator
* @since 2.1.15
*/
public final class CompletableAndThenPublisher<R> extends Flowable<R> {

final CompletableSource source;

final Publisher<? extends R> other;

public CompletableAndThenPublisher(CompletableSource source,
Publisher<? extends R> other) {
this.source = source;
this.other = other;
}

@Override
protected void subscribeActual(Subscriber<? super R> s) {
source.subscribe(new AndThenPublisherSubscriber<R>(s, other));
}

static final class AndThenPublisherSubscriber<R>
extends AtomicReference<Subscription>
implements FlowableSubscriber<R>, CompletableObserver, Subscription {

private static final long serialVersionUID = -8948264376121066672L;

final Subscriber<? super R> downstream;

Publisher<? extends R> other;

Disposable upstream;

final AtomicLong requested;

AndThenPublisherSubscriber(Subscriber<? super R> downstream, Publisher<? extends R> other) {
this.downstream = downstream;
this.other = other;
this.requested = new AtomicLong();
}

@Override
public void onNext(R t) {
downstream.onNext(t);
}

@Override
public void onError(Throwable t) {
downstream.onError(t);
}

@Override
public void onComplete() {
Publisher<? extends R> p = other;
if (p == null) {
downstream.onComplete();
} else {
other = null;
p.subscribe(this);
}
}

@Override
public void request(long n) {
SubscriptionHelper.deferredRequest(this, requested, n);
}

@Override
public void cancel() {
upstream.dispose();
SubscriptionHelper.cancel(this);
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}

@Override
public void onSubscribe(Subscription s) {
SubscriptionHelper.deferredSetOnce(this, requested, s);
}
}
}
Loading

0 comments on commit 43ceedf

Please sign in to comment.