Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
b-naber committed Jul 27, 2022
1 parent fa8bdf9 commit 59a6dfe
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 37 deletions.
4 changes: 0 additions & 4 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub struct Sender<T> {
/// drop(tx);
/// assert!(tx_weak.clone().upgrade().is_none());
/// }
///
/// ```
pub struct WeakSender<T> {
chan: Arc<chan::Chan<T, Semaphore>>,
Expand Down Expand Up @@ -1033,9 +1032,6 @@ impl<T> Sender<T> {
/// channel were dropped and only `WeakSender` instances remain,
/// the channel is closed.
pub fn downgrade(&self) -> WeakSender<T> {
// Note: If this is the last `Sender` instance we want to close the
// channel when downgrading, so it's important to move into `self` here.

WeakSender {
chan: self.chan.downgrade(),
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(crate) trait Semaphore {
fn is_closed(&self) -> bool;
}

pub(crate) struct Chan<T, S> {
pub(super) struct Chan<T, S> {
/// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,

Expand Down
52 changes: 20 additions & 32 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
#[cfg(not(all(target_arch = "wasm32", not(target_os = "wasi"))))]
use tokio::test as maybe_tokio_test;

use tokio::join;
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio::sync::mpsc::{self, channel};
use tokio::sync::oneshot;
use tokio::time;
use tokio_test::*;

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, Release};
use std::sync::Arc;
use std::time::Duration;

#[cfg(not(target_arch = "wasm32"))]
mod support {
Expand Down Expand Up @@ -838,47 +835,36 @@ impl Drop for Msg {
// `Sender` was dropped while more than one `WeakSender` remains, we want to
// ensure that no messages are kept in the channel, which were sent after
// the receiver was dropped.
#[tokio::test(start_paused = true)]
#[tokio::test]
async fn test_msgs_dropped_on_rx_drop() {
fn ms(millis: u64) -> Duration {
Duration::from_millis(millis)
}

let (tx, mut rx) = mpsc::channel(3);

let rx_handle = tokio::spawn(async move {
let _ = rx.recv().await.unwrap();
let _ = rx.recv().await.unwrap();
time::sleep(ms(1)).await;
drop(rx);
let _ = tx.send(Msg {}).await.unwrap();
let _ = tx.send(Msg {}).await.unwrap();

time::advance(ms(1)).await;
});
// This msg will be pending and should be dropped when `rx` is dropped
let sent_fut = tx.send(Msg {});

let tx_handle = tokio::spawn(async move {
let _ = tx.send(Msg {}).await.unwrap();
let _ = tx.send(Msg {}).await.unwrap();
let _ = rx.recv().await.unwrap();
let _ = rx.recv().await.unwrap();

// This msg will be pending and should be dropped when `rx` is dropped
let _ = tx.send(Msg {}).await.unwrap();
time::advance(ms(1)).await;
let _ = sent_fut.await.unwrap();

// This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
time::sleep(ms(1)).await;
assert!(tx.send(Msg {}).await.is_err());
drop(rx);

// Ensure that third message isn't put onto the channel anymore
assert_eq!(NUM_DROPPED.load(Acquire), 4);
});
assert_eq!(NUM_DROPPED.load(Acquire), 3);

let (_, _) = join!(rx_handle, tx_handle);
// This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
assert!(tx.send(Msg {}).await.is_err());

assert_eq!(NUM_DROPPED.load(Acquire), 4);
}

// Tests that a `WeakSender` is upgradeable when other `Sender`s exist.
#[tokio::test]
async fn downgrade_upgrade_sender_success() {
let (tx, _rx) = mpsc::channel::<i32>(1);
let weak_tx = tx.clone().downgrade();
let weak_tx = tx.downgrade();
assert!(weak_tx.upgrade().is_some());
}

Expand All @@ -897,6 +883,7 @@ async fn downgrade_upgrade_sender_failure() {
async fn downgrade_drop_upgrade() {
let (tx, _rx) = mpsc::channel::<i32>(1);

// the cloned `Tx` is dropped right away
let weak_tx = tx.clone().downgrade();
drop(tx);
assert!(weak_tx.upgrade().is_none());
Expand All @@ -907,7 +894,7 @@ async fn downgrade_drop_upgrade() {
#[tokio::test]
async fn downgrade_get_permit_upgrade_no_senders() {
let (tx, _rx) = mpsc::channel::<i32>(1);
let weak_tx = tx.clone().downgrade();
let weak_tx = tx.downgrade();
let _permit = tx.reserve_owned().await.unwrap();
assert!(weak_tx.upgrade().is_some());
}
Expand All @@ -920,12 +907,13 @@ async fn downgrade_upgrade_get_permit_no_senders() {
let tx2 = tx.clone();
let _permit = tx.reserve_owned().await.unwrap();
let weak_tx = tx2.downgrade();
drop(tx2);
assert!(weak_tx.upgrade().is_some());
}

// Tests that `Clone` of `WeakSender` doesn't decrement `tx_count`.
// Tests that `downgrade` does not change the `tx_count` of the channel.
#[tokio::test]
async fn test_weak_sender_clone() {
async fn test_tx_count_weak_sender() {
let (tx, _rx) = mpsc::channel::<i32>(1);
let tx_weak = tx.downgrade();
let tx_weak2 = tx.downgrade();
Expand Down

0 comments on commit 59a6dfe

Please sign in to comment.