diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTake.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTake.java index 67c2a49c21..e15abaef84 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTake.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTake.java @@ -19,6 +19,7 @@ import io.reactivex.*; import io.reactivex.internal.subscriptions.*; +import io.reactivex.plugins.RxJavaPlugins; public final class FlowableTake extends AbstractFlowableWithUpstream { final long limit; @@ -75,6 +76,8 @@ public void onError(Throwable t) { done = true; subscription.cancel(); actual.onError(t); + } else { + RxJavaPlugins.onError(t); } } @Override diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableLimitTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableLimitTest.java index 471b0e818d..9150bab4ad 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableLimitTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableLimitTest.java @@ -207,4 +207,19 @@ public void run() { ts.assertResult(1, 2, 3, 4, 5); } } + + @Test + public void errorAfterLimitReached() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.error(new TestException()) + .limit(0) + .test() + .assertResult(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeTest.java index c19b2108b1..345282bfbd 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeTest.java @@ -14,9 +14,10 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.util.Arrays; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -28,6 +29,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -495,4 +497,19 @@ public void run() { ts.assertResult(1, 2); } } + + @Test + public void errorAfterLimitReached() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.error(new TestException()) + .take(0) + .test() + .assertResult(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java index f966cd8647..f1f5851fd3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java @@ -14,9 +14,10 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.util.Arrays; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.*; @@ -24,10 +25,13 @@ import org.mockito.InOrder; import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.Observer; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; @@ -389,4 +393,19 @@ public ObservableSource apply(Observable o) throws Exception { } }); } + + @Test + public void errorAfterLimitReached() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.error(new TestException()) + .take(0) + .test() + .assertResult(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } }