From d056825cd10e7f4303481a4daa88b584b29e85f0 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 16 Apr 2025 18:14:46 +0200 Subject: [PATCH 1/8] WIP: Send on multiple paths --- quinn-proto/src/connection/mod.rs | 172 ++++++++++++++++++++++----- quinn-proto/src/connection/paths.rs | 5 + quinn-proto/src/connection/spaces.rs | 15 +-- 3 files changed, 153 insertions(+), 39 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 1267926f1..d7f4843bf 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1,6 +1,6 @@ use std::{ cmp, - collections::VecDeque, + collections::{BTreeMap, VecDeque}, convert::TryFrom, fmt, io, mem, net::{IpAddr, SocketAddr}, @@ -61,7 +61,7 @@ mod packet_crypto; use packet_crypto::{PrevCrypto, ZeroRttCrypto}; mod paths; -use paths::{PathData, PathResponses}; +use paths::PathData; pub use paths::{PathId, RttEstimator}; mod send_buffer; @@ -143,7 +143,13 @@ pub struct Connection { /// The "real" local IP address which was was used to receive the initial packet. /// This is only populated for the server case, and if known local_ip: Option, - paths: FxHashMap, + /// The [`PathData`] for each path + /// + /// This needs to be ordered because [`Connection::poll_transmit`] needs to + /// deterministically select the next PathId to send on. + /// + /// TODO(flub): well does it really? But deterministic is nice for now. + paths: BTreeMap, /// Whether MTU detection is supported in this environment allow_mtud: bool, state: State, @@ -197,8 +203,7 @@ pub struct Connection { // // Queued non-retransmittable 1-RTT data // - /// Responses to PATH_CHALLENGE frames - path_responses: PathResponses, + /// If the CONNECTION_CLOSE frame needs to be sent close: bool, // @@ -303,8 +308,8 @@ impl Connection { handshake_cid: loc_cid, rem_handshake_cid: rem_cid, local_cid_state, - paths: FxHashMap::from_iter([( - PathId::default(), + paths: BTreeMap::from_iter([( + PathId(0), PathMigrationData { path, prev_path: None, @@ -346,7 +351,6 @@ impl Connection { timers: TimerTable::default(), authentication_failures: 0, error: None, - path_responses: PathResponses::default(), close: false, ack_frequency: AckFrequencyState::new(get_max_ack_delay( @@ -495,22 +499,58 @@ impl Connection { true => max_datagrams, }; + // Each call to poll_transmit can only send datagrams to one destination, because + // all datagrams in a GSO batch are for the same destination. Therefore only + // datagrams for one Path ID are produced for each poll_transmit call. + + // First all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending. + + // For all AVAILABLE paths: + // - Is the path congestion blocked or pacing blocked? + // - call maybe_queue_ to ensure a tail-loss probe would be sent? + // - do we need to send a close message? + // - call can_send + // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths + + // What about PATH_CHALLENGE or PATH_RESPONSE? We need to check if we need to send + // any of those. + // TODO(flub): We only have PathId(0) for now. For multipath we need to figure // out which path we want to send the packet on before we start building it. let path_id = PathId(0); + for space in SpaceId::iter() { + let request_immediate_ack = + space == SpaceId::Data && self.peer_supports_ack_frequency(); + for (path_id, pns) in self.spaces[space].iter_paths() { + if self.spaces[space].for_path(*path_id).loss_probes > 0 { + self.spaces[space].maybe_queue_probe( + *path_id, + request_immediate_ack, + &self.streams, + ); + break; + } + } + } + let mut buf = TransmitBuf::new( buf, max_datagrams, self.path_data(path_id).current_mtu().into(), ); - if let Some(challenge) = self.send_path_challenge(now, &mut buf, path_id) { + for (path_id, path) in self.paths.iter() { + let path = &path.path; + todo!(); + } + + if let Some(challenge) = self.send_prev_path_challenge(now, &mut buf, path_id) { return Some(challenge); } // If we need to send a probe, make sure we have something to send. - // TODO(flub): We need to populate each path_id. + // TODO(flub): When we select the PathId we need this. for space in SpaceId::iter() { let request_immediate_ack = space == SpaceId::Data && self.peer_supports_ack_frequency(); @@ -764,6 +804,10 @@ impl Connection { // Send a close frame in every possible space for robustness, per RFC9000 // "Immediate Close during the Handshake". Don't bother trying to send anything // else. + // TODO(flub): Pretty sure with GSO disabled this doesn't work if + // closing during the handshake. The next datagram would not be + // allowed, on the next call to poll_transmit you would send + // CONNECTION_CLOSE in the initial or handshake space again. next_space_id = self.next_send_space(space_id.next(), path_id, &buf, close); continue; } @@ -772,8 +816,12 @@ impl Connection { // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path // validation can occur while the link is saturated. if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 { - let remote = self.path_data(path_id).remote; - if let Some((token, remote)) = self.path_responses.pop_off_path(remote) { + let path = self.path_data_mut(path_id); + let remote = path.remote; + if let Some((token, remote)) = path.path_responses.pop_off_path(remote) { + // TODO(flub): We need to use the right CID! We shouldn't use the same + // CID as the current active one for the path. Though see also + // https://github.com/quinn-rs/quinn/issues/2184 trace!("PATH_RESPONSE {:08x} (off-path)", token); builder .frame_space_mut() @@ -1061,7 +1109,10 @@ impl Connection { } /// Send PATH_CHALLENGE for a previous path if necessary - fn send_path_challenge( + /// + /// QUIC-TRANSPORT section 9.3.3 + /// https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding + fn send_prev_path_challenge( &mut self, now: Instant, buf: &mut TransmitBuf<'_>, @@ -1507,6 +1558,8 @@ impl Connection { /// Multipath is only enabled after the handshake is completed and if it was negotiated /// by both peers. pub fn is_multipath_enabled(&self) -> bool { + // TODO(flub): I believe it might be a TRANSPORT_ERROR if multipath is enabled but + // there's a zero-lenth CID. !self.is_handshaking() && !self.handshake_cid.is_empty() && !self.rem_handshake_cid.is_empty() @@ -2065,6 +2118,7 @@ impl Connection { } /// Probe Timeout + // TODO(flub): This needs a PathId parameter fn pto(&self, space: SpaceId) -> Duration { let max_ack_delay = match space { SpaceId::Initial | SpaceId::Handshake => Duration::ZERO, @@ -3024,8 +3078,29 @@ impl Connection { close = Some(reason); } Frame::PathChallenge(token) => { - self.path_responses.push(number, token, remote); - if remote == self.path_data(path_id).remote { + // A PATH_CHALLENGE can create a new path. + let path = &mut self + .paths + .entry(path_id) + .or_insert_with(|| { + let peer_max_udp_payload_size = + u16::try_from(self.peer_params.max_udp_payload_size.into_inner()) + .unwrap_or(u16::MAX); + let data = PathData::new( + remote, + self.allow_mtud, + Some(peer_max_udp_payload_size), + now, + &self.config, + ); + PathMigrationData { + path: data, + prev_path: None, + } + }) + .path; + path.path_responses.push(number, token, remote); + if remote == path.remote { // PATH_CHALLENGE on active path, possible off-path packet forwarding // attack. Send a non-probing packet to recover the active path. match self.peer_supports_ack_frequency() { @@ -3035,7 +3110,7 @@ impl Connection { } } Frame::PathResponse(token) => { - // TODO(@divma): make an effort ot move to path + // TODO(@divma): make an effort to move to path let path_data = self.paths.get_mut(&path_id).expect("known path"); if path_data.path.challenge == Some(token) && remote == path_data.path.remote { trace!("new path validated"); @@ -3487,7 +3562,6 @@ impl Connection { ) -> SentFrames { let mut sent = SentFrames::default(); let is_multipath_enabled = self.is_multipath_enabled(); - let path_data_remote = self.path_data(path_id).remote; let space = &mut self.spaces[space_id]; let path = &mut self.paths.get_mut(&path_id).expect("known path").path; let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none(); @@ -3626,7 +3700,7 @@ impl Connection { // PATH_RESPONSE if buf.remaining_mut() > 9 && space_id == SpaceId::Data { - if let Some(token) = self.path_responses.pop_on_path(path.remote) { + if let Some(token) = path.path_responses.pop_on_path(path.remote) { sent.non_retransmits = true; sent.requires_padding = true; trace!("PATH_RESPONSE {:08x}", token); @@ -3804,7 +3878,7 @@ impl Connection { panic!("NEW_TOKEN frames should not be enqueued by clients"); }; - if remote_addr != path_data_remote { + if remote_addr != path.remote { // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only // useful if used from the same IP address. Thus, we abandon enqueued NEW_TOKEN // frames upon an path change. Instead, when the new path becomes validated, @@ -4162,21 +4236,59 @@ impl Connection { self.path_data(PathId(0)).current_mtu() } + fn next_send_path(&self) -> Option { + if !self.is_multipath_enabled() { + return Some(PathId(0)); + } + + // If we still have initial or handshake spaces we first check those. + if self.highest_space < SpaceId::Data { + todo!(); + } + + // First look for pending PATH_CHALLENGE or PATH_RESPONSE frames. + for space in SpaceId::iter() { + for (path_id, pns) in self.spaces[space].iter_paths() { + let challenge_pending = self.paths.get(path_id).is_some_and(|p| { + p.path.challenge_pending + || p.prev_path + .as_ref() + .is_some_and(|(_, path_data)| path_data.challenge_pending) + }); + let response_pending = self + .paths + .get(path_id) + .is_some_and(|p| !p.path.path_responses.is_empty()); + if challenge_pending || response_pending { + return Some(*path_id); + } + } + } + + // + + todo!(); + None + } + /// Whether we have 1-RTT data to send /// - /// See also `self.space(SpaceId::Data).can_send()` - fn can_send_1rtt(&self, max_size: usize) -> bool { - // TODO(@divma): needs work for multipath - let chanllenge_pending = { - let path_mig_data = self.paths.get(&PathId(0)).expect("known path"); - path_mig_data.path.challenge_pending - || path_mig_data - .prev_path + /// This checks for frames that can only be sent in the data space (1-RTT): + /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated. + /// - Pending data to send in STREAM frames. + /// - Pending DATAGRAM frames to send. + /// + /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that + /// may need to be sent. + fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> bool { + let challenge_pending = self.paths.get(&path_id).is_some_and(|p| { + p.path.challenge_pending + || p.prev_path .as_ref() - .is_some_and(|(_, x)| x.challenge_pending) - }; + .is_some_and(|(_, path)| path.challenge_pending) + }); self.streams.can_send_stream_data() - || chanllenge_pending + || challenge_pending || !self.path_responses.is_empty() || self .datagrams diff --git a/quinn-proto/src/connection/paths.rs b/quinn-proto/src/connection/paths.rs index 69313c790..ea138c7fe 100644 --- a/quinn-proto/src/connection/paths.rs +++ b/quinn-proto/src/connection/paths.rs @@ -70,6 +70,8 @@ pub(super) struct PathData { pub(super) pacing: Pacer, pub(super) challenge: Option, pub(super) challenge_pending: bool, + /// Pending responses to PATH_CHALLENGE frames + pub(super) path_responses: PathResponses, /// Whether we're certain the peer can both send and receive on this address /// /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every @@ -123,6 +125,7 @@ impl PathData { congestion, challenge: None, challenge_pending: false, + path_responses: PathResponses::default(), validated: false, total_sent: 0, total_recvd: 0, @@ -163,6 +166,7 @@ impl PathData { congestion, challenge: None, challenge_pending: false, + path_responses: PathResponses::default(), validated: false, total_sent: 0, total_recvd: 0, @@ -414,6 +418,7 @@ impl PathResponses { struct PathResponse { /// The packet number the corresponding PATH_CHALLENGE was received in packet: u64, + /// The token of the PATH_CHALLENGE token: u64, /// The address the corresponding PATH_CHALLENGE was received from remote: SocketAddr, diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index a2fbc8cb5..2a1030cb3 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -157,24 +157,21 @@ impl PacketSpace { } } - /// Whether there is anything to send. + /// Whether there is anything to send in this space + /// + /// For the data space [`Connection::can_send_1rtt`] also needs to be consulted. pub(super) fn can_send(&self, path_id: PathId, streams: &StreamsState) -> SendableFrames { - let immediate_ack_pending = self - .number_spaces - .get(&path_id) - .map(|pns| pns.immediate_ack_pending) - .unwrap_or_default(); let acks = self.pending_acks.can_send(); let other = !self.pending.is_empty(streams) || self .number_spaces - .values() - .any(|s| s.ping_pending || immediate_ack_pending); + .get(&path_id) + .is_some_and(|s| s.ping_pending || s.immediate_ack_pending); SendableFrames { acks, other } } - /// The number of packets sent with the current crypto keys. + /// The number of packets sent with the current crypto keys /// /// Used to know if a key update is needed. pub(super) fn sent_with_keys(&self) -> u64 { From 722047b0318a7305ab342fe0a6f2e4d339e9be5a Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 7 May 2025 18:19:02 +0200 Subject: [PATCH 2/8] Something that starts to work --- quinn-proto/src/connection/mod.rs | 542 +++++++++++++-------------- quinn-proto/src/connection/spaces.rs | 24 +- 2 files changed, 275 insertions(+), 291 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index d7f4843bf..9a6e8e077 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -180,7 +180,7 @@ pub struct Connection { spin: bool, /// Packet number spaces: initial, handshake, 1-RTT spaces: [PacketSpace; 3], - /// Highest usable packet number space + /// Highest usable [`SpaceId`] highest_space: SpaceId, /// 1-RTT keys used prior to a key update prev_crypto: Option, @@ -503,7 +503,8 @@ impl Connection { // all datagrams in a GSO batch are for the same destination. Therefore only // datagrams for one Path ID are produced for each poll_transmit call. - // First all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending. + // First, if we have to send a close, select a path for that. + // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending. // For all AVAILABLE paths: // - Is the path congestion blocked or pacing blocked? @@ -519,44 +520,16 @@ impl Connection { // out which path we want to send the packet on before we start building it. let path_id = PathId(0); - for space in SpaceId::iter() { - let request_immediate_ack = - space == SpaceId::Data && self.peer_supports_ack_frequency(); - for (path_id, pns) in self.spaces[space].iter_paths() { - if self.spaces[space].for_path(*path_id).loss_probes > 0 { - self.spaces[space].maybe_queue_probe( - *path_id, - request_immediate_ack, - &self.streams, - ); - break; - } - } - } - let mut buf = TransmitBuf::new( buf, max_datagrams, self.path_data(path_id).current_mtu().into(), ); - for (path_id, path) in self.paths.iter() { - let path = &path.path; - todo!(); - } - if let Some(challenge) = self.send_prev_path_challenge(now, &mut buf, path_id) { return Some(challenge); } - // If we need to send a probe, make sure we have something to send. - // TODO(flub): When we select the PathId we need this. - for space in SpaceId::iter() { - let request_immediate_ack = - space == SpaceId::Data && self.peer_supports_ack_frequency(); - self.spaces[space].maybe_queue_probe(path_id, request_immediate_ack, &self.streams); - } - // Check whether we need to send a close message let close = match self.state { State::Drained => { @@ -576,13 +549,13 @@ impl Connection { }; // Check whether we need to send an ACK_FREQUENCY frame - let rtt = self - .paths - .values() - .map(|p| p.path.rtt.get()) - .min() - .expect("one path exists"); if let Some(config) = &self.config.ack_frequency_config { + let rtt = self + .paths + .values() + .map(|p| p.path.rtt.get()) + .min() + .expect("one path exists"); self.spaces[SpaceId::Data].pending.ack_frequency = self .ack_frequency .should_send_ack_frequency(rtt, config, &self.peer_params) @@ -605,102 +578,87 @@ impl Connection { // The packet number of the last built packet. let mut last_packet_number = None; - // Iterate over all spaces and find data to send - // - // Each loop builds one packet, which is finished before the next iteration of the - // loop. When packets are coalesced a datagram is filled over multiple loops. - let mut next_space_id = self.next_send_space(SpaceId::Initial, path_id, &buf, close); - while let Some(space_id) = next_space_id { - // Whether the next packet will contain ack-eliciting frames. - let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams) - || self.spaces[space_id].for_path(path_id).ping_pending - || self.spaces[space_id] - .for_path(path_id) - .immediate_ack_pending; - if space_id == SpaceId::Data { - let pn = self.spaces[SpaceId::Data] - .for_path(path_id) - .peek_tx_number(); - let frame_space_1rtt = buf - .segment_size() - .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); - ack_eliciting |= self.can_send_1rtt(frame_space_1rtt); - } - - // If the datagram is full, we need to start a new one. - if buf.datagram_remaining_mut() == 0 { - // Is 1 more datagram allowed? - if buf.num_datagrams() >= buf.max_datagrams() { - // No more datagrams allowed - break; + let mut path_id = *self.paths.first_key_value().expect("one path must exist").0; + let mut space_id = match path_id { + PathId(0) => SpaceId::Initial, + _ => SpaceId::Data, + }; + loop { + // Can anything be sent on this packet number space? If not advance either the + // SpaceId or PathId and try again. + let send_ready = self.space_ready_to_send(path_id, space_id, &buf, close, now); + let can_send = match send_ready { + SendReady::Frames(can_send) if !can_send.is_empty() => can_send, + SendReady::Frames(_) if space_id < SpaceId::Data => { + space_id = space_id.next(); + continue; } - - // Anti-amplification is only based on `total_sent`, which gets - // updated at the end of this method. Therefore we pass the amount - // of bytes for datagrams that are already created, as well as 1 byte - // for starting another datagram. If there is any anti-amplification - // budget left, we always allow a full MTU to be sent - // (see https://github.com/quinn-rs/quinn/issues/1082) - if self.path_data(path_id).anti_amplification_blocked( - (buf.segment_size() * buf.num_datagrams()) as u64 + 1, - ) { - trace!("blocked by anti-amplification"); - break; + SendReady::CongestionBlocked if space_id < SpaceId::Data => { + // Higher spaces might still have tail-loss probes to send, which are + // not congestion blocked. + congestion_blocked = true; + space_id = space_id.next(); + continue; } - - // Congestion control and pacing checks - // Tail loss probes must not be blocked by congestion, or a deadlock could arise - if ack_eliciting && self.spaces[space_id].for_path(path_id).loss_probes == 0 { - let bytes_to_send = buf.segment_size() as u64; - if self.path_data(path_id).in_flight.bytes + bytes_to_send - >= self.path_data(path_id).congestion.window() - { - next_space_id = self.next_send_space(space_id.next(), path_id, &buf, close); + _ => { + // Nothing more to send on this path. + if !matches!(send_ready, SendReady::Frames(_)) { congestion_blocked = true; - trace!("blocked by congestion control"); - if next_space_id == Some(space_id) { - // We are in the highest space, nothing more to do. - break; - } else { - // We continue looking for packets in higher spaces because we - // might still have to send loss probes in them, which are not - // congestion controlled. - continue; - } } - // Check whether the next datagram is blocked by pacing - if let Some(delay) = - self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) - { - self.timers.set(Timer::Pacing(path_id), delay); - congestion_blocked = true; - // Loss probes should be subject to pacing, even though - // they are not congestion controlled. - trace!("blocked by pacing"); + // If there are any datagrams in the transmit, packets for another path + // can not be built anymore. + if buf.num_datagrams() > 0 { break; } + + // TODO(flub): We want to prioritise active paths and those needing path + // challenges/responses. + match self.paths.keys().find(|&&path| path > path_id) { + Some(new_path_id) => { + // See if this next path can send anything. + path_id = *new_path_id; + space_id = SpaceId::Data; + continue; + } + None => { + // Nothing more to send. + break; + } + } + } + }; + + // If the datagram is full, we need to start a new one. + if buf.datagram_remaining_mut() == 0 { + if buf.num_datagrams() >= buf.max_datagrams() { + // No more datagrams allowed + break; } - // Start the next datagram match self.spaces[space_id].for_path(path_id).loss_probes { 0 => buf.start_new_datagram(), _ => { self.spaces[space_id].for_path(path_id).loss_probes -= 1; - // Clamp the datagram to at most the minimum MTU to ensure that loss probes - // can get through and enable recovery even if the path MTU has shrank - // unexpectedly. + // Clamp the datagram to at most the minimum MTU to ensure that loss + // probes can get through and enable recovery even if the path MTU + // has shrank unexpectedly. buf.start_new_datagram_with_size(std::cmp::min( usize::from(INITIAL_MTU), buf.segment_size(), )); } - }; + } + trace!(count = buf.num_datagrams(), "new datagram started"); coalesce = true; pad_datagram = false; } - debug_assert!(buf.datagram_remaining_mut() >= MIN_PACKET_SPACE); + // If coalescing another packet into the existing datagram, there should + // still be enough space for a whole packet. + if buf.datagram_start_offset() < buf.len() { + debug_assert!(buf.datagram_remaining_mut() >= MIN_PACKET_SPACE); + } // // From here on, we've determined that a packet will definitely be sent. @@ -727,17 +685,17 @@ impl Connection { path_id, self.rem_cids.get(&path_id).unwrap().active(), &mut buf, - ack_eliciting, + can_send.other, self, )?; last_packet_number = Some(builder.exact_number); coalesce = coalesce && !builder.short_header; - // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1 + // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1 pad_datagram |= - space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting); + space_id == SpaceId::Initial && (self.side.is_client() || can_send.other); - if close { + if can_send.close { trace!("sending CONNECTION_CLOSE"); // Encode ACKs before the ConnectionClose message, to give the receiver // a better approximate on what data has been processed. This is @@ -797,28 +755,24 @@ impl Connection { builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); if space_id == self.highest_space { // Don't send another close packet + // TODO(flub): Is it worth sending CONNECTION_CLOSE on all paths? self.close = false; // `CONNECTION_CLOSE` is the final packet break; } else { - // Send a close frame in every possible space for robustness, per RFC9000 - // "Immediate Close during the Handshake". Don't bother trying to send anything - // else. - // TODO(flub): Pretty sure with GSO disabled this doesn't work if - // closing during the handshake. The next datagram would not be - // allowed, on the next call to poll_transmit you would send - // CONNECTION_CLOSE in the initial or handshake space again. - next_space_id = self.next_send_space(space_id.next(), path_id, &buf, close); + // Send a close frame in every possible space for robustness, per + // RFC9000 "Immediate Close during the Handshake". Don't bother trying + // to send anything else. + space_id = space_id.next(); continue; } } - // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path - // validation can occur while the link is saturated. + // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that + // path validation can occur while the link is saturated. if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 { let path = self.path_data_mut(path_id); - let remote = path.remote; - if let Some((token, remote)) = path.path_responses.pop_off_path(remote) { + if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) { // TODO(flub): We need to use the right CID! We shouldn't use the same // CID as the current active one for the path. Though see also // https://github.com/quinn-rs/quinn/issues/2184 @@ -861,29 +815,15 @@ impl Connection { // only checked if the full MTU is available and when potentially large fixed-size // frames aren't queued, so that lack of space in the datagram isn't the reason for just // writing ACKs. - { - let pn = if builder.space == SpaceId::Data { - builder.exact_number - } else { - self.spaces[SpaceId::Data] - .for_path(path_id) - .peek_tx_number() - }; - let frame_space_1rtt = builder - .buf - .segment_size() - .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); - let can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); - debug_assert!( - !(sent_frames.is_ack_only(&self.streams) - && !can_send.acks - && can_send.other - && builder.buf.segment_size() - == self.path_data(path_id).current_mtu() as usize - && self.datagrams.outgoing.is_empty()), - "SendableFrames was {can_send:?}, but only ACKs have been written" - ); - } + debug_assert!( + !(sent_frames.is_ack_only(&self.streams) + && !can_send.acks + && can_send.other + && builder.buf.segment_size() + == self.path_data(path_id).current_mtu() as usize + && self.datagrams.outgoing.is_empty()), + "SendableFrames was {can_send:?}, but only ACKs have been written" + ); pad_datagram |= sent_frames.requires_padding; if sent_frames.largest_acked.is_some() { @@ -895,86 +835,66 @@ impl Connection { // be coalescing the next packet into this one, or will be ending the datagram // as well. Because if this is the last packet in the datagram more padding // might be needed because of the packet type, or to fill the GSO segment size. - next_space_id = self.next_send_space(space_id, path_id, builder.buf, close); - if let Some(next_space_id) = next_space_id { - // Are we allowed to coalesce AND is there enough space for another *packet* - // in this datagram? - if coalesce - && builder - .buf - .datagram_remaining_mut() - .saturating_sub(builder.predict_packet_end()) - > MIN_PACKET_SPACE - { - // We can append/coalesce the next packet into the current - // datagram. Finish the current packet without adding extra padding. - builder.finish_and_track(now, self, path_id, sent_frames, false); - } else { - // We need a new datagram for the next packet. Finish the current - // packet with padding. - if builder.buf.num_datagrams() > 1 { - // If too many padding bytes would be required to continue the - // GSO batch after this packet, end the GSO batch here. Ensures - // that fixed-size frames with heterogeneous sizes - // (e.g. application datagrams) won't inadvertently waste large - // amounts of bandwidth. The exact threshold is a bit arbitrary - // and might benefit from further tuning, though there's no - // universally optimal value. - // - // Additionally, if this datagram is a loss probe and - // `segment_size` is larger than `INITIAL_MTU`, then padding it - // to `segment_size` to continue the GSO batch would risk - // failure to recover from a reduction in path MTU. Loss probes - // are the only packets for which we might grow `buf_capacity` - // by less than `segment_size`. - const MAX_PADDING: usize = 16; - if builder.buf.datagram_remaining_mut() - > builder.predict_packet_end() + MAX_PADDING - { - trace!( - "GSO truncated by demand for {} padding bytes", - builder.buf.datagram_remaining_mut() - builder.predict_packet_end() - ); - builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); - break; - } - // Pad the current datagram to GSO segment size so it can be - // included in the GSO batch. - builder.pad_to(builder.buf.segment_size() as u16); + // Are we allowed to coalesce AND is there enough space for another *packet* in + // this datagram AND is there another packet to send in this or the next space? + if coalesce + && builder + .buf + .datagram_remaining_mut() + .saturating_sub(builder.predict_packet_end()) + > MIN_PACKET_SPACE + && (matches!( + self.space_ready_to_send(path_id, space_id, builder.buf, close, now), + SendReady::Frames(can_send) if !can_send.is_empty(), + ) || matches!( + self.space_ready_to_send(path_id, space_id.next(), builder.buf, close, now), + SendReady::Frames(can_send) if !can_send.is_empty(), + )) + { + // We can append/coalesce the next packet into the current + // datagram. Finish the current packet without adding extra padding. + builder.finish_and_track(now, self, path_id, sent_frames, false); + } else { + // We need a new datagram for the next packet. Finish the current + // packet with padding. + if builder.buf.num_datagrams() > 1 { + // If too many padding bytes would be required to continue the + // GSO batch after this packet, end the GSO batch here. Ensures + // that fixed-size frames with heterogeneous sizes + // (e.g. application datagrams) won't inadvertently waste large + // amounts of bandwidth. The exact threshold is a bit arbitrary + // and might benefit from further tuning, though there's no + // universally optimal value. + // + // Additionally, if this datagram is a loss probe and + // `segment_size` is larger than `INITIAL_MTU`, then padding it + // to `segment_size` to continue the GSO batch would risk + // failure to recover from a reduction in path MTU. Loss probes + // are the only packets for which we might grow `buf_capacity` + // by less than `segment_size`. + const MAX_PADDING: usize = 16; + if builder.buf.datagram_remaining_mut() + > builder.predict_packet_end() + MAX_PADDING + { + trace!( + "GSO truncated by demand for {} padding bytes", + builder.buf.datagram_remaining_mut() - builder.predict_packet_end() + ); + builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); + break; } - builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); - - if buf.num_datagrams() == 1 { - buf.clip_datagram_size(); - if next_space_id == SpaceId::Data { - // Now that we know the size of the first datagram, check whether - // the data we planned to send will fit in the next segment. If - // not, bail out and leave it for the next GSO batch. We can't - // easily compute the right segment size before the original call to - // `space_can_send`, because at that time we haven't determined - // whether we're going to coalesce with the first datagram or - // potentially pad it to `MIN_INITIAL_SIZE`. - let pn = self.spaces[SpaceId::Data] - .for_path(path_id) - .peek_tx_number(); - let frame_space_1rtt = buf - .segment_size() - .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); - if self - .space_can_send(next_space_id, path_id, frame_space_1rtt) - .is_empty() - { - break; - } - } - } + // Pad the current datagram to GSO segment size so it can be + // included in the GSO batch. + builder.pad_to(builder.buf.segment_size() as u16); } - } else { - // Nothing more to send. This was the last packet. + builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); - break; + + if buf.num_datagrams() == 1 { + buf.clip_datagram_size(); + } } } @@ -1071,16 +991,67 @@ impl Connection { }) } - /// Returns the [`SpaceId`] of the next packet space which has data to send + // /// Returns the next path on which data is ready to be sent + // fn next_send_path(&mut self) -> Option { + // if !self.is_multipath_enabled() { + // return Some(PathId(0)); + // } + + // // If we still have initial or handshake spaces we prefer those. + // if self.highest_space < SpaceId::Data { + // for space_id in SpaceId::iter() { + // let space = &mut self.spaces[space_id]; + // if !space.can_send(PathId(0), &self.streams).is_empty() + // || space.for_path(PathId(0)).loss_probes > 0 + // { + // // TODO(flub): Check the path is not anti-aplification, congestion or + // // pacing blocked. + // return Some(PathId(0)); + // } + // } + // } + + // // From now on we only need to check SpaceId::Data. + // let space = &mut self.spaces[SpaceId::Data]; + + // // First look for pending PATH_CHALLENGE or PATH_RESPONSE frames. + + // // PATH_CHALLENGE probes for new paths so is not anti-amplification blocked. + // // PATH_RESPONSE is anti-amplification blocked. However we can send a PATH_RESPONSE that is + // for (path_id, path) in self.paths.iter() { + // let challenge_pending = path.path.challenge_pending + // || path + // .prev_path + // .as_ref() + // .is_some_and(|(_cid, path_data)| path_data.challenge_pending); + + // let response_pending = !path.path.path_responses.is_empty(); + // if challenge_pending || response_pending { + // // PATH_CHALLENGE and PATH_RESPONSE frames are not anti-amplification blocked. The + + // // TODO(flub): Check the path is not congestion or pacing blocked + // return Some(*path_id); + // } + // } + + // // + + // todo!(); + // None + // } + + /// Whether anything needs to be sent in this packet number space /// - /// This takes into account the space available to frames in the next datagram. - fn next_send_space( + /// This checks whether there is anything to send on the `(PathId, SpaceId)` tuple and + /// whether sending is allowed by the congestion controler etc. + fn space_ready_to_send( &mut self, - current_space_id: SpaceId, path_id: PathId, - buf: &TransmitBuf<'_>, + space_id: SpaceId, + transmit: &TransmitBuf<'_>, close: bool, - ) -> Option { + now: Instant, + ) -> SendReady { // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed // to be able to send an individual frame at least this large in the next 1-RTT // packet. This could be generalized to support every space, but it's only needed to @@ -1090,22 +1061,63 @@ impl Connection { let pn = self.spaces[SpaceId::Data] .for_path(path_id) .peek_tx_number(); - let frame_space_1rtt = buf + let frame_space_1rtt = transmit .segment_size() .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); - let mut space_id = current_space_id; - loop { - let can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); - if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { - return Some(space_id); - } - space_id = match space_id { - SpaceId::Initial => SpaceId::Handshake, - SpaceId::Handshake => SpaceId::Data, - SpaceId::Data => break, + let mut can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); + can_send.close = close && self.spaces[space_id].crypto.is_some(); + let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0; + + if can_send.is_empty() && !need_loss_probe { + return SendReady::Frames(can_send); + } + + // Anti-amplification is only based on `total_sent`, which gets updated after the + // transmit is sent. Therefore we pass the amount of bytes for datagrams that are + // already created, as well as 1 byte for starting another datagram. If there is any + // anti-amplification budget left, we always allow a full MTU to be sent (see + // https://github.com/quinn-rs/quinn/issues/1082). + if self + .path_data(path_id) + .anti_amplification_blocked(transmit.len() as u64 + 1) + { + return SendReady::AntiAmplificationBlocked; + } + + // Congestion control check. + // Tail loss probes must not be blocked by congestion, or a deadlock could arise. + let bytes_to_send = transmit.segment_size() as u64; + if can_send.other && !need_loss_probe && !can_send.close { + let path = self.path_data(path_id); + if path.in_flight.bytes + bytes_to_send >= path.congestion.window() { + trace!("blocked by congestion control"); + return SendReady::CongestionBlocked; } } - None + + // Pacing check. + if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) { + // TODO(@divma): this needs fixing asap + self.timers.set(Timer::Pacing(path_id), delay); + // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though + // they are not congestion controlled. + trace!("blocked by pacing"); + return SendReady::PacingBlocked; + } + + // Ensure there is something to send if a loss probe is needed. + if need_loss_probe && can_send.is_empty() { + let request_immediate_ack = + space_id == SpaceId::Data && self.peer_supports_ack_frequency(); + self.spaces[space_id].maybe_queue_probe(path_id, request_immediate_ack, &self.streams); + can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); + debug_assert!( + can_send.other, + "tail-loss probe must have something to send" + ); + } + + SendReady::Frames(can_send) } /// Send PATH_CHALLENGE for a previous path if necessary @@ -1184,7 +1196,7 @@ impl Connection { } let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams); if space_id == SpaceId::Data { - can_send.other |= self.can_send_1rtt(frame_space_1rtt); + can_send.other |= self.can_send_1rtt(path_id, frame_space_1rtt); } can_send } @@ -4236,41 +4248,6 @@ impl Connection { self.path_data(PathId(0)).current_mtu() } - fn next_send_path(&self) -> Option { - if !self.is_multipath_enabled() { - return Some(PathId(0)); - } - - // If we still have initial or handshake spaces we first check those. - if self.highest_space < SpaceId::Data { - todo!(); - } - - // First look for pending PATH_CHALLENGE or PATH_RESPONSE frames. - for space in SpaceId::iter() { - for (path_id, pns) in self.spaces[space].iter_paths() { - let challenge_pending = self.paths.get(path_id).is_some_and(|p| { - p.path.challenge_pending - || p.prev_path - .as_ref() - .is_some_and(|(_, path_data)| path_data.challenge_pending) - }); - let response_pending = self - .paths - .get(path_id) - .is_some_and(|p| !p.path.path_responses.is_empty()); - if challenge_pending || response_pending { - return Some(*path_id); - } - } - } - - // - - todo!(); - None - } - /// Whether we have 1-RTT data to send /// /// This checks for frames that can only be sent in the data space (1-RTT): @@ -4289,7 +4266,7 @@ impl Connection { }); self.streams.can_send_stream_data() || challenge_pending - || !self.path_responses.is_empty() + || !self.path_data(path_id).path_responses.is_empty() || self .datagrams .outgoing @@ -4402,6 +4379,13 @@ impl fmt::Debug for Connection { } } +enum SendReady { + Frames(SendableFrames), + AntiAmplificationBlocked, + CongestionBlocked, + PacingBlocked, +} + /// Fields of `Connection` specific to it being client-side or server-side enum ConnectionSide { Client { diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index 2a1030cb3..4c6b103c8 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -105,9 +105,6 @@ impl PacketSpace { /// waiting to be sent, then we retransmit in-flight data to reduce odds of loss. If there's no /// in-flight data either, we're probably a client guarding against a handshake /// anti-amplification deadlock and we just make something up. - // TODO(flub): This is still wrong! The probe needs to be sent for each path - // separately, and ON the path so that it gets a higher packet number than those that - // might be lost. pub(super) fn maybe_queue_probe( &mut self, path_id: PathId, @@ -147,13 +144,8 @@ impl PacketSpace { // Nothing new to send and nothing to retransmit, so fall back on a ping. This should only // happen in rare cases during the handshake when the server becomes blocked by // anti-amplification. - // TODO(flub): Sending a ping on all paths is wasteful, but we also need per-path - // pings so doing this is easier for now. Maybe later introduce a - // connection-level ping again. if !self.for_path(path_id).immediate_ack_pending { - self.number_spaces - .values_mut() - .for_each(|s| s.ping_pending = true); + self.for_path(path_id).ping_pending = true; } } @@ -167,8 +159,11 @@ impl PacketSpace { .number_spaces .get(&path_id) .is_some_and(|s| s.ping_pending || s.immediate_ack_pending); - - SendableFrames { acks, other } + SendableFrames { + acks, + other, + close: false, + } } /// The number of packets sent with the current crypto keys @@ -752,8 +747,12 @@ impl Dedup { /// Indicates which data is available for sending #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub(super) struct SendableFrames { + /// Whether there ACK frames to send pub(super) acks: bool, + /// Whether there are any other frames to send, these are ack-eliciting pub(super) other: bool, + /// Whether there is a CONNECTION_CLOSE to send + pub(super) close: bool, } impl SendableFrames { @@ -762,12 +761,13 @@ impl SendableFrames { Self { acks: false, other: false, + close: false, } } /// Whether no data is sendable pub(super) fn is_empty(&self) -> bool { - !self.acks && !self.other + !self.acks && !self.other && !self.close } } From 2a01ddce6a069d629d1df030d3dca710673c4b58 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 8 May 2025 15:33:50 +0200 Subject: [PATCH 3/8] Check the 3rd space too, this is very hacky --- quinn-proto/src/connection/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 9a6e8e077..7aa21e056 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -850,6 +850,9 @@ impl Connection { ) || matches!( self.space_ready_to_send(path_id, space_id.next(), builder.buf, close, now), SendReady::Frames(can_send) if !can_send.is_empty(), + ) || matches!( + self.space_ready_to_send(path_id, space_id.next().next(), builder.buf, close, now), + SendReady::Frames(can_send) if !can_send.is_empty(), )) { // We can append/coalesce the next packet into the current From 135aaf02fdeb57c197bb610c5797c4bd167cd583 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 9 May 2025 10:04:46 +0200 Subject: [PATCH 4/8] Better next space check, only check congestion for new dgram --- quinn-proto/src/connection/mod.rs | 127 ++++++++++++++++++++---------- quinn-proto/src/tests/mod.rs | 1 + 2 files changed, 88 insertions(+), 40 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 7aa21e056..2f8d376c7 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -566,8 +566,8 @@ impl Connection { // Whether this packet can be coalesced with another one in the same datagram. let mut coalesce = true; - // Whether the last packet in the datagram must be padded to at least - // MIN_INITIAL_SIZE. + // Whether the last packet in the datagram must be padded so the datagram takes up + // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller. let mut pad_datagram = false; // Whether congestion control stopped the next packet from being sent. Further @@ -590,6 +590,7 @@ impl Connection { let can_send = match send_ready { SendReady::Frames(can_send) if !can_send.is_empty() => can_send, SendReady::Frames(_) if space_id < SpaceId::Data => { + trace!(?space_id, ?path_id, "nothing left to send in this space"); space_id = space_id.next(); continue; } @@ -844,16 +845,19 @@ impl Connection { .datagram_remaining_mut() .saturating_sub(builder.predict_packet_end()) > MIN_PACKET_SPACE - && (matches!( - self.space_ready_to_send(path_id, space_id, builder.buf, close, now), - SendReady::Frames(can_send) if !can_send.is_empty(), - ) || matches!( - self.space_ready_to_send(path_id, space_id.next(), builder.buf, close, now), - SendReady::Frames(can_send) if !can_send.is_empty(), - ) || matches!( - self.space_ready_to_send(path_id, space_id.next().next(), builder.buf, close, now), - SendReady::Frames(can_send) if !can_send.is_empty(), - )) + && self + .next_send_space(space_id, path_id, builder.buf, close) + .is_some() + // && (matches!( + // self.space_ready_to_send(path_id, space_id, builder.buf, close, now), + // SendReady::Frames(can_send) if !can_send.is_empty(), + // ) || matches!( + // self.space_ready_to_send(path_id, space_id.next(), builder.buf, close, now), + // SendReady::Frames(can_send) if !can_send.is_empty(), + // ) || matches!( + // self.space_ready_to_send(path_id, space_id.next().next(), builder.buf, close, now), + // SendReady::Frames(can_send) if !can_send.is_empty(), + // )) { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. @@ -968,6 +972,8 @@ impl Connection { } trace!( + segment_size = buf.segment_size(), + last_datagram_len = buf.len() % buf.segment_size(), "sending {} bytes in {} datagrams", buf.len(), buf.num_datagrams() @@ -994,6 +1000,44 @@ impl Connection { }) } + /// Returns the [`SpaceId`] of the next packet space which has data to send + /// + /// This takes into account the space available to frames in the next datagram. + // TODO(flub): This duplication is not nice. + fn next_send_space( + &mut self, + current_space_id: SpaceId, + path_id: PathId, + buf: &TransmitBuf<'_>, + close: bool, + ) -> Option { + // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed + // to be able to send an individual frame at least this large in the next 1-RTT + // packet. This could be generalized to support every space, but it's only needed to + // handle large fixed-size frames, which only exist in 1-RTT (application + // datagrams). We don't account for coalesced packets potentially occupying space + // because frames can always spill into the next datagram. + let pn = self.spaces[SpaceId::Data] + .for_path(path_id) + .peek_tx_number(); + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); + let mut space_id = current_space_id; + loop { + let can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); + if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { + return Some(space_id); + } + space_id = match space_id { + SpaceId::Initial => SpaceId::Handshake, + SpaceId::Handshake => SpaceId::Data, + SpaceId::Data => break, + } + } + None + } + // /// Returns the next path on which data is ready to be sent // fn next_send_path(&mut self) -> Option { // if !self.is_multipath_enabled() { @@ -1075,37 +1119,40 @@ impl Connection { return SendReady::Frames(can_send); } - // Anti-amplification is only based on `total_sent`, which gets updated after the - // transmit is sent. Therefore we pass the amount of bytes for datagrams that are - // already created, as well as 1 byte for starting another datagram. If there is any - // anti-amplification budget left, we always allow a full MTU to be sent (see - // https://github.com/quinn-rs/quinn/issues/1082). - if self - .path_data(path_id) - .anti_amplification_blocked(transmit.len() as u64 + 1) - { - return SendReady::AntiAmplificationBlocked; - } + // Only if a new datagram is needed do we need to check anti-amplification, + // congestion control and pacing. + if transmit.datagram_remaining_mut() == 0 { + // Anti-amplification is only based on `total_sent`, which gets updated after + // the transmit is sent. Therefore we pass the amount of bytes for datagrams + // that are already created, as well as 1 byte for starting another datagram. If + // there is any anti-amplification budget left, we always allow a full MTU to be + // sent (see https://github.com/quinn-rs/quinn/issues/1082). + if self + .path_data(path_id) + .anti_amplification_blocked(transmit.len() as u64 + 1) + { + return SendReady::AntiAmplificationBlocked; + } - // Congestion control check. - // Tail loss probes must not be blocked by congestion, or a deadlock could arise. - let bytes_to_send = transmit.segment_size() as u64; - if can_send.other && !need_loss_probe && !can_send.close { - let path = self.path_data(path_id); - if path.in_flight.bytes + bytes_to_send >= path.congestion.window() { - trace!("blocked by congestion control"); - return SendReady::CongestionBlocked; + // Congestion control check. + // Tail loss probes must not be blocked by congestion, or a deadlock could arise. + let bytes_to_send = transmit.segment_size() as u64; + if can_send.other && !need_loss_probe && !can_send.close { + let path = self.path_data(path_id); + if path.in_flight.bytes + bytes_to_send >= path.congestion.window() { + trace!(?space_id, %path_id, "blocked by congestion control"); + return SendReady::CongestionBlocked; + } } - } - // Pacing check. - if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) { - // TODO(@divma): this needs fixing asap - self.timers.set(Timer::Pacing(path_id), delay); - // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though - // they are not congestion controlled. - trace!("blocked by pacing"); - return SendReady::PacingBlocked; + // Pacing check. + if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) { + self.timers.set(Timer::Pacing(path_id), delay); + // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though + // they are not congestion controlled. + trace!("blocked by pacing"); + return SendReady::PacingBlocked; + } } // Ensure there is something to send if a loss probe is needed. diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 453d14634..1c75e8711 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -1242,6 +1242,7 @@ fn server_hs_retransmit() { pair.step(); assert!(!pair.client.inbound.is_empty()); // Initial + Handshakes pair.client.inbound.clear(); + info!("client inbound queue cleared"); pair.drive(); assert_matches!( pair.client_conn_mut(client_ch).poll(), From 25645818fa7ec018467568f0f214822577836505 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sun, 11 May 2025 18:44:41 +0200 Subject: [PATCH 5/8] Always call maybe_queue_probe We need to make sure there's something ACK-eliciting. So there needs to be a can_send.other. But the maybe_queue_probe function already has this functionality in it. Just call it, even if the work that function does has weird lines. For now keep it the same. --- quinn-proto/src/connection/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 2f8d376c7..cce28bd3f 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1156,7 +1156,7 @@ impl Connection { } // Ensure there is something to send if a loss probe is needed. - if need_loss_probe && can_send.is_empty() { + if need_loss_probe { let request_immediate_ack = space_id == SpaceId::Data && self.peer_supports_ack_frequency(); self.spaces[space_id].maybe_queue_probe(path_id, request_immediate_ack, &self.streams); From 8ba7262786e686a7a86283695d189b0759a8dfe3 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 12 May 2025 12:44:14 +0200 Subject: [PATCH 6/8] clippy --- quinn-proto/src/connection/timer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quinn-proto/src/connection/timer.rs b/quinn-proto/src/connection/timer.rs index 0d256eb54..90c554b97 100644 --- a/quinn-proto/src/connection/timer.rs +++ b/quinn-proto/src/connection/timer.rs @@ -112,7 +112,7 @@ impl TimerTable { pub(super) fn values(&self) -> Vec { let mut values = self.timeout_queue.clone().into_sorted_vec(); values.retain(|entry| self.most_recent_timeout.get(&entry.timer) == Some(&entry.time)); - return values; + values } } From a90eafd60bd84adec1f4fcf36cdbaeba969531cf Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 12 May 2025 12:47:21 +0200 Subject: [PATCH 7/8] bump sccache to stop using deprecated github cache servers --- .github/workflows/rust.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 8d3231847..d02f0574a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -118,7 +118,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: mozilla-actions/sccache-action@v0.0.4 + - uses: mozilla-actions/sccache-action@v0.0.9 - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.rust }} @@ -181,7 +181,7 @@ jobs: SCCACHE_GHA_ENABLED: "on" steps: - uses: actions/checkout@v4 - - uses: mozilla-actions/sccache-action@v0.0.4 + - uses: mozilla-actions/sccache-action@v0.0.9 - uses: dtolnay/rust-toolchain@1.71.0 - uses: Swatinem/rust-cache@v2 - run: cargo check --locked --lib --all-features -p iroh-quinn-udp -p iroh-quinn-proto -p iroh-quinn @@ -193,7 +193,7 @@ jobs: SCCACHE_GHA_ENABLED: "on" steps: - uses: actions/checkout@v4 - - uses: mozilla-actions/sccache-action@v0.0.4 + - uses: mozilla-actions/sccache-action@v0.0.9 - uses: dtolnay/rust-toolchain@stable with: components: rustfmt, clippy From bbbc0cb63cab128498c829e2cbda051354e81e94 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 12 May 2025 12:56:44 +0200 Subject: [PATCH 8/8] fixup docs --- quinn-proto/src/connection/mod.rs | 2 +- quinn-proto/src/connection/spaces.rs | 2 ++ quinn-proto/src/connection/timer.rs | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index cce28bd3f..2e269077d 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1173,7 +1173,7 @@ impl Connection { /// Send PATH_CHALLENGE for a previous path if necessary /// /// QUIC-TRANSPORT section 9.3.3 - /// https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding + /// fn send_prev_path_challenge( &mut self, now: Instant, diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index 4c6b103c8..d3ece980f 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -152,6 +152,8 @@ impl PacketSpace { /// Whether there is anything to send in this space /// /// For the data space [`Connection::can_send_1rtt`] also needs to be consulted. + /// + /// [`Connection::can_send_1rtt`]: super::Connection::can_send_1rtt pub(super) fn can_send(&self, path_id: PathId, streams: &StreamsState) -> SendableFrames { let acks = self.pending_acks.can_send(); let other = !self.pending.is_empty(streams) diff --git a/quinn-proto/src/connection/timer.rs b/quinn-proto/src/connection/timer.rs index 90c554b97..9704f47f8 100644 --- a/quinn-proto/src/connection/timer.rs +++ b/quinn-proto/src/connection/timer.rs @@ -30,7 +30,7 @@ pub(crate) enum Timer { /// Keeps track of the nearest timeout for each `Timer` /// -/// The [`TimerTable`] is advanced with [`TimerTable::expire_timers`]. +/// The [`TimerTable`] is advanced with [`TimerTable::expire_before`]. #[derive(Debug, Clone, Default)] pub(crate) struct TimerTable { most_recent_timeout: FxHashMap,