diff --git a/quic/s2n-quic-transport/src/connection/connection_impl.rs b/quic/s2n-quic-transport/src/connection/connection_impl.rs index dc8db7f922..87ac407017 100644 --- a/quic/s2n-quic-transport/src/connection/connection_impl.rs +++ b/quic/s2n-quic-transport/src/connection/connection_impl.rs @@ -936,6 +936,13 @@ impl connection::Trait for ConnectionImpl { packet_interceptor, ); + // If anything was transmitted, notify the space manager + // that a burst of packets has completed transmission + if count > 0 { + self.space_manager + .on_transmit_burst_complete(self.path_manager.active_path(), timestamp); + } + let mut publisher = self.event_context.publisher(timestamp, subscriber); if outcome.bytes_progressed > 0 { publisher.on_tx_stream_progress(TxStreamProgress { diff --git a/quic/s2n-quic-transport/src/recovery/manager.rs b/quic/s2n-quic-transport/src/recovery/manager.rs index 647b6e7c0d..f5cf0bd9be 100644 --- a/quic/s2n-quic-transport/src/recovery/manager.rs +++ b/quic/s2n-quic-transport/src/recovery/manager.rs @@ -60,6 +60,11 @@ pub struct Manager { // The total ecn counts for outstanding (unacknowledged) packets sent_packet_ecn_counts: EcnCounts, + + // An update to the PTO timer is needed. + // + // Used for updating the PTO timer at the end of a transmission burst. + pto_update_pending: bool, } //= https://www.rfc-editor.org/rfc/rfc9002#section-6.1.1 @@ -124,6 +129,7 @@ impl Manager { time_of_last_ack_eliciting_packet: None, baseline_ecn_counts: EcnCounts::default(), sent_packet_ecn_counts: EcnCounts::default(), + pto_update_pending: false, } } @@ -161,6 +167,8 @@ impl Manager { context: &mut Ctx, publisher: &mut Pub, ) { + debug_assert!(!self.pto_update_pending); + if self.loss_timer.is_armed() { if self.loss_timer.poll_expiration(timestamp).is_ready() { self.detect_and_remove_lost_packets( @@ -251,16 +259,27 @@ impl Manager { .on_packet_sent(ecn, path_event!(path, path_id), publisher); self.sent_packet_ecn_counts.increment(ecn); - if outcome.is_congestion_controlled { - if outcome.ack_elicitation.is_ack_eliciting() { - self.time_of_last_ack_eliciting_packet = Some(time_sent); - } + if outcome.ack_elicitation.is_ack_eliciting() { + self.time_of_last_ack_eliciting_packet = Some(time_sent); //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1 //# A sender SHOULD restart its PTO timer every time an ack-eliciting //# packet is sent or acknowledged, - let is_handshake_confirmed = context.is_handshake_confirmed(); - let path = context.path_mut_by_id(context.path_id()); - self.update_pto_timer(path, time_sent, is_handshake_confirmed); + self.pto_update_pending = true; + } + } + + /// Invoked after a burst of packets has completed transmitting + pub fn on_transmit_burst_complete( + &mut self, + active_path: &Path, + now: Timestamp, + is_handshake_confirmed: bool, + ) { + debug_assert!(active_path.is_active()); + if self.pto_update_pending { + // Update the PTO timer once per transmission burst to reduce CPU cost + self.update_pto_timer(active_path, now, is_handshake_confirmed); + debug_assert!(!self.pto_update_pending); } } @@ -271,6 +290,8 @@ impl Manager { now: Timestamp, is_handshake_confirmed: bool, ) { + self.pto_update_pending = false; + //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1 //# If no additional data can be sent, the server's PTO timer MUST NOT be //# armed until datagrams have been received from the client, because @@ -281,9 +302,10 @@ impl Manager { return; } - let ack_eliciting_packets_in_flight = self.sent_packets.iter().any(|(_, sent_info)| { - sent_info.congestion_controlled && sent_info.ack_elicitation.is_ack_eliciting() - }); + let ack_eliciting_packets_in_flight = self + .sent_packets + .iter() + .any(|(_, sent_info)| sent_info.ack_elicitation.is_ack_eliciting()); //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1 //# it is the client's responsibility to send packets to unblock the server diff --git a/quic/s2n-quic-transport/src/recovery/manager/snapshots/quic__s2n-quic-transport__src__recovery__manager__tests__events__on_transmit_burst_complete.snap b/quic/s2n-quic-transport/src/recovery/manager/snapshots/quic__s2n-quic-transport__src__recovery__manager__tests__events__on_transmit_burst_complete.snap new file mode 100644 index 0000000000..064fe10379 --- /dev/null +++ b/quic/s2n-quic-transport/src/recovery/manager/snapshots/quic__s2n-quic-transport__src__recovery__manager__tests__events__on_transmit_burst_complete.snap @@ -0,0 +1,5 @@ +--- +source: quic/s2n-quic-transport/src/recovery/manager/tests.rs +expression: "" +--- + diff --git a/quic/s2n-quic-transport/src/recovery/manager/tests.rs b/quic/s2n-quic-transport/src/recovery/manager/tests.rs index b357e81e6f..eed6187551 100644 --- a/quic/s2n-quic-transport/src/recovery/manager/tests.rs +++ b/quic/s2n-quic-transport/src/recovery/manager/tests.rs @@ -73,7 +73,6 @@ fn one_second_pto_when_no_previous_rtt_available() { //= type=test #[test] fn on_packet_sent() { - let max_ack_delay = Duration::from_millis(100); let now = time::now(); let mut time_sent = now; let ecn = ExplicitCongestionNotification::Ect0; @@ -88,14 +87,11 @@ fn on_packet_sent() { // simulate receiving a handshake packet to force path validation context.path_mut().on_handshake_packet(); - // PTO = smoothed_rtt + max(4*rttvar, kGranularity) + max_ack_delay - // PTO = DEFAULT_INITIAL_RTT + 4*DEFAULT_INITIAL_RTT/2 + 10 - let expected_pto_duration = DEFAULT_INITIAL_RTT + 2 * DEFAULT_INITIAL_RTT + max_ack_delay; let mut expected_bytes_in_flight = 0; for i in 1..=10 { - // Reset the timer so we can confirm it was set correctly - manager.pto.timer.cancel(); + // Reset pto_update_pending so we can confirm it was set correctly + manager.pto_update_pending = false; let sent_packet = space.new_packet_number(VarInt::from_u8(i)); let ack_elicitation = if i % 2 == 0 { @@ -138,36 +134,23 @@ fn on_packet_sent() { if outcome.is_congestion_controlled { assert_eq!(actual_sent_packet.sent_bytes as usize, outcome.bytes_sent); - - if outcome.ack_elicitation.is_ack_eliciting() { - //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1 - //= type=test - //# A sender SHOULD restart its PTO timer every time an ack-eliciting - //# packet is sent - assert!(manager.pto.timer.is_armed()); - assert_eq!( - Some(time_sent + expected_pto_duration), - manager.pto.timer.next_expiration() - ); - } else if let Some(time_of_last_ack_eliciting_packet) = - manager.time_of_last_ack_eliciting_packet - { - assert!(manager.pto.timer.is_armed()); - assert_eq!( - Some(time_of_last_ack_eliciting_packet + expected_pto_duration), - manager.pto.timer.next_expiration() - ); - } else { - // No ack eliciting packets have been sent yet - assert!(!manager.pto.timer.is_armed()); - assert_eq!(None, manager.pto.timer.next_expiration()); - } - expected_bytes_in_flight += outcome.bytes_sent; } else { assert_eq!(actual_sent_packet.sent_bytes, 0); } + if outcome.ack_elicitation.is_ack_eliciting() { + assert_eq!(Some(time_sent), manager.time_of_last_ack_eliciting_packet); + //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1 + //= type=test + //# A sender SHOULD restart its PTO timer every time an ack-eliciting + //# packet is sent + assert!(manager.pto_update_pending); + } else { + // No ack eliciting packets have been sent yet + assert!(!manager.pto_update_pending); + } + time_sent += Duration::from_millis(10); } @@ -212,9 +195,6 @@ fn on_packet_sent() { // - bytes sent, congestion_controlled, time_sent match that of sent // - pto is armed fn on_packet_sent_across_multiple_paths() { - // let space = PacketNumberSpace::ApplicationData; - let max_ack_delay = Duration::from_millis(100); - // let mut manager = Manager::new(space, max_ack_delay); let now = time::now(); let ecn = ExplicitCongestionNotification::default(); let mut time_sent = now; @@ -231,12 +211,8 @@ fn on_packet_sent_across_multiple_paths() { // simulate receiving a handshake packet to force path validation context.path_mut().on_handshake_packet(); - // PTO = smoothed_rtt + max(4*rttvar, kGranularity) + max_ack_delay - // PTO = DEFAULT_INITIAL_RTT + 4*DEFAULT_INITIAL_RTT/2 + 10 - let expected_pto_duration = DEFAULT_INITIAL_RTT + 2 * DEFAULT_INITIAL_RTT + max_ack_delay; - - // Reset the timer so we can confirm it was set correctly - manager.pto.timer.cancel(); + // Reset pto_update_pending so we can confirm it was set correctly + manager.pto_update_pending = false; // Trigger 1: let sent_packet = space.new_packet_number(VarInt::from_u8(1)); @@ -276,9 +252,7 @@ fn on_packet_sent_across_multiple_paths() { //= type=test //# A sender SHOULD restart its PTO timer every time an ack-eliciting //# packet is sent - let expected_pto = time_sent + expected_pto_duration; - assert!(manager.pto.timer.is_armed()); - assert_eq!(Some(expected_pto), manager.pto.timer.next_expiration()); + assert!(manager.pto_update_pending); // Setup 2: // send 2nd packet on path 2nd path @@ -291,8 +265,8 @@ fn on_packet_sent_across_multiple_paths() { bytes_progressed: 0, }; - // Reset the timer so we can confirm it was set correctly - manager.pto.timer.cancel(); + // Reset pto_update_pending so we can confirm it was set correctly + manager.pto_update_pending = false; // Trigger 2: context.set_path_id(second_path_id); @@ -323,9 +297,7 @@ fn on_packet_sent_across_multiple_paths() { //= type=test //# A sender SHOULD restart its PTO timer every time an ack-eliciting //# packet is sent - let expected_pto = time_sent + expected_pto_duration; - assert!(manager.pto.timer.is_armed()); - assert_eq!(Some(expected_pto), manager.pto.timer.next_expiration()); + assert!(manager.pto_update_pending); } //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.7 @@ -2477,6 +2449,7 @@ fn update_pto_timer() { context.path_mut().on_bytes_transmitted((1200 * 3) + 1); // Arm the PTO so we can verify it is cancelled manager.pto.timer.set(now + Duration::from_secs(10)); + manager.pto_update_pending = true; manager.update_pto_timer(context.path(), now, is_handshake_confirmed); //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1 @@ -2485,9 +2458,11 @@ fn update_pto_timer() { //# armed until datagrams have been received from the client, because //# packets sent on PTO count against the anti-amplification limit. assert!(!manager.pto.timer.is_armed()); + assert!(!manager.pto_update_pending); // Arm the PTO so we can verify it is cancelled manager.pto.timer.set(now + Duration::from_secs(10)); + manager.pto_update_pending = true; // Validate the path so it is not at the anti-amplification limit // // simulate receiving a handshake packet to force path validation @@ -2497,6 +2472,7 @@ fn update_pto_timer() { // Since the path is peer validated and sent packets is empty, PTO is cancelled assert!(!manager.pto.timer.is_armed()); + assert!(!manager.pto_update_pending); // Reset the path back to not peer validated context.path_manager[unsafe { path::Id::new(0) }] = Path::new( @@ -2511,6 +2487,7 @@ fn update_pto_timer() { // simulate receiving a handshake packet to force path validation context.path_mut().on_handshake_packet(); context.path_mut().pto_backoff = 2; + manager.pto_update_pending = true; let is_handshake_confirmed = false; manager.update_pto_timer(context.path(), now, is_handshake_confirmed); @@ -2519,13 +2496,16 @@ fn update_pto_timer() { //# An endpoint MUST NOT set its PTO timer for the Application Data //# packet number space until the handshake is confirmed. assert!(!manager.pto.timer.is_armed()); + assert!(!manager.pto_update_pending); // Set is handshake confirmed back to true let is_handshake_confirmed = true; + manager.pto_update_pending = true; manager.update_pto_timer(context.path(), now, is_handshake_confirmed); // Now the PTO is armed assert!(manager.pto.timer.is_armed()); + assert!(!manager.pto_update_pending); // Send a packet to validate behavior when sent_packets is not empty manager.on_packet_sent( @@ -2554,6 +2534,7 @@ fn update_pto_timer() { true, space, ); + manager.pto_update_pending = true; manager.update_pto_timer(context.path(), now, is_handshake_confirmed); //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1 @@ -2568,6 +2549,7 @@ fn update_pto_timer() { manager.pto.timer.next_expiration().unwrap(), expected_pto_base_timestamp + Duration::from_millis(12020) ); + assert!(!manager.pto_update_pending); } //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1 @@ -2682,6 +2664,7 @@ fn on_timeout() { &mut context, &mut publisher, ); + manager.on_transmit_burst_complete(context.path(), now, true); // Loss timer is armed and expired, on_packet_loss is called manager.loss_timer.set(now - Duration::from_secs(1)); @@ -3105,6 +3088,49 @@ fn packet_declared_lost_less_than_1_ms_from_loss_threshold() { assert_eq!(1, context.on_packet_loss_count); } +#[test] +fn on_transmit_burst_complete() { + let space = PacketNumberSpace::ApplicationData; + let mut manager = Manager::new(space); + let now = time::now() + Duration::from_secs(10); + let is_handshake_confirmed = true; + let mut path_manager = helper_generate_path_manager(Duration::from_millis(10)); + let ecn = ExplicitCongestionNotification::default(); + let mut context = MockContext::new(&mut path_manager); + let mut publisher = Publisher::snapshot(); + + // Send an ack-eliciting packet to trigger a PTO timer update + manager.on_packet_sent( + space.new_packet_number(VarInt::from_u8(1)), + transmission::Outcome { + ack_elicitation: AckElicitation::Eliciting, + is_congestion_controlled: true, + bytes_sent: 1, + bytes_progressed: 0, + }, + now, + ecn, + transmission::Mode::Normal, + None, + &mut context, + &mut publisher, + ); + + // Validate the path so the PTO timer can be set + context.path_mut().on_handshake_packet(); + context.path_mut().on_peer_validated(); + + assert!(manager.pto_update_pending); + manager.on_transmit_burst_complete(path_manager.active_path(), now, is_handshake_confirmed); + assert!(manager.pto.timer.is_armed()); + assert!(!manager.pto_update_pending); + + // Cancel the PTO timer to validate it isn't re-armed when not needed + manager.pto.timer.cancel(); + manager.on_transmit_burst_complete(path_manager.active_path(), now, is_handshake_confirmed); + assert!(!manager.pto.timer.is_armed()); +} + fn helper_generate_multi_path_manager( space: PacketNumberSpace, publisher: &mut Publisher, diff --git a/quic/s2n-quic-transport/src/space/application.rs b/quic/s2n-quic-transport/src/space/application.rs index e3bdf84c40..2255e90fff 100644 --- a/quic/s2n-quic-transport/src/space/application.rs +++ b/quic/s2n-quic-transport/src/space/application.rs @@ -248,6 +248,19 @@ impl ApplicationSpace { Ok((outcome, buffer)) } + pub(super) fn on_transmit_burst_complete( + &mut self, + active_path: &Path, + timestamp: Timestamp, + is_handshake_confirmed: bool, + ) { + self.recovery_manager.on_transmit_burst_complete( + active_path, + timestamp, + is_handshake_confirmed, + ); + } + pub(super) fn on_transmit_close<'a>( &mut self, context: &mut ConnectionTransmissionContext, diff --git a/quic/s2n-quic-transport/src/space/handshake.rs b/quic/s2n-quic-transport/src/space/handshake.rs index cc2334cade..6919e3e690 100644 --- a/quic/s2n-quic-transport/src/space/handshake.rs +++ b/quic/s2n-quic-transport/src/space/handshake.rs @@ -185,6 +185,19 @@ impl HandshakeSpace { Ok((outcome, buffer)) } + pub(super) fn on_transmit_burst_complete( + &mut self, + active_path: &Path, + timestamp: Timestamp, + is_handshake_confirmed: bool, + ) { + self.recovery_manager.on_transmit_burst_complete( + active_path, + timestamp, + is_handshake_confirmed, + ); + } + pub(super) fn on_transmit_close<'a>( &mut self, context: &mut ConnectionTransmissionContext, diff --git a/quic/s2n-quic-transport/src/space/initial.rs b/quic/s2n-quic-transport/src/space/initial.rs index b4ec4cc8a2..075df78fc7 100644 --- a/quic/s2n-quic-transport/src/space/initial.rs +++ b/quic/s2n-quic-transport/src/space/initial.rs @@ -234,6 +234,19 @@ impl InitialSpace { Ok((outcome, buffer)) } + pub(super) fn on_transmit_burst_complete( + &mut self, + active_path: &Path, + timestamp: Timestamp, + is_handshake_confirmed: bool, + ) { + self.recovery_manager.on_transmit_burst_complete( + active_path, + timestamp, + is_handshake_confirmed, + ); + } + pub(super) fn on_transmit_close<'a>( &mut self, context: &mut ConnectionTransmissionContext, diff --git a/quic/s2n-quic-transport/src/space/mod.rs b/quic/s2n-quic-transport/src/space/mod.rs index 24005aaf17..956ffab9b2 100644 --- a/quic/s2n-quic-transport/src/space/mod.rs +++ b/quic/s2n-quic-transport/src/space/mod.rs @@ -307,6 +307,35 @@ impl PacketSpaceManager { } } + /// Called after a burst of one or more packets have finished being transmitted + pub fn on_transmit_burst_complete(&mut self, active_path: &Path, timestamp: Timestamp) { + debug_assert!(active_path.is_active()); + + if let Some((space, handshake_status)) = self.initial_mut() { + space.on_transmit_burst_complete( + active_path, + timestamp, + handshake_status.is_confirmed(), + ); + } + + if let Some((space, handshake_status)) = self.handshake_mut() { + space.on_transmit_burst_complete( + active_path, + timestamp, + handshake_status.is_confirmed(), + ); + } + + if let Some((space, handshake_status)) = self.application_mut() { + space.on_transmit_burst_complete( + active_path, + timestamp, + handshake_status.is_confirmed(), + ); + } + } + pub fn requires_probe(&self) -> bool { core::iter::empty() .chain(self.initial.iter().map(|space| space.requires_probe()))