Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Weak version of mpsc::Sender #4595

Merged
merged 13 commits into from
Jul 27, 2022
80 changes: 80 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,50 @@ pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}

/// A Sender that does not influence RAII semantics, i.e. if all [`Sender`]
/// instances of a channel were dropped and only `WeakSender` instances remain,
/// the channel is closed.
b-naber marked this conversation as resolved.
Show resolved Hide resolved
///
/// In order to send messages, the `WeakSender` needs to be upgraded using
/// [`WeakSender::upgrade`], which returns `Option<Sender>`, `None` if all
b-naber marked this conversation as resolved.
Show resolved Hide resolved
/// `Sender`s were already dropped, otherwise `Some` (at which point it does
/// influence RAII semantics again).
///
/// [`Sender`]: Sender
/// [`WeakSender::upgrade`]: WeakSender::upgrade
///
/// #Examples
///
/// ```rust
/// use tokio;
b-naber marked this conversation as resolved.
Show resolved Hide resolved
/// use tokio::sync::mpsc::channel;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = channel(15);
/// let _ = tx.send(1).await;
/// let tx_weak = tx.downgrade();
///
/// let _ = tokio::spawn(async move {
/// for i in 0..2 {
/// if i == 0 {
/// assert_eq!(rx.recv().await.unwrap(), 1);
/// } else if i == 1 {
/// // only WeakSender instance remains -> channel is dropped
/// assert!(rx.recv().await.is_none());
/// }
/// }
/// })
/// .await;
///
/// assert!(tx_weak.upgrade().is_none());
/// }
b-naber marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```
b-naber marked this conversation as resolved.
Show resolved Hide resolved
pub struct WeakSender<T> {
chan: chan::Tx<T, Semaphore>,
}

/// Permits to send one value into the channel.
///
/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
Expand Down Expand Up @@ -991,6 +1035,19 @@ impl<T> Sender<T> {
pub fn capacity(&self) -> usize {
self.chan.semaphore().0.available_permits()
}

/// Converts the `Sender` to a [`WeakSender`] that does not count
/// towards RAII semantics, i.e. if all `Sender` instances of the
/// channel were dropped and only `WeakSender` instances remain,
/// the channel is closed.
pub fn downgrade(self) -> WeakSender<T> {
// Note: If this is the last `Sender` instance we want to close the
// channel when downgrading, so it's important to move into `self` here.
b-naber marked this conversation as resolved.
Show resolved Hide resolved

WeakSender {
chan: self.chan.downgrade(),
}
}
}

impl<T> Clone for Sender<T> {
Expand All @@ -1009,6 +1066,29 @@ impl<T> fmt::Debug for Sender<T> {
}
}

impl<T> Clone for WeakSender<T> {
fn clone(&self) -> Self {
WeakSender {
chan: self.chan.clone(),
}
}
}
Darksonn marked this conversation as resolved.
Show resolved Hide resolved

impl<T> WeakSender<T> {
/// Tries to convert a WeakSender into a [`Sender`]. This will return `Some`
/// if there are other `Sender` instances alive and the channel wasn't
/// previously dropped, otherwise `None` is returned.
pub fn upgrade(self) -> Option<Sender<T>> {
self.chan.upgrade().map(Sender::new)
}
}

impl<T> fmt::Debug for WeakSender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("WeakSender").finish()
}
}

// ===== impl Permit =====

impl<T> Permit<'_, T> {
Expand Down
39 changes: 35 additions & 4 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use crate::sync::notify::Notify;

use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;

/// Channel sender.
pub(crate) struct Tx<T, S> {
Expand Down Expand Up @@ -129,6 +130,39 @@ impl<T, S> Tx<T, S> {
Tx { inner: chan }
}

pub(super) fn downgrade(self) -> Self {
if self.inner.tx_count.fetch_sub(1, AcqRel) == 1 {
// Close the list, which sends a `Close` message
self.inner.tx.close();

// Notify the receiver
self.wake_rx();
}

self
}

// Returns a boolean that indicates whether the channel is closed.
pub(super) fn upgrade(self) -> Option<Self> {
b-naber marked this conversation as resolved.
Show resolved Hide resolved
let mut tx_count = self.inner.tx_count.load(Acquire);

loop {
if tx_count == 0 {
// channel is closed
return None;
}

match self
.inner
.tx_count
.compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
{
Ok(_) => return Some(self),
Err(prev_count) => tx_count = prev_count,
}
}
}

pub(super) fn semaphore(&self) -> &S {
&self.inner.semaphore
}
Expand Down Expand Up @@ -378,9 +412,6 @@ impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {

// ===== impl Semaphore for AtomicUsize =====

use std::sync::atomic::Ordering::{Acquire, Release};
use std::usize;

impl Semaphore for AtomicUsize {
fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
pub(super) mod block;

mod bounded;
pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender};
pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender, WeakSender};

mod chan;

Expand Down
Loading