Skip to content

Commit

Permalink
feat(udp): use recvmmsg
Browse files Browse the repository at this point in the history
Read up to `BATCH_SIZE = 32` with single `recvmmsg` syscall.

Previously `neqo_bin::udp::Socket::recv` would use `recvmmsg`, but provide a
single buffer to write into only, effectively using `recvmsg` instead of
`recvmmsg`.

With this commit `Socket::recv` provides `BATCH_SIZE` number of buffers on each
`recvmmsg` syscall, thus reading more than one datagram at a time if available.
  • Loading branch information
mxinden committed Mar 15, 2024
1 parent 30a02ea commit 35efc39
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 141 deletions.
4 changes: 4 additions & 0 deletions neqo-bin/src/bin/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ impl super::Client for Connection {
self.process(dgram, now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_input(dgram, now);
}

Check warning on line 148 in neqo-bin/src/bin/client/http09.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/bin/client/http09.rs#L146-L148

Added lines #L146 - L148 were not covered by tests

fn close<S>(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S)
where
S: AsRef<str> + std::fmt::Display,
Expand Down
4 changes: 4 additions & 0 deletions neqo-bin/src/bin/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl super::Client for Http3Client {
self.process(dgram, now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_input(dgram, now);
}

Check warning on line 124 in neqo-bin/src/bin/client/http3.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/bin/client/http3.rs#L122-L124

Added lines #L122 - L124 were not covered by tests

fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display,
Expand Down
12 changes: 8 additions & 4 deletions neqo-bin/src/bin/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ trait Handler {

/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
// TODO: datagram option needed?
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_input(&mut self, dgram: &Datagram, now: Instant);
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display;
Expand Down Expand Up @@ -365,11 +367,13 @@ impl<'a, H: Handler> Runner<'a, H> {
match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
let mut is_empty = true;
for dgram in dgrams {
is_empty = false;
self.client.process_input(&dgram, Instant::now());

Check warning on line 373 in neqo-bin/src/bin/client/main.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/bin/client/main.rs#L370-L373

Added lines #L370 - L373 were not covered by tests
}
for dgram in &dgrams {
self.process(Some(dgram)).await?;
if is_empty {
break;

Check warning on line 376 in neqo-bin/src/bin/client/main.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/bin/client/main.rs#L375-L376

Added lines #L375 - L376 were not covered by tests
}
self.handler.maybe_key_update(&mut self.client)?;
},
Expand Down
16 changes: 12 additions & 4 deletions neqo-bin/src/bin/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ fn qns_read_response(filename: &str) -> Option<Vec<u8>> {
}

trait HttpServer: Display {
// TODO: Remove the option?
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_input(&mut self, dgram: &Datagram, now: Instant);
fn process_events(&mut self, args: &Args, now: Instant);
fn set_qlog_dir(&mut self, dir: Option<PathBuf>);
fn set_ciphers(&mut self, ciphers: &[Cipher]);
Expand Down Expand Up @@ -304,6 +306,10 @@ impl HttpServer for SimpleServer {
self.server.process(dgram, now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.server.process_input(dgram, now);
}

Check warning on line 311 in neqo-bin/src/bin/server/main.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/bin/server/main.rs#L309-L311

Added lines #L309 - L311 were not covered by tests

fn process_events(&mut self, args: &Args, _now: Instant) {
while let Some(event) = self.server.next_event() {
match event {
Expand Down Expand Up @@ -544,11 +550,13 @@ impl ServersRunner {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgrams = socket.recv(host)?;
if dgrams.is_empty() {
break;
}
let mut is_empty = true;

Check warning on line 553 in neqo-bin/src/bin/server/main.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/bin/server/main.rs#L553

Added line #L553 was not covered by tests
for dgram in dgrams {
self.process(Some(&dgram)).await?;
is_empty = false;
self.server.process_input(&dgram, Instant::now());
}
if is_empty {
break;

Check warning on line 559 in neqo-bin/src/bin/server/main.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/bin/server/main.rs#L555-L559

Added lines #L555 - L559 were not covered by tests
}
},
Ready::Timeout => {
Expand Down
4 changes: 4 additions & 0 deletions neqo-bin/src/bin/server/old_https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ impl HttpServer for Http09Server {
self.server.process(dgram, now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.server.process_input(dgram, now);
}

Check warning on line 199 in neqo-bin/src/bin/server/old_https.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/bin/server/old_https.rs#L197-L199

Added lines #L197 - L199 were not covered by tests

fn process_events(&mut self, args: &Args, now: Instant) {
let active_conns = self.server.active_connections();
for mut acr in active_conns {
Expand Down
198 changes: 66 additions & 132 deletions neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::{
io::{self, IoSliceMut},
mem::MaybeUninit,
net::{SocketAddr, ToSocketAddrs},
slice,
};
Expand All @@ -17,6 +18,13 @@ use neqo_common::{Datagram, IpTos};
use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState};
use tokio::io::Interest;

#[cfg(not(any(target_os = "macos", target_os = "ios")))]
// Chosen somewhat arbitrarily; might benefit from additional tuning.
pub(crate) const BATCH_SIZE: usize = 32;

#[cfg(any(target_os = "macos", target_os = "ios"))]
pub(crate) const BATCH_SIZE: usize = 1;

/// Socket receive buffer size.
///
/// Allows reading multiple datagrams in a single [`Socket::recv`] call.
Expand All @@ -25,7 +33,8 @@ const RECV_BUF_SIZE: usize = u16::MAX as usize;
pub struct Socket {
socket: tokio::net::UdpSocket,
state: UdpSocketState,
recv_buf: Vec<u8>,
// TODO: Rename
recv_buf: [Vec<u8>; BATCH_SIZE],
}

impl Socket {
Expand All @@ -36,7 +45,11 @@ impl Socket {
Ok(Self {
state: quinn_udp::UdpSocketState::new((&socket).into())?,
socket: tokio::net::UdpSocket::from_std(socket)?,
recv_buf: vec![0; RECV_BUF_SIZE],
recv_buf: (0..BATCH_SIZE)
.map(|_| vec![0; RECV_BUF_SIZE])
.collect::<Vec<_>>()
.try_into()
.expect("successful array instantiation"),

Check warning on line 52 in neqo-bin/src/udp.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/udp.rs#L46-L52

Added lines #L46 - L52 were not covered by tests
})
}

Check warning on line 54 in neqo-bin/src/udp.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/udp.rs#L54

Added line #L54 was not covered by tests

Expand Down Expand Up @@ -76,147 +89,68 @@ impl Socket {
}

Check warning on line 89 in neqo-bin/src/udp.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/udp.rs#L88-L89

Added lines #L88 - L89 were not covered by tests

/// Receive a UDP datagram on the specified socket.
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
let mut meta = RecvMeta::default();

match self.socket.try_io(Interest::READABLE, || {
self.state.recv(
(&self.socket).into(),
&mut [IoSliceMut::new(&mut self.recv_buf)],
slice::from_mut(&mut meta),
)
pub fn recv<'a>(
&'a mut self,
local_address: &'a SocketAddr,
) -> Result<impl Iterator<Item = Datagram> + 'a, io::Error> {
let mut metas = [RecvMeta::default(); BATCH_SIZE];

// TODO: Safe?
let mut iovs = MaybeUninit::<[IoSliceMut<'_>; BATCH_SIZE]>::uninit();
for (i, buf) in self.recv_buf.iter_mut().enumerate() {
unsafe {
iovs.as_mut_ptr()
.cast::<IoSliceMut>()
.add(i)
.write(IoSliceMut::new(buf));
};
}
let mut iovs = unsafe { iovs.assume_init() };

Check warning on line 108 in neqo-bin/src/udp.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/udp.rs#L92-L108

Added lines #L92 - L108 were not covered by tests

let msgs = match self.socket.try_io(Interest::READABLE, || {
self.state
.recv((&self.socket).into(), &mut iovs, &mut metas)
}) {
Ok(n) => {
assert_eq!(n, 1, "only passed one slice");
}
Ok(n) => n,
Err(ref err)
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(vec![])
0

Check warning on line 119 in neqo-bin/src/udp.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/udp.rs#L110-L119

Added lines #L110 - L119 were not covered by tests
}
Err(err) => {
return Err(err);

Check warning on line 122 in neqo-bin/src/udp.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/udp.rs#L121-L122

Added lines #L121 - L122 were not covered by tests
}
};

if meta.len == 0 {
eprintln!("zero length datagram received?");
return Ok(vec![]);
}
if meta.len == self.recv_buf.len() {
eprintln!(
"Might have received more than {} bytes",
self.recv_buf.len()
);
}

Ok(self.recv_buf[0..meta.len]
.chunks(meta.stride.min(self.recv_buf.len()))
.map(|d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
.collect())
}
}

#[cfg(test)]
mod tests {
use neqo_common::{IpTosDscp, IpTosEcn};

use super::*;

#[tokio::test]
async fn datagram_tos() -> Result<(), io::Error> {
let sender = Socket::bind("127.0.0.1:0")?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut receiver = Socket::bind(receiver_addr)?;

let datagram = Datagram::new(
sender.local_addr()?,
receiver.local_addr()?,
IpTos::from((IpTosDscp::Le, IpTosEcn::Ect1)),
None,
"Hello, world!".as_bytes().to_vec(),
);

sender.writable().await?;
sender.send(datagram.clone())?;

receiver.readable().await?;
let received_datagram = receiver
.recv(&receiver_addr)
.expect("receive to succeed")
// TODO
// if meta.len == 0 {
// eprintln!("zero length datagram received?");
// return Ok(vec![]);
// }
// if meta.len == self.recv_buf.len() {
// eprintln!(
// "Might have received more than {} bytes",
// self.recv_buf.len()
// );
// }

Ok(metas
.into_iter()
.next()
.expect("receive to yield datagram");

// Assert that the ECN is correct.
assert_eq!(
IpTosEcn::from(datagram.tos()),
IpTosEcn::from(received_datagram.tos())
);

Ok(())
}

/// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read.
#[tokio::test]
#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)]
async fn many_datagrams_through_gro() -> Result<(), io::Error> {
const SEGMENT_SIZE: usize = 128;

let sender = Socket::bind("127.0.0.1:0")?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut receiver = Socket::bind(receiver_addr)?;

// `neqo_common::udp::Socket::send` does not yet
// (https://github.com/mozilla/neqo/issues/1693) support GSO. Use
// `quinn_udp` directly.
let max_gso_segments = sender.state.max_gso_segments();
let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments];
let transmit = Transmit {
destination: receiver.local_addr()?,
ecn: EcnCodepoint::from_bits(Into::<u8>::into(IpTos::from((
IpTosDscp::Le,
IpTosEcn::Ect1,
)))),
contents: msg.clone().into(),
segment_size: Some(SEGMENT_SIZE),
src_ip: None,
};
sender.writable().await?;
let n = sender.socket.try_io(Interest::WRITABLE, || {
sender
.state
.send((&sender.socket).into(), slice::from_ref(&transmit))
})?;
assert_eq!(n, 1, "only passed one slice");

// Allow for one GSO sendmmsg to result in multiple GRO recvmmsg.
let mut num_received = 0;
while num_received < max_gso_segments {
receiver.readable().await?;
receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.for_each(|d| {
assert_eq!(
SEGMENT_SIZE,
d.len(),
"Expect received datagrams to have same length as sent datagrams."
);
num_received += 1;
});
}

Ok(())
.zip(self.recv_buf.iter())
.take(msgs)
.flat_map(move |(meta, buf)| {
buf[0..meta.len]
.chunks(meta.stride.min(buf.len()))
.map(move |d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
}))
}

Check warning on line 155 in neqo-bin/src/udp.rs

View check run for this annotation

Codecov / codecov/patch

neqo-bin/src/udp.rs#L138-L155

Added lines #L138 - L155 were not covered by tests
}
4 changes: 4 additions & 0 deletions neqo-http3/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ impl Http3Server {
self.server.ech_config()
}

pub fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.server.process_input(dgram, now);
}

Check warning on line 118 in neqo-http3/src/server.rs

View check run for this annotation

Codecov / codecov/patch

neqo-http3/src/server.rs#L116-L118

Added lines #L116 - L118 were not covered by tests

pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
qtrace!([self], "Process.");
let out = self.server.process(dgram, now);
Expand Down
5 changes: 4 additions & 1 deletion neqo-transport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,10 @@ impl Server {
}
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) -> Option<Datagram> {
/// TODO
/// # Panics
/// TODO
pub fn process_input(&mut self, dgram: &Datagram, now: Instant) -> Option<Datagram> {
qtrace!("Process datagram: {}", hex(&dgram[..]));

// This is only looking at the first packet header in the datagram.
Expand Down

0 comments on commit 35efc39

Please sign in to comment.