Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added Receiver::recv_owned #83

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,19 @@ impl<T> Receiver<T> {
})
}

/// Receives a message from the channel.
///
/// Like [`Receiver::recv`], but uses a cloned `Receiver`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Like [`Receiver::recv`], but uses a cloned `Receiver`.
/// This function is similar to [`Receiver::recv`], but it uses a cloned `Receiver`.

Avoid using sentence fragments in documentation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it could be useful for send.

I'll clean up my doc comments.

///
/// Good for using `Recv` in `Future` impls.
pub fn recv_owned(&self) -> RecvOwned<T> {
RecvOwned::_new(RecvOwnedInner {
receiver: self.clone(),
listener: None,
_pin: PhantomPinned,
})
}

Comment on lines +588 to +595
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an example for this function, preferably one that points out where it could be useful.

/// Receives a message from the channel using the blocking strategy.
///
/// If the channel is empty, this method waits until there is a message.
Expand Down Expand Up @@ -1201,6 +1214,61 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
}
}

easy_wrapper! {
/// A future returned by [`Receiver::recv_owned()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct RecvOwned<T>(RecvOwnedInner<T> => Result<T, RecvError>);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}

pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct RecvOwnedInner<T> {
// Reference to the receiver.
receiver: Receiver<T>,

// Listener waiting on the channel.
listener: Option<EventListener>,

// Keeping this type `!Unpin` enables future optimizations.
#[pin]
_pin: PhantomPinned
}
}

impl<T> EventListenerFuture for RecvOwnedInner<T> {
type Output = Result<T, RecvError>;

/// Run this future with the given `Strategy`.
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
let this = self.project();

loop {
// Attempt to receive a message.
match this.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
if this.listener.is_some() {
// Poll using the given strategy
ready!(S::poll(strategy, &mut *this.listener, cx));
} else {
*this.listener = Some(this.receiver.channel.recv_ops.listen());
}
}
}
}

Comment on lines +1217 to +1271
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than duplicating all of this code from Recv, why not extract it into a common type? For instance, you could do this:

struct RecvInner<T, Channel: Borrow<Receiver<T>> { ... }

...then do this:

pub struct Recv<'a, T>(RecvInner<T, &'a Receiver<T>>);
pub struct RecvOwned<T>(RecvInner<T, Receiver<T>>);

This would prevent duplicating around 250 lines of code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm playing around with this suggestion, but the compiler complains about T being unused.

I'll figure it out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use PhantomData to ensure that T is captured

Copy link
Member

@fogti fogti Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose 01fbd7d (it should be possible to fast-forward to that commit if no further adjustments to it are necessary)

#[cfg(feature = "std")]
use std::process::abort;

Expand Down
25 changes: 25 additions & 0 deletions tests/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,31 @@ fn send() {
.run();
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn send_recv_owned() {
let (s, r) = bounded(1);

Parallel::new()
.add(|| {
future::block_on(s.send(7)).unwrap();
sleep(ms(1000));
future::block_on(s.send(8)).unwrap();
sleep(ms(1000));
future::block_on(s.send(9)).unwrap();
sleep(ms(1000));
future::block_on(s.send(10)).unwrap();
})
.add(|| {
sleep(ms(1500));
assert_eq!(future::block_on(r.recv_owned()), Ok(7));
assert_eq!(future::block_on(r.recv_owned()), Ok(8));
assert_eq!(future::block_on(r.recv_owned()), Ok(9));
assert_eq!(future::block_on(r.recv_owned()), Ok(10));
})
.run();
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn send_after_close() {
Expand Down
25 changes: 25 additions & 0 deletions tests/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,31 @@ fn send() {
assert_eq!(future::block_on(s.send(777)), Err(SendError(777)));
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn send_recv_owned() {
let (s, r) = unbounded();

Parallel::new()
.add(|| {
future::block_on(s.send(7)).unwrap();
sleep(ms(1000));
future::block_on(s.send(8)).unwrap();
sleep(ms(1000));
future::block_on(s.send(9)).unwrap();
sleep(ms(1000));
future::block_on(s.send(10)).unwrap();
})
.add(|| {
sleep(ms(1500));
assert_eq!(future::block_on(r.recv_owned()), Ok(7));
assert_eq!(future::block_on(r.recv_owned()), Ok(8));
assert_eq!(future::block_on(r.recv_owned()), Ok(9));
assert_eq!(future::block_on(r.recv_owned()), Ok(10));
})
.run();
}

#[test]
fn send_after_close() {
let (s, r) = unbounded();
Expand Down
Loading