diff --git a/src/main/java/io/reactivex/rxjava3/exceptions/MissingBackpressureException.java b/src/main/java/io/reactivex/rxjava3/exceptions/MissingBackpressureException.java
index a817e6fb93..f0a173ba59 100644
--- a/src/main/java/io/reactivex/rxjava3/exceptions/MissingBackpressureException.java
+++ b/src/main/java/io/reactivex/rxjava3/exceptions/MissingBackpressureException.java
@@ -20,6 +20,15 @@ public final class MissingBackpressureException extends RuntimeException {
private static final long serialVersionUID = 8517344746016032542L;
+ /**
+ * The default error message.
+ *
+ * This can happen if the downstream doesn't call {@link org.reactivestreams.Subscription#request(long)}
+ * in time or at all.
+ * @since 3.1.6
+ */
+ public static final String DEFAULT_MESSAGE = "Could not emit value due to lack of requests";
+
/**
* Constructs a MissingBackpressureException without message or cause.
*/
@@ -35,4 +44,13 @@ public MissingBackpressureException(String message) {
super(message);
}
+ /**
+ * Constructs a new {@code MissingBackpressureException} with the
+ * default message {@value #DEFAULT_MESSAGE}.
+ * @return the new {@code MissingBackpressureException} instance.
+ * @since 3.1.6
+ */
+ public static MissingBackpressureException createDefault() {
+ return new MissingBackpressureException(DEFAULT_MESSAGE);
+ }
}
diff --git a/src/main/java/io/reactivex/rxjava3/exceptions/QueueOverflowException.java b/src/main/java/io/reactivex/rxjava3/exceptions/QueueOverflowException.java
new file mode 100644
index 0000000000..bdd8a25e6f
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/exceptions/QueueOverflowException.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.exceptions;
+
+/**
+ * Indicates an overflow happened because the upstream disregarded backpressure completely or
+ * {@link org.reactivestreams.Subscriber#onNext(Object)} was called concurrently from multiple threads
+ * without synchronization. Rarely, it is an indication of bugs inside an operator.
+ * @since 3.1.6
+ */
+public final class QueueOverflowException extends RuntimeException {
+
+ private static final long serialVersionUID = 8517344746016032542L;
+
+ /**
+ * The message for queue overflows.
+ *
+ * This can happen if the upstream disregards backpressure completely or calls
+ * {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
+ * without synchronization. Rarely, it is an indication of bugs inside an operator.
+ */
+ private static final String DEFAULT_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in an operator";
+
+ /**
+ * Constructs a QueueOverflowException with the default message.
+ */
+ public QueueOverflowException() {
+ this(DEFAULT_MESSAGE);
+ }
+
+ /**
+ * Constructs a QueueOverflowException with the given message but no cause.
+ * @param message the error message
+ */
+ public QueueOverflowException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java
index 35292bb17c..48ea8b6e5f 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java
@@ -174,7 +174,7 @@ public void onNext(T t) {
if (sourceMode != QueueFuseable.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
- onError(new MissingBackpressureException("Queue full?!"));
+ onError(new QueueOverflowException());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java
index d83ce2d398..00a53f3ad7 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java
@@ -120,7 +120,7 @@ public void onSubscribe(Subscription s) {
public void onNext(CompletableSource t) {
if (sourceFused == QueueSubscription.NONE) {
if (!queue.offer(t)) {
- onError(new MissingBackpressureException());
+ onError(new QueueOverflowException());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
index a25032e33b..36436d1370 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
@@ -140,7 +140,7 @@ public void onNext(T t) {
if (!queue.offer(t)) {
SubscriptionHelper.cancel(this);
- onError(new MissingBackpressureException("Queue full?!"));
+ onError(new QueueOverflowException());
} else {
signalConsumer();
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMap.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMap.java
index bf0f0ff163..5d1b6e4c33 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMap.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMap.java
@@ -19,7 +19,7 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.internal.util.*;
@@ -152,7 +152,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
- onError(new IllegalStateException("Queue full?!"));
+ onError(new QueueOverflowException());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEager.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEager.java
index af33ec9281..dc423d89fc 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEager.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEager.java
@@ -200,7 +200,7 @@ public void innerNext(InnerQueuedSubscriber inner, R value) {
drain();
} else {
inner.cancel();
- innerError(inner, new MissingBackpressureException());
+ innerError(inner, MissingBackpressureException.createDefault());
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapScheduler.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapScheduler.java
index c53ebe0867..707d10a19e 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapScheduler.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapScheduler.java
@@ -19,7 +19,7 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
@@ -151,7 +151,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
- onError(new IllegalStateException("Queue full?!"));
+ onError(new QueueOverflowException());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCreate.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCreate.java
index 673fc4bc22..1e87c4b7e6 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCreate.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCreate.java
@@ -442,7 +442,7 @@ static final class ErrorAsyncEmitter extends NoOverflowBaseAsyncEmitter {
@Override
void onOverflow() {
- onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
+ onError(new MissingBackpressureException("create: " + MissingBackpressureException.DEFAULT_MESSAGE));
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounce.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounce.java
index b1066c0133..51838a1864 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounce.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounce.java
@@ -148,7 +148,7 @@ void emit(long idx, T value) {
BackpressureHelper.produced(this, 1);
} else {
cancel();
- downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
+ downstream.onError(MissingBackpressureException.createDefault());
}
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java
index 35bfaec397..30b398ab7b 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java
@@ -159,7 +159,7 @@ void emit(long idx, T t, DebounceEmitter emitter) {
emitter.dispose();
} else {
cancel();
- downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
+ downstream.onError(MissingBackpressureException.createDefault());
}
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java
index 78c9a9e542..c250d3b165 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java
@@ -243,7 +243,7 @@ void tryEmitScalar(U value) {
q = getMainQueue();
}
if (!q.offer(value)) {
- onError(new MissingBackpressureException("Scalar queue full?!"));
+ onError(new QueueOverflowException());
}
}
if (decrementAndGet() == 0) {
@@ -252,7 +252,7 @@ void tryEmitScalar(U value) {
} else {
SimpleQueue q = getMainQueue();
if (!q.offer(value)) {
- onError(new MissingBackpressureException("Scalar queue full?!"));
+ onError(new QueueOverflowException());
return;
}
if (getAndIncrement() != 0) {
@@ -278,7 +278,7 @@ void tryEmit(U value, InnerSubscriber inner) {
inner.queue = q;
}
if (!q.offer(value)) {
- onError(new MissingBackpressureException("Inner queue full?!"));
+ onError(new QueueOverflowException());
}
}
if (decrementAndGet() == 0) {
@@ -291,7 +291,7 @@ void tryEmit(U value, InnerSubscriber inner) {
inner.queue = q;
}
if (!q.offer(value)) {
- onError(new MissingBackpressureException("Inner queue full?!"));
+ onError(new QueueOverflowException());
return;
}
if (getAndIncrement() != 0) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java
index 019a132329..0b9164e0cd 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java
@@ -180,7 +180,7 @@ public void onNext(T t) {
return;
}
if (fusionMode == NONE && !queue.offer(t)) {
- onError(new MissingBackpressureException("Queue is full?!"));
+ onError(new QueueOverflowException());
return;
}
drain();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java
index 344b029018..50e641ffc1 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java
@@ -168,7 +168,7 @@ public void onNext(T t) {
if (emittedGroups != get()) {
downstream.onNext(group);
} else {
- MissingBackpressureException mbe = new MissingBackpressureException(groupHangWarning(emittedGroups));
+ MissingBackpressureException mbe = groupHangWarning(emittedGroups);
mbe.initCause(ex);
onError(mbe);
return;
@@ -194,13 +194,13 @@ public void onNext(T t) {
}
} else {
upstream.cancel();
- onError(new MissingBackpressureException(groupHangWarning(emittedGroups)));
+ onError(groupHangWarning(emittedGroups));
}
}
}
- static String groupHangWarning(long n) {
- return "Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.";
+ static MissingBackpressureException groupHangWarning(long n) {
+ return new MissingBackpressureException("Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.");
}
@Override
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupJoin.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupJoin.java
index 3021b9ddc5..c9619c48b7 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupJoin.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupJoin.java
@@ -276,7 +276,7 @@ void drain() {
a.onNext(w);
BackpressureHelper.produced(requested, 1);
} else {
- fail(new MissingBackpressureException("Could not emit value due to lack of requests"), a, q);
+ fail(MissingBackpressureException.createDefault(), a, q);
return;
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInterval.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInterval.java
index 536e36c2ea..98fbe6dee1 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInterval.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInterval.java
@@ -93,7 +93,7 @@ public void run() {
downstream.onNext(count++);
BackpressureHelper.produced(this, 1);
} else {
- downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
+ downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableIntervalRange.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableIntervalRange.java
index 7c556acdac..cfa605882d 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableIntervalRange.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableIntervalRange.java
@@ -114,7 +114,7 @@ public void run() {
decrementAndGet();
}
} else {
- downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
+ downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableJoin.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableJoin.java
index 2c269cee4d..9cda3d457d 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableJoin.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableJoin.java
@@ -260,7 +260,7 @@ void drain() {
e++;
} else {
- ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
+ ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
q.clear();
cancelAll();
errorAll(a);
@@ -321,7 +321,7 @@ else if (mode == RIGHT_VALUE) {
e++;
} else {
- ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
+ ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
q.clear();
cancelAll();
errorAll(a);
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java
index 05dd11b816..576debbf2b 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java
@@ -113,7 +113,7 @@ public final void onNext(T t) {
if (!queue.offer(t)) {
upstream.cancel();
- error = new MissingBackpressureException("Queue is full?!");
+ error = new QueueOverflowException();
done = true;
}
trySchedule();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java
index 9b0b8a7b20..29a207fba5 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java
@@ -140,7 +140,7 @@ public void onNext(T t) {
}
} else if (callError) {
upstream.cancel();
- onError(new MissingBackpressureException());
+ onError(MissingBackpressureException.createDefault());
} else {
drain();
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureError.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureError.java
index 03e668b2ba..acaf165069 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureError.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureError.java
@@ -66,7 +66,7 @@ public void onNext(T t) {
BackpressureHelper.produced(this, 1);
} else {
upstream.cancel();
- onError(new MissingBackpressureException("could not emit value due to lack of requests"));
+ onError(MissingBackpressureException.createDefault());
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublish.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublish.java
index 87f43b762b..b05bc8b748 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublish.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublish.java
@@ -226,7 +226,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
// we expect upstream to honor backpressure requests
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
- onError(new MissingBackpressureException("Prefetch queue is full?!"));
+ onError(new QueueOverflowException());
return;
}
// since many things can happen concurrently, we have a common dispatch
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java
index c97b8b6bbf..08d0e40058 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java
@@ -212,7 +212,7 @@ public void onNext(T t) {
}
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
upstream.get().cancel();
- onError(new MissingBackpressureException());
+ onError(MissingBackpressureException.createDefault());
return;
}
drain();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSamplePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSamplePublisher.java
index 778f3faf21..19474d2cc0 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSamplePublisher.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSamplePublisher.java
@@ -129,7 +129,7 @@ void emit() {
BackpressureHelper.produced(requested, 1);
} else {
cancel();
- downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
+ downstream.onError(MissingBackpressureException.createDefault());
}
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java
index e17ad04a66..67a0412c02 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java
@@ -125,7 +125,7 @@ void emit() {
BackpressureHelper.produced(requested, 1);
} else {
cancel();
- downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
+ downstream.onError(MissingBackpressureException.createDefault());
}
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSequenceEqual.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSequenceEqual.java
index 0ba38698d6..8dadbe0161 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSequenceEqual.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSequenceEqual.java
@@ -300,7 +300,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (sourceMode == QueueSubscription.NONE) {
if (!queue.offer(t)) {
- onError(new MissingBackpressureException());
+ onError(MissingBackpressureException.createDefault());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchMap.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchMap.java
index 1c820570dc..4d4521c655 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchMap.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchMap.java
@@ -381,7 +381,7 @@ public void onNext(R t) {
SwitchMapSubscriber p = parent;
if (index == p.unique) {
if (fusionMode == QueueSubscription.NONE && !queue.offer(t)) {
- onError(new MissingBackpressureException("Queue full?!"));
+ onError(new QueueOverflowException());
return;
}
p.drain();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java
index c12399e1d1..7ddcbf792a 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java
@@ -96,7 +96,7 @@ public void onNext(T t) {
} else {
done = true;
cancel();
- downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
+ downstream.onError(MissingBackpressureException.createDefault());
return;
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimer.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimer.java
index dd33daea0f..db45fe2e98 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimer.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimer.java
@@ -78,7 +78,7 @@ public void run() {
downstream.onComplete();
} else {
lazySet(EmptyDisposable.INSTANCE);
- downstream.onError(new MissingBackpressureException("Can't deliver value due to lack of requests"));
+ downstream.onError(MissingBackpressureException.createDefault());
}
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java
index 1dcfd4e529..2aad69dcb5 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java
@@ -248,7 +248,7 @@ void drain() {
} else {
SubscriptionHelper.cancel(upstream);
boundarySubscriber.dispose();
- errors.tryAddThrowableOrReport(new MissingBackpressureException("Could not deliver a window due to lack of requests"));
+ errors.tryAddThrowableOrReport(MissingBackpressureException.createDefault());
done = true;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java
index 394315cd9b..d6bffae2e1 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java
@@ -272,7 +272,7 @@ void drain() {
upstream.cancel();
startSubscriber.cancel();
resources.dispose();
- error.tryAddThrowableOrReport(new MissingBackpressureException(FlowableWindowTimed.missingBackpressureMessage(emitted)));
+ error.tryAddThrowableOrReport(FlowableWindowTimed.missingBackpressureMessage(emitted));
upstreamDone = true;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java
index 23ef916fef..39e7de59ad 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java
@@ -213,7 +213,7 @@ void createFirstWindow() {
upstream.request(Long.MAX_VALUE);
} else {
upstream.cancel();
- downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted)));
+ downstream.onError(missingBackpressureMessage(emitted));
cleanupResources();
upstreamCancelled = true;
@@ -282,7 +282,7 @@ else if (!isEmpty) {
cleanupResources();
upstreamCancelled = true;
- downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted)));
+ downstream.onError(missingBackpressureMessage(emitted));
} else {
emitted++;
@@ -386,7 +386,7 @@ void createFirstWindow() {
upstream.request(Long.MAX_VALUE);
} else {
upstream.cancel();
- downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted)));
+ downstream.onError(missingBackpressureMessage(emitted));
cleanupResources();
upstreamCancelled = true;
@@ -499,7 +499,7 @@ UnicastProcessor createNewWindow(UnicastProcessor window) {
cleanupResources();
upstreamCancelled = true;
- downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted)));
+ downstream.onError(missingBackpressureMessage(emitted));
} else {
this.emitted = ++emitted;
@@ -584,7 +584,7 @@ void createFirstWindow() {
upstream.request(Long.MAX_VALUE);
} else {
upstream.cancel();
- downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted)));
+ downstream.onError(missingBackpressureMessage(emitted));
cleanupResources();
upstreamCancelled = true;
@@ -654,7 +654,7 @@ void drain() {
}
} else {
upstream.cancel();
- Throwable ex = new MissingBackpressureException(missingBackpressureMessage(emitted));
+ Throwable ex = missingBackpressureMessage(emitted);
for (UnicastProcessor window : windows) {
window.onError(ex);
}
@@ -717,8 +717,8 @@ public void run() {
}
}
- static String missingBackpressureMessage(long index) {
- return "Unable to emit the next window (#" + index + ") due to lack of requests. Please make sure the downstream is ready to consume windows.";
+ static MissingBackpressureException missingBackpressureMessage(long index) {
+ return new MissingBackpressureException("Unable to emit the next window (#" + index + ") due to lack of requests. Please make sure the downstream is ready to consume windows.");
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java
index bd701de40c..03e4c568d6 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java
@@ -18,7 +18,7 @@
import org.reactivestreams.Subscription;
import io.reactivex.rxjava3.core.FlowableSubscriber;
-import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
+import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.operators.QueueFuseable;
@@ -99,7 +99,7 @@ public final void onNext(T t) {
if (t != null) {
if (!queue.offer(t)) {
upstream.cancel();
- onError(new MissingBackpressureException("queue full?!"));
+ onError(new QueueOverflowException());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java
index 67c2ed4852..eb57bccea5 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java
@@ -204,7 +204,7 @@ public void onNext(T t) {
if (sourceMode == QueueSubscription.NONE) {
if (!queue.offer(t)) {
upstream.cancel();
- onError(new MissingBackpressureException("Queue is full?"));
+ onError(new QueueOverflowException());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java
index 3152aaeab8..2852fda8c7 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java
@@ -18,7 +18,7 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
+import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.operators.SimplePlainQueue;
@@ -153,7 +153,7 @@ public void onNext(JoinInnerSubscriber inner, T value) {
if (!q.offer(value)) {
cancelAll();
- Throwable mbe = new MissingBackpressureException("Queue full?!");
+ Throwable mbe = new QueueOverflowException();
if (errors.compareAndSet(null, mbe)) {
downstream.onError(mbe);
} else {
@@ -170,7 +170,7 @@ public void onNext(JoinInnerSubscriber inner, T value) {
if (!q.offer(value)) {
cancelAll();
- onError(new MissingBackpressureException("Queue full?!"));
+ onError(new QueueOverflowException());
return;
}
@@ -333,7 +333,7 @@ void onNext(JoinInnerSubscriber inner, T value) {
if (!q.offer(value)) {
inner.cancel();
- errors.tryAddThrowableOrReport(new MissingBackpressureException("Queue full?!"));
+ errors.tryAddThrowableOrReport(new QueueOverflowException());
done.decrementAndGet();
drainLoop();
return;
@@ -347,7 +347,7 @@ void onNext(JoinInnerSubscriber inner, T value) {
if (!q.offer(value)) {
inner.cancel();
- errors.tryAddThrowableOrReport(new MissingBackpressureException("Queue full?!"));
+ errors.tryAddThrowableOrReport(new QueueOverflowException());
done.decrementAndGet();
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java
index e9c92a0c24..22f822db34 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java
@@ -19,7 +19,7 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
-import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
+import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.internal.schedulers.SchedulerMultiWorkerSupport;
import io.reactivex.rxjava3.internal.schedulers.SchedulerMultiWorkerSupport.WorkerCallback;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
@@ -148,7 +148,7 @@ public final void onNext(T t) {
}
if (!queue.offer(t)) {
upstream.cancel();
- onError(new MissingBackpressureException("Queue is full?!"));
+ onError(new QueueOverflowException());
return;
}
schedule();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/subscribers/QueueDrainSubscriber.java b/src/main/java/io/reactivex/rxjava3/internal/subscribers/QueueDrainSubscriber.java
index 145882e418..8fb7f55295 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/subscribers/QueueDrainSubscriber.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/subscribers/QueueDrainSubscriber.java
@@ -84,7 +84,7 @@ protected final void fastPathEmitMax(U value, boolean delayError, Disposable dis
}
} else {
dispose.dispose();
- s.onError(new MissingBackpressureException("Could not emit buffer due to lack of requests"));
+ s.onError(MissingBackpressureException.createDefault());
return;
}
} else {
@@ -118,7 +118,7 @@ protected final void fastPathOrderedEmitMax(U value, boolean delayError, Disposa
} else {
cancelled = true;
dispose.dispose();
- s.onError(new MissingBackpressureException("Could not emit buffer due to lack of requests"));
+ s.onError(MissingBackpressureException.createDefault());
return;
}
} else {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java b/src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java
index a9a2ab4901..fa0c500892 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java
@@ -78,7 +78,7 @@ public static void drainMaxLoop(SimplePlainQueue q, Subscriber super
if (dispose != null) {
dispose.dispose();
}
- a.onError(new MissingBackpressureException("Could not emit value due to lack of requests."));
+ a.onError(MissingBackpressureException.createDefault());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java
index c1115bb478..2949253b31 100644
--- a/src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java
+++ b/src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java
@@ -402,10 +402,13 @@ static boolean isBug(Throwable error) {
return true;
}
// the sender didn't honor the request amount
- // it's either due to an operator bug or concurrent onNext
if (error instanceof MissingBackpressureException) {
return true;
}
+ // it's either due to an operator bug or concurrent onNext
+ if (error instanceof QueueOverflowException) {
+ return true;
+ }
// general protocol violations
// it's either due to an operator bug or concurrent onNext
if (error instanceof IllegalStateException) {
diff --git a/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
index eac40a8a75..2e5117ca56 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
@@ -590,7 +590,7 @@ public boolean test(Object o) {
return false;
}
cancel();
- downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
+ downstream.onError(MissingBackpressureException.createDefault());
return true;
}
diff --git a/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java
index 766e75145d..14a7a55ee2 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java
@@ -295,7 +295,7 @@ public void onNext(@NonNull T t) {
ExceptionHelper.nullCheck(t, "onNext called with a null value.");
if (!queue.offer(t)) {
SubscriptionHelper.cancel(upstream);
- onError(new MissingBackpressureException());
+ onError(MissingBackpressureException.createDefault());
return;
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
index db6787880f..73507844d2 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
@@ -362,7 +362,7 @@ public void onNext(T t) {
BackpressureHelper.producedCancel(this, 1);
} else {
cancel();
- downstream.onError(new MissingBackpressureException("Could not emit value due to lack of requests"));
+ downstream.onError(MissingBackpressureException.createDefault());
}
}
diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java
index 07ae9c0f27..975ef5c149 100644
--- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java
+++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java
@@ -24,7 +24,7 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
+import io.reactivex.rxjava3.exceptions.QueueOverflowException;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
@@ -475,7 +475,7 @@ public Integer apply(Integer v) {
int vc = ts.values().size();
assertTrue("10 < " + vc, vc <= 10);
- ts.assertError(MissingBackpressureException.class);
+ ts.assertError(QueueOverflowException.class);
}
@Test
diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStreamTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStreamTest.java
index 31e326a3a2..c865d35a52 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStreamTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStreamTest.java
@@ -274,7 +274,7 @@ protected void subscribeActual(Subscriber super Integer> s) {
}
.flatMapStream(v -> Stream.of(1, 2), 1)
.test(0)
- .assertFailure(MissingBackpressureException.class);
+ .assertFailure(QueueOverflowException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class);
});
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatTest.java
index f13cb91558..11bc1a9b06 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatTest.java
@@ -52,9 +52,9 @@ public void subscribe(Subscriber super Completable> s) {
}), 1
)
.test()
- .assertFailure(MissingBackpressureException.class);
+ .assertFailure(QueueOverflowException.class);
- TestHelper.assertError(errors, 0, MissingBackpressureException.class);
+ TestHelper.assertError(errors, 0, QueueOverflowException.class);
} finally {
RxJavaPlugins.reset();
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java
index a815f738fa..e5ad7806d3 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java
@@ -152,7 +152,7 @@ public void emptyThrowsNoSuch() {
it.next();
}
- @Test(expected = MissingBackpressureException.class)
+ @Test(expected = QueueOverflowException.class)
public void overflowQueue() {
Iterator it = new Flowable() {
@Override
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java
index 0f7c772566..027157ce02 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java
@@ -624,7 +624,7 @@ protected void subscribeActual(Subscriber super Integer> s) {
}
.concatMap(Functions.justFunction(Flowable.just(2)), 8, ImmediateThinScheduler.INSTANCE)
.test(0L)
- .assertFailure(IllegalStateException.class);
+ .assertFailure(QueueOverflowException.class);
}
@Test
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java
index 986dddad01..5a0634ffb6 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java
@@ -1291,7 +1291,7 @@ protected void subscribeActual(Subscriber super Integer> s) {
}
.concatMap(Functions.justFunction(Flowable.just(2)), 8)
.test(0L)
- .assertFailure(IllegalStateException.class);
+ .assertFailure(QueueOverflowException.class);
}
@Test
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java
index b7f574b725..be94c5e57c 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java
@@ -1391,7 +1391,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
}
.flatMap(v -> Flowable.just(v), 1)
.test(0L)
- .assertFailure(MissingBackpressureException.class);
+ .assertFailure(QueueOverflowException.class);
}
@Test
@@ -1413,7 +1413,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
}
})
.test()
- .assertFailure(MissingBackpressureException.class, 1);
+ .assertFailure(QueueOverflowException.class, 1);
}
@Test
@@ -1430,7 +1430,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
}
}, false, 1, 1)
.test(0L)
- .assertFailure(MissingBackpressureException.class);
+ .assertFailure(QueueOverflowException.class);
}
@Test
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterableTest.java
index 27e4ca128d..2cb9392e19 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterableTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterableTest.java
@@ -814,7 +814,7 @@ protected void subscribeActual(Subscriber super Integer> s) {
}
.flatMapIterable(Functions.justFunction(Arrays.asList(1)), 1)
.test(0L)
- .assertFailure(MissingBackpressureException.class);
+ .assertFailure(QueueOverflowException.class);
}
@Test
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSourceTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSourceTest.java
index 03275dd2e9..6506c010ea 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSourceTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSourceTest.java
@@ -137,7 +137,7 @@ public void normalError() {
ts.assertError(MissingBackpressureException.class);
ts.assertNotComplete();
- Assert.assertEquals("create: could not emit value due to lack of requests", ts.errors().get(0).getMessage());
+ Assert.assertEquals("create: " + MissingBackpressureException.DEFAULT_MESSAGE, ts.errors().get(0).getMessage());
}
@Test
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java
index a4839b4616..e4e3d6f3fe 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java
@@ -579,13 +579,13 @@ public void onNext(Integer t) {
assertEquals(1, errors.size());
System.out.println("Errors: " + errors);
Throwable t = errors.get(0);
- if (t instanceof MissingBackpressureException) {
+ if (t instanceof QueueOverflowException) {
// success, we expect this
} else {
- if (t.getCause() instanceof MissingBackpressureException) {
+ if (t.getCause() instanceof QueueOverflowException) {
// this is also okay
} else {
- fail("Expecting MissingBackpressureException");
+ fail("Expecting QueueOverflowException");
}
}
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java
index e27d5dffcd..5511fd0123 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java
@@ -189,7 +189,7 @@ public void overflowMissingBackpressureException() {
ts.assertError(MissingBackpressureException.class);
ts.assertNotComplete();
- Assert.assertEquals("Could not emit value due to lack of requests",
+ Assert.assertEquals(MissingBackpressureException.DEFAULT_MESSAGE,
ts.errors().get(0).getMessage());
Assert.assertFalse("Source has subscribers?", pp.hasSubscribers());
}
@@ -212,7 +212,7 @@ public void overflowMissingBackpressureExceptionDelayed() {
ts.assertError(MissingBackpressureException.class);
ts.assertNotComplete();
- Assert.assertEquals("Could not emit value due to lack of requests", ts.errors().get(0).getMessage());
+ Assert.assertEquals(MissingBackpressureException.DEFAULT_MESSAGE, ts.errors().get(0).getMessage());
Assert.assertFalse("Source has subscribers?", pp.hasSubscribers());
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java
index ab94d5b2e6..c3355e9a38 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java
@@ -905,9 +905,9 @@ public void subscribe(FlowableEmitter