Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Weak version of mpsc::Sender #4595

Merged
merged 13 commits into from
Jul 27, 2022
68 changes: 68 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::loom::sync::Arc;
use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
Expand All @@ -22,6 +23,40 @@ pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}

/// A sender that does not prevent the channel from being closed.
///
/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
/// instances remain, the channel is closed.
///
/// In order to send messages, the `WeakSender` needs to be upgraded using
/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
///
/// [`Sender`]: Sender
/// [`WeakSender::upgrade`]: WeakSender::upgrade
///
/// #Examples
///
/// ```
/// use tokio::sync::mpsc::channel;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, _rx) = channel::<i32>(15);
/// let tx_weak = tx.downgrade();
///
/// // Upgrading will succeed because `tx` still exists.
/// assert!(tx_weak.upgrade().is_some());
///
/// // If we drop `tx`, then it will fail.
/// drop(tx);
/// assert!(tx_weak.clone().upgrade().is_none());
/// }
/// ```
pub struct WeakSender<T> {
chan: Arc<chan::Chan<T, Semaphore>>,
}

/// Permits to send one value into the channel.
///
/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
Expand Down Expand Up @@ -991,6 +1026,16 @@ impl<T> Sender<T> {
pub fn capacity(&self) -> usize {
self.chan.semaphore().0.available_permits()
}

/// Converts the `Sender` to a [`WeakSender`] that does not count
/// towards RAII semantics, i.e. if all `Sender` instances of the
/// channel were dropped and only `WeakSender` instances remain,
/// the channel is closed.
pub fn downgrade(&self) -> WeakSender<T> {
WeakSender {
chan: self.chan.downgrade(),
}
}
}

impl<T> Clone for Sender<T> {
Expand All @@ -1009,6 +1054,29 @@ impl<T> fmt::Debug for Sender<T> {
}
}

impl<T> Clone for WeakSender<T> {
fn clone(&self) -> Self {
WeakSender {
chan: self.chan.clone(),
}
}
}
Darksonn marked this conversation as resolved.
Show resolved Hide resolved

impl<T> WeakSender<T> {
/// Tries to convert a WeakSender into a [`Sender`]. This will return `Some`
/// if there are other `Sender` instances alive and the channel wasn't
/// previously dropped, otherwise `None` is returned.
pub fn upgrade(&self) -> Option<Sender<T>> {
chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
}
}

impl<T> fmt::Debug for WeakSender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("WeakSender").finish()
}
}

// ===== impl Permit =====

impl<T> Permit<'_, T> {
Expand Down
32 changes: 27 additions & 5 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use crate::sync::notify::Notify;

use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;

/// Channel sender.
pub(crate) struct Tx<T, S> {
Expand Down Expand Up @@ -46,7 +47,7 @@ pub(crate) trait Semaphore {
fn is_closed(&self) -> bool;
}

struct Chan<T, S> {
pub(super) struct Chan<T, S> {
/// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,

Expand Down Expand Up @@ -129,6 +130,30 @@ impl<T, S> Tx<T, S> {
Tx { inner: chan }
}

pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> {
self.inner.clone()
}

// Returns the upgraded channel or None if the upgrade failed.
pub(super) fn upgrade(chan: Arc<Chan<T, S>>) -> Option<Self> {
let mut tx_count = chan.tx_count.load(Acquire);

loop {
if tx_count == 0 {
// channel is closed
return None;
}

match chan
.tx_count
.compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
{
Ok(_) => return Some(Tx { inner: chan }),
Err(prev_count) => tx_count = prev_count,
}
}
}

pub(super) fn semaphore(&self) -> &S {
&self.inner.semaphore
}
Expand Down Expand Up @@ -378,9 +403,6 @@ impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {

// ===== impl Semaphore for AtomicUsize =====

use std::sync::atomic::Ordering::{Acquire, Release};
use std::usize;

impl Semaphore for AtomicUsize {
fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
pub(super) mod block;

mod bounded;
pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender};
pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender, WeakSender};

mod chan;

Expand Down
Loading