Skip to content

Commit

Permalink
Generic Send Offload (GSO) Support
Browse files Browse the repository at this point in the history
This change implements an initial version of GSO support [1][2][3]
for Linux, which improves the effiency of sending data.

The approach taken in this change is to create a buffer which contains
multiple datagrams in `Connection::poll_transmit`. This was picked
over trying to merge packets in the endpoint task, since packets
seem to need to be padded to a common segment size in order to
make them sendable via GSO.

In order to to this in an efficient fashion, the `poll_transmit`
method was restructred. Instead of selecting spaces upfront, it
will now loop through all possible packet spaces, check if there is
pending data to send and create packets and datagrams out of this.

The last packet which was written to a datagram buffer is not
finalized until it is clear whether follow-up packets need to be
written, since we need to know whether this packet should get padded
to MTU length or not.

This change doesn't enable GSO yet, since it will only produce a
single datagram. I will create a separate change for this.

Performance measurements:

**No GSO:**

```
Sent 1073741824 bytes on 1 streams in 4.31s (237.66 MiB/s)
```

**With GSO (up to 8 packets):**

```
Sent 1073741824 bytes on 1 streams in 3.02s (339.18 MiB/s)
```

[1] http://vger.kernel.org/lpc_net2018_talks/willemdebruijn-lpc2018-udpgso-paper-DRAFT-1.pdf
[2] http://vger.kernel.org/lpc_net2018_talks/willemdebruijn-lpc2018-udpgso-presentation-20181104.pdf
[3] https://lwn.net/Articles/752956/
  • Loading branch information
Matthias247 authored and djc committed Feb 22, 2021
1 parent f1628f8 commit f830910
Showing 1 changed file with 138 additions and 28 deletions.
166 changes: 138 additions & 28 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ where
/// - a call was made to `handle_timeout`
#[must_use]
pub fn poll_transmit(&mut self, now: Instant) -> Option<Transmit> {
// This will become a parameter to `poll_transmit` in an additional update
const MAX_DATAGRAMS: usize = 1;

let mut num_datagrams = 0;

// Send PATH_CHALLENGE for a previous path if necessary
Expand Down Expand Up @@ -402,67 +405,159 @@ where
}
}

