Skip to content

Commit

Permalink
docs
Browse files Browse the repository at this point in the history
  • Loading branch information
b-naber committed Jul 22, 2022
1 parent 631fc03 commit b6e6f67
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 26 deletions.
41 changes: 16 additions & 25 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,34 @@ pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}

/// A Sender that does not influence RAII semantics, i.e. if all [`Sender`]
/// instances of a channel were dropped and only `WeakSender` instances remain,
/// the channel is closed.
/// A sender that does not prevent the channel from being closed.
///
/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
/// instances remain, the channel is closed.
///
/// In order to send messages, the `WeakSender` needs to be upgraded using
/// [`WeakSender::upgrade`], which returns `Option<Sender>`, `None` if all
/// `Sender`s were already dropped, otherwise `Some` (at which point it does
/// influence RAII semantics again).
/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
///
/// [`Sender`]: Sender
/// [`WeakSender::upgrade`]: WeakSender::upgrade
///
/// #Examples
///
/// ```rust
/// use tokio;
/// ```
/// use tokio::sync::mpsc::channel;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = channel(15);
/// let _ = tx.send(1).await;
/// let tx_weak = tx.downgrade();
///
/// let _ = tokio::spawn(async move {
/// for i in 0..2 {
/// if i == 0 {
/// assert_eq!(rx.recv().await.unwrap(), 1);
/// } else if i == 1 {
/// // only WeakSender instance remains -> channel is dropped
/// assert!(rx.recv().await.is_none());
/// }
/// }
/// })
/// .await;
///
/// assert!(tx_weak.upgrade().is_none());
/// let (tx, rx) = channel(15);
/// let tx_weak = tx.clone().downgrade();
///
/// // Upgrading will succeed because `tx` still exists.
/// assert!(tx_weak.clone().upgrade().is_some());
///
/// // If we drop `tx`, then it will fail.
/// drop(tx);
/// assert!(tx_weak.clone().upgrade().is_none());
/// }
///
/// ```
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl<T, S> Tx<T, S> {
self
}

// Returns a boolean that indicates whether the channel is closed.
// Returns the upgraded channel or None if the upgrade failed.
pub(super) fn upgrade(self) -> Option<Self> {
let mut tx_count = self.inner.tx_count.load(Acquire);

Expand Down

0 comments on commit b6e6f67

Please sign in to comment.