Skip to content

Commit

Permalink
fix(s2n-quic-core): always wake application on available datagram cap…
Browse files Browse the repository at this point in the history
…acity
  • Loading branch information
camshaft committed Jun 13, 2024
1 parent d8ee4e6 commit b047674
Showing 1 changed file with 92 additions and 25 deletions.
117 changes: 92 additions & 25 deletions quic/s2n-quic-core/src/datagram/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ pub struct Sender {
capacity: usize,
min_packet_space: usize,
max_packet_space: usize,
dropped_datagrams: u64,
smoothed_packet_size: f64,
waker: Option<Waker>,
max_datagram_payload: u64,
Expand Down Expand Up @@ -449,43 +450,64 @@ impl Sender {
pub fn smoothed_packet_space(&self) -> usize {
self.smoothed_packet_size as usize
}

/// Returns the number of datagrams that have been dropped by the sender
///
/// The cause of drops is due to the datagrams being larger than the current path MTU. If this
/// number is non-zero, applications should try to send smaller datagrams.
#[inline]
pub fn dropped_datagrams(&self) -> u64 {
self.dropped_datagrams
}
}

impl super::Sender for Sender {
#[inline]
fn on_transmit<P: Packet>(&mut self, packet: &mut P) {
// Cede space to stream data when datagrams are not prioritized
if packet.has_pending_streams() && !packet.datagrams_prioritized() {
return;
}

self.record_capacity_stats(packet.remaining_capacity());

let mut has_written = false;

while packet.remaining_capacity() > 0 {
if let Some(datagram) = self.queue.pop_front() {
// Ensure there is enough space in the packet to send a datagram
if packet.remaining_capacity() >= datagram.data.len() {
match packet.write_datagram(&datagram.data) {
Ok(()) => has_written = true,
Err(_error) => {
continue;
}
}
// Since a datagram was popped off the queue, wake the
// stored waker if we have one to let the application know
// that there is space on the queue for more datagrams.
if let Some(w) = self.waker.take() {
w.wake();
}
} else {
// This check keeps us from popping all the datagrams off the
// queue when packet space remaining is smaller than the datagram.
if has_written {
self.queue.push_front(datagram);
return;
}
let Some(datagram) = self.queue.pop_front() else {
break;
};

// Ensure there is enough space in the packet to send a datagram
if packet.remaining_capacity() < datagram.data.len() {
// This check keeps us from popping all the datagrams off the
// queue when packet space remaining is smaller than the datagram.
if has_written {
self.queue.push_front(datagram);
break;
}

// the datagram is too large for the current packet and unlikely to ever fit so
// record a metric and try the next datagram in the queue
self.dropped_datagrams += 1;
continue;
}

match packet.write_datagram(&datagram.data) {
Ok(()) => has_written = true,
Err(_error) => {
// TODO log this
self.dropped_datagrams += 1;
continue;
}
} else {
// If there are no datagrams on the queue we return
return;
}
}

// If we now have additional capacity wake the stored waker if we have one to
// let the application know that there is space on the queue for more datagrams.
if self.capacity > self.queue.len() {
if let Some(w) = self.waker.take() {
w.wake();
}
}
}
Expand Down Expand Up @@ -540,6 +562,7 @@ impl SenderBuilder {
queue: VecDeque::with_capacity(self.queue_capacity),
capacity: self.queue_capacity,
max_datagram_payload: self.max_datagram_payload,
dropped_datagrams: 0,
max_packet_space: 0,
min_packet_space: 0,
smoothed_packet_size: 0.0,
Expand Down Expand Up @@ -780,6 +803,50 @@ mod tests {
assert!(!default_sender.queue.is_empty());
}

/// Ensures the application waker is called when capacity becomes available
#[test]
fn wake_with_capacity() {
let (waker, wake_count) = new_count_waker();
let mut cx = Context::from_waker(&waker);

let conn_info = ConnectionInfo::new(100, waker.clone());

let mut default_sender = Sender::builder()
.with_capacity(1)
.with_connection_info(&conn_info)
.build()
.unwrap();

let datagram = bytes::Bytes::from_static(&[1, 2, 3]);

assert!(default_sender
.poll_send_datagram(&mut datagram.clone(), &mut cx)
.is_ready());
assert!(default_sender
.poll_send_datagram(&mut datagram.clone(), &mut cx)
.is_pending());

assert_eq!(wake_count.get(), 0);

// Packet size is just enough to write the first datagram with some
// room left over, but not enough to write the second.
let mut packet = MockPacket {
remaining_capacity: 2,
has_pending_streams: false,
datagrams_prioritized: false,
};
crate::datagram::Sender::on_transmit(&mut default_sender, &mut packet);

// Packet capacity has not changed
assert_eq!(packet.remaining_capacity, 2);
// Send queue is completely depleted
assert!(default_sender.queue.is_empty());
// The waker was called since we now have capacity
assert_eq!(wake_count.get(), 1);
// The sender should record the number of dropped datagrams
assert_eq!(default_sender.dropped_datagrams(), 1);
}

fn fake_receive_context() -> crate::datagram::ReceiveContext<'static> {
crate::datagram::ReceiveContext {
path: crate::event::api::Path {
Expand Down

0 comments on commit b047674

Please sign in to comment.