diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index c9f93163a6..0f8ab73f16 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -152,7 +152,12 @@ impl CongestionControl for ClassicCongestionControl { } // Multi-packet version of OnPacketAckedCC - fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) { + fn on_packets_acked( + &mut self, + acked_pkts: &[SentPacket], + min_rtt: Duration, + now: Instant, + ) -> bool { // Check whether we are app limited before acked packets are removed // from bytes_in_flight. let is_app_limited = self.app_limited(); @@ -166,6 +171,7 @@ impl CongestionControl for ClassicCongestionControl { MAX_DATAGRAM_SIZE * PACING_BURST_SIZE, ); + let mut exiting_recovery = false; let mut new_acked = 0; for pkt in acked_pkts { qinfo!( @@ -190,6 +196,7 @@ impl CongestionControl for ClassicCongestionControl { if self.state.in_recovery() { self.set_state(State::CongestionAvoidance); + exiting_recovery = true; qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]); } @@ -199,7 +206,7 @@ impl CongestionControl for ClassicCongestionControl { if is_app_limited { self.cc_algorithm.on_app_limited(); qinfo!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); - return; + return exiting_recovery; } // Slow start, up to the slow start threshold. @@ -250,6 +257,7 @@ impl CongestionControl for ClassicCongestionControl { ], ); qinfo!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); + exiting_recovery } /// Update congestion controller state based on lost packets. diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index 5cd5676747..d189fb3727 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -40,7 +40,12 @@ pub trait CongestionControl: Display + Debug { #[must_use] fn cwnd_avail(&self) -> usize; - fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant); + fn on_packets_acked( + &mut self, + acked_pkts: &[SentPacket], + min_rtt: Duration, + now: Instant, + ) -> bool; /// Returns true if the congestion window was reduced. fn on_packets_lost( diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 3a25a1bea9..783164a619 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -957,10 +957,10 @@ impl Path { } /// Record packets as acknowledged with the sender. - pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) { + pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) -> bool { debug_assert!(self.is_primary()); self.sender - .on_packets_acked(acked_pkts, self.rtt.minimum(), now); + .on_packets_acked(acked_pkts, self.rtt.minimum(), now) } /// Record packets as lost with the sender. diff --git a/neqo-transport/src/recovery.rs b/neqo-transport/src/recovery.rs index 9256a0727c..2987f0a2e2 100644 --- a/neqo-transport/src/recovery.rs +++ b/neqo-transport/src/recovery.rs @@ -185,6 +185,13 @@ pub(crate) struct LossRecoverySpace { /// This is `None` if there were no out-of-order packets detected. /// When set to `Some(T)`, time-based loss detection should be enabled. first_ooo_time: Option, + /// If no reordering has been observed, TODO: just say reo_wnd_mult != 0 + reordering_seen: bool, + /// the RTO is RTT * (reo_wnd_mult + 9) / 8 + /// + /// this is basically the index of the first non-zero entry of `reo_wnd_persist` + reorder_window_mult: u32, + reorder_window_persist: u32, } impl LossRecoverySpace { @@ -197,6 +204,9 @@ impl LossRecoverySpace { in_flight_outstanding: 0, sent_packets: BTreeMap::default(), first_ooo_time: None, + reorder_window_mult: 0, + reorder_window_persist: 0, + reordering_seen: false, } } @@ -384,18 +394,20 @@ impl LossRecoverySpace { pub fn detect_lost_packets( &mut self, now: Instant, - loss_delay: Duration, + rtt_estimate: Duration, cleanup_delay: Duration, lost_packets: &mut Vec, ) { // Housekeeping. self.remove_old_lost(now, cleanup_delay); + let loss_delay = rtt_estimate * (self.reorder_window_mult + 9) / 8; qtrace!( - "detect lost {}: now={:?} delay={:?}", + "detect lost {}: now={:?} delay={:?}, multiplier={}", self.space, now, loss_delay, + self.reorder_window_mult ); self.first_ooo_time = None; @@ -418,7 +430,7 @@ impl LossRecoverySpace { packet.time_sent, loss_delay ); - } else if largest_acked >= Some(*pn + PACKET_THRESHOLD) { + } else if !self.reordering_seen && largest_acked >= Some(*pn + PACKET_THRESHOLD) { qtrace!( "lost={}, is >= {} from largest acked {:?}", pn, @@ -438,6 +450,60 @@ impl LossRecoverySpace { lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone())); } + + pub fn detect_reordered_packets( + &mut self, + now: Instant, + acked_pkts: &[SentPacket], + rtt_estimate: Duration, + ) { + // detect packet reordering + let mut max_rtt = Duration::default(); + if let Some(largest_ack) = self.largest_acked { + for pkt in acked_pkts + .iter() + .filter(|pkt| pkt.cc_in_flight() && pkt.pn < largest_ack) + { + qinfo!("detect_reordered_packets largest_ack={}, pn={}", largest_ack, pkt.pn); + // reordering event + self.reordering_seen = true; + max_rtt = max(max_rtt, now.duration_since(pkt.time_sent)); + } + } + // update reo_wnd + if max_rtt > rtt_estimate && !rtt_estimate.is_zero() { + // calculate reo_wnd necessary to accept the reordering event + // inverse of + // lost_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8; + // <=> self.reo_wnd_mult = (lost_delay / rtt_estimate) * 8 - 9 + let multiplier = min( + (max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1, + 8, + ); + let multiplier = u32::try_from(multiplier).unwrap(); + qinfo!( + "detect_reordered_packets max_rtt={}, rtt_estimate={} old_barrier={}, new_barrier={}", + max_rtt.as_micros(), + rtt_estimate.as_micros(), + (rtt_estimate * (self.reorder_window_mult + 9) / 8).as_micros(), + (rtt_estimate * (multiplier + 9) / 8).as_micros() + ); + self.reorder_window_mult = max(self.reorder_window_mult, multiplier); + } + } + + pub fn on_exiting_recovery(&mut self) { + if self.reorder_window_persist != 0 { + self.reorder_window_persist -= 1; + if self.reorder_window_persist == 0 { + self.reorder_window_mult = 0; + } + } + qinfo!( + "on_exiting_recovery reorder_window_persist={}, reorder_window_mult={}", + self.reorder_window_persist, self.reorder_window_mult + ); + } } #[derive(Debug)] @@ -680,6 +746,9 @@ impl LossRecovery { return (Vec::new(), Vec::new()); } + let rtt_estimate = primary_path.borrow().rtt().estimated_upper(); + space.detect_reordered_packets(now, &acked_packets, rtt_estimate); + // Track largest PN acked per space let prev_largest_acked = space.largest_acked_sent_time; if Some(largest_acked) > space.largest_acked { @@ -704,12 +773,11 @@ impl LossRecovery { // We need to ensure that we have sent any PTO probes before they are removed // as we rely on the count of in-flight packets to determine whether to send // another probe. Removing them too soon would result in not sending on PTO. - let loss_delay = primary_path.borrow().rtt().loss_delay(); let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space); let mut lost = Vec::new(); self.spaces.get_mut(pn_space).unwrap().detect_lost_packets( now, - loss_delay, + rtt_estimate, cleanup_delay, &mut lost, ); @@ -725,9 +793,12 @@ impl LossRecovery { // This must happen after on_packets_lost. If in recovery, this could // take us out, and then lost packets will start a new recovery period // when it shouldn't. - primary_path + if primary_path .borrow_mut() - .on_packets_acked(&acked_packets, now); + .on_packets_acked(&acked_packets, now) + { + self.spaces.get_mut(pn_space).unwrap().on_exiting_recovery(); + } self.pto_state = None; diff --git a/neqo-transport/src/rtt.rs b/neqo-transport/src/rtt.rs index 3d6d0e70f8..bd080f94ad 100644 --- a/neqo-transport/src/rtt.rs +++ b/neqo-transport/src/rtt.rs @@ -146,6 +146,16 @@ impl RttEstimate { max(rtt * 9 / 8, GRANULARITY) } + /// Frin RFC9002 Section 6.1.2 Time Treshhold + /// Using `max(smoothed_rtt, latest_rtt)` protects from the two following cases: + // * the latest RTT sample is lower than the smoothed RTT, perhaps due to reordering where the + // acknowledgment encountered a shorter path; + // * the latest RTT sample is higher than the smoothed RTT, perhaps due to a sustained + // increase in the actual RTT, but the smoothed RTT has not yet caught up. + pub fn estimated_upper(&self) -> Duration { + max(self.latest_rtt, self.smoothed_rtt) + } + pub fn first_sample_time(&self) -> Option { self.first_sample_time } diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index 05cf9740bb..2ac16cce26 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -63,8 +63,13 @@ impl PacketSender { self.cc.cwnd_avail() } - pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) { - self.cc.on_packets_acked(acked_pkts, min_rtt, now); + pub fn on_packets_acked( + &mut self, + acked_pkts: &[SentPacket], + min_rtt: Duration, + now: Instant, + ) -> bool { + self.cc.on_packets_acked(acked_pkts, min_rtt, now) } /// Called when packets are lost. Returns true if the congestion window was reduced.