Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udp): use sendmmsg and recvmmsg #1741

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ impl super::Client for Connection {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
fn process_multiple_input<I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I: IntoIterator<Item = Datagram>,
{
self.process_multiple_input(dgrams, now);
}
Expand Down
4 changes: 2 additions & 2 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ impl super::Client for Http3Client {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
fn process_multiple_input<I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I: IntoIterator<Item = Datagram>,
{
self.process_multiple_input(dgrams, now);
}
Expand Down
41 changes: 26 additions & 15 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,10 @@ trait Handler {
/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
fn process_output(&mut self, now: Instant) -> Output;
fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
// TODO: Take &'a Datagram
fn process_multiple_input<I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>;
I: IntoIterator<Item = Datagram>;
fn has_events(&self) -> bool;
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
Expand All @@ -370,6 +371,7 @@ struct Runner<'a, H: Handler> {
handler: H,
timeout: Option<Pin<Box<Sleep>>>,
args: &'a Args,
dgrams: Vec<Datagram>,
}

impl<'a, H: Handler> Runner<'a, H> {
Expand Down Expand Up @@ -419,35 +421,42 @@ impl<'a, H: Handler> Runner<'a, H> {
}

async fn process_output(&mut self) -> Result<(), io::Error> {
// Accumulate up to BATCH_SIZE datagrams before sending.
loop {
match self.client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
self.dgrams.push(dgram);
}
Output::Callback(new_timeout) => {
qdebug!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
break;
}
Output::None => {
qdebug!("Output::None");
maybe_callback => {
if let Output::Callback(new_timeout) = maybe_callback {
qdebug!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
}
break;
}
}

// Reached BATCH_SIZE. Send batch.
if self.dgrams.len() == udp::BATCH_SIZE {
self.socket.send(self.dgrams.drain(..)).await?;
}
}

// About to exit. Send remaining datagrams.
if !self.dgrams.is_empty() {
self.socket.send(self.dgrams.drain(..)).await?;
}

Ok(())
}

async fn process_multiple_input(&mut self) -> Res<()> {
loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
let mut dgrams = self.socket.recv(&self.local_addr)?.peekable();
if dgrams.peek().is_none() {
break;
}
self.client
.process_multiple_input(dgrams.iter(), Instant::now());
self.client.process_multiple_input(dgrams, Instant::now());
self.process_output().await?;
}

Expand Down Expand Up @@ -566,6 +575,7 @@ pub async fn client(mut args: Args) -> Res<()> {
local_addr: real_local,
socket: &mut socket,
timeout: None,
dgrams: vec![],
}
.run()
.await?
Expand All @@ -582,6 +592,7 @@ pub async fn client(mut args: Args) -> Res<()> {
local_addr: real_local,
socket: &mut socket,
timeout: None,
dgrams: vec![],
}
.run()
.await?
Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Default for QuicParameters {
max_streams_uni: 16,
idle_timeout: 30,
congestion_control: CongestionControlAlgorithm::NewReno,
no_pacing: false,
no_pacing: true,
preferred_address_v4: None,
preferred_address_v6: None,
}
Expand Down
75 changes: 42 additions & 33 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
use clap::Parser;
use futures::{
future::{select, select_all, Either},
FutureExt,
FutureExt, TryFutureExt,
};
use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram, Header};
use neqo_crypto::{
Expand Down Expand Up @@ -432,7 +432,7 @@ struct ServersRunner {
args: Args,
server: Box<dyn HttpServer>,
timeout: Option<Pin<Box<Sleep>>>,
sockets: Vec<(SocketAddr, udp::Socket)>,
sockets: HashMap<SocketAddr, (udp::Socket, Vec<Datagram>)>,
}

impl ServersRunner {
Expand All @@ -449,7 +449,7 @@ impl ServersRunner {
let local_addr = socket.local_addr()?;
qinfo!("Server waiting for connection on: {local_addr:?}");

Ok((host, socket))
Ok((host, (socket, vec![])))
})
.collect::<Result<_, io::Error>>()?;
let server = Self::create_server(&args);
Expand Down Expand Up @@ -495,38 +495,45 @@ impl ServersRunner {
svr
}

/// Tries to find a socket, but then just falls back to sending from the first.
fn find_socket(&mut self, addr: SocketAddr) -> &mut udp::Socket {
let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap();
rest.iter_mut()
.map(|(_host, socket)| socket)
.find(|socket| {
socket
.local_addr()
.ok()
.map_or(false, |socket_addr| socket_addr == addr)
})
.unwrap_or(first_socket)
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
// Accumulate up to BATCH_SIZE datagrams per socket before sending.
loop {
match self.server.process(dgram.take(), self.args.now()) {
Output::Datagram(dgram) => {
let socket = self.find_socket(dgram.source());
socket.writable().await?;
socket.send(dgram)?;
self.sockets
.get_mut(&dgram.source())
.expect("TODO")
.1
.push(dgram);
}
Output::Callback(new_timeout) => {
qdebug!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
break;
}
Output::None => {
maybe_callback => {
if let Output::Callback(new_timeout) = maybe_callback {
qdebug!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
}
break;
}
}

// Reached BATCH_SIZE for one socket. Send batch.
if let Some((_, (socket, dgrams))) = self
.sockets
.iter_mut()
.find(|(_, (_, dgrams))| dgrams.len() == udp::BATCH_SIZE)
{
socket.send(dgrams.drain(..)).await?;
}
}

// About to exit. Send remaining datagrams.
for (_, (socket, dgrams)) in self
.sockets
.iter_mut()
.filter(|(_, (_, dgrams))| !dgrams.is_empty())
{
socket.send(dgrams.drain(..)).await?;
}

Ok(())
}

Expand All @@ -535,10 +542,10 @@ impl ServersRunner {
let sockets_ready = select_all(
self.sockets
.iter()
.map(|(_host, socket)| Box::pin(socket.readable())),
.map(|(addr, (socket, _))| Box::pin(socket.readable().map_ok(|()| *addr))),
)
.map(|(res, inx, _)| match res {
Ok(()) => Ok(Ready::Socket(inx)),
.map(|(res, _, _)| match res {
Ok(addr) => Ok(Ready::Socket(addr)),
Err(e) => Err(e),
});
let timeout_ready = self
Expand All @@ -560,9 +567,11 @@ impl ServersRunner {
}

match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgrams = socket.recv(host)?;
Ready::Socket(addr) => loop {
let (socket, _) = self.sockets.get_mut(&addr).unwrap();
// TODO: Remove collect.
// TODO: Rename to input_dgrams?
let dgrams: Vec<_> = socket.recv(&addr)?.collect();
if dgrams.is_empty() {
break;
}
Expand All @@ -580,7 +589,7 @@ impl ServersRunner {
}

enum Ready {
Socket(usize),
Socket(SocketAddr),
Timeout,
}

Expand Down
Loading
Loading