Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fold UdpState into AsyncUdpSocket #1612

Merged
merged 3 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "quinn-udp"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
rust-version = "1.63"
license = "MIT OR Apache-2.0"
Expand Down
10 changes: 10 additions & 0 deletions quinn-udp/src/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ impl UdpSocketState {
};
Ok(1)
}

#[inline]
pub fn max_gso_segments(&self) -> usize {
1
}

#[inline]
pub fn gro_segments(&self) -> usize {
1
}
}

impl Default for UdpSocketState {
Expand Down
67 changes: 1 addition & 66 deletions quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,9 @@
use std::os::unix::io::AsFd;
#[cfg(windows)]
use std::os::windows::io::AsSocket;
#[cfg(not(windows))]
use std::sync::atomic::AtomicBool;
use std::{
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::{
atomic::{AtomicUsize, Ordering},
Mutex,
},
sync::Mutex,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -48,66 +43,6 @@ pub fn may_fragment() -> bool {
/// Number of UDP packets to send/receive at a time
pub const BATCH_SIZE: usize = imp::BATCH_SIZE;

/// The capabilities a UDP socket supports on a certain platform
#[derive(Debug)]
pub struct UdpState {
max_gso_segments: AtomicUsize,
gro_segments: usize,

/// True if we have received EINVAL error from `sendmsg` or `sendmmsg` system call at least once.
///
/// If enabled, we assume that old kernel is used and switch to fallback mode.
/// In particular, we do not use IP_TOS cmsg_type in this case,
/// which is not supported on Linux <3.13 and results in not sending the UDP packet at all.
#[cfg(not(windows))]
sendmsg_einval: AtomicBool,
}

impl UdpState {
pub fn new() -> Self {
imp::udp_state()
}

/// The maximum amount of segments which can be transmitted if a platform
/// supports Generic Send Offload (GSO).
///
/// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
/// while using GSO.
#[inline]
pub fn max_gso_segments(&self) -> usize {
self.max_gso_segments.load(Ordering::Relaxed)
}

/// The number of segments to read when GRO is enabled. Used as a factor to
/// compute the receive buffer size.
///
/// Returns 1 if the platform doesn't support GRO.
#[inline]
pub fn gro_segments(&self) -> usize {
self.gro_segments
}

/// Returns true if we previously got an EINVAL error from `sendmsg` or `sendmmsg` syscall.
#[inline]
#[cfg(not(windows))]
fn sendmsg_einval(&self) -> bool {
self.sendmsg_einval.load(Ordering::Relaxed)
}

/// Sets the flag indicating we got EINVAL error from `sendmsg` or `sendmmsg` syscall.
#[inline]
#[cfg(all(unix, not(any(target_os = "macos", target_os = "ios"))))]
fn set_sendmsg_einval(&self) {
self.sendmsg_einval.store(true, Ordering::Relaxed)
}
}

impl Default for UdpState {
fn default() -> Self {
Self::new()
}
}

#[derive(Debug, Copy, Clone)]
pub struct RecvMeta {
pub addr: SocketAddr,
Expand Down
79 changes: 50 additions & 29 deletions quinn-udp/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
os::unix::io::AsRawFd,
sync::{
atomic::{AtomicBool, AtomicUsize},
atomic::{AtomicBool, AtomicUsize, Ordering},
Mutex,
},
time::Instant,
Expand All @@ -16,8 +16,7 @@ use std::{
use socket2::SockRef;

use super::{
cmsg, log_sendmsg_error, EcnCodepoint, RecvMeta, Transmit, UdpSockRef, UdpState,
IO_ERROR_LOG_INTERVAL,
cmsg, log_sendmsg_error, EcnCodepoint, RecvMeta, Transmit, UdpSockRef, IO_ERROR_LOG_INTERVAL,
};

#[cfg(target_os = "freebsd")]
Expand All @@ -32,27 +31,34 @@ type IpTosTy = libc::c_int;
#[derive(Debug)]
pub struct UdpSocketState {
last_send_error: Mutex<Instant>,
max_gso_segments: AtomicUsize,
gro_segments: usize,

/// True if we have received EINVAL error from `sendmsg` or `sendmmsg` system call at least once.
///
/// If enabled, we assume that old kernel is used and switch to fallback mode.
/// In particular, we do not use IP_TOS cmsg_type in this case,
/// which is not supported on Linux <3.13 and results in not sending the UDP packet at all.
sendmsg_einval: AtomicBool,
}

impl UdpSocketState {
pub fn new() -> Self {
let now = Instant::now();
Self {
last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
max_gso_segments: AtomicUsize::new(gso::max_gso_segments()),
gro_segments: gro::gro_segments(),
sendmsg_einval: AtomicBool::new(false),
}
}

pub fn configure(sock: UdpSockRef<'_>) -> io::Result<()> {
init(sock.0)
}

pub fn send(
&self,
socket: UdpSockRef<'_>,
state: &UdpState,
transmits: &[Transmit],
) -> Result<usize, io::Error> {
send(state, socket.0, &self.last_send_error, transmits)
pub fn send(&self, socket: UdpSockRef<'_>, transmits: &[Transmit]) -> Result<usize, io::Error> {
send(self, socket.0, transmits)
}

pub fn recv(
Expand All @@ -63,6 +69,36 @@ impl UdpSocketState {
) -> io::Result<usize> {
recv(socket.0, bufs, meta)
}

/// The maximum amount of segments which can be transmitted if a platform
/// supports Generic Send Offload (GSO).
///
/// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
/// while using GSO.
#[inline]
pub fn max_gso_segments(&self) -> usize {
self.max_gso_segments.load(Ordering::Relaxed)
}

/// The number of segments to read when GRO is enabled. Used as a factor to
/// compute the receive buffer size.
///
/// Returns 1 if the platform doesn't support GRO.
#[inline]
pub fn gro_segments(&self) -> usize {
self.gro_segments
}

/// Returns true if we previously got an EINVAL error from `sendmsg` or `sendmmsg` syscall.
fn sendmsg_einval(&self) -> bool {
self.sendmsg_einval.load(Ordering::Relaxed)
}

/// Sets the flag indicating we got EINVAL error from `sendmsg` or `sendmmsg` syscall.
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
fn set_sendmsg_einval(&self) {
self.sendmsg_einval.store(true, Ordering::Relaxed)
}
}

impl Default for UdpSocketState {
Expand Down Expand Up @@ -157,9 +193,8 @@ fn init(io: SockRef<'_>) -> io::Result<()> {
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
fn send(
#[allow(unused_variables)] // only used on Linux
state: &UdpState,
state: &UdpSocketState,
io: SockRef<'_>,
last_send_error: &Mutex<Instant>,
transmits: &[Transmit],
) -> io::Result<usize> {
#[allow(unused_mut)] // only mutable on FreeBSD
Expand Down Expand Up @@ -245,7 +280,7 @@ fn send(
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
log_sendmsg_error(last_send_error, e, &transmits[0]);
log_sendmsg_error(&state.last_send_error, e, &transmits[0]);

// The ERRORS section in https://man7.org/linux/man-pages/man2/sendmmsg.2.html
// describes that errors will only be returned if no message could be transmitted
Expand All @@ -260,12 +295,7 @@ fn send(
}

#[cfg(any(target_os = "macos", target_os = "ios"))]
fn send(
state: &UdpState,
io: SockRef<'_>,
last_send_error: &Mutex<Instant>,
transmits: &[Transmit],
) -> io::Result<usize> {
fn send(state: &UdpSocketState, io: SockRef<'_>, transmits: &[Transmit]) -> io::Result<usize> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
let mut iov: libc::iovec = unsafe { mem::zeroed() };
let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]);
Expand Down Expand Up @@ -299,7 +329,7 @@ fn send(
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
log_sendmsg_error(last_send_error, e, &transmits[sent]);
log_sendmsg_error(&state.last_send_error, e, &transmits[sent]);
sent += 1;
}
}
Expand Down Expand Up @@ -505,15 +535,6 @@ unsafe fn recvmmsg_fallback(
}
}

/// Returns the platforms UDP socket capabilities
pub(crate) fn udp_state() -> UdpState {
UdpState {
max_gso_segments: AtomicUsize::new(gso::max_gso_segments()),
gro_segments: gro::gro_segments(),
sendmsg_einval: AtomicBool::new(false),
}
}

const CMSG_LEN: usize = 88;

fn prepare_msg(
Expand Down
36 changes: 21 additions & 15 deletions quinn-udp/src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use windows_sys::Win32::Networking::WinSock;

use super::{log_sendmsg_error, RecvMeta, Transmit, UdpSockRef, UdpState, IO_ERROR_LOG_INTERVAL};
use super::{log_sendmsg_error, RecvMeta, Transmit, UdpSockRef, IO_ERROR_LOG_INTERVAL};

/// QUIC-friendly UDP interface for Windows
#[derive(Debug)]
Expand Down Expand Up @@ -80,12 +80,7 @@ impl UdpSocketState {
Ok(())
}

pub fn send(
&self,
socket: UdpSockRef<'_>,
_state: &UdpState,
transmits: &[Transmit],
) -> Result<usize, io::Error> {
pub fn send(&self, socket: UdpSockRef<'_>, transmits: &[Transmit]) -> Result<usize, io::Error> {
let mut sent = 0;
for transmit in transmits {
match socket.0.send_to(
Expand Down Expand Up @@ -137,6 +132,25 @@ impl UdpSocketState {
};
Ok(1)
}

/// The maximum amount of segments which can be transmitted if a platform
/// supports Generic Send Offload (GSO).
///
/// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
Ralith marked this conversation as resolved.
Show resolved Hide resolved
/// while using GSO.
#[inline]
pub fn max_gso_segments(&self) -> usize {
1
}

/// The number of segments to read when GRO is enabled. Used as a factor to
/// compute the receive buffer size.
///
/// Returns 1 if the platform doesn't support GRO.
#[inline]
pub fn gro_segments(&self) -> usize {
1
}
}

impl Default for UdpSocketState {
Expand All @@ -145,14 +159,6 @@ impl Default for UdpSocketState {
}
}

/// Returns the platforms UDP socket capabilities
pub(crate) fn udp_state() -> super::UdpState {
super::UdpState {
max_gso_segments: std::sync::atomic::AtomicUsize::new(1),
gro_segments: 1,
}
}

pub(crate) const BATCH_SIZE: usize = 1;

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion quinn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ rustls = { version = "0.21.0", default-features = false, features = ["quic"], op
thiserror = "1.0.21"
tracing = "0.1.10"
tokio = { version = "1.28.1", features = ["sync"] }
udp = { package = "quinn-udp", path = "../quinn-udp", version = "0.4", default-features = false }
udp = { package = "quinn-udp", path = "../quinn-udp", version = "0.5", default-features = false }

[dev-dependencies]
anyhow = "1.0.22"
Expand Down
Loading
Loading