-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Flowable.blockingSubscribe is unbounded and can lead to OOME #6026
Fix Flowable.blockingSubscribe is unbounded and can lead to OOME #6026
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #6026 +/- ##
===========================================
- Coverage 98.3% 98.27% -0.04%
- Complexity 6175 6193 +18
===========================================
Files 665 666 +1
Lines 44729 44801 +72
Branches 6205 6206 +1
===========================================
+ Hits 43973 44030 +57
- Misses 222 233 +11
- Partials 534 538 +4
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some small missing pieces and one logical error.
* </dl> | ||
* @param onNext the callback action for each source value | ||
* @param bufferSize the size of the buffer | ||
* @since 2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please designate as 2.1.15 - experimental
here and to the other methods.
*/ | ||
@BackpressureSupport(BackpressureKind.FULL) | ||
@SchedulerSupport(SchedulerSupport.NONE) | ||
public final void blockingSubscribe(Consumer<? super T> onNext, int bufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add @Experimental
annotation to these new methods.
final Action onComplete; | ||
final Consumer<? super Subscription> onSubscribe; | ||
|
||
private int bufferSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please no private instance fields.
final Consumer<? super Subscription> onSubscribe; | ||
|
||
private int bufferSize; | ||
private final AtomicInteger internalBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this atomic?
if (internalBuffer.getAndIncrement() < bufferSize) { | ||
onNext.accept(t); | ||
} | ||
if (internalBuffer.get() == bufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would you end the flow when the bufferSize number of elements have been received?
|
||
assertEquals(Arrays.asList(1, 2, 3, 100), list); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add an unit test that transfers 1 million items.
|
||
@Override | ||
public void onComplete() { | ||
if (get() != SubscriptionHelper.CANCELLED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are going to check the value of get()
may as well do a compareAndSet loop for concurrent cancellation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't. That level of precision it is not really worth it here.
onNext.accept(t); | ||
|
||
int c = consumed + 1; | ||
if (c == bufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use a low-watermark than requesting only when the entire buffer has been consumed. Define a final int limit
field, in the constructor set it as this.limit = bufferSize - (bufferSize >> 2);
, then compare and request as limit
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One algorithmic adjustment and good to go.
Also it would be great if you copied the unit tests that crash the callbacks so that the coverage on the new class gets very high. |
62a9816
to
74e7cde
Compare
I created a |
* @param bufferSize the number of elements to prefetch from the source Publisher | ||
* @param <T> the value type | ||
*/ | ||
@SuppressWarnings("ResultOfMethodCallIgnored") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed. It never appears anywhere else in RxJava. Please remove.
this.onError = onError; | ||
this.onComplete = onComplete; | ||
this.onSubscribe = onSubscribe; | ||
this.bufferSize = ((Functions.BoundedConsumer) onSubscribe).getBufferSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this cast needed? Can't you could simply pass the bufferSize into the constructor?
}, new Consumer<Throwable>() { | ||
@Override | ||
public void accept(Throwable throwable) throws Exception { | ||
throw new TestException("Inner"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Place a breakpoint here and debug the test. It should reveal why the code coverage doesn't seem to execute that code in BoundedSubscriber
.
@@ -1,11 +1,11 @@ | |||
/** | |||
* Copyright (c) 2016-present, RxJava Contributors. | |||
* | |||
* <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't change the license headers.
@@ -0,0 +1,388 @@ | |||
/** | |||
* Copyright (c) 2016-present, RxJava Contributors. | |||
* <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't change the license headers.
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
import org.testng.annotations.Test; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use org.junit.Test
as the TestNG tests are not part of the code coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix headers and use org.junit.Test please.
db252ee
to
8bd5b13
Compare
Create and bound new
blockingSubscribe
overloads tobufferSize
.bufferSize
boundedConsumer
BoundedSubsciber
Close: #5988