Skip to content

Commit

Permalink
add send_datagram_wait
Browse files Browse the repository at this point in the history
  • Loading branch information
devsnek committed Apr 13, 2024
1 parent 6b3f568 commit b3f6cc8
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
71 changes: 70 additions & 1 deletion quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,22 @@ impl Connection {
}
}

/// Transmit `data` as an unreliable, unordered application datagram
///
/// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
/// conditions, which effectively prioritizes old datagrams over new datagrams.
///
/// See [`send_datagram()`] for details.
///
/// [`send_datagram()`]: Connection::send_datagram
pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
SendDatagram {
conn: &self.0,
data: Some(data),
notify: self.0.shared.datagrams_unblocked.notified(),
}
}

/// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
///
/// Returns `None` if datagrams are unsupported by the peer or disabled locally.
Expand Down Expand Up @@ -753,6 +769,55 @@ impl Future for ReadDatagram<'_> {
}
}

pin_project! {
/// Future produced by [`Connection::send_datagram_wait`]
pub struct SendDatagram<'a> {
conn: &'a ConnectionRef,
data: Option<Bytes>,
#[pin]
notify: Notified<'a>,
}
}

impl Future for SendDatagram<'_> {
type Output = Result<(), SendDatagramError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut state = this.conn.state.lock("SendDatagram::poll");
if let Some(ref e) = state.error {
return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
}
use proto::SendDatagramError::*;
match state
.inner
.datagrams()
.send(this.data.take().unwrap(), false)
{
Ok(()) => {
state.wake();
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(match e {
Blocked(data) => {
this.data.replace(data);
loop {
match this.notify.as_mut().poll(ctx) {
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagrams_unblocked.notified()),
}
}
}
UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
Disabled => SendDatagramError::Disabled,
TooLarge => SendDatagramError::TooLarge,
})),
}
}
}

#[derive(Debug)]
pub(crate) struct ConnectionRef(Arc<ConnectionInner>);

Expand Down Expand Up @@ -842,6 +907,7 @@ pub(crate) struct Shared {
/// Notified when the peer has initiated a new stream
stream_incoming: [Notify; 2],
datagram_received: Notify,
datagrams_unblocked: Notify,
closed: Notify,
}

Expand Down Expand Up @@ -974,7 +1040,9 @@ impl State {
DatagramReceived => {
shared.datagram_received.notify_waiters();
}
DatagramsUnblocked => {}
DatagramsUnblocked => {
shared.datagrams_unblocked.notify_waiters();
}
Stream(StreamEvent::Readable { id }) => {
if let Some(reader) = self.blocked_readers.remove(&id) {
reader.wake();
Expand Down Expand Up @@ -1082,6 +1150,7 @@ impl State {
shared.stream_incoming[Dir::Uni as usize].notify_waiters();
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
shared.datagram_received.notify_waiters();
shared.datagrams_unblocked.notify_waiters();
for (_, x) in self.finishing.drain() {
let _ = x.send(Some(WriteError::ConnectionLost(reason.clone())));
}
Expand Down
2 changes: 1 addition & 1 deletion quinn/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ async fn two_datagram_readers() {
async {
server.send_datagram(b"one"[..].into()).unwrap();
done.notified().await;
server.send_datagram(b"two"[..].into()).unwrap();
server.send_datagram_wait(b"two"[..].into()).await.unwrap();
}
);
assert!(*a == *b"one" || *b == *b"one");
Expand Down

0 comments on commit b3f6cc8

Please sign in to comment.