Skip to content

Commit 5fea266

Browse files
committed
Implement the Send future
1 parent ede84ad commit 5fea266

File tree

2 files changed

+67
-19
lines changed

2 files changed

+67
-19
lines changed

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ categories = ["asynchronous", "concurrency"]
1515
concurrent-queue = "1.2.2"
1616
event-listener = "2.4.0"
1717
futures-core = "0.3.5"
18-
pin-project-lite = "0.2.4"
1918

2019
[dev-dependencies]
2120
blocking = "1"

src/lib.rs

+67-18
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use std::usize;
4141
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
4242
use event_listener::{Event, EventListener};
4343
use futures_core::stream::Stream;
44-
use pin_project_lite::pin_project;
4544

4645
struct Channel<T> {
4746
/// Inner message queue.
@@ -528,7 +527,10 @@ impl<T> Receiver<T> {
528527
/// # });
529528
/// ```
530529
pub fn recv(&self) -> Recv<'_, T> {
531-
Recv::new(self)
530+
Recv {
531+
receiver: self,
532+
listener: None,
533+
}
532534
}
533535

534536
/// Closes the channel.
@@ -906,29 +908,76 @@ impl fmt::Display for TryRecvError {
906908
}
907909
}
908910

909-
pin_project! {
910-
/// A future returned by [`Receiver::recv()`].
911-
#[must_use = "futures do nothing unless .awaited"]
912-
pub struct Recv<'a, T> {
913-
receiver: &'a Receiver<T>,
914-
listener: Option<EventListener>,
915-
}
911+
/// A future returned by [`Sender::send()`].
912+
#[derive(Debug)]
913+
#[must_use = "futures do nothing unless .awaited"]
914+
pub struct Send<'a, T> {
915+
sender: &'a Sender<T>,
916+
listener: Option<EventListener>,
917+
msg: Option<T>,
916918
}
917919

918-
impl<'a, T> Recv<'a, T> {
919-
fn new(receiver: &'a Receiver<T>) -> Self {
920-
Self {
921-
receiver,
922-
listener: None,
920+
impl<'a, T> Unpin for Send<'a, T> {}
921+
922+
impl<'a, T> Future for Send<'a, T> {
923+
type Output = Result<(), SendError<T>>;
924+
925+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
926+
let mut this = Pin::new(self);
927+
928+
loop {
929+
let msg = this.msg.take().unwrap();
930+
// Attempt to send a message.
931+
match this.sender.try_send(msg) {
932+
Ok(()) => {
933+
// If the capacity is larger than 1, notify another blocked send operation.
934+
match this.sender.channel.queue.capacity() {
935+
Some(1) => {}
936+
Some(_) | None => this.sender.channel.send_ops.notify(1),
937+
}
938+
return Poll::Ready(Ok(()));
939+
}
940+
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
941+
Err(TrySendError::Full(m)) => this.msg = Some(m),
942+
}
943+
944+
// Sending failed - now start listening for notifications or wait for one.
945+
match &mut this.listener {
946+
None => {
947+
// Start listening and then try receiving again.
948+
this.listener = Some(this.sender.channel.send_ops.listen());
949+
}
950+
Some(l) => {
951+
// Wait for a notification.
952+
match Pin::new(l).poll(cx) {
953+
Poll::Ready(_) => {
954+
this.listener = None;
955+
continue;
956+
}
957+
958+
Poll::Pending => return Poll::Pending,
959+
}
960+
}
961+
}
923962
}
924963
}
925964
}
926965

966+
/// A future returned by [`Receiver::recv()`].
967+
#[derive(Debug)]
968+
#[must_use = "futures do nothing unless .awaited"]
969+
pub struct Recv<'a, T> {
970+
receiver: &'a Receiver<T>,
971+
listener: Option<EventListener>,
972+
}
973+
974+
impl<'a, T> Unpin for Recv<'a, T> {}
975+
927976
impl<'a, T> Future for Recv<'a, T> {
928977
type Output = Result<T, RecvError>;
929978

930979
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
931-
let mut this = self.project();
980+
let mut this = Pin::new(self);
932981

933982
loop {
934983
// Attempt to receive a message.
@@ -951,13 +1000,13 @@ impl<'a, T> Future for Recv<'a, T> {
9511000
match &mut this.listener {
9521001
None => {
9531002
// Start listening and then try receiving again.
954-
*this.listener = Some(this.receiver.channel.recv_ops.listen());
1003+
this.listener = Some(this.receiver.channel.recv_ops.listen());
9551004
}
9561005
Some(l) => {
9571006
// Wait for a notification.
958-
match dbg!(Pin::new(l).poll(cx)) {
1007+
match Pin::new(l).poll(cx) {
9591008
Poll::Ready(_) => {
960-
*this.listener = None;
1009+
this.listener = None;
9611010
continue;
9621011
}
9631012

0 commit comments

Comments
 (0)