Skip to content

Commit

Permalink
Implement RACK
Browse files Browse the repository at this point in the history
This is a modified version of [RACK] for QUIC.

[RACK]: https://datatracker.ietf.org/doc/html/rfc8985
  • Loading branch information
mb committed Nov 2, 2023
1 parent 887d256 commit a4ac438
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 14 deletions.
12 changes: 10 additions & 2 deletions neqo-transport/src/cc/classic_cc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
}

// Multi-packet version of OnPacketAckedCC
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool {
// Check whether we are app limited before acked packets are removed
// from bytes_in_flight.
let is_app_limited = self.app_limited();
Expand All @@ -163,6 +168,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
MAX_DATAGRAM_SIZE * PACING_BURST_SIZE,
);

let mut exiting_recovery = false;
let mut new_acked = 0;
for pkt in acked_pkts {
qinfo!(
Expand All @@ -187,6 +193,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {

if self.state.in_recovery() {
self.set_state(State::CongestionAvoidance);
exiting_recovery = true;
qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]);
}

Expand All @@ -196,7 +203,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
if is_app_limited {
self.cc_algorithm.on_app_limited();
qinfo!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
return;
return exiting_recovery;
}

// Slow start, up to the slow start threshold.
Expand Down Expand Up @@ -247,6 +254,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
],
);
qinfo!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
exiting_recovery
}

/// Update congestion controller state based on lost packets.
Expand Down
7 changes: 6 additions & 1 deletion neqo-transport/src/cc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ pub trait CongestionControl: Display + Debug {
#[must_use]
fn cwnd_avail(&self) -> usize;

fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant);
fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool;

/// Returns true if the congestion window was reduced.
fn on_packets_lost(
Expand Down
4 changes: 2 additions & 2 deletions neqo-transport/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,10 @@ impl Path {
}

/// Record packets as acknowledged with the sender.
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) {
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) -> bool {
debug_assert!(self.is_primary());
self.sender
.on_packets_acked(acked_pkts, self.rtt.minimum(), now);
.on_packets_acked(acked_pkts, self.rtt.minimum(), now)
}

/// Record packets as lost with the sender.
Expand Down
96 changes: 89 additions & 7 deletions neqo-transport/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ pub(crate) struct LossRecoverySpace {
/// This is `None` if there were no out-of-order packets detected.
/// When set to `Some(T)`, time-based loss detection should be enabled.
first_ooo_time: Option<Instant>,
/// If no reordering has been observed, TODO: just say reo_wnd_mult != 0
reordering_seen: bool,
/// the RTO is RTT * (reo_wnd_mult + 9) / 8
///
/// this is basically the index of the first non-zero entry of `reo_wnd_persist`
reo_wnd_mult: u32,
reo_wnd_persist: [u8; 16],
}

impl LossRecoverySpace {
Expand All @@ -197,6 +204,9 @@ impl LossRecoverySpace {
in_flight_outstanding: 0,
sent_packets: BTreeMap::default(),
first_ooo_time: None,
reo_wnd_mult: 0,
reo_wnd_persist: Default::default(),
reordering_seen: false,
}
}

