Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add fusion support to ObservableSwitchMap inner source #5919

Merged
merged 1 commit into from
Mar 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class ObservableSwitchMapCompletablePerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int count;

Observable<Integer> switchMapToObservableEmpty;
Observable<Integer> observableConvert;

Completable switchMapCompletableEmpty;
Completable observableDedicated;

Observable<Integer> observablePlain;

Expand All @@ -53,15 +53,15 @@ public Observable<? extends Integer> apply(Integer v)
}
});

switchMapToObservableEmpty = source.switchMap(new Function<Integer, Observable<? extends Integer>>() {
observableConvert = source.switchMap(new Function<Integer, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> apply(Integer v)
throws Exception {
return Completable.complete().toObservable();
}
});

switchMapCompletableEmpty = source.switchMapCompletable(new Function<Integer, Completable>() {
observableDedicated = source.switchMapCompletable(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v)
throws Exception {
Expand All @@ -76,12 +76,12 @@ public Object observablePlain(Blackhole bh) {
}

@Benchmark
public Object switchMapToObservableEmpty(Blackhole bh) {
return switchMapToObservableEmpty.subscribeWith(new PerfConsumer(bh));
public Object observableConvert(Blackhole bh) {
return observableConvert.subscribeWith(new PerfConsumer(bh));
}

@Benchmark
public Object switchMapCompletableEmpty(Blackhole bh) {
return switchMapCompletableEmpty.subscribeWith(new PerfConsumer(bh));
public Object observableDedicated(Blackhole bh) {
return observableDedicated.subscribeWith(new PerfConsumer(bh));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ public final void complete(T value) {
if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0) {
return;
}
Observer<? super T> a = actual;
if (state == FUSED_EMPTY) {
this.value = value;
lazySet(FUSED_READY);
a.onNext(null);
} else {
lazySet(TERMINATED);
a.onNext(value);
}
Observer<? super T> a = actual;
a.onNext(value);
if (get() != DISPOSED) {
a.onComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.observers.BasicQueueDisposable;

/**
* Wraps a Completable and exposes it as an Observable.
Expand All @@ -34,8 +36,12 @@ protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new ObserverCompletableObserver(observer));
}

static final class ObserverCompletableObserver implements CompletableObserver {
private final Observer<?> observer;
static final class ObserverCompletableObserver extends BasicQueueDisposable<Void>
implements CompletableObserver {

final Observer<?> observer;

Disposable upstream;

ObserverCompletableObserver(Observer<?> observer) {
this.observer = observer;
Expand All @@ -53,7 +59,40 @@ public void onError(Throwable e) {

@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
if (DisposableHelper.validate(upstream, d)) {
this.upstream = d;
observer.onSubscribe(this);
}
}

@Override
public int requestFusion(int mode) {
return mode & ASYNC;
}

@Override
public Void poll() throws Exception {
return null; // always empty
}

@Override
public boolean isEmpty() {
return true;
}

@Override
public void clear() {
// always empty
}

@Override
public void dispose() {
upstream.dispose();
}

@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.queue.*;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.util.AtomicThrowable;
import io.reactivex.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -181,6 +182,8 @@ void drain() {
}

final Observer<? super R> a = actual;
final AtomicReference<SwitchMapInnerObserver<T, R>> active = this.active;
final boolean delayErrors = this.delayErrors;

int missing = 1;

Expand Down Expand Up @@ -218,66 +221,85 @@ void drain() {
SwitchMapInnerObserver<T, R> inner = active.get();

if (inner != null) {
SpscLinkedArrayQueue<R> q = inner.queue;

if (inner.done) {
boolean empty = q.isEmpty();
if (delayErrors) {
if (empty) {
active.compareAndSet(inner, null);
continue;
SimpleQueue<R> q = inner.queue;

if (q != null) {
if (inner.done) {
boolean empty = q.isEmpty();
if (delayErrors) {
if (empty) {
active.compareAndSet(inner, null);
continue;
}
} else {
Throwable ex = errors.get();
if (ex != null) {
a.onError(errors.terminate());
return;
}
if (empty) {
active.compareAndSet(inner, null);
continue;
}
}
} else {
Throwable ex = errors.get();
if (ex != null) {
a.onError(errors.terminate());
}

boolean retry = false;

for (;;) {
if (cancelled) {
return;
}
if (empty) {
active.compareAndSet(inner, null);
continue;
if (inner != active.get()) {
retry = true;
break;
}
}
}

boolean retry = false;
if (!delayErrors) {
Throwable ex = errors.get();
if (ex != null) {
a.onError(errors.terminate());
return;
}
}

for (;;) {
if (cancelled) {
return;
}
if (inner != active.get()) {
retry = true;
break;
}
boolean d = inner.done;
R v;

if (!delayErrors) {
Throwable ex = errors.get();
if (ex != null) {
a.onError(errors.terminate());
return;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
errors.addThrowable(ex);
active.compareAndSet(inner, null);
if (!delayErrors) {
disposeInner();
s.dispose();
done = true;
} else {
inner.cancel();
}
v = null;
retry = true;
}
}
boolean empty = v == null;

boolean d = inner.done;
R v = q.poll();
boolean empty = v == null;
if (d && empty) {
active.compareAndSet(inner, null);
retry = true;
break;
}

if (d && empty) {
active.compareAndSet(inner, null);
retry = true;
break;
}
if (empty) {
break;
}

if (empty) {
break;
a.onNext(v);
}

a.onNext(v);
}

if (retry) {
continue;
if (retry) {
continue;
}
}
}

Expand Down Expand Up @@ -306,25 +328,49 @@ static final class SwitchMapInnerObserver<T, R> extends AtomicReference<Disposab
private static final long serialVersionUID = 3837284832786408377L;
final SwitchMapObserver<T, R> parent;
final long index;
final SpscLinkedArrayQueue<R> queue;

final int bufferSize;

volatile SimpleQueue<R> queue;

volatile boolean done;

SwitchMapInnerObserver(SwitchMapObserver<T, R> parent, long index, int bufferSize) {
this.parent = parent;
this.index = index;
this.queue = new SpscLinkedArrayQueue<R>(bufferSize);
this.bufferSize = bufferSize;
}

@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this, s);
if (DisposableHelper.setOnce(this, s)) {
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<R> qd = (QueueDisposable<R>) s;

int m = qd.requestFusion(QueueDisposable.ANY);
if (m == QueueDisposable.SYNC) {
queue = qd;
done = true;
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
queue = qd;
return;
}
}

queue = new SpscLinkedArrayQueue<R>(bufferSize);
}
}

@Override
public void onNext(R t) {
if (index == parent.unique) {
queue.offer(t);
if (t != null) {
queue.offer(t);
}
parent.drain();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.observers.DeferredScalarDisposable;

/**
* Wraps a Single and exposes it as an Observable.
Expand Down Expand Up @@ -48,14 +49,14 @@ public static <T> SingleObserver<T> create(Observer<? super T> downstream) {
}

static final class SingleToObservableObserver<T>
implements SingleObserver<T>, Disposable {

final Observer<? super T> actual;
extends DeferredScalarDisposable<T>
implements SingleObserver<T> {

private static final long serialVersionUID = 3786543492451018833L;
Disposable d;

SingleToObservableObserver(Observer<? super T> actual) {
this.actual = actual;
super(actual);
}

@Override
Expand All @@ -69,23 +70,19 @@ public void onSubscribe(Disposable d) {

@Override
public void onSuccess(T value) {
actual.onNext(value);
actual.onComplete();
complete(value);
}

@Override
public void onError(Throwable e) {
actual.onError(e);
error(e);
}

@Override
public void dispose() {
super.dispose();
d.dispose();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}
}
}
Loading