Skip to content

Commit

Permalink
udp: listen: linux: Implement receive_broadcasts option
Browse files Browse the repository at this point in the history
This option mimics the Windows behavior to also receive broadcasts on
sockets bound to a specific IP.

Signed-off-by: Konrad Gräfe <k.graefe@gateware.de>
  • Loading branch information
kgraefe committed Mar 22, 2023
1 parent 4b11857 commit 97eb83a
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 3 deletions.
32 changes: 31 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ url = { version = "2.2", optional = true }
integer-encoding = "3.0.2"
lazy_static = "1.4.0"

[target.'cfg(target_os = "linux")'.dependencies.nix]
version = "0.26.2"
default-features = false
features = ["socket", "uio", "net"]

[target.'cfg(target_os = "linux")'.dependencies.libc]
version = "0.2.137"
default-features = false

[dev-dependencies]
bincode = "1.3.1"
criterion = "0.4"
Expand Down
159 changes: 157 additions & 2 deletions src/adapters/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,20 @@ use mio::event::{Source};

use socket2::{Socket, Domain, Type, Protocol};

#[cfg(target_os = "linux")]
use nix::errno::{Errno};
#[cfg(target_os = "linux")]
use nix::sys::socket::{self, sockopt, MsgFlags, SockaddrStorage, ControlMessageOwned};
#[cfg(target_os = "linux")]
use nix::ifaddrs::{getifaddrs};

use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
#[cfg(target_os = "linux")]
use std::net::{IpAddr, Ipv6Addr};
use std::io::{self, ErrorKind};
use std::mem::{MaybeUninit};
#[cfg(target_os = "linux")]
use std::os::fd::AsRawFd;

