-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync: refactored PollSender<T>
to fix a subtly broken Sink<T>
implementation
#4214
Conversation
…ntation Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
tokio-util/src/sync/mpsc.rs
Outdated
/// will panic. | ||
pub fn start_send(&mut self, value: T) -> Result<(), PollSendError<T>> { | ||
let (result, next_state) = match self.take_state() { | ||
State::Idle(_) | State::Acquiring => panic!("`start_send` called without first calling `poll_ready`"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make the choice to have start_send
succeed in this case, attempting to use the send
method instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only viable approach I currently see to make this method more resilient is that we could attempt a send via Sender::try_send
. We'd then be able to change the language in the documentation to something more like:
If
poll_reserve
is called prior to callingstart_send
, and returnsPoll::Ready(Ok(()))
, then the call tostart_send
is guaranteed to succeed. If it is not called prior, then a send is attempted but may or may not succeed.
The biggest issue then would be how to document the error case: PollSendError<T>
is meant to only ever indicate that the channel is closed, but in this case, try_send
might fail due to the channel being closed or full. It feels weird to differentiate errors based on the condition of whether or not poll_reserve
was called since there is supposed to be a formal contract of calling poll_reserve
before start_send
.
Long story short, I'm willing to make such a change, just curious what your thoughts are on how to best document the behavior difference depending on if poll_reserve
was called first or not.
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
pub fn clone_inner(&self) -> Option<Sender<T>> { | ||
self.sender.as_ref().map(|sender| (&**sender).clone()) | ||
/// The underlying channel that this sender was wrapping may still be open. | ||
pub fn is_closed(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't want to check whether the underlying channel is closed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_closed
only cares about whether or not this PollSender
is closed, if a user can expect to be able to execute more sends in the future.
Admittedly, there is the corner case here of when we close but already have a sender slot permit, because a call to poll_reserve
would still show as ready, and start_send
could be used to execute that send.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the current state of this method is great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most of the feedback has revolved around the lengths the PR goes to in order to attempt to preserve an acquired sending slot, so let me ask: should we actually bother trying to hold an acquired sending slot?
If we get rid of that functionality, things get much simpler. I don't have a strong opinion on keeping that behavior, and I'm not sure that it provides more flexibility compared to simply avoiding closing the sender before you're done actually sending.
tokio-util/src/sync/mpsc.rs
Outdated
/// If a slot was previously reserved by calling `poll_reserve`, then a final call can be made | ||
/// to `start_send` in order to consume the reserved slot. After that, no further sends will be | ||
/// possible. If you do not intend to send another item, you can release the reserved slot back | ||
/// to the underlying sender by calling [`abort_send`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a footgun.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the "footgun" aspect you're referring to specifically about how PollSender
can keep a Receiver
open (when PollSender
is in the ReadyToSend
state) even when all other Sender
references are gone? I was mostly trying to follow the previous behavior where an in-flight "operation" could be finished cleanly even after closing the PollSender
.
If you think we should abandon that behavior, I'm happy to make close
be more forceful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless of the behavior of this method, I think that Sink::poll_close
should properly close it.
As for in-flight operations, it's true that we have a similar API in Tokio, but it's on the receiver and necessary there because of races, but that race isn't present for senders, so its not as necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what is the proper way then? Making any subsequent call to poll_ready
/start_send
/poll_flush
not do anything?
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
In v0.7 of `tokio-util`, the `PollSender` API was changed to have the semantics that most users expect (see tokio-rs/tokio#4214). The `poll_send_done` method was replaced with a `poll_reserve` method, and the implementation rewritten to drive a `Sender::reserve_owned` future that's consumed in `send_item`. Now, `PollSender` essentially implements exactly the same code that was written by hand in the consensus service. We can simplify the consensus service significantly by upgrading the `tokio-util` dependency to 0.7 and replacing the hand-written version with `PollSender`. There should be no functional change as a result of this refactor.
In v0.7 of `tokio-util`, the `PollSender` API was changed to have the semantics that most users expect (see tokio-rs/tokio#4214). The `poll_send_done` method was replaced with a `poll_reserve` method, and the implementation rewritten to drive a `Sender::reserve_owned` future that's consumed in `send_item`. Now, `PollSender` essentially implements exactly the same code that was written by hand in the consensus service. We can simplify the consensus service significantly by upgrading the `tokio-util` dependency to 0.7 and replacing the hand-written version with `PollSender`. There should be no functional change as a result of this refactor.
Motivation
The existing
PollSender<T>
implementation uses a simplified design that treatspoll_send_done
as bothpoll_ready
andpoll_flush
. However, it only ever actually does any flushing work i.e. driving the future which sends into the underlying channel. When used inpoll_ready
, it simply returns that the sender is always ready.This leads to a subtle bug where callers could inadvertently use something like
sink.send_all(..).await
which leads to a panic. Combinators likesend_all
believe they can continue to callstart_send
so long aspoll_ready
returnstrue
when called directly before. However, in the current design, if the underlying channel was full, the next call tostart_send
-- afterpoll_ready
claimed the sender was ready -- could hit a scenario where the pending send was not yet complete, which would leaveself.is_sending
astrue
, causing the next call tostart_send
to hit the check for ifself.is_sending
istrue
, thus leading to a panic.Solution
This PR explores a refactored design where instead of driving a channel send future, we drive a future for reserving a permit for sending to the channel. This moves the ordering of logic from
poll_send_done
/start_send
/poll_send_done
to simplypoll_reserve
/start_send
.This allows
poll_ready
to actually be representative of whether or not an item can be sent into the channel. Additionally, through the state machine approach, we can eliminate most clones of theSender
itself by recapturing the underlying sender from the permit.This does change a few methods and some naming so it would be a breaking change, although I think it's worth it for the ability to correctly provide the
Sink<T>
implementation.Signed-off-by: Toby Lawrence toby@nuclearfurnace.com