-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs #1955
Conversation
This is complicated stuff, maybe an (internal) helper class should be created to help with future producer-backpressure-consumer management. |
reimplemented Buffer and Block strategies with it.
I hope the new |
Wow, this is some non-trivial stuff. Thanks for tackling this. It's going to take me a bit to grok it. |
Anyone else have the time and interest to also do a code review on the concurrency code in this? I'd appreciate more eyes than my own. |
|
||
private AtomicBoolean saturated = new AtomicBoolean(false); | ||
// if child unsubscribes it should unsubscribe the parent, but not the other way around |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this use case why would we ever be unsubscribing early? We can emit an onError
, but I don't see the need to decouple the subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertCapacity
calls unsubscribe and would disrupt downstream.
I think this code is good to merge. I've asked a variety of clarifying questions that I want to review with you before I merge and release. |
@@ -0,0 +1,241 @@ | |||
/* | |||
* Copyright 2011 David Karnok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong copyright, should be Netflix
Re-trigger travis. |
} | ||
@Override | ||
public Object poll() { | ||
return queue.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add if (capacity != null) capacity.incrementAndGet();
here.
Could you change CRLFs to LFs in BackpressureDrainManager? |
* @return true if a terminal state has been reached | ||
*/ | ||
public final boolean isTerminated() { | ||
return terminated; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isTerminated
, terminate
, terminate(Throwable)
are not used. Could you explain why adding them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a complex class. So I prefer to keep it as simple as possible.
We can't know. The consumer way want to terminate after a certain item has been accepted.
This is a base class, some operators may need to do something after termination and before calling drain.
This is an internal base class to help implement backpressure over an abstract queue. The unavoidable complexity is in |
@Override | ||
public Object poll() { | ||
Object value = queue.poll(); | ||
if (capacity != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that I missed a case. If we poll null
, we should not increment capacity
.
LGTM now. |
@benjchristensen your review would be welcome. |
OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs
This is quite a complex operator with lots of cases.
Properties:
3.a) If more was requested but nothing is available or nothing was requested and something is available: quit and let either the onNext or request do the subsequent drain.
3.b) If elements and termination was produced but not requested: quit and let the request do the drain
3.c) If termination was requested and no elements produced: loop , emit terminal event and quit.
In table form: