From a4ac438c8c94826983fd1dc28122a24d3e238bce Mon Sep 17 00:00:00 2001 From: Manuel Bucher Date: Thu, 2 Nov 2023 19:05:22 +0100 Subject: [PATCH] Implement RACK This is a modified version of [RACK] for QUIC. [RACK]: https://datatracker.ietf.org/doc/html/rfc8985 --- neqo-transport/src/cc/classic_cc.rs | 12 +++- neqo-transport/src/cc/mod.rs | 7 ++- neqo-transport/src/path.rs | 4 +- neqo-transport/src/recovery.rs | 96 ++++++++++++++++++++++++++--- neqo-transport/src/rtt.rs | 10 +++ neqo-transport/src/sender.rs | 9 ++- 6 files changed, 124 insertions(+), 14 deletions(-) diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index 39b98e49a5..d3b05e8948 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -149,7 +149,12 @@ impl CongestionControl for ClassicCongestionControl { } // Multi-packet version of OnPacketAckedCC - fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) { + fn on_packets_acked( + &mut self, + acked_pkts: &[SentPacket], + min_rtt: Duration, + now: Instant, + ) -> bool { // Check whether we are app limited before acked packets are removed // from bytes_in_flight. let is_app_limited = self.app_limited(); @@ -163,6 +168,7 @@ impl CongestionControl for ClassicCongestionControl { MAX_DATAGRAM_SIZE * PACING_BURST_SIZE, ); + let mut exiting_recovery = false; let mut new_acked = 0; for pkt in acked_pkts { qinfo!( @@ -187,6 +193,7 @@ impl CongestionControl for ClassicCongestionControl { if self.state.in_recovery() { self.set_state(State::CongestionAvoidance); + exiting_recovery = true; qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]); } @@ -196,7 +203,7 @@ impl CongestionControl for ClassicCongestionControl { if is_app_limited { self.cc_algorithm.on_app_limited(); qinfo!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); - return; + return exiting_recovery; } // Slow start, up to the slow start threshold. @@ -247,6 +254,7 @@ impl CongestionControl for ClassicCongestionControl { ], ); qinfo!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); + exiting_recovery } /// Update congestion controller state based on lost packets. diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index 5cd5676747..d189fb3727 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -40,7 +40,12 @@ pub trait CongestionControl: Display + Debug { #[must_use] fn cwnd_avail(&self) -> usize; - fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant); + fn on_packets_acked( + &mut self, + acked_pkts: &[SentPacket], + min_rtt: Duration, + now: Instant, + ) -> bool; /// Returns true if the congestion window was reduced. fn on_packets_lost( diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 3a25a1bea9..783164a619 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -957,10 +957,10 @@ impl Path { } /// Record packets as acknowledged with the sender. - pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) { + pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) -> bool { debug_assert!(self.is_primary()); self.sender - .on_packets_acked(acked_pkts, self.rtt.minimum(), now); + .on_packets_acked(acked_pkts, self.rtt.minimum(), now) } /// Record packets as lost with the sender. diff --git a/neqo-transport/src/recovery.rs b/neqo-transport/src/recovery.rs index 9256a0727c..759f197b56 100644 --- a/neqo-transport/src/recovery.rs +++ b/neqo-transport/src/recovery.rs @@ -185,6 +185,13 @@ pub(crate) struct LossRecoverySpace { /// This is `None` if there were no out-of-order packets detected. /// When set to `Some(T)`, time-based loss detection should be enabled. first_ooo_time: Option, + /// If no reordering has been observed, TODO: just say reo_wnd_mult != 0 + reordering_seen: bool, + /// the RTO is RTT * (reo_wnd_mult + 9) / 8 + /// + /// this is basically the index of the first non-zero entry of `reo_wnd_persist` + reo_wnd_mult: u32, + reo_wnd_persist: [u8; 16], } impl LossRecoverySpace { @@ -197,6 +204,9 @@ impl LossRecoverySpace { in_flight_outstanding: 0, sent_packets: BTreeMap::default(), first_ooo_time: None, + reo_wnd_mult: 0, + reo_wnd_persist: Default::default(), + reordering_seen: false, } } @@ -384,18 +394,20 @@ impl LossRecoverySpace { pub fn detect_lost_packets( &mut self, now: Instant, - loss_delay: Duration, + rtt_estimate: Duration, cleanup_delay: Duration, lost_packets: &mut Vec, ) { // Housekeeping. self.remove_old_lost(now, cleanup_delay); + let loss_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8; qtrace!( - "detect lost {}: now={:?} delay={:?}", + "detect lost {}: now={:?} delay={:?}, multiplier={}", self.space, now, loss_delay, + self.reo_wnd_mult ); self.first_ooo_time = None; @@ -418,7 +430,7 @@ impl LossRecoverySpace { packet.time_sent, loss_delay ); - } else if largest_acked >= Some(*pn + PACKET_THRESHOLD) { + } else if !self.reordering_seen && largest_acked >= Some(*pn + PACKET_THRESHOLD) { qtrace!( "lost={}, is >= {} from largest acked {:?}", pn, @@ -438,6 +450,71 @@ impl LossRecoverySpace { lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone())); } + + pub fn detect_reordered_packets( + &mut self, + now: Instant, + acked_pkts: &[SentPacket], + rtt_estimate: Duration, + ) { + // detect packet reordering + let mut max_rtt = Duration::default(); + if let Some(largest_ack) = self.largest_acked { + for pkt in acked_pkts + .iter() + .filter(|pkt| pkt.cc_in_flight() && pkt.pn < largest_ack) + { + println!("largest_ack={}, pn={}", largest_ack, pkt.pn); + // reordering event + self.reordering_seen = true; + max_rtt = max(max_rtt, now.duration_since(pkt.time_sent)); + } + } + // update reo_wnd + if max_rtt > rtt_estimate && !rtt_estimate.is_zero() { + // calculate reo_wnd necessary to accept the reordering event + + // inverse of lost_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8; + // self.reo_wnd_mult = (lost_delay / rtt_estimate) * 8 - 9 + let new_reo_wnd = min( + (max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1, + self.reo_wnd_persist.len() as u128, + ); + let new_reo_wnd = usize::try_from(new_reo_wnd).unwrap(); + for el in 0..new_reo_wnd { + self.reo_wnd_persist[el] = 16; + } + println!( + "max_rtt={}, rtt_estimate={} old_barrier={}, new_barrier={}", + max_rtt.as_millis(), + rtt_estimate.as_millis(), + (rtt_estimate * (self.reo_wnd_mult + 9) / 8).as_millis(), + (rtt_estimate * (new_reo_wnd as u32 + 9) / 8).as_millis() + ); + println!( + "detect_reordered_packets old={}, new={}, reo_wnd_persist={:?}", + self.reo_wnd_mult, + (max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1, + self.reo_wnd_persist + ); + self.reo_wnd_mult = max(self.reo_wnd_mult, u32::try_from(new_reo_wnd).unwrap()); + } + } + + pub fn on_exiting_recovery(&mut self) { + let old = self.reo_wnd_mult; + for (i, el) in self.reo_wnd_persist.iter_mut().enumerate() { + if *el == 0 { + self.reo_wnd_mult = u32::try_from(i).unwrap(); + break; + } + *el = el.saturating_sub(1); + } + println!( + "detect_lost_packets old={}, new={}, reo_wnd_persist={:?}", + old, self.reo_wnd_mult, self.reo_wnd_persist + ); + } } #[derive(Debug)] @@ -680,6 +757,9 @@ impl LossRecovery { return (Vec::new(), Vec::new()); } + let rtt_estimate = primary_path.borrow().rtt().estimated_upper(); + space.detect_reordered_packets(now, &acked_packets, rtt_estimate); + // Track largest PN acked per space let prev_largest_acked = space.largest_acked_sent_time; if Some(largest_acked) > space.largest_acked { @@ -704,12 +784,11 @@ impl LossRecovery { // We need to ensure that we have sent any PTO probes before they are removed // as we rely on the count of in-flight packets to determine whether to send // another probe. Removing them too soon would result in not sending on PTO. - let loss_delay = primary_path.borrow().rtt().loss_delay(); let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space); let mut lost = Vec::new(); self.spaces.get_mut(pn_space).unwrap().detect_lost_packets( now, - loss_delay, + rtt_estimate, cleanup_delay, &mut lost, ); @@ -725,9 +804,12 @@ impl LossRecovery { // This must happen after on_packets_lost. If in recovery, this could // take us out, and then lost packets will start a new recovery period // when it shouldn't. - primary_path + if primary_path .borrow_mut() - .on_packets_acked(&acked_packets, now); + .on_packets_acked(&acked_packets, now) + { + self.spaces.get_mut(pn_space).unwrap().on_exiting_recovery(); + } self.pto_state = None; diff --git a/neqo-transport/src/rtt.rs b/neqo-transport/src/rtt.rs index 3d6d0e70f8..811d485506 100644 --- a/neqo-transport/src/rtt.rs +++ b/neqo-transport/src/rtt.rs @@ -146,6 +146,16 @@ impl RttEstimate { max(rtt * 9 / 8, GRANULARITY) } + /// Frin RFC9002 Section 6.1.2 Time Treshhold + /// Using max(smoothed_rtt, latest_rtt) protects from the two following cases: + // * the latest RTT sample is lower than the smoothed RTT, perhaps due to reordering where the + // acknowledgment encountered a shorter path; + // * the latest RTT sample is higher than the smoothed RTT, perhaps due to a sustained + // increase in the actual RTT, but the smoothed RTT has not yet caught up. + pub fn estimated_upper(&self) -> Duration { + max(self.latest_rtt, self.smoothed_rtt) + } + pub fn first_sample_time(&self) -> Option { self.first_sample_time } diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index 05cf9740bb..2ac16cce26 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -63,8 +63,13 @@ impl PacketSender { self.cc.cwnd_avail() } - pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) { - self.cc.on_packets_acked(acked_pkts, min_rtt, now); + pub fn on_packets_acked( + &mut self, + acked_pkts: &[SentPacket], + min_rtt: Duration, + now: Instant, + ) -> bool { + self.cc.on_packets_acked(acked_pkts, min_rtt, now) } /// Called when packets are lost. Returns true if the congestion window was reduced.