From b746f336f34e6203d7ac85b0e3ab59a8bf3a69f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konrad=20Gr=C3=A4fe?= Date: Wed, 22 Mar 2023 13:51:22 +0100 Subject: [PATCH] udp: listen: linux: Implement receive_broadcasts option MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This option mimics the Windows behavior to also receive broadcasts on sockets bound to a specific IP. Signed-off-by: Konrad Gräfe --- Cargo.lock | 32 ++++++++- Cargo.toml | 9 +++ src/adapters/udp.rs | 171 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 209 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63f77c5..5787bcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -221,7 +221,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset", + "memoffset 0.6.5", "once_cell", "scopeguard", ] @@ -434,6 +434,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "message-io" version = "0.14.8" @@ -448,8 +457,10 @@ dependencies = [ "httparse", "integer-encoding", "lazy_static", + "libc", "log", "mio", + "nix", "rand", "serde", "socket2", @@ -471,6 +482,19 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nix" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset 0.7.1", + "static_assertions", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -732,6 +756,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strum" version = "0.24.1" diff --git a/Cargo.toml b/Cargo.toml index f70c4f4..6d2dac8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,15 @@ url = { version = "2.2", optional = true } integer-encoding = "3.0.2" lazy_static = "1.4.0" +[target.'cfg(target_os = "linux")'.dependencies.nix] +version = "0.26.2" +default-features = false +features = ["socket", "uio", "net"] + +[target.'cfg(target_os = "linux")'.dependencies.libc] +version = "0.2.137" +default-features = false + [dev-dependencies] bincode = "1.3.1" criterion = "0.4" diff --git a/src/adapters/udp.rs b/src/adapters/udp.rs index fd47e6f..d1bbe15 100644 --- a/src/adapters/udp.rs +++ b/src/adapters/udp.rs @@ -9,9 +9,20 @@ use mio::event::{Source}; use socket2::{Socket, Domain, Type, Protocol}; +#[cfg(target_os = "linux")] +use nix::errno::{Errno}; +#[cfg(target_os = "linux")] +use nix::sys::socket::{self, sockopt, MsgFlags, SockaddrStorage, ControlMessageOwned}; +#[cfg(target_os = "linux")] +use nix::ifaddrs::{getifaddrs}; + use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr}; +#[cfg(target_os = "linux")] +use std::net::{IpAddr, Ipv6Addr}; use std::io::{self, ErrorKind}; use std::mem::{MaybeUninit}; +#[cfg(target_os = "linux")] +use std::os::fd::AsRawFd; /// Maximun payload that UDP can send over the internet to be mostly compatible. pub const MAX_INTERNET_PAYLOAD_LEN: usize = 1500 - 20 - 8; @@ -64,6 +75,12 @@ pub struct UdpListenConfig { /// [`Endpoint::from_listener`](crate::network::Endpoint::from_listener). pub send_broadcasts: bool, + /// On Windows, when listening on a specific IP address, the sockets also receives + /// corresponding subnet broadcasts and global broadcasts ([`std::net::Ipv4Addr::BROADCAST`]) + /// received on the interface matching the IP. When this option is set, message-io mimics this + /// behavior on Linux. + pub receive_broadcasts: bool, + /// Set value for the `SO_REUSEADDR` option on this socket. This indicates that futher calls to /// `bind` may allow reuse of local addresses. pub reuse_address: bool, @@ -159,6 +176,8 @@ impl Remote for RemoteResource { pub(crate) struct LocalResource { socket: UdpSocket, + #[cfg(target_os = "linux")] + ingress_addresses: Option>, } impl Resource for LocalResource { @@ -167,10 +186,79 @@ impl Resource for LocalResource { } } +#[cfg(target_os = "linux")] +impl LocalResource { + fn accept_filtered( + &self, + ingress_addresses: &[IpAddr], + mut accept_remote: impl FnMut(AcceptedType<'_, RemoteResource>), + ) { + let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit(); + let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array + let mut control_buffer = nix::cmsg_space!(libc::sockaddr_storage); + + loop { + let result = socket::recvmsg::( + self.socket.as_raw_fd(), + &mut [io::IoSliceMut::new(&mut input_buffer)], + Some(&mut control_buffer), + MsgFlags::empty(), + ); + + match result { + Ok(msg) => { + let size = msg.bytes; + + let ingress_ip = match msg.cmsgs().find_map(|cmsg| match cmsg { + ControlMessageOwned::Ipv4PacketInfo(pktinfo) => { + Some(Ipv4Addr::from(pktinfo.ipi_addr.s_addr.to_be()).into()) + } + ControlMessageOwned::Ipv6PacketInfo(pktinfo) => { + Some(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr).into()) + } + _ => None, + }) { + Some(ingress_ip) => ingress_ip, + None => continue, + }; + + if !ingress_addresses.contains(&ingress_ip) { + continue + } + + fn convert_sockaddr(addr: SockaddrStorage) -> Option { + if let Some(addr) = addr.as_sockaddr_in() { + return Some(SocketAddr::V4((*addr).into())) + } + if let Some(addr) = addr.as_sockaddr_in6() { + return Some(SocketAddr::V6((*addr).into())) + } + None + } + + let addr = match msg.address.and_then(convert_sockaddr) { + Some(addr) => addr, + None => continue, + }; + + let data = &mut input_buffer[..size]; + accept_remote(AcceptedType::Data(addr, data)) + } + Err(Errno::EWOULDBLOCK) => break, + Err(err) => break log::error!("UDP accept error: {}", err), // Should never happen + } + } + } +} + impl Local for LocalResource { type Remote = RemoteResource; - fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result> { + fn listen_with( + config: TransportListen, + #[cfg(not(target_os = "linux"))] addr: SocketAddr, + #[cfg(target_os = "linux")] mut addr: SocketAddr, + ) -> io::Result> { let config = match config { TransportListen::Udp(config) => config, _ => panic!("Internal error: Got wrong config"), @@ -200,6 +288,70 @@ impl Local for LocalResource { } socket.set_broadcast(config.send_broadcasts)?; + #[cfg(target_os = "linux")] + let ingress_addresses = if config.receive_broadcasts { + // enable the socket packet info option + match addr { + SocketAddr::V4 { .. } => { + socket::setsockopt(socket.as_raw_fd(), sockopt::Ipv4PacketInfo, &true)? + } + SocketAddr::V6 { .. } => { + socket::setsockopt(socket.as_raw_fd(), sockopt::Ipv6RecvPacketInfo, &true)? + } + } + + // find ifaddr matching the listen addr + let ifaddr = getifaddrs()?.find_map(|ifaddr| { + ifaddr.address.and_then(|ss| { + match ( + ss.as_sockaddr_in().map(|si| Ipv4Addr::from(si.ip())), + ss.as_sockaddr_in6().map(|si| si.ip()), + ) { + (Some(ip4), _) if IpAddr::V4(ip4) == addr.ip() => Some(ifaddr), + (_, Some(ip6)) if IpAddr::V6(ip6) == addr.ip() => Some(ifaddr), + _ => None, + } + }) + }); + + match ifaddr { + None => return Err(ErrorKind::AddrNotAvailable.into()), + Some(ifaddr) => { + // Get allowed ingress IP addresses + let mut ingress_addresses = vec![addr.ip()]; + + // Some interfaces like VPN adapters don't have broadcast support. + if let Some(broadcast_ss) = ifaddr.broadcast { + if let Some(si) = broadcast_ss.as_sockaddr_in() { + ingress_addresses.push(Ipv4Addr::from(si.ip()).into()); + ingress_addresses.push(Ipv4Addr::BROADCAST.into()); + } + if let Some(si) = broadcast_ss.as_sockaddr_in6() { + ingress_addresses.push(si.ip().into()); + } + } + + // Bind the socket to the specific interface + socket::setsockopt( + socket.as_raw_fd(), + sockopt::BindToDevice, + &ifaddr.interface_name.into(), + )?; + + // Listen on UNSPECIFIED + addr.set_ip(match addr { + SocketAddr::V4 { .. } => Ipv4Addr::UNSPECIFIED.into(), + SocketAddr::V6 { .. } => Ipv6Addr::UNSPECIFIED.into(), + }); + + Some(ingress_addresses) + } + } + } + else { + None + }; + if let Some(multicast) = multicast { socket.join_multicast_v4(multicast.ip(), &Ipv4Addr::UNSPECIFIED)?; socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, addr.port()).into())?; @@ -210,10 +362,25 @@ impl Local for LocalResource { let socket = UdpSocket::from_std(socket.into()); let local_addr = socket.local_addr().unwrap(); - Ok(ListeningInfo { local: { LocalResource { socket } }, local_addr }) + Ok(ListeningInfo { + local: { + LocalResource { + socket, + #[cfg(target_os = "linux")] + ingress_addresses, + } + }, + local_addr, + }) } fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) { + #[cfg(target_os = "linux")] + if let Some(ingress_addresses) = &self.ingress_addresses { + self.accept_filtered(ingress_addresses, accept_remote); + return + } + let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit(); let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array