-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add is_closed
, is_empty
and len
to mpsc::Receiver
and mpsc::UnboundedReceiver
#6348
Conversation
tokio/src/sync/mpsc/bounded.rs
Outdated
/// assert!(rx.is_closed()); | ||
/// } | ||
/// ``` | ||
pub fn is_closed(&mut self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% happy with this name, since in the end it also checks for outstanding messages in channel buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I was going through this one more time and I realized that you may be right here. Currently, it's possible to have a situation where Sender::is_closed
returns true but Receiver::is_closed()
returns false. That's probably not desirable. It probably makes more sense to change it to match the existing method on the sender.
Of course, another option is to rename this method to reflect its actual behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, it's possible to have a situation where Sender::is_closed returns true but Receiver::is_closed() returns false. That's probably not desirable. It probably makes more sense to change it to match the existing method on the sender.
Yeah, I totally agree with this. I think the is_closed
should return true if close
was called or all the senders are dropped, regardless of having any messages available in the channel.
I believe it would be a good idea to provide an additional function, maybe is_empty
or has_messages
, that checks if there are any messages available in the channel, this way the user would be able to check both conditions with rx.is_closed() && rx.is_empty()
.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems reasonable to me. Adding an is_empty
method could make even more sense together with adding a len
method (which would close #6314). After all, clippy emits a warning if you add len
without is_empty
.
But if you only want to add is_empty
, that's also okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added the is_empty
and len
functions.
The len
implementation only checks for the tail_position
. I am not sure if it is necessary to check the ready
bits for valid values in each position, since that would be relatively costly because of the linked list implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, regarding the ready bits, I guess the main question here is concurrency-related. For example, imagine if you have the following two threads running in parallel:
sender.send("message1").await;
assert!(receiver.len() != 0); // or similar with `is_empty`
sender.send("message2").await;
Then, depending on how you implement len
, it's possible that the assertion could fail. This would happen if message1 ends up being stored after message2 in the channel queue, but when the assert runs, message2 is has not yet been fully sent.
We once had a similar bug with try_recv
. See #1997. Basically, this assert could fail:
sender.send("message1").await;
assert!(receiver.try_recv().is_some());
sender.send("message2").await;
The assert would sometimes fail when message2 ends up before message1 in the queue. Here, try_recv
can't return message1 because we guarantee that messages are returned in order. It also can't return message2 because the call to send("message2")
is still running. So it returns None
instead, even though we know that a message has been successfully sent. When we fixed it in #4113, we did so by having try_recv
sleep until send("message2")
finished when there are fully sent messages later in the queue.
To test this, you can add these loom tests:
// tokio/src/sync/tests/loom_mpsc.rs
#[test]
fn len_nonzero_after_send() {
loom::model(|| {
let (send, recv) = mpsc::unbounded_channel();
let send2 = send.clone();
let join = thread::spawn(move || {
block_on(send2.send("message2")).unwrap();
});
block_on(send.send("message1")).unwrap();
assert!(recv.len() != 0);
join.join().unwrap();
});
}
#[test]
fn nonempty_after_send() {
loom::model(|| {
let (send, recv) = mpsc::unbounded_channel();
let send2 = send.clone();
let join = thread::spawn(move || {
block_on(send2.send("message2")).unwrap();
});
block_on(send.send("message1")).unwrap();
assert!(!recv.is_empty());
join.join().unwrap();
});
}
I don't know whether these tests will fail, but if they do, please look at try_recv
to see how it determines whether it should sleep or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both tests failed. I am looking on how to fix it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added both loom tests and fixed the is_empty
implementation.
About len_nonzero_after_send
test, I think the len
implementation was already correct, but I ran the wrong test initially.
At first I had written the assertion assert!(recv.len() == 2)
, which should fail since the join()
is after the assert!
.
With the assert!(recv.len() != 0);
assertion, the test passes
changes the Receiver is_closed function to have the same behaviour from the Sender. adds is_empty to the receiver to check if the channel is empty
is_closed
to mpsc::Receiver
and mpsc::UnboundedReceiver
is_closed
, is_empty
and len
to mpsc::Receiver
and mpsc::UnboundedReceiver
tokio/src/sync/mpsc/list.rs
Outdated
unsafe { | ||
let tail_block = &mut *tail; | ||
|
||
if tail_block.is_closed() { | ||
tail_position - self.index - 1 | ||
} else { | ||
tail_position - self.index | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reduce the scope of this unsafe block. You only need it for the &mut *tail
operation as far as I can tell.
let tail_block = unsafe { &mut *tail };
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a lot of tests! That's awesome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks correct, but I think we can simplify it. Please see my suggestions below.
tokio/src/sync/mpsc/list.rs
Outdated
let tail_block = &mut *tail; | ||
tail_block.is_closed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The is_closed
method takes &self
. Please either change this to &*tail
or change is_closed
to &mut self
depending on which one is correct.
Same in is_empty
. In fact, it seems like you could factor out the is_empty
logic into a separate function in list.rs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks for the feedback! The code is much better now
pub(crate) fn len(&self, tx: &Tx<T>) -> usize { | ||
// When all the senders are dropped, there will be a last block in the tail position, | ||
// but it will be closed | ||
let tail_position = tx.tail_position.load(Acquire); | ||
tail_position - self.index - (tx.is_closed() as usize) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a test for whether the length is correct in all of the following cases?
- There are still senders, but
Receiver::close
has been called. - There are no more senders, but
Receiver::close
has not been called. - There are no more senders, and
Receiver::close
has been called before the last sender dropped. - There are no more senders, and
Receiver::close
has been called after the last sender dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the scenarios were missing, but I have added them now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
Fixes: #4638
Fixes: #6314
Motivation
Taken from #4638
Solution
Adds a new
is_closed
function to bothmpsc::Receivers
that check if the channel is closed and has no remaining messages in the internal buffer.Since there are two possible ways to close a channel and they lead to different internal states, it was necessary to add two checks, one for each internal state.