-
Notifications
You must be signed in to change notification settings - Fork 636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite the mpsc channels #984
Conversation
897ad27
to
bf4460e
Compare
Previously, the bounded mpsc channels wouldn't provide backpressure in many cases because they provided a guaranteed slot for every sender. This change refactors mpsc channels to never allow more than buffer + 1 messages in-flight at a time. Senders will attempt to increment the number of in-flight messages in poll-ready. If successful, the count is incremented and a message is sent. If the buffer was full, the sender adds itself to a queue to be awoken. When a message is received, the receiver attempts to awaken a Sender and grant it "sending rights". If no such channel was found, it decreases the number of in-flight messages currently present in the buffer. There's one odd choice in this change, which was to pick buffer + 1 rather than buffer as the max number of elements. In practice, many of the tests and a handful of real-world code relied on the ability to have an initial element available for the first sender, and were attempting to send into channels with a buffer size of zero. These use-cases were all broken by this change without the change to use buffer + 1. Despite the buffer + 1 mitigation, this is still a breaking change since it modifies the maximum number of elements allowed into a channel. 0.2 has been out for a short enough time that it's probably possible to transition without breaking existing code, but this approach should be taken with caution. Luckily, 0.2 organized futures-channel into a separate crate, so it could be released as an 0.3 version of futures-channel, futures-sink, futures-util, and the futures facade while still maintaining futures-io and futures-core compatibility.
// Atomic, FIFO queue used to send wakers for blocked `Sender` tasks to the `Receiver`. | ||
sender_waker_queue: Queue<Arc<Mutex<SenderWaker>>>, | ||
// Lock for popping off of the sender_waker_queue | ||
sender_waker_pop_lock: Mutex<()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't love that this is necessary, but it shouldn't be contended often-- it's only used in receiving and in the Drop
impl of Sender
. I considered having Sender::drop
wake up the receiver to queue the item instead, but that means it's doing two wakeups instead of one, which is almost definitely going to be slower than a mutex acquire.
This would address #403! :D |
I have not reviewed the code at all, but based off of the description, this sounds exactly like the strategy that was first attempted and ruled out as it was deemed flawed. I will attempt to explain the reasoning here. The problem is that this strategy assumes two things:
Unfortunately, neither of these assumptions hold true and violating either will result in reduced latency and / or deadlocks. For example, if using HTTP/2.0, you might use the protocol support to propagate back pressure signals from your channel to a remote peer. In this case, once the Another example is the tower load balancer. A load balancer instance is composed of many services (which, internally use channels). The load balancer will call Leaky abstractionThe behavior of requiring a fast turn around between requesting capacity and sending a message (or dropping the handle) is a leaky abstraction. The problem is, middleware like the tower balancer can't assume that each Future / Service that it is generic over might have special requirements. As far as I can tell, this would make the proposed channel unusable from such contexts. Channels and backpressureSince this question comes up often, I probably should take this opportunity to write up the history and the background behind the current design since everybody gets confused. If there is a better general purpose channel strategy, then great (though, IMO this is not it). There are fundamentally two dimensions in which channel usage need to be bounded:
This channel implementation only bounds the first item, and I will argue that it is incorrect to attempt to bound the second one as this responsibility lies elsewhere. Blocking channelsLets take a step back and look at how back pressure works with your synchronous, blocking channels. With blocking channels, there can be any number of For simplicity, lets assume that each thread only has one Imagine you have a channel that is bounded to 5 values and you clone the To solve this, the user of the blocking Limiting the call sitesThe original argument for changing the behavior is that Assuming that the pattern is: tx.clone().send(foo); This returns a future, which does no work by itself. Something has to spawn this future or push it into a vector somewhere. If nothing is limiting this logic at a higher level, there will be an unbounded number of spawn tasks or items pushed into storage somewhere. Even with the proposed change, unbounded buffering behavior remains. In my experience, every case of But everyone gets it wrongYes, it's true. It is a point of serious confusion. I would argue that this is due to using the functional combinators being really hard and less the behavior of the channel. When using combinators, one must employ an entirely functional programming style, threading all necessary state through the combinators. This is very very annoying and a common shortcut is to just clone into the combinators, resulting in this behavior. It all gets better with async / awaitFinally, await!(self.tx.send(my_val)) And, TL;DR
|
@carllerche Thanks for your detailed reply! I agree that this solution isn't perfect for some of the reasons you describe, but I'll try and outline why I think that we can use this PR as a stepping stone to something better than the status quo. Hopefully we can start to sort out more of these issues.
I agree that the backpressure implemented by this PR would not suffice for such a situation, but I don't believe that the current backpressure implementation works here either. If you want backpressure to propagate from the channel back to a remote peer, you must necessarily delay requesting a value until you can be sure that there is space available to receive and process the client's message. The current implementation doesn't allow for this type of fixed-buffer backpressure with a growable number of senders. Is there something about the way that this system currently works that makes it usable in the HTTP2 case where this PR's implementation wouldn't be usable? If you're not making use of channel backpressure to push back at a higher level, then is there a reason for not using the
Yes, getting backpressure right in this case is something that requires a higher level of coordination. In my intended use-case, I'm reading reading requests from an OS channel and spawning the Finally, WRT "everything is fixed under async/await", I agree that |
This is exactly what
I'm not sure what you mean. Both implementations would support this as this is the
tbh, I never used the combinators personally, so I never hit the pain. However, I spoke w/ @danburkert offline a bunch about this and he explained the issue to me. The solution is a pretty simple change to the current implementation: implement |
I was actually looking into this myself today, as the hyper test suite has been hitting the race condition daily with the change to a thread pool of executors. I thought maybe of just trying to patch the existing implementation, but eventually I came to think that probably we don't need yet another channel implementation in futures, when excellent work has been done on the libstd and crossbeam channels. Specifically, they both better handle all the edge cases around multiple or single produces send at the same time a consumer is dropped. The proposed changes I was thinking was to use either std or crossbeam's channel internally, and just add the task/waker stuff on top. |
That's basically what this is-- the underlying queue is basically the same as the one from std-- all the noise on top is for getting proper wakeups. It's annoyingly complicated to get right.
Perhaps you've misunderstood this PR-- that's what I implemented.
No, the current |
@seanmonstar I have no opinion w.r.t the implementation as long as it maintains the current behavior and equal or greater performance. IIRC, I originally attempted to add a layer on top of std channel, but it was not possible w/o a mutex guarding both ends. I could be wrong as it was a long time ago. @cramertj I understand what the PR is doing. I re-read the original snippet I quoted, and I think that I see what you mean. Again, this conflates two dimensions of growth and I do not believe that the channel backlog should be used to guard against a growing number of call sites. It seems like the crux of the problem is that the current implementation does not behave correctly when using combinators and I proposed a simple fix, what do you think about that propsal? |
I'm gonna try to estate things for myself, basically repeating some of @carllerche's earlier points. I think we can all agree with the following premise:
That is, when we look at the application as a whole, we want bounded "buffering", where buffering includes not just literal buffers, but also cases where a potential sender has generated a value to send but is prevented from queuing it. That last point is key: it means that we need to coordinate not just queuing of already-produced values, but also the production of values in the first place. The "one limit" strategyThis PR achieves our goal in the most "direct" way: by having the queue itself fully manage the global limit. To do this, it must hand out "tickets" for sending. Since the total number of tickets is bounded, so is the number of values ever produced to send (assuming senders get a ticket before producing their value). The downside to this strategy, as @carllerche explains (and I think that @cramertj agrees), is that there is a potentially arbitrary delay between acquiring and using a ticket. For the moment, let's focus on the potential for deadlock. As @carllerche says, reasonable abstractions elsewhere in the system may result in a given ticket for sending never being used or dropped, which can ultimately eat up the queue's capacity. @cramertj, do you agree with this potential? The "two limits" strategyThe alternative approach, represented by the current implementation, breaks down the global limit of in-flight messages into two parts, as @carllerche articulated:
The first limit is imposed by the channel itself, through its limited buffer. The second limit is imposed at a higher level in the application, where it is less likely to cause deadlock. And, in particular, this second limit will often "fall out" from other limits, e.g. on the number of active connections. Moreover, @carllerche argues, it is not clear how to have the channel alone impose the second limit without leading to potential for deadlock and other bad inter-task interference. DiscussionI think a core issue here is local vs global thinking. It's very tempting to think about backpressure in purely local terms: we want to avoid this particular receiver from getting too backlogged, and we do so by imposing a limit purely from its local perspective. The problem is that we can't safely impose this limit purely locally; it fundamentally involves some global coordination. The strategy that @carllerche advocates for is one that allows individual tasks to be written in a more-or-less "isolated" style, without having to think about how e.g. grabbing a ticket may cause problems for other unrelated tasks. Global coordination is instead handled at a much higher level. That means that backpressure isn't as simple as throwing in a bounded channel, but I think in the big picture it makes life easier overall (and in particular makes abstractions more independent). It's also worth noting that we can, and probably should, build some pure ticket abstractions, i.e. futures-aware semaphors. These can then be used in a more explicit way as one simple means of imposing global limits (where the potential for deadlock is more obvious). I wanted to clarify a few additional remarks: @cramertj says:
Hopefully the above clarifies, but to reiterate: the current implementation takes care of one of the two limits, and you are expected to separately coordinate the other (number of senders) more globally. @cramertj says:
The reason is that the bounded channel does impose an important limit: the one on the rate of sending for individual senders. It's built so that you can compose this limit with a global one on the number of senders to guarantee an overall total bound. |
@aturon a good summary, I would re-emphasize two additional points:
|
It doesn't solve bounding the number of callers, I agree. It solves a different problem: the one of callers finishing. If callers are unable to complete due to the channel being full, then it creates an increased number of callers which creates backpressure for higher levels of the system.
I'd expect operations like this to have timeouts or some other mechanism to prevent them from blocking indefinitely. If we allow them to block indefinitely, then we're explicitly allowing the system to gradually leak memory over time. The purpose of backpressure in my use-case is to specifically limit the amount of memory that a process can grow to use. |
To clarify, that's not the scenario @carllerche was describing. The worry was around a grabbing a ticket for sending, but never actually completing that send. This is why I brought up explicit semaphores: they make clear that you're juggling an abstract resource count and that while you hold a ticket you are potentially blocking other tasks. |
@aturon I'm explicitly trying to block other tasks by holding this ticket. |
@cramertj this implies to me that you want a higher level point of coordination. As @aturon mentioned, this semaphore can be implemented as a standalone type. Again, I go back to the synchronous analogy. In order to prevent the value from being materialized before channel capacity is guaranteed you would need additional coordination around the blocking channel. i.e. the behavior you want is not possible in the blocking world w/ your usual channels. |
Closing in favor of a more minimal set of changes, which I'll open shortly and link here. |
This fixes the "bounded senders are actually unbounded" problem. The problem is discussed here: rust-lang#984
Previously, the bounded mpsc channels wouldn't provide backpressure
in many cases because they provided a guaranteed slot for every
sender. This change refactors mpsc channels to never allow more than
buffer + 1 messages in-flight at a time.
Senders will attempt to increment the number of in-flight messages
in poll-ready. If successful, the count is incremented and a message
is sent. If the buffer was full, the sender adds itself to a queue
to be awoken. When a message is received, the receiver attempts to
awaken a Sender and grant it "sending rights". If no such channel
was found, it decreases the number of in-flight messages currently
present in the buffer. If a channel with "sending rights" is dropped,
it gives those sending rights to another waiting
Sender
or decreasesthe number of in-flight messages.
There's one odd choice in this change, which was to pick buffer + 1
rather than buffer as the max number of elements. In practice, many
of the tests and a handful of real-world code relied on the ability
to have an initial element available for the first sender, and were
attempting to send into channels with a buffer size of zero. These
use-cases were all broken by this change without the change to use
buffer + 1.
Despite the buffer + 1 mitigation, this is still a breaking change
since it modifies the maximum number of elements allowed into a
channel. 0.2 has been out for a short enough time that it's probably
possible to transition without breaking existing code, but this
approach should be taken with caution. Luckily, 0.2 organized
futures-channel into a separate crate, so it could be released as
an 0.3 version of futures-channel, futures-sink, futures-util, and
the futures facade while still maintaining futures-io and futures-core
compatibility.
Fix #800
Fix rustasync/team#11
cc @danburkert, @stjepang, @carllerche