-
Notifications
You must be signed in to change notification settings - Fork 13.3k
Rewrite channels to be internally upgrade-able #11578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Great stuff! |
use mpsc = sync::mpsc_queue; | ||
|
||
static DISCONNECTED: int = int::min_value; | ||
static FUDGE: int = 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this magic number? It seems this is wrong.
Does this mean that this code will break when more than 1024 tasks happen to be in the race window?
Why not just check for < -2 or < DISCONNECTED / 2 instead of < DISCONNECTED + FUDGE?
Is having 3 implementations really needed? Are we sure that it isn't possible to design a single implementation that works well under all conditions? (i.e. use an MPSC queue with the first node inside the struct so that oneshots don't allocate) |
// Consumes ownership of the 'to_wake' field. | ||
fn take_to_wake(&mut self) -> BlockedTask { | ||
let task = self.to_wake.load(atomics::SeqCst); | ||
self.to_wake.store(0, atomics::SeqCst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use an atomic swap to swap in 0?
That's faster than this code at least on x86 (seqcst store is implemented as a swap anyway), and it might allow to eliminate the yield loop an abort_selection, at least if spurious wakeups are tolerable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to express the intend that this was indeed not an atomic swap, but rather just a transfer of ownership. I could relax the memory orderings, but I'm also getting to the point where I don't trust myself enough to relax the orderings.
On 32-bit platforms, it seems that both cnt and steals can overflow, since they can get incremented 2^32 times before anyone decrements them, due to the non-blocking try_recv. This is definitely a problem for cnt due to the special values, and might be a problem for steals (which at least would need to become an uint). This should be fixable by checking whether steals is large enough after incrementing it (int::max_value / 4 should be a safe threshold, not sure which is the strictest possible one) and if so, factoring it into cnt. |
} | ||
// maybe we're done, if we're not the last ones | ||
// here, then we need to go try again. | ||
if self.sender_drain.compare_and_swap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically, to address the issue described on the atomic above I think you need to change this compare_and_swap to be a CAS loop with the following transitions:
0 => unreachable!
1 => set to 0, and break the loop
n => set to 1, and continue the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or even better, do it like this:
if self.sender_drain.compare_and_swap(1, 0, atomics::SeqCst) == 1 {
break
}
self.sender_drain.store(1, atomics::SeqCst);
This avoids the CAS loop and is much simpler to code.
No, it's the first thing I thought of.
I think we've had this discussion before. These values won't overflow because they're a count of items on the queue, not a count of all items ever sent on the channel. |
Ugh, no I see what you mean about overflowing, I need to think about that. |
r? @brson, mutexes aren't landing soon so I added try_send_deferred back in. |
Some(data) => { | ||
self.steals += 1; | ||
if self.steals > MAX_STEALS { | ||
self.steals -= self.cnt.swap(0, atomics::SeqCst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code really correct when cnt == DISCONNECTED?
Doesn't this need a compare_and_swap to handle that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, it is indeed not correct.
What about the FUDGE magic number that it seems should be removed? |
The fudge became a constant because I used it in more than one spot, and I don't believe that it should be removed. The situation it's protecting against is when N senders simultaneously see a disconnected state. They will all see this state by incrementing the shared count, but consequently only one will see the exact value of DISCONNECTED. They all need to realize that the port is disconnected, however. The fudge number is my guess that it's basically impossible for that number of threads to be pre-empted after they see (cnt < DISCONNECTED + FUDGE) and before they reset the count back to DISCONNECTED. Does that make sense? It does seem a little janky because I had to consider it in more than one place... |
Why not set it to Or, IMHO better, test for The problem of 1024 is that, although very unlikely in normal conditions, there can be more than 1024 tasks, and they can all be simultaneously in that race window (BTW, setting a debugger breakpoint that stops only the hitting thread on that instruction will guarantee that the race condition always happens). |
@brson, I have updated this now, and I'll highlight some areas I think you should look at. |
} | ||
} | ||
|
||
fn abort_selection(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function changed, I passed around was_upgrade
so aborts know whether they were part of an upgrade or not. I did this so you know whether you're paired with a start_selection
or not.
I also added a loop to continue doing this while you're upgraded.
// tasks in select(). | ||
// | ||
// This can only be called at channel-creation time | ||
pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll be interested in this.
The design at least is very elegant, though the details, particularly in We discussed that maybe the oneshot atomics should be conservatively switched to |
Beforehand, using a concurrent queue always mandated that the "shared state" be stored internally to the queues in order to provide a safe interface. This isn't quite as flexible as one would want in some circumstances, so instead this commit moves the queues to not containing the shared state. The queues no longer have a "default useful safe" interface, but rather a "default safe" interface (minus the useful part). The queues have to be shared manually through an Arc or some other means. This allows them to be a little more flexible at the cost of a usability hindrance. I plan on using this new flexibility to upgrade a channel to a shared channel seamlessly.
This, the Nth rewrite of channels, is not a rewrite of the core logic behind channels, but rather their API usage. In the past, we had the distinction between oneshot, stream, and shared channels, but the most recent rewrite dropped oneshots in favor of streams and shared channels. This distinction of stream vs shared has shown that it's not quite what we'd like either, and this moves the `std::comm` module in the direction of "one channel to rule them all". There now remains only one Chan and one Port. This new channel is actually a hybrid oneshot/stream/shared channel under the hood in order to optimize for the use cases in question. Additionally, this also reduces the cognitive burden of having to choose between a Chan or a SharedChan in an API. My simple benchmarks show no reduction in efficiency over the existing channels today, and a 3x improvement in the oneshot case. I sadly don't have a pre-last-rewrite compiler to test out the old old oneshots, but I would imagine that the performance is comparable, but slightly slower (due to atomic reference counting). This commit also brings the bonus bugfix to channels that the pending queue of messages are all dropped when a Port disappears rather then when both the Port and the Chan disappear.
The user-facing API-level change of this commit is that `SharedChan` is gone and `Chan` now has `clone`. The major parts of this patch are the internals which have changed. Channels are now internally upgraded from oneshots to streams to shared channels depending on the use case. I've noticed a 3x improvement in the oneshot case and very little slowdown (if any) in the stream/shared case. This patch is mostly a reorganization of the `std::comm` module, and the large increase in code is from either dispatching to one of 3 impls or the duplication between the stream/shared impl (because they're not entirely separate). The `comm` module is now divided into `oneshot`, `stream`, `shared`, and `select` modules. Each module contains the implementation for that flavor of channel (or the select implementation for select). Some notable parts of this patch * Upgrades are done through a semi-ad-hoc scheme for oneshots and messages for streams * Upgrades are processed ASAP and have some interesting interactions with select * send_deferred is gone because I expect the mutex to land before this * Some of stream/shared is straight-up duplicated, but I like having the distinction between the two modules * Select got a little worse, but it's still "basically limping along" * This lumps in the patch of deallocating the queue backlog on packet drop * I'll rebase this on top of the "more errors from try_recv" patch once it lands (all the infrastructure is here already) All in all, this shouldn't be merged until the new mutexes are merged (because send_deferred wasn't implemented). Closes #11351
/// Immediately attempt to receive a value on a port, this function will | ||
/// never block. Has the same semantics as `Port.try_recv`. | ||
pub fn try_recv(&mut self) -> comm::TryRecvResult<T> { | ||
self.port.try_recv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deletion of Handle.try_recv()
here is unfortunately a showstopper for one of my projects. Why was this removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface to select()
is quite volatile right now. This no longer takes a mutable loan on the port so you still have access to the original port.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh it doesn't? Interesting. That does help a lot.
The user-facing API-level change of this commit is that
SharedChan
is gone andChan
now hasclone
. The major parts of this patch are the internals which have changed.Channels are now internally upgraded from oneshots to streams to shared channels depending on the use case. I've noticed a 3x improvement in the oneshot case and very little slowdown (if any) in the stream/shared case.
This patch is mostly a reorganization of the
std::comm
module, and the large increase in code is from either dispatching to one of 3 impls or the duplication between the stream/shared impl (because they're not entirely separate).The
comm
module is now divided intooneshot
,stream
,shared
, andselect
modules. Each module contains the implementation for that flavor of channel (or the select implementation for select).Some notable parts of this patch
All in all, this shouldn't be merged until the new mutexes are merged (because send_deferred wasn't implemented).
Closes #11351