/// Maximun payload that UDP can send over the internet to be mostly compatible.
pub const MAX_INTERNET_PAYLOAD_LEN: usize = 1500 - 20 - 8;
Expand Down Expand Up @@ -64,6 +75,12 @@ pub struct UdpListenConfig {
/// [`Endpoint::from_listener`](crate::network::Endpoint::from_listener).
pub send_broadcasts: bool,

/// On Windows, when listening on a specific IP address, the sockets also receives
/// corresponding subnet broadcasts and global broadcasts ([`std::net::Ipv4Addr::BROADCAST`])
/// received on the interface matching the IP. When this option is set, message-io mimics this
/// behavior on Linux.
pub receive_broadcasts: bool,

/// Set value for the `SO_REUSEADDR` option on this socket. This indicates that futher calls to
/// `bind` may allow reuse of local addresses.
pub reuse_address: bool,
Expand Down Expand Up @@ -159,6 +176,8 @@ impl Remote for RemoteResource {

pub(crate) struct LocalResource {
socket: UdpSocket,
#[cfg(target_os = "linux")]
ingress_addresses: Vec<IpAddr>,
}

impl Resource for LocalResource {
Expand All @@ -170,7 +189,11 @@ impl Resource for LocalResource {
impl Local for LocalResource {
type Remote = RemoteResource;

fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
fn listen_with(
config: TransportListen,
#[cfg(not(target_os = "linux"))] addr: SocketAddr,
#[cfg(target_os = "linux")] mut addr: SocketAddr,
) -> io::Result<ListeningInfo<Self>> {
let config = match config {
TransportListen::Udp(config) => config,
_ => panic!("Internal error: Got wrong config"),
Expand Down Expand Up @@ -202,6 +225,68 @@ impl Local for LocalResource {
socket.set_broadcast(true)?;
}

#[cfg(target_os = "linux")]
match addr {
SocketAddr::V4 { .. } => {
socket::setsockopt(socket.as_raw_fd(), sockopt::Ipv4PacketInfo, &true)?
}
SocketAddr::V6 { .. } => {
socket::setsockopt(socket.as_raw_fd(), sockopt::Ipv6RecvPacketInfo, &true)?
}
}

#[cfg(target_os = "linux")]
let mut ingress_addresses = vec![];

#[cfg(target_os = "linux")]
if config.receive_broadcasts {
// find ifaddr matching the listen addr
let ifaddr = getifaddrs()?.find_map(|ifaddr| {
ifaddr.address.and_then(|ss| {
match (
ss.as_sockaddr_in().map(|si| Ipv4Addr::from(si.ip())),
ss.as_sockaddr_in6().map(|si| si.ip()),
) {
(Some(ip4), _) if IpAddr::V4(ip4) == addr.ip() => Some(ifaddr),
(_, Some(ip6)) if IpAddr::V6(ip6) == addr.ip() => Some(ifaddr),
_ => None,
}
})
});

match ifaddr {
None => return Err(ErrorKind::AddrNotAvailable.into()),
Some(ifaddr) => {
// Get allowed ingress IP addresses
ingress_addresses.push(addr.ip());

// Some interfaces like VPN adapters don't have broadcast support.
if let Some(broadcast_ss) = ifaddr.broadcast {
if let Some(si) = broadcast_ss.as_sockaddr_in() {
ingress_addresses.push(Ipv4Addr::from(si.ip()).into());
ingress_addresses.push(Ipv4Addr::BROADCAST.into());
}
if let Some(si) = broadcast_ss.as_sockaddr_in6() {
ingress_addresses.push(si.ip().into());
}
}

// Bind the socket to the specific interface
socket::setsockopt(
socket.as_raw_fd(),
sockopt::BindToDevice,
&ifaddr.interface_name.into(),
)?;

// Listen on UNSPECIFIED
addr.set_ip(match addr {
SocketAddr::V4 { .. } => Ipv4Addr::UNSPECIFIED.into(),
SocketAddr::V6 { .. } => Ipv6Addr::UNSPECIFIED.into(),
});
}
}
}

if let Some(multicast) = multicast {
socket.join_multicast_v4(multicast.ip(), &Ipv4Addr::UNSPECIFIED)?;
socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, addr.port()).into())?;
Expand All @@ -212,9 +297,19 @@ impl Local for LocalResource {

let socket = UdpSocket::from_std(socket.into());
let local_addr = socket.local_addr().unwrap();
Ok(ListeningInfo { local: { LocalResource { socket } }, local_addr })
Ok(ListeningInfo {
local: {
LocalResource {
socket,
#[cfg(target_os = "linux")]
ingress_addresses,
}
},
local_addr,
})
}

#[cfg(not(target_os = "linux"))]
fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) {
let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit();
let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array
Expand All @@ -231,6 +326,66 @@ impl Local for LocalResource {
}
}

#[cfg(target_os = "linux")]
fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) {
let mut input_buffer = vec![0; MAX_LOCAL_PAYLOAD_LEN];
let mut control_buffer = nix::cmsg_space!(libc::sockaddr_storage);

loop {
let result = socket::recvmsg::<SockaddrStorage>(
self.socket.as_raw_fd(),
&mut [io::IoSliceMut::new(&mut input_buffer)],
Some(&mut control_buffer),
MsgFlags::empty(),
);

match result {
Ok(msg) => {
let size = msg.bytes;

let ingress_ip = match msg.cmsgs().find_map(|cmsg| match cmsg {
ControlMessageOwned::Ipv4PacketInfo(pktinfo) => {
Some(Ipv4Addr::from(pktinfo.ipi_addr.s_addr.to_be()).into())
}
ControlMessageOwned::Ipv6PacketInfo(pktinfo) => {
Some(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr).into())
}
_ => None,
}) {
Some(ingress_ip) => ingress_ip,
None => continue,
};

if !self.ingress_addresses.is_empty()
&& !self.ingress_addresses.contains(&ingress_ip)
{
continue
}

fn convert_sockaddr(addr: SockaddrStorage) -> Option<SocketAddr> {
if let Some(addr) = addr.as_sockaddr_in() {
return Some(SocketAddr::V4((*addr).into()))
}
if let Some(addr) = addr.as_sockaddr_in6() {
return Some(SocketAddr::V6((*addr).into()))
}
None
}

let addr = match msg.address.map(convert_sockaddr) {
Some(Some(addr)) => addr,
_ => continue,
};

let data = &mut input_buffer[..size];
accept_remote(AcceptedType::Data(addr, data))
}
Err(Errno::EWOULDBLOCK) => break,
Err(err) => break log::error!("UDP accept error: {}", err), // Should never happen
}
}
}

fn send_to(&self, addr: SocketAddr, data: &[u8]) -> SendStatus {
send_packet(data, |data| self.socket.send_to(data, addr))
}
Expand Down

0 comments on commit 97eb83a

Please sign in to comment.