From f830910ea296b90f815d4f0fe0ac3b0716aebca7 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 21 Feb 2021 21:46:00 -0800 Subject: [PATCH] Generic Send Offload (GSO) Support This change implements an initial version of GSO support [1][2][3] for Linux, which improves the effiency of sending data. The approach taken in this change is to create a buffer which contains multiple datagrams in `Connection::poll_transmit`. This was picked over trying to merge packets in the endpoint task, since packets seem to need to be padded to a common segment size in order to make them sendable via GSO. In order to to this in an efficient fashion, the `poll_transmit` method was restructred. Instead of selecting spaces upfront, it will now loop through all possible packet spaces, check if there is pending data to send and create packets and datagrams out of this. The last packet which was written to a datagram buffer is not finalized until it is clear whether follow-up packets need to be written, since we need to know whether this packet should get padded to MTU length or not. This change doesn't enable GSO yet, since it will only produce a single datagram. I will create a separate change for this. Performance measurements: **No GSO:** ``` Sent 1073741824 bytes on 1 streams in 4.31s (237.66 MiB/s) ``` **With GSO (up to 8 packets):** ``` Sent 1073741824 bytes on 1 streams in 3.02s (339.18 MiB/s) ``` [1] http://vger.kernel.org/lpc_net2018_talks/willemdebruijn-lpc2018-udpgso-paper-DRAFT-1.pdf [2] http://vger.kernel.org/lpc_net2018_talks/willemdebruijn-lpc2018-udpgso-presentation-20181104.pdf [3] https://lwn.net/Articles/752956/ --- quinn-proto/src/connection/mod.rs | 166 +++++++++++++++++++++++++----- 1 file changed, 138 insertions(+), 28 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index c9a87c0f0..77b8c6e54 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -345,6 +345,9 @@ where /// - a call was made to `handle_timeout` #[must_use] pub fn poll_transmit(&mut self, now: Instant) -> Option { + // This will become a parameter to `poll_transmit` in an additional update + const MAX_DATAGRAMS: usize = 1; + let mut num_datagrams = 0; // Send PATH_CHALLENGE for a previous path if necessary @@ -402,67 +405,159 @@ where } } - // Select the set of spaces that have data to send so we can try to coalesce them - let (spaces, close) = match self.state { + // Check whether we need to send a close message + let mut close = match self.state { State::Drained => { self.app_limited = true; return None; } State::Draining | State::Closed(_) => { + // TODO: This seems to lose the `close` flag if it can't be + // immediately enqueued if mem::replace(&mut self.close, false) { - (vec![self.highest_space], true) + true } else { self.app_limited = true; return None; } } - _ => ( - SpaceId::iter() - .filter(|&x| self.space_can_send(x)) - .collect(), - false, - ), + _ => false, }; - let mut buf = Vec::with_capacity(self.path.mtu as usize); + let mut buf = Vec::new(); // Reserving capacity can provide more capacity than we asked for. // However we are not allowed to write more than MTU size. Therefore // the maximum capacity is tracked separately. - let buf_capacity = self.path.mtu as usize; + let mut buf_capacity = 0; + + let mut coalesce = true; + let mut builder: Option = None; + let mut sent_frames = None; let mut pad_datagram = false; - num_datagrams += 1; + let mut congestion_blocked = false; - let mut coalesce = spaces.len() > 1; + // Iterate over all spaces and find data to send + let mut space_idx = 0; + let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data]; + while space_idx < spaces.len() { + let space_id = spaces[space_idx]; - let mut congestion_blocked = false; + if close && space_id != self.highest_space { + // We ignore data in this space, since the close message + // has higher priority + space_idx += 1; + continue; + } + + // Is there data or a close message to send in this space? + if !self.space_can_send(space_id) && !close { + space_idx += 1; + continue; + } - for space_id in spaces { let mut ack_eliciting = !self.spaces[space_id].pending.is_empty() || self.spaces[space_id].ping_pending; if space_id == SpaceId::Data { ack_eliciting |= self.can_send_1rtt(); + } + + // Can we append more data into the current buffer? + // It is not safe to assume that `buf.len()` is the end of the data, + // since the last packet might not have been finished. + let buf_end = if let Some(builder) = &builder { + buf.len().max(builder.min_size) + builder.tag_len + } else { + buf.len() + }; + + if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE { + // We need to send 1 more datagram and extend the buffer for that. + + // Is 1 more datagram allowed? + if buf_capacity >= self.path.mtu as usize * MAX_DATAGRAMS { + // No more datagrams allowed + break; + } + + // Anti-amplification is only based on `total_sent`, which gets + // updated at the end of this method. Therefore we pass the accumulated + // amount of datagrams and bytes here. + if self + .path + .anti_amplification_blocked(self.path.mtu as u64 * (num_datagrams + 1) as u64) + { + trace!("blocked by anti-amplification"); + break; + } + + // Congestion control and pacing checks // Tail loss probes must not be blocked by congestion, or a deadlock could arise if ack_eliciting && self.spaces[space_id].loss_probes == 0 { - if self.congestion_blocked(u64::from(self.path.mtu)) { + // Assume the current packet will get padded to fill the full MTU + let untracked_bytes = if let Some(builder) = &builder { + buf_capacity - builder.partial_encode.start + } else { + 0 + } as u64; + debug_assert!(untracked_bytes <= self.path.mtu as u64); + + let bytes_to_send = u64::from(self.path.mtu) + untracked_bytes; + + if self.congestion_blocked(bytes_to_send) { + space_idx += 1; congestion_blocked = true; + // We continue instead of breaking here in order to avoid + // blocking loss probes queued for higher spaces. continue; } + + // Check whether the next datagram is blocked by pacing let smoothed_rtt = self.path.rtt.get(); - let window = self.path.congestion.window(); if let Some(delay) = self.path.pacing.delay( smoothed_rtt, - self.path.mtu as u64, + bytes_to_send, self.path.mtu, - window, + self.path.congestion.window(), now, ) { self.timers.set(Timer::Pacing, delay); congestion_blocked = true; - continue; + // Loss probes should be subject to pacing, even though + // they are not congestion controlled. + break; } } + + // Finish current packet + if let Some(mut builder) = builder.take() { + // Pad the packet to make it suitable for sending with GSO + // which will always send the maximum PDU. + builder.pad_to_mtu_size(self.path.mtu); + + self.finish_and_track_packet(now, builder, sent_frames.take(), &mut buf); + + debug_assert_eq!(buf.len(), buf_capacity, "Packet must be padded"); + } + + // Allocate space for another datagram + buf_capacity += self.path.mtu as usize; + if buf.capacity() < buf_capacity { + buf.reserve(buf_capacity - buf.capacity()); + } + num_datagrams += 1; + coalesce = true; + pad_datagram = false; + } else { + // We can append/coalesce the next packet into the current + // datagram. + // Finish current packet without adding extra padding + if let Some(builder) = builder.take() { + self.finish_and_track_packet(now, builder, sent_frames.take(), &mut buf); + } } + debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE); + // // From here on, we've determined that a packet will definitely be sent. // @@ -479,17 +574,25 @@ where prev.update_unacked = false; } - let mut builder = self.begin_packet( + debug_assert!( + builder.is_none() && sent_frames.is_none(), + "Previous packet must have been finished" + ); + + // This should really be `builder.insert()`, but `Option::insert` + // is not stable yet. Since we `debug_assert!(builder.is_none())` it + // doesn't make any functional difference. + let builder = builder.get_or_insert(self.begin_packet( now, space_id, &mut buf, buf_capacity, (num_datagrams - 1) * (self.path.mtu as usize), ack_eliciting, - )?; + )?); coalesce = coalesce && !builder.short_header; - let sent_frames = if close { + sent_frames = if close { trace!("sending CONNECTION_CLOSE"); match self.state { State::Closed(state::Closed { ref reason }) => { @@ -517,6 +620,8 @@ where // A close frame in the initial space requires padding pad_datagram = true; coalesce = false; + // We don't want to send 2 close packets + close = false; None } else { Some(self.populate_packet(space_id, &mut buf, buf_capacity - builder.tag_len)) @@ -534,15 +639,16 @@ where pad_datagram |= sent.requires_padding; } + // Don't increment space_idx. + // We stay in the current space and check if there is more data to send. + } + + // Finish the last packet + if let Some(mut builder) = builder { if pad_datagram { builder.pad_to_min_initial_size(); } - self.finish_and_track_packet(now, builder, sent_frames, &mut buf); - - if !coalesce || buf_capacity - buf.len() < MIN_PACKET_SPACE { - break; - } } self.app_limited = buf.is_empty() && !congestion_blocked; @@ -3533,6 +3639,10 @@ impl PacketBuilder { fn pad_to_min_initial_size(&mut self) { self.min_size = self.datagram_start + MIN_INITIAL_SIZE - self.tag_len; } + + fn pad_to_mtu_size(&mut self, mtu: u16) { + self.min_size = self.datagram_start + (mtu as usize) - self.tag_len; + } } /// Perform key updates this many packets before the AEAD confidentiality limit.