Expand Down Expand Up @@ -384,18 +394,20 @@ impl LossRecoverySpace {
pub fn detect_lost_packets(
&mut self,
now: Instant,
loss_delay: Duration,
rtt_estimate: Duration,
cleanup_delay: Duration,
lost_packets: &mut Vec<SentPacket>,
) {
// Housekeeping.
self.remove_old_lost(now, cleanup_delay);

let loss_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8;
qtrace!(
"detect lost {}: now={:?} delay={:?}",
"detect lost {}: now={:?} delay={:?}, multiplier={}",
self.space,
now,
loss_delay,
self.reo_wnd_mult
);
self.first_ooo_time = None;

Expand All @@ -418,7 +430,7 @@ impl LossRecoverySpace {
packet.time_sent,
loss_delay
);
} else if largest_acked >= Some(*pn + PACKET_THRESHOLD) {
} else if !self.reordering_seen && largest_acked >= Some(*pn + PACKET_THRESHOLD) {
qtrace!(
"lost={}, is >= {} from largest acked {:?}",
pn,
Expand All @@ -438,6 +450,71 @@ impl LossRecoverySpace {

lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone()));
}

pub fn detect_reordered_packets(
&mut self,
now: Instant,
acked_pkts: &[SentPacket],
rtt_estimate: Duration,
) {
// detect packet reordering
let mut max_rtt = Duration::default();
if let Some(largest_ack) = self.largest_acked {
for pkt in acked_pkts
.iter()
.filter(|pkt| pkt.cc_in_flight() && pkt.pn < largest_ack)
{
println!("largest_ack={}, pn={}", largest_ack, pkt.pn);
// reordering event
self.reordering_seen = true;
max_rtt = max(max_rtt, now.duration_since(pkt.time_sent));
}
}
// update reo_wnd
if max_rtt > rtt_estimate && !rtt_estimate.is_zero() {
// calculate reo_wnd necessary to accept the reordering event

// inverse of lost_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8;
// self.reo_wnd_mult = (lost_delay / rtt_estimate) * 8 - 9
let new_reo_wnd = min(
(max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1,
self.reo_wnd_persist.len() as u128,
);
let new_reo_wnd = usize::try_from(new_reo_wnd).unwrap();
for el in 0..new_reo_wnd {
self.reo_wnd_persist[el] = 16;
}
println!(
"max_rtt={}, rtt_estimate={} old_barrier={}, new_barrier={}",
max_rtt.as_millis(),
rtt_estimate.as_millis(),
(rtt_estimate * (self.reo_wnd_mult + 9) / 8).as_millis(),
(rtt_estimate * (new_reo_wnd as u32 + 9) / 8).as_millis()
);
println!(
"detect_reordered_packets old={}, new={}, reo_wnd_persist={:?}",
self.reo_wnd_mult,
(max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1,
self.reo_wnd_persist
);
self.reo_wnd_mult = max(self.reo_wnd_mult, u32::try_from(new_reo_wnd).unwrap());
}
}

pub fn on_exiting_recovery(&mut self) {
let old = self.reo_wnd_mult;
for (i, el) in self.reo_wnd_persist.iter_mut().enumerate() {
if *el == 0 {
self.reo_wnd_mult = u32::try_from(i).unwrap();
break;
}
*el = el.saturating_sub(1);
}
println!(
"detect_lost_packets old={}, new={}, reo_wnd_persist={:?}",
old, self.reo_wnd_mult, self.reo_wnd_persist
);
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -680,6 +757,9 @@ impl LossRecovery {
return (Vec::new(), Vec::new());
}

let rtt_estimate = primary_path.borrow().rtt().estimated_upper();
space.detect_reordered_packets(now, &acked_packets, rtt_estimate);

// Track largest PN acked per space
let prev_largest_acked = space.largest_acked_sent_time;
if Some(largest_acked) > space.largest_acked {
Expand All @@ -704,12 +784,11 @@ impl LossRecovery {
// We need to ensure that we have sent any PTO probes before they are removed
// as we rely on the count of in-flight packets to determine whether to send
// another probe. Removing them too soon would result in not sending on PTO.
let loss_delay = primary_path.borrow().rtt().loss_delay();
let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space);
let mut lost = Vec::new();
self.spaces.get_mut(pn_space).unwrap().detect_lost_packets(
now,
loss_delay,
rtt_estimate,
cleanup_delay,
&mut lost,
);
Expand All @@ -725,9 +804,12 @@ impl LossRecovery {
// This must happen after on_packets_lost. If in recovery, this could
// take us out, and then lost packets will start a new recovery period
// when it shouldn't.
primary_path
if primary_path
.borrow_mut()
.on_packets_acked(&acked_packets, now);
.on_packets_acked(&acked_packets, now)
{
self.spaces.get_mut(pn_space).unwrap().on_exiting_recovery();
}

self.pto_state = None;

Expand Down
10 changes: 10 additions & 0 deletions neqo-transport/src/rtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ impl RttEstimate {
max(rtt * 9 / 8, GRANULARITY)
}

/// Frin RFC9002 Section 6.1.2 Time Treshhold
/// Using max(smoothed_rtt, latest_rtt) protects from the two following cases:
// * the latest RTT sample is lower than the smoothed RTT, perhaps due to reordering where the
// acknowledgment encountered a shorter path;
// * the latest RTT sample is higher than the smoothed RTT, perhaps due to a sustained
// increase in the actual RTT, but the smoothed RTT has not yet caught up.
pub fn estimated_upper(&self) -> Duration {
max(self.latest_rtt, self.smoothed_rtt)
}

pub fn first_sample_time(&self) -> Option<Instant> {
self.first_sample_time
}
Expand Down
9 changes: 7 additions & 2 deletions neqo-transport/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ impl PacketSender {
self.cc.cwnd_avail()
}

pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
self.cc.on_packets_acked(acked_pkts, min_rtt, now);
pub fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool {
self.cc.on_packets_acked(acked_pkts, min_rtt, now)
}

/// Called when packets are lost. Returns true if the congestion window was reduced.
Expand Down

0 comments on commit a4ac438

Please sign in to comment.