Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow calling UnboundedReceiver::try_next after None #2369

Merged
merged 1 commit into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,9 +1020,6 @@ impl<T> Receiver<T> {
/// It is not recommended to call this function from inside of a future,
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
/// This function will panic if called after `try_next` or `poll_next` has
/// returned `None`.
stepancheg marked this conversation as resolved.
Show resolved Hide resolved
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Poll::Ready(msg) => {
Expand All @@ -1033,7 +1030,10 @@ impl<T> Receiver<T> {
}

fn next_message(&mut self) -> Poll<Option<T>> {
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
let inner = match self.inner.as_mut() {
None => return Poll::Ready(None),
Some(inner) => inner,
};
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
Expand Down Expand Up @@ -1173,9 +1173,6 @@ impl<T> UnboundedReceiver<T> {
/// * `Ok(Some(t))` when message is fetched
/// * `Ok(None)` when channel is closed and no messages left in the queue
/// * `Err(e)` when there are no messages available, but channel is not yet closed
///
/// This function will panic if called after `try_next` or `poll_next` has
/// returned `None`.
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Poll::Ready(msg) => {
Expand All @@ -1186,7 +1183,10 @@ impl<T> UnboundedReceiver<T> {
}

fn next_message(&mut self) -> Poll<Option<T>> {
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
let inner = match self.inner.as_mut() {
None => return Poll::Ready(None),
Some(inner) => inner,
};
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
Expand Down
22 changes: 22 additions & 0 deletions futures-channel/tests/mpsc-close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,25 @@ fn stress_try_send_as_receiver_closes() {
bg.join()
.expect("background thread join");
}

#[test]
fn unbounded_try_next_after_none() {
let (tx, mut rx) = mpsc::unbounded::<String>();
// Drop the sender, close the channel.
drop(tx);
// Receive the end of channel.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
// None received, check we can call `try_next` again.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
}

#[test]
fn bounded_try_next_after_none() {
let (tx, mut rx) = mpsc::channel::<String>(17);
// Drop the sender, close the channel.
drop(tx);
// Receive the end of channel.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
// None received, check we can call `try_next` again.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
}