// Select the set of spaces that have data to send so we can try to coalesce them
let (spaces, close) = match self.state {
// Check whether we need to send a close message
let mut close = match self.state {
State::Drained => {
self.app_limited = true;
return None;
}
State::Draining | State::Closed(_) => {
// TODO: This seems to lose the `close` flag if it can't be
// immediately enqueued
if mem::replace(&mut self.close, false) {
(vec![self.highest_space], true)
true
} else {
self.app_limited = true;
return None;
}
}
_ => (
SpaceId::iter()
.filter(|&x| self.space_can_send(x))
.collect(),
false,
),
_ => false,
};

let mut buf = Vec::with_capacity(self.path.mtu as usize);
let mut buf = Vec::new();
// Reserving capacity can provide more capacity than we asked for.
// However we are not allowed to write more than MTU size. Therefore
// the maximum capacity is tracked separately.
let buf_capacity = self.path.mtu as usize;
let mut buf_capacity = 0;

let mut coalesce = true;
let mut builder: Option<PacketBuilder> = None;
let mut sent_frames = None;
let mut pad_datagram = false;
num_datagrams += 1;
let mut congestion_blocked = false;

let mut coalesce = spaces.len() > 1;
// Iterate over all spaces and find data to send
let mut space_idx = 0;
let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
while space_idx < spaces.len() {
let space_id = spaces[space_idx];

let mut congestion_blocked = false;
if close && space_id != self.highest_space {
// We ignore data in this space, since the close message
// has higher priority
space_idx += 1;
continue;
}

// Is there data or a close message to send in this space?
if !self.space_can_send(space_id) && !close {
space_idx += 1;
continue;
}

for space_id in spaces {
let mut ack_eliciting =
!self.spaces[space_id].pending.is_empty() || self.spaces[space_id].ping_pending;
if space_id == SpaceId::Data {
ack_eliciting |= self.can_send_1rtt();
}

// Can we append more data into the current buffer?
// It is not safe to assume that `buf.len()` is the end of the data,
// since the last packet might not have been finished.
let buf_end = if let Some(builder) = &builder {
buf.len().max(builder.min_size) + builder.tag_len
} else {
buf.len()
};

if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE {
// We need to send 1 more datagram and extend the buffer for that.

// Is 1 more datagram allowed?
if buf_capacity >= self.path.mtu as usize * MAX_DATAGRAMS {
// No more datagrams allowed
break;
}

// Anti-amplification is only based on `total_sent`, which gets
// updated at the end of this method. Therefore we pass the accumulated
// amount of datagrams and bytes here.
if self
.path
.anti_amplification_blocked(self.path.mtu as u64 * (num_datagrams + 1) as u64)
{
trace!("blocked by anti-amplification");
break;
}

// Congestion control and pacing checks
// Tail loss probes must not be blocked by congestion, or a deadlock could arise
if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
if self.congestion_blocked(u64::from(self.path.mtu)) {
// Assume the current packet will get padded to fill the full MTU
let untracked_bytes = if let Some(builder) = &builder {
buf_capacity - builder.partial_encode.start
} else {
0
} as u64;
debug_assert!(untracked_bytes <= self.path.mtu as u64);

let bytes_to_send = u64::from(self.path.mtu) + untracked_bytes;

if self.congestion_blocked(bytes_to_send) {
space_idx += 1;
congestion_blocked = true;
// We continue instead of breaking here in order to avoid
// blocking loss probes queued for higher spaces.
continue;
}

// Check whether the next datagram is blocked by pacing
let smoothed_rtt = self.path.rtt.get();
let window = self.path.congestion.window();
if let Some(delay) = self.path.pacing.delay(
smoothed_rtt,
self.path.mtu as u64,
bytes_to_send,
self.path.mtu,
window,
self.path.congestion.window(),
now,
) {
self.timers.set(Timer::Pacing, delay);
congestion_blocked = true;
continue;
// Loss probes should be subject to pacing, even though
// they are not congestion controlled.
break;
}
}

// Finish current packet
if let Some(mut builder) = builder.take() {
// Pad the packet to make it suitable for sending with GSO
// which will always send the maximum PDU.
builder.pad_to_mtu_size(self.path.mtu);

self.finish_and_track_packet(now, builder, sent_frames.take(), &mut buf);

debug_assert_eq!(buf.len(), buf_capacity, "Packet must be padded");
}

// Allocate space for another datagram
buf_capacity += self.path.mtu as usize;
if buf.capacity() < buf_capacity {
buf.reserve(buf_capacity - buf.capacity());
}
num_datagrams += 1;
coalesce = true;
pad_datagram = false;
} else {
// We can append/coalesce the next packet into the current
// datagram.
// Finish current packet without adding extra padding
if let Some(builder) = builder.take() {
self.finish_and_track_packet(now, builder, sent_frames.take(), &mut buf);
}
}

debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);

//
// From here on, we've determined that a packet will definitely be sent.
//
Expand All @@ -479,17 +574,25 @@ where
prev.update_unacked = false;
}

let mut builder = self.begin_packet(
debug_assert!(
builder.is_none() && sent_frames.is_none(),
"Previous packet must have been finished"
);

// This should really be `builder.insert()`, but `Option::insert`
// is not stable yet. Since we `debug_assert!(builder.is_none())` it
// doesn't make any functional difference.
let builder = builder.get_or_insert(self.begin_packet(
now,
space_id,
&mut buf,
buf_capacity,
(num_datagrams - 1) * (self.path.mtu as usize),
ack_eliciting,
)?;
)?);
coalesce = coalesce && !builder.short_header;

let sent_frames = if close {
sent_frames = if close {
trace!("sending CONNECTION_CLOSE");
match self.state {
State::Closed(state::Closed { ref reason }) => {
Expand Down Expand Up @@ -517,6 +620,8 @@ where
// A close frame in the initial space requires padding
pad_datagram = true;
coalesce = false;
// We don't want to send 2 close packets
close = false;
None
} else {
Some(self.populate_packet(space_id, &mut buf, buf_capacity - builder.tag_len))
Expand All @@ -534,15 +639,16 @@ where
pad_datagram |= sent.requires_padding;
}

// Don't increment space_idx.
// We stay in the current space and check if there is more data to send.
}

// Finish the last packet
if let Some(mut builder) = builder {
if pad_datagram {
builder.pad_to_min_initial_size();
}

self.finish_and_track_packet(now, builder, sent_frames, &mut buf);

if !coalesce || buf_capacity - buf.len() < MIN_PACKET_SPACE {
break;
}
}

self.app_limited = buf.is_empty() && !congestion_blocked;
Expand Down Expand Up @@ -3533,6 +3639,10 @@ impl PacketBuilder {
fn pad_to_min_initial_size(&mut self) {
self.min_size = self.datagram_start + MIN_INITIAL_SIZE - self.tag_len;
}

fn pad_to_mtu_size(&mut self, mtu: u16) {
self.min_size = self.datagram_start + (mtu as usize) - self.tag_len;
}
}

/// Perform key updates this many packets before the AEAD confidentiality limit.
Expand Down

0 comments on commit f830910

Please sign in to comment.