diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index d282bb983..347f8ff02 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -406,6 +406,22 @@ impl Connection { } } + /// Transmit `data` as an unreliable, unordered application datagram + /// + /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion + /// conditions, which effectively prioritizes old datagrams over new datagrams. + /// + /// See [`send_datagram()`] for details. + /// + /// [`send_datagram()`]: Connection::send_datagram + pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> { + SendDatagram { + conn: &self.0, + data: Some(data), + notify: self.0.shared.datagrams_unblocked.notified(), + } + } + /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`]. /// /// Returns `None` if datagrams are unsupported by the peer or disabled locally. @@ -753,6 +769,55 @@ impl Future for ReadDatagram<'_> { } } +pin_project! { + /// Future produced by [`Connection::send_datagram_wait`] + pub struct SendDatagram<'a> { + conn: &'a ConnectionRef, + data: Option, + #[pin] + notify: Notified<'a>, + } +} + +impl Future for SendDatagram<'_> { + type Output = Result<(), SendDatagramError>; + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let mut state = this.conn.state.lock("SendDatagram::poll"); + if let Some(ref e) = state.error { + return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone()))); + } + use proto::SendDatagramError::*; + match state + .inner + .datagrams() + .send(this.data.take().unwrap(), false) + { + Ok(()) => { + state.wake(); + Poll::Ready(Ok(())) + } + Err(e) => Poll::Ready(Err(match e { + Blocked(data) => { + this.data.replace(data); + loop { + match this.notify.as_mut().poll(ctx) { + Poll::Pending => return Poll::Pending, + // Spurious wakeup, get a new future + Poll::Ready(()) => this + .notify + .set(this.conn.shared.datagrams_unblocked.notified()), + } + } + } + UnsupportedByPeer => SendDatagramError::UnsupportedByPeer, + Disabled => SendDatagramError::Disabled, + TooLarge => SendDatagramError::TooLarge, + })), + } + } +} + #[derive(Debug)] pub(crate) struct ConnectionRef(Arc); @@ -842,6 +907,7 @@ pub(crate) struct Shared { /// Notified when the peer has initiated a new stream stream_incoming: [Notify; 2], datagram_received: Notify, + datagrams_unblocked: Notify, closed: Notify, } @@ -974,7 +1040,9 @@ impl State { DatagramReceived => { shared.datagram_received.notify_waiters(); } - DatagramsUnblocked => {} + DatagramsUnblocked => { + shared.datagrams_unblocked.notify_waiters(); + } Stream(StreamEvent::Readable { id }) => { if let Some(reader) = self.blocked_readers.remove(&id) { reader.wake(); @@ -1082,6 +1150,7 @@ impl State { shared.stream_incoming[Dir::Uni as usize].notify_waiters(); shared.stream_incoming[Dir::Bi as usize].notify_waiters(); shared.datagram_received.notify_waiters(); + shared.datagrams_unblocked.notify_waiters(); for (_, x) in self.finishing.drain() { let _ = x.send(Some(WriteError::ConnectionLost(reason.clone()))); } diff --git a/quinn/src/tests.rs b/quinn/src/tests.rs index afe4b2039..5f3ada9d4 100755 --- a/quinn/src/tests.rs +++ b/quinn/src/tests.rs @@ -775,7 +775,7 @@ async fn two_datagram_readers() { async { server.send_datagram(b"one"[..].into()).unwrap(); done.notified().await; - server.send_datagram(b"two"[..].into()).unwrap(); + server.send_datagram_wait(b"two"[..].into()).await.unwrap(); } ); assert!(*a == *b"one" || *b == *b"one");