From ef5ee865bfee5f6b43eeda666206dc0b40dc9310 Mon Sep 17 00:00:00 2001 From: Curtis Maves Date: Wed, 28 Oct 2020 21:20:39 -0400 Subject: [PATCH 1/3] Return SendError from Sender::send on disconnect. --- src/sync/channel.rs | 16 +++++++++++----- src/sync/mod.rs | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index 928cfc5de..81791ff9c 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -135,7 +135,7 @@ impl Sender { /// # Ok(()) /// # }) } /// ``` - pub async fn send(&self, msg: T) { + pub async fn send(&self, msg: T) -> Result<(), SendError> { struct SendFuture<'a, T> { channel: &'a Channel, msg: Option, @@ -145,7 +145,7 @@ impl Sender { impl Unpin for SendFuture<'_, T> {} impl Future for SendFuture<'_, T> { - type Output = (); + type Output = Result<(), SendError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { @@ -158,10 +158,9 @@ impl Sender { // Try sending the message. match self.channel.try_send(msg) { - Ok(()) => return Poll::Ready(()), + Ok(()) => return Poll::Ready(Ok(())), Err(TrySendError::Disconnected(msg)) => { - self.msg = Some(msg); - return Poll::Pending; + return Poll::Ready(Err(SendError(msg))); } Err(TrySendError::Full(msg)) => { self.msg = Some(msg); @@ -1001,6 +1000,13 @@ pub enum TrySendError { Disconnected(T), } +/// An error returned from the `send` method when it is disconnected. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(PartialEq, Eq, Debug)] +pub struct SendError(pub T); + + impl Error for TrySendError {} impl Debug for TrySendError { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8b7fe3102..48bf1da3a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -185,7 +185,7 @@ mod rwlock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; - pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError}; + pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, SendError, TrySendError}; pub use condvar::Condvar; mod barrier; From 06c1e9846b1639204dcb86b6baf4c18573224c4d Mon Sep 17 00:00:00 2001 From: Curtis Maves Date: Wed, 28 Oct 2020 21:51:20 -0400 Subject: [PATCH 2/3] Add unwraps to Results return from calls. --- tests/channel.rs | 46 +++++++++++++++++++++++----------------------- tests/stream.rs | 4 ++-- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/tests/channel.rs b/tests/channel.rs index a218ea2ae..3af8798cf 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -26,10 +26,10 @@ fn smoke() { task::block_on(async { let (s, r) = channel(1); - s.send(7).await; + s.send(7).await.unwrap(); assert_eq!(r.recv().await.unwrap(), 7); - s.send(8).await; + s.send(8).await.unwrap(); assert_eq!(r.recv().await.unwrap(), 8); drop(s); @@ -39,7 +39,7 @@ fn smoke() { task::block_on(async { let (s, r) = channel(10); drop(r); - s.send(1).await; + s.send(1).await.unwrap(); }); } @@ -67,7 +67,7 @@ fn len_empty_full() { assert_eq!(r.is_empty(), true); assert_eq!(r.is_full(), false); - s.send(()).await; + s.send(()).await.unwrap(); assert_eq!(s.len(), 1); assert_eq!(s.is_empty(), false); @@ -76,7 +76,7 @@ fn len_empty_full() { assert_eq!(r.is_empty(), false); assert_eq!(r.is_full(), false); - s.send(()).await; + s.send(()).await.unwrap(); assert_eq!(s.len(), 2); assert_eq!(s.is_empty(), false); @@ -112,9 +112,9 @@ fn recv() { }); task::sleep(ms(1500)).await; - s.send(7).await; - s.send(8).await; - s.send(9).await; + s.send(7).await.unwrap(); + s.send(8).await.unwrap(); + s.send(9).await.unwrap(); }) } @@ -125,13 +125,13 @@ fn send() { let (s, r) = channel(1); spawn(async move { - s.send(7).await; + s.send(7).await.unwrap(); task::sleep(ms(1000)).await; - s.send(8).await; + s.send(8).await.unwrap(); task::sleep(ms(1000)).await; - s.send(9).await; + s.send(9).await.unwrap(); task::sleep(ms(1000)).await; - s.send(10).await; + s.send(10).await.unwrap(); }); task::sleep(ms(1500)).await; @@ -147,9 +147,9 @@ fn recv_after_disconnect() { task::block_on(async { let (s, r) = channel(100); - s.send(1).await; - s.send(2).await; - s.send(3).await; + s.send(1).await.unwrap(); + s.send(2).await.unwrap(); + s.send(3).await.unwrap(); drop(s); @@ -174,7 +174,7 @@ fn len() { for _ in 0..CAP / 10 { for i in 0..50 { - s.send(i).await; + s.send(i).await.unwrap(); assert_eq!(s.len(), i + 1); } @@ -188,7 +188,7 @@ fn len() { assert_eq!(r.len(), 0); for i in 0..CAP { - s.send(i).await; + s.send(i).await.unwrap(); assert_eq!(s.len(), i + 1); } @@ -211,7 +211,7 @@ fn len() { }); for i in 0..COUNT { - s.send(i).await; + s.send(i).await.unwrap(); let len = s.len(); assert!(len <= CAP); } @@ -256,7 +256,7 @@ fn spsc() { }); for i in 0..COUNT { - s.send(i).await; + s.send(i).await.unwrap(); } drop(s); @@ -292,7 +292,7 @@ fn mpmc() { let s = s.clone(); tasks.push(spawn(async move { for i in 0..COUNT { - s.send(i).await; + s.send(i).await.unwrap(); } })); } @@ -320,7 +320,7 @@ fn oneshot() { let c2 = spawn(async move { s.send(0).await }); c1.await; - c2.await; + c2.await.unwrap(); } }) } @@ -360,13 +360,13 @@ fn drops() { }); for _ in 0..steps { - s.send(DropCounter).await; + s.send(DropCounter).await.unwrap(); } child.await; for _ in 0..additional { - s.send(DropCounter).await; + s.send(DropCounter).await.unwrap(); } assert_eq!(DROPS.load(Ordering::SeqCst), steps); diff --git a/tests/stream.rs b/tests/stream.rs index 3a192339f..ecbf9c7a4 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -36,7 +36,7 @@ fn merging_delayed_streams_work() { task::block_on(async move { task::sleep(std::time::Duration::from_millis(500)).await; - sender.send(92).await; + sender.send(92).await.unwrap(); drop(sender); let xs = t.await; assert_eq!(xs, vec![92]) @@ -55,7 +55,7 @@ fn merging_delayed_streams_work() { task::block_on(async move { task::sleep(std::time::Duration::from_millis(500)).await; - sender.send(92).await; + sender.send(92).await.unwrap(); drop(sender); let xs = t.await; assert_eq!(xs, vec![92]) From 4e7be6fda9e670f7e38a4a057cb88181d8c4ed60 Mon Sep 17 00:00:00 2001 From: Curtis Maves Date: Wed, 28 Oct 2020 22:07:51 -0400 Subject: [PATCH 3/3] Fix documentation warnings in channel.rs --- src/sync/channel.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index 81791ff9c..b5bc59d3e 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -43,12 +43,12 @@ use crate::sync::WakerSet; /// let (s, r) = channel(1); /// /// // This call returns immediately because there is enough space in the channel. -/// s.send(1usize).await; +/// s.send(1usize).await.unwrap(); /// /// task::spawn(async move { /// // This call will have to wait because the channel is full. /// // It will be able to complete only after the first message is received. -/// s.send(2).await; +/// s.send(2).await.unwrap(); /// }); /// /// task::sleep(Duration::from_secs(1)).await; @@ -124,8 +124,8 @@ impl Sender { /// let (s, r) = channel(1); /// /// task::spawn(async move { - /// s.send(1).await; - /// s.send(2).await; + /// s.send(1).await.unwrap(); + /// s.send(2).await.unwrap(); /// }); /// /// assert_eq!(r.recv().await?, 1); @@ -243,7 +243,7 @@ impl Sender { /// let (s, r) = channel(1); /// /// assert!(s.is_empty()); - /// s.send(0).await; + /// s.send(0).await.unwrap(); /// assert!(!s.is_empty()); /// # /// # }) @@ -264,7 +264,7 @@ impl Sender { /// let (s, r) = channel(1); /// /// assert!(!s.is_full()); - /// s.send(0).await; + /// s.send(0).await.unwrap(); /// assert!(s.is_full()); /// # /// # }) @@ -285,8 +285,8 @@ impl Sender { /// let (s, r) = channel(2); /// assert_eq!(s.len(), 0); /// - /// s.send(1).await; - /// s.send(2).await; + /// s.send(1).await.unwrap(); + /// s.send(2).await.unwrap(); /// assert_eq!(s.len(), 2); /// # /// # }) @@ -349,9 +349,9 @@ impl fmt::Debug for Sender { /// let (s, r) = channel(100); /// /// task::spawn(async move { -/// s.send(1usize).await; +/// s.send(1usize).await.unwrap(); /// task::sleep(Duration::from_secs(1)).await; -/// s.send(2).await; +/// s.send(2).await.unwrap(); /// }); /// /// assert_eq!(r.recv().await?, 1); // Received immediately. @@ -389,8 +389,8 @@ impl Receiver { /// let (s, r) = channel(1); /// /// task::spawn(async move { - /// s.send(1usize).await; - /// s.send(2).await; + /// s.send(1usize).await.unwrap(); + /// s.send(2).await.unwrap(); /// // Then we drop the sender /// }); /// @@ -449,7 +449,7 @@ impl Receiver { /// /// let (s, r) = channel(1); /// - /// s.send(1u8).await; + /// s.send(1u8).await.unwrap(); /// /// assert!(r.try_recv().is_ok()); /// assert!(r.try_recv().is_err()); @@ -486,7 +486,7 @@ impl Receiver { /// let (s, r) = channel(1); /// /// assert!(r.is_empty()); - /// s.send(0).await; + /// s.send(0).await.unwrap(); /// assert!(!r.is_empty()); /// # /// # }) @@ -507,7 +507,7 @@ impl Receiver { /// let (s, r) = channel(1); /// /// assert!(!r.is_full()); - /// s.send(0).await; + /// s.send(0).await.unwrap(); /// assert!(r.is_full()); /// # /// # }) @@ -528,8 +528,8 @@ impl Receiver { /// let (s, r) = channel(2); /// assert_eq!(r.len(), 0); /// - /// s.send(1).await; - /// s.send(2).await; + /// s.send(1).await.unwrap(); + /// s.send(2).await.unwrap(); /// assert_eq!(r.len(), 2); /// # /// # })