Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add len and is_empty methods to mpsc senders #7103

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,6 @@ impl<T> Receiver<T> {
/// tx.send(0).await.unwrap();
/// assert!(!rx.is_empty());
/// }
///
/// ```
pub fn is_empty(&self) -> bool {
self.chan.is_empty()
Expand Down Expand Up @@ -1060,6 +1059,46 @@ impl<T> Sender<T> {
self.chan.is_closed()
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(10);
/// assert!(rx.is_empty());
///
/// tx.send(0).await.unwrap();
/// assert!(!rx.is_empty());
tqwewe marked this conversation as resolved.
Show resolved Hide resolved
/// }
/// ```
pub fn is_empty(&self) -> bool {
self.chan.semaphore().bound - self.chan.semaphore().semaphore.available_permits() == 0
}

/// Returns the number of messages in the channel.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(10);
/// assert_eq!(0, rx.len());
///
/// tx.send(0).await.unwrap();
/// assert_eq!(1, rx.len());
/// }
/// ```
pub fn len(&self) -> usize {
self.chan.semaphore().bound - self.chan.semaphore().semaphore.available_permits()
}
Comment on lines +1098 to +1100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this seems incorrect to me. This will include permits in the count, but to match the receiver length method, permits should not be counted.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm true, I've updated the test locally and it does seem to be affected by reserved permits. I am not well versed in the internals of tokio, but it seems like the length may not be able to be derived from the bounded sender as is due to this reserved permits issue? I'd like to be wrong, but if not then perhaps it would require an atomic counter added to keep track of reserved permits vs messages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I commented on this issue previously: #6314 (comment)


/// Waits for channel capacity. Once capacity to send one message is
/// available, it is reserved for the caller.
///
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub(crate) trait Semaphore {

fn add_permits(&self, n: usize);

fn available_permits(&self) -> usize;

fn close(&self);

fn is_closed(&self) -> bool;
Expand Down Expand Up @@ -193,6 +195,14 @@ impl<T, S> Tx<T, S> {
}

impl<T, S: Semaphore> Tx<T, S> {
pub(super) fn is_empty(&self) -> bool {
self.inner.semaphore.available_permits() == 0
}

pub(super) fn len(&self) -> usize {
self.inner.semaphore.available_permits()
}

pub(crate) fn is_closed(&self) -> bool {
self.inner.semaphore.is_closed()
}
Expand Down Expand Up @@ -576,6 +586,10 @@ impl Semaphore for bounded::Semaphore {
self.semaphore.release(n)
}

fn available_permits(&self) -> usize {
self.semaphore.available_permits()
}

fn is_idle(&self) -> bool {
self.semaphore.available_permits() == self.bound
}
Expand Down Expand Up @@ -610,6 +624,10 @@ impl Semaphore for unbounded::Semaphore {
}
}

fn available_permits(&self) -> usize {
self.0.load(Acquire) >> 1
tqwewe marked this conversation as resolved.
Show resolved Hide resolved
}

fn is_idle(&self) -> bool {
self.0.load(Acquire) >> 1 == 0
}
Expand Down
41 changes: 40 additions & 1 deletion tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ impl<T> UnboundedReceiver<T> {
/// tx.send(0).unwrap();
/// assert!(!rx.is_empty());
/// }
///
/// ```
pub fn is_empty(&self) -> bool {
self.chan.is_empty()
Expand Down Expand Up @@ -577,6 +576,46 @@ impl<T> UnboundedSender<T> {
}
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel();
/// assert!(tx.is_empty());
///
/// tx.send(0).unwrap();
/// assert!(!tx.is_empty());
/// }
/// ```
pub fn is_empty(&self) -> bool {
self.chan.is_empty()
}

/// Returns the number of messages in the channel.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel();
/// assert_eq!(0, tx.len());
///
/// tx.send(0).unwrap();
/// assert_eq!(1, tx.len());
/// }
/// ```
pub fn len(&self) -> usize {
self.chan.len()
}

/// Completes when the receiver has dropped.
///
/// This allows the producers to get notified when interest in the produced
Expand Down
58 changes: 58 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,64 @@ async fn test_tx_capacity() {
assert_eq!(tx.max_capacity(), 10);
}

#[tokio::test]
async fn test_bounded_tx_len() {
let (tx, mut rx) = mpsc::channel::<()>(10);

// initially len should be 0
assert_eq!(tx.len(), 0);
assert!(tx.is_empty());

// queue one message, and len should be 1
tx.send(()).await.unwrap();
assert_eq!(tx.len(), 1);
assert!(!tx.is_empty());

// queue a second message, and len should be 2
tx.send(()).await.unwrap();
assert_eq!(tx.len(), 2);
assert!(!tx.is_empty());

// consume a message, and len should be 1
let _ = rx.recv().await;
assert_eq!(tx.len(), 1);
assert!(!tx.is_empty());

// consume a final message, and len should be 0
let _ = rx.recv().await;
assert_eq!(tx.len(), 0);
assert!(tx.is_empty());
}

#[tokio::test]
async fn test_unbounded_tx_len() {
let (tx, mut rx) = mpsc::unbounded_channel();

// initially len should be 0
assert_eq!(tx.len(), 0);
assert!(tx.is_empty());

// queue one message, and len should be 1
tx.send(()).unwrap();
assert_eq!(tx.len(), 1);
assert!(!tx.is_empty());

// queue a second message, and len should be 2
tx.send(()).unwrap();
assert_eq!(tx.len(), 2);
assert!(!tx.is_empty());

// consume a message, and len should be 1
let _ = rx.recv().await;
assert_eq!(tx.len(), 1);
assert!(!tx.is_empty());

// consume a final message, and len should be 0
let _ = rx.recv().await;
assert_eq!(tx.len(), 0);
assert!(tx.is_empty());
}

#[tokio::test]
async fn test_rx_is_closed_when_calling_close_with_sender() {
// is_closed should return true after calling close but still has a sender
Expand Down
Loading