diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java index bc765d48b5..5416125366 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java @@ -13,16 +13,17 @@ package io.reactivex.internal.operators.mixed; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.*; +import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; -import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; @@ -56,7 +57,9 @@ public ObservableConcatMapCompletable(Observable source, @Override protected void subscribeActual(CompletableObserver s) { - source.subscribe(new ConcatMapCompletableObserver(s, mapper, errorMode, prefetch)); + if (!tryScalarSource(source, mapper, s)) { + source.subscribe(new ConcatMapCompletableObserver(s, mapper, errorMode, prefetch)); + } } static final class ConcatMapCompletableObserver @@ -77,7 +80,7 @@ static final class ConcatMapCompletableObserver final int prefetch; - final SimplePlainQueue queue; + SimpleQueue queue; Disposable upstream; @@ -96,20 +99,40 @@ static final class ConcatMapCompletableObserver this.prefetch = prefetch; this.errors = new AtomicThrowable(); this.inner = new ConcatMapInnerObserver(this); - this.queue = new SpscLinkedArrayQueue(prefetch); } @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(upstream, s)) { this.upstream = s; + if (s instanceof QueueDisposable) { + @SuppressWarnings("unchecked") + QueueDisposable qd = (QueueDisposable) s; + + int m = qd.requestFusion(QueueDisposable.ANY); + if (m == QueueDisposable.SYNC) { + queue = qd; + done = true; + downstream.onSubscribe(this); + drain(); + return; + } + if (m == QueueDisposable.ASYNC) { + queue = qd; + downstream.onSubscribe(this); + return; + } + } + queue = new SpscLinkedArrayQueue(prefetch); downstream.onSubscribe(this); } } @Override public void onNext(T t) { - queue.offer(t); + if (t != null) { + queue.offer(t); + } drain(); } @@ -187,6 +210,9 @@ void drain() { return; } + AtomicThrowable errors = this.errors; + ErrorMode errorMode = this.errorMode; + do { if (disposed) { queue.clear(); @@ -206,8 +232,24 @@ void drain() { } boolean d = done; - T v = queue.poll(); - boolean empty = v == null; + boolean empty = true; + CompletableSource cs = null; + try { + T v = queue.poll(); + if (v != null) { + cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource"); + empty = false; + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + disposed = true; + queue.clear(); + upstream.dispose(); + errors.addThrowable(ex); + ex = errors.terminate(); + downstream.onError(ex); + return; + } if (d && empty) { disposed = true; @@ -221,21 +263,6 @@ void drain() { } if (!empty) { - - CompletableSource cs; - - try { - cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - disposed = true; - queue.clear(); - upstream.dispose(); - errors.addThrowable(ex); - ex = errors.terminate(); - downstream.onError(ex); - return; - } active = true; cs.subscribe(inner); } @@ -274,4 +301,30 @@ void dispose() { } } } + + static boolean tryScalarSource(Observable source, Function mapper, CompletableObserver observer) { + if (source instanceof Callable) { + @SuppressWarnings("unchecked") + Callable call = (Callable) source; + CompletableSource cs = null; + try { + T item = call.call(); + if (item != null) { + cs = ObjectHelper.requireNonNull(mapper.apply(item), "The mapper returned a null CompletableSource"); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptyDisposable.error(ex, observer); + return true; + } + + if (cs == null) { + EmptyDisposable.complete(observer); + } else { + cs.subscribe(observer); + } + return true; + } + return false; + } } diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 2a4ac1ee58..9e31194f0f 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -2827,4 +2827,99 @@ public static void checkInvalidParallelSubscribers(ParallelFlowable sourc tss[i].assertFailure(IllegalArgumentException.class); } } + + public static Observable rejectObservableFusion() { + return new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(new QueueDisposable() { + + @Override + public int requestFusion(int mode) { + return 0; + } + + @Override + public boolean offer(T value) { + throw new IllegalStateException(); + } + + @Override + public boolean offer(T v1, T v2) { + throw new IllegalStateException(); + } + + @Override + public T poll() throws Exception { + return null; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public void clear() { + } + + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return false; + } + }); + } + }; + } + + public static Flowable rejectFlowableFusion() { + return new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new QueueSubscription() { + + @Override + public int requestFusion(int mode) { + return 0; + } + + @Override + public boolean offer(T value) { + throw new IllegalStateException(); + } + + @Override + public boolean offer(T v1, T v2) { + throw new IllegalStateException(); + } + + @Override + public T poll() throws Exception { + return null; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public void clear() { + } + + @Override + public void cancel() { + } + + @Override + public void request(long n) { + } + }); + } + }; + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletableTest.java index 50c79bb01f..6f4ff458ab 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletableTest.java @@ -24,6 +24,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.*; @@ -112,6 +113,19 @@ public CompletableSource apply(Integer v) throws Exception { .assertFailure(TestException.class); } + @Test + public void mapperCrashHidden() { + Observable.just(1).hide() + .concatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + @Test public void immediateError() { PublishSubject ps = PublishSubject.create(); @@ -359,4 +373,62 @@ public void doneButNotEmpty() { to.assertResult(); } + + @Test + public void asyncFused() { + final PublishSubject ps = PublishSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); + + final TestObserver to = ps.observeOn(ImmediateThinScheduler.INSTANCE) + .concatMapCompletable( + Functions.justFunction(cs) + ) + .test(); + + ps.onNext(1); + ps.onComplete(); + + cs.onComplete(); + + to.assertResult(); + } + + @Test + public void fusionRejected() { + final CompletableSubject cs = CompletableSubject.create(); + + TestHelper.rejectObservableFusion() + .concatMapCompletable( + Functions.justFunction(cs) + ) + .test() + .assertEmpty(); + } + + @Test + public void emptyScalarSource() { + final CompletableSubject cs = CompletableSubject.create(); + + Observable.empty() + .concatMapCompletable(Functions.justFunction(cs)) + .test() + .assertResult(); + } + + @Test + public void justScalarSource() { + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = Observable.just(1) + .concatMapCompletable(Functions.justFunction(cs)) + .test(); + + to.assertEmpty(); + + assertTrue(cs.hasObservers()); + + cs.onComplete(); + + to.assertResult(); + } }