Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio::sync::mpsc::bounded::Receiver<T>::is_empty() returns false when recv().await blocks #6594

Closed
saltatory opened this issue May 28, 2024 · 3 comments
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync

Comments

@saltatory
Copy link

saltatory commented May 28, 2024

Version

└── tokio v1.37.0
    ├── num_cpus v1.16.0
    │   └── libc v0.2.154
    ├── pin-project-lite v0.2.14
    └── tokio-macros v2.2.0 (proc-macro)
        ├── proc-macro2 v1.0.82 (*)
        ├── quote v1.0.36 (*)
        └── syn v2.0.63 (*)

Platform

Linux Seiza 6.8.0-31-generic #31-Ubuntu SMP PREEMPT_DYNAMIC Sat Apr 20 00:40:06 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux

Issue

TL;DR: tokio:::sync::mpsc::bounded::Receiver<T>::is_empty() returns false but when immediately followed by tokio::sync::mpsc::bounded::Receiver<T>::recv() the recv()blocks.

I have a message-oriented IO library that is communicating with io-uring. Before I attempt to recv() IO messages on my channel, I check that both io-uring submission queue is not full and that Receiver is not empty before calling recv().await. The is_empty() call returns false but the recv() blocks.

@saltatory saltatory added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels May 28, 2024
@saltatory
Copy link
Author

Quick update: the same logic refactored to use try_recv() works fine.

@Darksonn Darksonn added the M-sync Module: tokio/sync label May 28, 2024
@Darksonn
Copy link
Contributor

Can you please provide more information? We have a fair number of tests on this:

tokio/tokio/tests/sync_mpsc.rs

Lines 1092 to 1151 in 9e00b26

#[tokio::test]
async fn test_rx_is_empty_when_no_messages_were_sent() {
let (_tx, rx) = mpsc::channel::<()>(10);
assert!(rx.is_empty())
}
#[tokio::test]
async fn test_rx_is_not_empty_when_there_are_messages_in_the_buffer() {
let (tx, rx) = mpsc::channel::<()>(10);
assert!(tx.send(()).await.is_ok());
assert!(!rx.is_empty())
}
#[tokio::test]
async fn test_rx_is_not_empty_when_the_buffer_is_full() {
let (tx, rx) = mpsc::channel(10);
for i in 0..10 {
assert!(tx.send(i).await.is_ok());
}
assert!(!rx.is_empty())
}
#[tokio::test]
async fn test_rx_is_not_empty_when_all_but_one_messages_are_consumed() {
let (tx, mut rx) = mpsc::channel(10);
for i in 0..10 {
assert!(tx.send(i).await.is_ok());
}
for _ in 0..9 {
assert!(rx.recv().await.is_some());
}
assert!(!rx.is_empty())
}
#[tokio::test]
async fn test_rx_is_empty_when_all_messages_are_consumed() {
let (tx, mut rx) = mpsc::channel(10);
for i in 0..10 {
assert!(tx.send(i).await.is_ok());
}
while rx.try_recv().is_ok() {}
assert!(rx.is_empty())
}
#[tokio::test]
async fn test_rx_is_empty_all_senders_are_dropped_and_messages_consumed() {
let (tx, mut rx) = mpsc::channel(10);
for i in 0..10 {
assert!(tx.send(i).await.is_ok());
}
drop(tx);
for _ in 0..10 {
assert!(rx.recv().await.is_some());
}
assert!(rx.is_empty())
}

If you're able to provide an example that displays the bug, then that would be very helpful.

@Darksonn
Copy link
Contributor

Darksonn commented Jun 2, 2024

Closing as duplicate of #6602. Thank you for reporting this bug.

@Darksonn Darksonn closed this as completed Jun 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync
Projects
None yet
Development

No branches or pull requests

2 participants