From d01111923cf5f60f574047b99d87802be1f4445f Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Thu, 16 Jan 2014 16:56:21 +0100 Subject: [PATCH] Implement Unix domain sockets in libnative --- src/libnative/io/mod.rs | 4 +- src/libnative/io/net.rs | 286 ++++++++++++++++++++++++++++++++++++++ src/libstd/io/net/unix.rs | 9 +- src/libstd/libc.rs | 28 +++- src/libstd/rt/rtio.rs | 5 + 5 files changed, 325 insertions(+), 7 deletions(-) diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index f1bec440547e1..9a52476431d4c 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -181,10 +181,10 @@ impl rtio::IoFactory for IoFactory { net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket) } fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> { - Err(unimpl()) + net::UnixListener::bind(_path).map(|s| ~s as ~RtioUnixListener) } fn unix_connect(&mut self, _path: &CString) -> IoResult<~RtioPipe> { - Err(unimpl()) + net::UnixStream::connect(_path, libc::SOCK_STREAM).map(|s| ~s as ~RtioPipe) } fn get_host_addresses(&mut self, _host: Option<&str>, _servname: Option<&str>, _hint: Option) -> IoResult<~[ai::Info]> { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index adcd21f0ac4c5..dff1a688c90e4 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -15,6 +15,7 @@ use std::libc; use std::mem; use std::rt::rtio; use std::unstable::intrinsics; +use std::c_str::CString; use super::{IoResult, retry}; use super::file::keep_going; @@ -88,6 +89,22 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) { } } +fn addr_to_sockaddr_un(addr: &CString) -> (libc::sockaddr_storage, uint) { + unsafe { + let storage: libc::sockaddr_storage = intrinsics::init(); + let s: *mut libc::sockaddr_un = cast::transmute(&storage); + (*s).sun_family = libc::AF_UNIX as libc::sa_family_t; + let mut i = 0; + for c in addr.iter() { + (*s).sun_path[i] = c; + i += 1; + } + + let len = mem::size_of::() + i + 1; //count the null terminator + return (storage, len); + } +} + fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult { unsafe { let fam = match addr.ip { @@ -101,6 +118,15 @@ fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult { } } +fn unix_socket(ty: libc::c_int) -> IoResult { + unsafe { + match libc::socket(libc::AF_UNIX, ty, 0) { + -1 => Err(super::last_error()), + fd => Ok(fd) + } + } +} + fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, payload: T) -> IoResult<()> { unsafe { @@ -176,6 +202,25 @@ fn sockaddr_to_addr(storage: &libc::sockaddr_storage, } } +fn sockaddr_to_unix(storage: &libc::sockaddr_storage, + len: uint) -> IoResult { + match storage.ss_family as libc::c_int { + libc::AF_UNIX => { + assert!(len as uint <= mem::size_of::()); + let storage: &libc::sockaddr_un = unsafe { + cast::transmute(storage) + }; + unsafe { + let buf:[libc::c_char, ..libc::sun_len] = storage.sun_path; + Ok(CString::new(buf.as_ptr(), false)) + } + } + _ => { + Err(io::standard_error(io::OtherIoError)) + } + } +} + #[cfg(unix)] pub fn init() {} @@ -584,3 +629,244 @@ impl rtio::RtioUdpSocket for UdpSocket { impl Drop for UdpSocket { fn drop(&mut self) { unsafe { close(self.fd) } } } + + +#[cfg(not(windows))] +//////////////////////////////////////////////////////////////////////////////// +// Unix +//////////////////////////////////////////////////////////////////////////////// + + +//////////////////////////////////////////////////////////////////////////////// +// Unix streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + priv fd: sock_t, +} + +impl UnixStream { + pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { + // the sun_path length is limited to SUN_LEN (with null) + if(addr.len() > libc::sun_len -1) { + return Err(io::IoError { + kind: io::OtherIoError, + desc: "path must be smaller than SUN_LEN", + detail: None, + }) + } + unsafe { + unix_socket(ty).and_then(|fd| { + let (addr, len) = addr_to_sockaddr_un(addr); + let ret = UnixStream{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match retry(|| { + libc::connect(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) + }) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + }) + } + } + + pub fn dgram_bind(addr: &CString) -> IoResult { + // the sun_path length is limited to SUN_LEN (with null) + if(addr.len() > libc::sun_len - 1) { + return Err(io::IoError { + kind: io::OtherIoError, + desc: "path must be smaller than SUN_LEN", + detail: None, + }) + } + unsafe { + unix_socket(libc::SOCK_DGRAM).and_then(|fd| { + let (addr, len) = addr_to_sockaddr_un(addr); + let ret = UnixStream{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match libc::bind(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + }) + } + } + + pub fn fd(&self) -> sock_t { self.fd } +} + +impl rtio::RtioPipe for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + let ret = retry(|| { + unsafe { + libc::recv(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as wrlen, + 0) as libc::c_int + } + }); + if ret == 0 { + Err(io::standard_error(io::EndOfFile)) + } else if ret < 0 { + Err(super::last_error()) + } else { + Ok(ret as uint) + } + } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + let ret = keep_going(buf, |buf, len| { + unsafe { + libc::send(self.fd, + buf as *mut libc::c_void, + len as wrlen, + 0) as i64 + } + }); + if ret < 0 { + Err(super::last_error()) + } else { + Ok(()) + } + } +} + +impl rtio::RtioDatagramPipe for UnixStream { + fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { + unsafe { + let mut storage: libc::sockaddr_storage = intrinsics::init(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let mut addrlen: libc::socklen_t = + mem::size_of::() as libc::socklen_t; + let ret = retry(|| { + libc::recvfrom(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as msglen_t, + 0, + storagep as *mut libc::sockaddr, + &mut addrlen) as libc::c_int + }); + if ret < 0 { return Err(super::last_error()) } + sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { + Ok((ret as uint, addr)) + }) + } + } + + fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { + let (dst, len) = addr_to_sockaddr_un(dst); + let dstp = &dst as *libc::sockaddr_storage; + unsafe { + let ret = retry(|| { + libc::sendto(self.fd, + buf.as_ptr() as *libc::c_void, + buf.len() as msglen_t, + 0, + dstp as *libc::sockaddr, + len as libc::socklen_t) as libc::c_int + }); + match ret { + -1 => Err(super::last_error()), + n if n as uint != buf.len() => { + Err(io::IoError { + kind: io::OtherIoError, + desc: "couldn't send entire packet at once", + detail: None, + }) + } + _ => Ok(()) + } + } + + } +} + +impl Drop for UnixStream { + fn drop(&mut self) { unsafe { close(self.fd); } } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + priv fd: sock_t, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult { + // the sun_path length is limited to SUN_LEN (with null) + if(addr.len() > libc::sun_len - 1) { + return Err(io::IoError { + kind: io::OtherIoError, + desc: "path must be smaller than SUN_LEN", + detail: None, + }) + } + unsafe { + unix_socket(libc::SOCK_STREAM).and_then(|fd| { + let (addr, len) = addr_to_sockaddr_un(addr); + let ret = UnixListener{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match libc::bind(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + }) + } + } + + pub fn fd(&self) -> sock_t { self.fd } + + pub fn native_listen(self, backlog: int) -> IoResult { + match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { + -1 => Err(super::last_error()), + _ => Ok(UnixAcceptor { listener: self }) + } + } +} + +impl rtio::RtioUnixListener for UnixListener { + fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> { + self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor) + } +} + +impl Drop for UnixListener { + fn drop(&mut self) { unsafe { close(self.fd); } } +} + +pub struct UnixAcceptor { + priv listener: UnixListener, +} + +impl UnixAcceptor { + pub fn fd(&self) -> sock_t { self.listener.fd } + + pub fn native_accept(&mut self) -> IoResult { + unsafe { + let mut storage: libc::sockaddr_storage = intrinsics::init(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::(); + let mut size = size as libc::socklen_t; + match retry(|| { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) as sock_t { + -1 => Err(super::last_error()), + fd => Ok(UnixStream { fd: fd }) + } + } + } +} + +impl rtio::RtioUnixAcceptor for UnixAcceptor { + fn accept(&mut self) -> IoResult<~rtio::RtioPipe> { + self.native_accept().map(|s| ~s as ~rtio::RtioPipe) + } +} + + diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index 232ca67059732..e301c597383bc 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -52,12 +52,14 @@ impl UnixStream { /// /// # Example /// + ///```rust /// use std::io::net::unix::UnixStream; + /// use std::path::posix::Path; /// - /// let server = Path("path/to/my/socket"); + /// let server = Path::new("path/to/my/socket"); /// let mut stream = UnixStream::connect(&server); /// stream.write([1, 2, 3]); - /// + ///``` pub fn connect(path: &P) -> Option { LocalIo::maybe_raise(|io| { io.unix_connect(&path.to_c_str()).map(UnixStream::new) @@ -91,6 +93,7 @@ impl UnixListener { /// /// # Example /// + ///````rust /// use std::io::net::unix::UnixListener; /// /// let server = Path("path/to/my/socket"); @@ -99,7 +102,7 @@ impl UnixListener { /// let mut client = client; /// client.write([1, 2, 3, 4]); /// } - /// + ///``` pub fn bind(path: &P) -> Option { LocalIo::maybe_raise(|io| { io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s }) diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index 9cf94e5a1b894..68d1a9d217188 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -267,8 +267,9 @@ pub mod types { pub enum timezone {} } pub mod bsd44 { - use libc::types::os::arch::c95::c_uint; + use libc::types::os::arch::c95::{c_char, c_uint}; + pub static sun_len:uint = 108; pub type socklen_t = u32; pub type sa_family_t = u16; pub type in_port_t = u16; @@ -309,6 +310,10 @@ pub mod types { ipv6mr_multiaddr: in6_addr, ipv6mr_interface: c_uint, } + pub struct sockaddr_un { + sun_family: sa_family_t, + sun_path: [c_char, ..108] + } } } @@ -626,6 +631,7 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::c_uint; + pub static sun_len:uint = 104; pub type socklen_t = u32; pub type sa_family_t = u8; pub type in_port_t = u16; @@ -671,6 +677,11 @@ pub mod types { ipv6mr_multiaddr: in6_addr, ipv6mr_interface: c_uint, } + pub struct sockaddr_un { + sun_len: u8, + sun_family: sa_family_t, + sun_path: [c_char, ..104] + } } } @@ -813,6 +824,7 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_int, c_uint}; + pub static sun_len:uint = 108; pub type SOCKET = c_uint; pub type socklen_t = c_int; pub type sa_family_t = u16; @@ -854,6 +866,10 @@ pub mod types { ipv6mr_multiaddr: in6_addr, ipv6mr_interface: c_uint, } + pub struct sockaddr_un { + sun_family: sa_family_t, + sun_path: [c_char, ..108] + } } } @@ -1121,8 +1137,9 @@ pub mod types { } pub mod bsd44 { - use libc::types::os::arch::c95::{c_int, c_uint}; + use libc::types::os::arch::c95::{c_char, c_int, c_uint}; + pub static sun_len:uint = 104; pub type socklen_t = c_int; pub type sa_family_t = u8; pub type in_port_t = u16; @@ -1168,6 +1185,11 @@ pub mod types { ipv6mr_multiaddr: in6_addr, ipv6mr_interface: c_uint, } + pub struct sockaddr_un { + sun_len: u8, + sun_family: sa_family_t, + sun_path: [c_char, ..104] + } } } @@ -2207,6 +2229,7 @@ pub mod consts { pub static MADV_UNMERGEABLE : c_int = 13; pub static MADV_HWPOISON : c_int = 100; + pub static AF_UNIX: c_int = 1; pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 10; pub static SOCK_STREAM: c_int = 1; @@ -3022,6 +3045,7 @@ pub mod consts { pub static MINCORE_REFERENCED_OTHER : c_int = 0x8; pub static MINCORE_MODIFIED_OTHER : c_int = 0x10; + pub static AF_UNIX: c_int = 1; pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 30; pub static SOCK_STREAM: c_int = 1; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 6b3d50a76ac8f..fb8497f157eff 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -266,6 +266,11 @@ pub trait RtioPipe { fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } +pub trait RtioDatagramPipe : RtioPipe { + fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, CString), IoError>; + fn sendto(&mut self, buf: &[u8], dst: &CString) -> Result<(), IoError>; +} + pub trait RtioUnixListener { fn listen(~self) -> Result<~RtioUnixAcceptor, IoError>; }