From 3b9fc98c72fc97afac9a250724c47eea12fe8e3a Mon Sep 17 00:00:00 2001 From: Matt Andrews Date: Fri, 22 Mar 2024 13:20:10 +1100 Subject: [PATCH 1/2] added `Receiver::recv_owned` Added `Receiver::recv_owned` as well as supporting types `RecvOwned` and `RecvOwnedInner`. --- src/lib.rs | 68 ++++++++++++++++++++++++++++++++++++++++++++++ tests/bounded.rs | 25 +++++++++++++++++ tests/unbounded.rs | 25 +++++++++++++++++ 3 files changed, 118 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 9c9e3b9..1ea2a05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -580,6 +580,19 @@ impl Receiver { }) } + /// Receives a message from the channel. + /// + /// Like [`Receiver::recv`], but uses a cloned `Receiver`. + /// + /// Good for using `Recv` in `Future` impls. + pub fn recv_owned(&self) -> RecvOwned { + RecvOwned::_new(RecvOwnedInner { + receiver: self.clone(), + listener: None, + _pin: PhantomPinned, + }) + } + /// Receives a message from the channel using the blocking strategy. /// /// If the channel is empty, this method waits until there is a message. @@ -1201,6 +1214,61 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> { } } +easy_wrapper! { + /// A future returned by [`Receiver::recv_owned()`]. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct RecvOwned(RecvOwnedInner => Result); + #[cfg(all(feature = "std", not(target_family = "wasm")))] + pub(crate) wait(); +} + +pin_project! { + #[derive(Debug)] + #[project(!Unpin)] + struct RecvOwnedInner { + // Reference to the receiver. + receiver: Receiver, + + // Listener waiting on the channel. + listener: Option, + + // Keeping this type `!Unpin` enables future optimizations. + #[pin] + _pin: PhantomPinned + } +} + +impl<'a, T> EventListenerFuture for RecvOwnedInner { + type Output = Result; + + /// Run this future with the given `Strategy`. + fn poll_with_strategy<'x, S: Strategy<'x>>( + self: Pin<&mut Self>, + strategy: &mut S, + cx: &mut S::Context, + ) -> Poll> { + let this = self.project(); + + loop { + // Attempt to receive a message. + match this.receiver.try_recv() { + Ok(msg) => return Poll::Ready(Ok(msg)), + Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), + Err(TryRecvError::Empty) => {} + } + + // Receiving failed - now start listening for notifications or wait for one. + if this.listener.is_some() { + // Poll using the given strategy + ready!(S::poll(strategy, &mut *this.listener, cx)); + } else { + *this.listener = Some(this.receiver.channel.recv_ops.listen()); + } + } + } +} + #[cfg(feature = "std")] use std::process::abort; diff --git a/tests/bounded.rs b/tests/bounded.rs index 4b691f7..16eb339 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -184,6 +184,31 @@ fn send() { .run(); } +#[cfg(not(target_family = "wasm"))] +#[test] +fn send_recv_owned() { + let (s, r) = bounded(1); + + Parallel::new() + .add(|| { + future::block_on(s.send(7)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(8)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(9)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(10)).unwrap(); + }) + .add(|| { + sleep(ms(1500)); + assert_eq!(future::block_on(r.recv_owned()), Ok(7)); + assert_eq!(future::block_on(r.recv_owned()), Ok(8)); + assert_eq!(future::block_on(r.recv_owned()), Ok(9)); + assert_eq!(future::block_on(r.recv_owned()), Ok(10)); + }) + .run(); +} + #[cfg(not(target_family = "wasm"))] #[test] fn send_after_close() { diff --git a/tests/unbounded.rs b/tests/unbounded.rs index 90cb375..7cb03eb 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -147,6 +147,31 @@ fn send() { assert_eq!(future::block_on(s.send(777)), Err(SendError(777))); } +#[cfg(not(target_family = "wasm"))] +#[test] +fn send_recv_owned() { + let (s, r) = unbounded(); + + Parallel::new() + .add(|| { + future::block_on(s.send(7)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(8)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(9)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(10)).unwrap(); + }) + .add(|| { + sleep(ms(1500)); + assert_eq!(future::block_on(r.recv_owned()), Ok(7)); + assert_eq!(future::block_on(r.recv_owned()), Ok(8)); + assert_eq!(future::block_on(r.recv_owned()), Ok(9)); + assert_eq!(future::block_on(r.recv_owned()), Ok(10)); + }) + .run(); +} + #[test] fn send_after_close() { let (s, r) = unbounded(); From 9a522014dd1aac874c28d54304b87fcd3ab27d8e Mon Sep 17 00:00:00 2001 From: Matt Andrews Date: Fri, 22 Mar 2024 13:55:17 +1100 Subject: [PATCH 2/2] fix clippy lint --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 1ea2a05..8f4dd99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1239,7 +1239,7 @@ pin_project! { } } -impl<'a, T> EventListenerFuture for RecvOwnedInner { +impl EventListenerFuture for RecvOwnedInner { type Output = Result; /// Run this future with the given `Strategy`.