Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Single-packet RTX in Initial/Handshake + RTX handshake ACKs #2027

Merged
merged 14 commits into from
Aug 13, 2024
15 changes: 9 additions & 6 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2235,7 +2235,7 @@ impl Connection {
// Include an ACK frame with the CONNECTION_CLOSE.
let limit = builder.limit();
builder.set_limit(limit - ClosingFrame::MIN_LENGTH);
self.acks.immediate_ack(now);
self.acks.immediate_ack(space, now);
self.acks.write_frame(
space,
now,
Expand Down Expand Up @@ -2798,10 +2798,8 @@ impl Connection {
// prepare to resend them.
self.stats.borrow_mut().frame_rx.ping += 1;
self.crypto.resend_unacked(space);
if space == PacketNumberSpace::ApplicationData {
// Send an ACK immediately if we might not otherwise do so.
self.acks.immediate_ack(now);
}
// Send an ACK immediately if we might not otherwise do so.
self.acks.immediate_ack(space, now);
}
Frame::Ack {
largest_acknowledged,
Expand Down Expand Up @@ -2960,7 +2958,12 @@ impl Connection {
for token in lost.tokens() {
qdebug!([self], "Lost: {:?}", token);
match token {
RecoveryToken::Ack(_) => {}
RecoveryToken::Ack(ack_token) => {
// If we lost an ACK frame during the handshake, send another one.
if ack_token.space() != PacketNumberSpace::ApplicationData {
self.acks.immediate_ack(ack_token.space(), lost.time_sent());
}
}
RecoveryToken::Crypto(ct) => self.crypto.lost(ct),
RecoveryToken::HandshakeDone => self.state_signaling.handshake_done(),
RecoveryToken::NewToken(seqno) => self.new_token.lost(*seqno),
Expand Down
60 changes: 60 additions & 0 deletions neqo-transport/src/connection/tests/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
},
events::ConnectionEvent,
server::ValidateAddress,
stats::FrameStats,
tparams::{TransportParameter, MIN_ACK_DELAY},
tracking::DEFAULT_ACK_DELAY,
CloseReason, ConnectionParameters, EmptyConnectionIdGenerator, Error, Pmtud, StreamType,
Expand Down Expand Up @@ -1194,3 +1195,62 @@ fn emit_authentication_needed_once() {
_ = client.process(server2.as_dgram_ref(), now());
assert_eq!(0, authentication_needed_count(&mut client));
}

#[test]
fn client_initial_retransmits_identical() {
let mut now = now();
let mut client = default_client();

// Force the client to retransmit its Initial packet a number of times and make sure the
// retranmissions are identical to the original. Also, verify the PTO durations.
for i in 1..=5 {
let ci = client.process(None, now).dgram().unwrap();
assert_eq!(ci.len(), client.plpmtu());
assert_eq!(
client.stats().frame_tx,
FrameStats {
crypto: i,
all: i,
..Default::default()
}
);
let pto = client.process(None, now).callback();
assert_eq!(pto, DEFAULT_RTT * 3 * (1 << (i - 1)));
now += pto;
}
}

#[test]
fn server_initial_retransmits_identical() {
let mut now = now();
let mut client = default_client();
let mut ci = client.process(None, now).dgram();

// Force the server to retransmit its Initial packet a number of times and make sure the
// retranmissions are identical to the original. Also, verify the PTO durations.
let mut server = default_server();
let mut total_ptos: Duration = Duration::from_secs(0);
for i in 1..=3 {
let si = server.process(ci.take().as_ref(), now).dgram().unwrap();
assert_eq!(si.len(), server.plpmtu());
assert_eq!(
server.stats().frame_tx,
FrameStats {
crypto: i * 2,
ack: i,
all: i * 3,
..Default::default()
}
);

let pto = server.process(None, now).callback();
if i < 3 {
assert_eq!(pto, DEFAULT_RTT * 3 * (1 << (i - 1)));
} else {
// Server is amplification-limited after three (re)transmissions.
assert_eq!(pto, server.conn_params.get_idle_timeout() - total_ptos);
}
now += pto;
total_ptos += pto;
}
}
11 changes: 4 additions & 7 deletions neqo-transport/src/connection/tests/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,15 @@ fn idle_caching() {
client.process_input(&handshake.unwrap(), start);

// Perform an exchange and keep the connection alive.
// Only allow a packet containing a PING to pass.
let middle = start + AT_LEAST_PTO;
mem::drop(client.process_output(middle));
// This is the RTX of the client Initial.
let dgram = client.process_output(middle).dgram();

// Get the server to send its first probe and throw that away.
mem::drop(server.process_output(middle).dgram());
// Now let the server process the client PING. This causes the server
// Now let the server process the RTX'ed client Initial. This causes the server
// to send CRYPTO frames again, so manually extract and discard those.
let ping_before_s = server.stats().frame_rx.ping;
server.process_input(&dgram.unwrap(), middle);
assert_eq!(server.stats().frame_rx.ping, ping_before_s + 1);
let mut tokens = Vec::new();
server.crypto.streams.write_frame(
PacketNumberSpace::Initial,
Expand All @@ -330,10 +327,10 @@ fn idle_caching() {
// Now only allow the Initial packet from the server through;
// it shouldn't contain a CRYPTO frame.
let (initial, _) = split_datagram(&dgram.unwrap());
let ping_before_c = client.stats().frame_rx.ping;
let crypto_before_c = client.stats().frame_rx.crypto;
let ack_before = client.stats().frame_rx.ack;
client.process_input(&initial, middle);
assert_eq!(client.stats().frame_rx.ping, ping_before_c + 1);
assert_eq!(client.stats().frame_rx.crypto, crypto_before_c);
assert_eq!(client.stats().frame_rx.ack, ack_before + 1);

let end = start + default_timeout() + (AT_LEAST_PTO / 2);
Expand Down
8 changes: 7 additions & 1 deletion neqo-transport/src/connection/tests/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ fn pto_handshake_complete() {
// We'll use that packet to force the server to acknowledge 1-RTT.
let stream_id = client.stream_create(StreamType::UniDi).unwrap();
client.stream_close_send(stream_id).unwrap();
now += HALF_RTT * 6;
let pkt3 = client.process(None, now).dgram();
assert_handshake(pkt3.as_ref().unwrap());
let (pkt3_hs, pkt3_1rtt) = split_datagram(&pkt3.unwrap());
Expand Down Expand Up @@ -581,6 +582,9 @@ fn loss_time_past_largest_acked() {
assert!(s_pto < RTT);
let s_hs2 = server.process(None, now + s_pto).dgram();
assert!(s_hs2.is_some());
let s_pto = server.process(None, now).callback();
assert_ne!(s_pto, Duration::from_secs(0));
assert!(s_pto < RTT);
let s_hs3 = server.process(None, now + s_pto).dgram();
assert!(s_hs3.is_some());

Expand Down Expand Up @@ -623,7 +627,9 @@ fn loss_time_past_largest_acked() {

// Now the client should start its loss recovery timer based on the ACK.
now += RTT / 2;
let c_ack = client.process(Some(&s_hs_ack), now).dgram();
let _c_ack = client.process(Some(&s_hs_ack), now).dgram();
// This ACK triggers an immediate ACK, due to an ACK loss during handshake.
let c_ack = client.process(None, now).dgram();
assert!(c_ack.is_none());
// The client should now have the loss recovery timer active.
let lr_time = client.process(None, now).callback();
Expand Down
33 changes: 15 additions & 18 deletions neqo-transport/src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,32 +426,33 @@ struct PtoState {
impl PtoState {
/// The number of packets we send on a PTO.
/// And the number to declare lost when the PTO timer is hit.
fn pto_packet_count(space: PacketNumberSpace, rx_count: usize) -> usize {
if space == PacketNumberSpace::Initial && rx_count == 0 {
// For the Initial space, we only send one packet on PTO if we have not received any
// packets from the peer yet. This avoids sending useless PING-only packets
// when the Client Initial is deemed lost.
1
} else {
fn pto_packet_count(space: PacketNumberSpace) -> usize {
if space == PacketNumberSpace::ApplicationData {
MAX_PTO_PACKET_COUNT
} else {
// For the Initial and Handshake spaces, we only send one packet on PTO. This avoids
// sending useless PING-only packets when only a single packet was lost, which is the
// common case. These PINGs use cwnd and amplification window space, and sending them
// hence makes the handshake more brittle.
1
}
}

pub fn new(space: PacketNumberSpace, probe: PacketNumberSpaceSet, rx_count: usize) -> Self {
pub fn new(space: PacketNumberSpace, probe: PacketNumberSpaceSet) -> Self {
debug_assert!(probe[space]);
Self {
space,
count: 1,
packets: Self::pto_packet_count(space, rx_count),
packets: Self::pto_packet_count(space),
probe,
}
}

pub fn pto(&mut self, space: PacketNumberSpace, probe: PacketNumberSpaceSet, rx_count: usize) {
pub fn pto(&mut self, space: PacketNumberSpace, probe: PacketNumberSpaceSet) {
debug_assert!(probe[space]);
self.space = space;
self.count += 1;
self.packets = Self::pto_packet_count(space, rx_count);
self.packets = Self::pto_packet_count(space);
self.probe = probe;
}

Expand Down Expand Up @@ -803,11 +804,10 @@ impl LossRecovery {
}

fn fire_pto(&mut self, pn_space: PacketNumberSpace, allow_probes: PacketNumberSpaceSet) {
let rx_count = self.stats.borrow().packets_rx;
if let Some(st) = &mut self.pto_state {
st.pto(pn_space, allow_probes, rx_count);
st.pto(pn_space, allow_probes);
} else {
self.pto_state = Some(PtoState::new(pn_space, allow_probes, rx_count));
self.pto_state = Some(PtoState::new(pn_space, allow_probes));
}

self.pto_state
Expand Down Expand Up @@ -839,10 +839,7 @@ impl LossRecovery {
let space = self.spaces.get_mut(*pn_space).unwrap();
lost.extend(
space
.pto_packets(PtoState::pto_packet_count(
*pn_space,
self.stats.borrow().packets_rx,
))
.pto_packets(PtoState::pto_packet_count(*pn_space))
.cloned(),
);

Expand Down
23 changes: 19 additions & 4 deletions neqo-transport/src/tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ pub struct AckToken {
ranges: Vec<PacketRange>,
}

impl AckToken {
/// Get the space for this token.
pub const fn space(&self) -> PacketNumberSpace {
self.space
}
}

/// A structure that tracks what packets have been received,
/// and what needs acknowledgement for a packet number space.
#[derive(Debug)]
Expand Down Expand Up @@ -280,7 +287,12 @@ impl RecvdPackets {
ack_frequency_seqno: 0,
ack_delay: DEFAULT_ACK_DELAY,
unacknowledged_count: 0,
unacknowledged_tolerance: DEFAULT_ACK_PACKET_TOLERANCE,
unacknowledged_tolerance: if space == PacketNumberSpace::ApplicationData {
DEFAULT_ACK_PACKET_TOLERANCE
} else {
larseggert marked this conversation as resolved.
Show resolved Hide resolved
// ACK more aggressively
0
},
ignore_order: false,
ecn_count: EcnCount::default(),
}
Expand Down Expand Up @@ -487,6 +499,9 @@ impl RecvdPackets {
.take(max_ranges)
.cloned()
.collect::<Vec<_>>();
if ranges.is_empty() {
return;
}

builder.encode_varint(if self.ecn_count.is_some() {
FRAME_TYPE_ACK_ECN
Expand Down Expand Up @@ -574,9 +589,9 @@ impl AckTracker {
}
}

// Force an ACK to be generated immediately (a PING was received).
pub fn immediate_ack(&mut self, now: Instant) {
if let Some(space) = self.get_mut(PacketNumberSpace::ApplicationData) {
/// Force an ACK to be generated immediately.
pub fn immediate_ack(&mut self, space: PacketNumberSpace, now: Instant) {
if let Some(space) = self.get_mut(space) {
space.immediate_ack(now);
}
}
Expand Down
Loading