-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs #1955
Merged
Merged
Changes from 2 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
c38a780
Fixed race & late termination condition.
f6fd5de
Factored out the backpressure management into an experimental class and
a82eff9
Fixed potential request value overflow.
akarnokd c653ae3
Fixed file comment, larger timeout for a test
akarnokd a3b454d
Fixed line delimiters.
akarnokd 34bf40a
Added capacity increase on poll.
akarnokd 047cc28
Added value null check
akarnokd File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,6 @@ | |
*/ | ||
package rx.internal.operators; | ||
|
||
import java.util.Queue; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
@@ -25,11 +24,10 @@ | |
import rx.Subscriber; | ||
import rx.exceptions.MissingBackpressureException; | ||
import rx.functions.Action0; | ||
import rx.internal.util.BackpressureDrainManager; | ||
|
||
public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> { | ||
|
||
private final NotificationLite<T> on = NotificationLite.instance(); | ||
|
||
private final Long capacity; | ||
private final Action0 onOverflow; | ||
|
||
|
@@ -52,122 +50,110 @@ public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) { | |
|
||
@Override | ||
public Subscriber<? super T> call(final Subscriber<? super T> child) { | ||
// TODO get a different queue implementation | ||
final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>(); | ||
final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity); | ||
final AtomicLong wip = new AtomicLong(); | ||
final AtomicLong requested = new AtomicLong(); | ||
|
||
child.setProducer(new Producer() { | ||
|
||
@Override | ||
public void request(long n) { | ||
if (requested.getAndAdd(n) == 0) { | ||
pollQueue(wip, requested, capacity, queue, child); | ||
} | ||
} | ||
|
||
}); | ||
// don't pass through subscriber as we are async and doing queue draining | ||
// a parent being unsubscribed should not affect the children | ||
Subscriber<T> parent = new Subscriber<T>() { | ||
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow); | ||
|
||
private AtomicBoolean saturated = new AtomicBoolean(false); | ||
// if child unsubscribes it should unsubscribe the parent, but not the other way around | ||
child.add(parent); | ||
child.setProducer(parent.manager()); | ||
|
||
@Override | ||
public void onStart() { | ||
request(Long.MAX_VALUE); | ||
} | ||
return parent; | ||
} | ||
private static final class BufferSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback { | ||
// TODO get a different queue implementation | ||
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>(); | ||
private final Long baseCapacity; | ||
private final AtomicLong capacity; | ||
private final Subscriber<? super T> child; | ||
private final AtomicBoolean saturated = new AtomicBoolean(false); | ||
private final BackpressureDrainManager manager; | ||
private final NotificationLite<T> on = NotificationLite.instance(); | ||
private final Action0 onOverflow; | ||
|
||
public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow) { | ||
this.child = child; | ||
this.baseCapacity = capacity; | ||
this.capacity = capacity != null ? new AtomicLong(capacity) : null; | ||
this.onOverflow = onOverflow; | ||
this.manager = new BackpressureDrainManager(this); | ||
} | ||
@Override | ||
public void onStart() { | ||
request(Long.MAX_VALUE); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
if (!saturated.get()) { | ||
queue.offer(on.completed()); | ||
pollQueue(wip, requested, capacity, queue, child); | ||
} | ||
@Override | ||
public void onCompleted() { | ||
if (!saturated.get()) { | ||
manager.terminateAndDrain(); | ||
} | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
if (!saturated.get()) { | ||
queue.offer(on.error(e)); | ||
pollQueue(wip, requested, capacity, queue, child); | ||
} | ||
@Override | ||
public void onError(Throwable e) { | ||
if (!saturated.get()) { | ||
manager.terminateAndDrain(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void onNext(T t) { | ||
if (!assertCapacity()) { | ||
return; | ||
} | ||
queue.offer(on.next(t)); | ||
pollQueue(wip, requested, capacity, queue, child); | ||
@Override | ||
public void onNext(T t) { | ||
if (!assertCapacity()) { | ||
return; | ||
} | ||
queue.offer(on.next(t)); | ||
manager.drain(); | ||
} | ||
|
||
private boolean assertCapacity() { | ||
if (capacity == null) { | ||
return true; | ||
} | ||
|
||
long currCapacity; | ||
do { | ||
currCapacity = capacity.get(); | ||
if (currCapacity <= 0) { | ||
if (saturated.compareAndSet(false, true)) { | ||
unsubscribe(); | ||
child.onError(new MissingBackpressureException( | ||
"Overflowed buffer of " | ||
+ OperatorOnBackpressureBuffer.this.capacity)); | ||
if (onOverflow != null) { | ||
onOverflow.call(); | ||
} | ||
} | ||
return false; | ||
} | ||
// ensure no other thread stole our slot, or retry | ||
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1)); | ||
return true; | ||
@Override | ||
public boolean accept(Object value) { | ||
return on.accept(child, value); | ||
} | ||
@Override | ||
public void complete(Throwable exception) { | ||
if (exception != null) { | ||
child.onError(exception); | ||
} else { | ||
child.onCompleted(); | ||
} | ||
}; | ||
|
||
// if child unsubscribes it should unsubscribe the parent, but not the other way around | ||
child.add(parent); | ||
} | ||
@Override | ||
public Object peek() { | ||
return queue.peek(); | ||
} | ||
@Override | ||
public Object poll() { | ||
return queue.poll(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to add |
||
} | ||
|
||
return parent; | ||
} | ||
private boolean assertCapacity() { | ||
if (capacity == null) { | ||
return true; | ||
} | ||
|
||
private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue<Object> queue, Subscriber<? super T> child) { | ||
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue? | ||
if (requested.get() > 0) { | ||
// only one draining at a time | ||
try { | ||
/* | ||
* This needs to protect against concurrent execution because `request` and `on*` events can come concurrently. | ||
*/ | ||
if (wip.getAndIncrement() == 0) { | ||
while (true) { | ||
if (requested.getAndDecrement() != 0) { | ||
Object o = queue.poll(); | ||
if (o == null) { | ||
// nothing in queue | ||
requested.incrementAndGet(); | ||
return; | ||
} | ||
if (capacity != null) { // it's bounded | ||
capacity.incrementAndGet(); | ||
} | ||
on.accept(child, o); | ||
} else { | ||
// we hit the end ... so increment back to 0 again | ||
requested.incrementAndGet(); | ||
return; | ||
long currCapacity; | ||
do { | ||
currCapacity = capacity.get(); | ||
if (currCapacity <= 0) { | ||
if (saturated.compareAndSet(false, true)) { | ||
unsubscribe(); | ||
child.onError(new MissingBackpressureException( | ||
"Overflowed buffer of " | ||
+ baseCapacity)); | ||
if (onOverflow != null) { | ||
onOverflow.call(); | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
} finally { | ||
wip.decrementAndGet(); | ||
} | ||
// ensure no other thread stole our slot, or retry | ||
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1)); | ||
return true; | ||
} | ||
protected Producer manager() { | ||
return manager; | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this use case why would we ever be unsubscribing early? We can emit an
onError
, but I don't see the need to decouple the subscription.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertCapacity
calls unsubscribe and would disrupt downstream.