Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement accept, close, connect, fsync ops on windows #188

Merged
merged 2 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions monoio/src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ pub(crate) trait OpAble {
#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry;

#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(super::legacy::ready::Direction, usize)>;
#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
fn legacy_call(&mut self) -> io::Result<u32>;
}

Expand Down Expand Up @@ -116,7 +116,7 @@ impl<T> Op<T> {
where
T: OpAble,
{
#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
if is_legacy() {
return if let Some((dir, id)) = self.data.as_ref().unwrap().legacy_interest() {
OpCanceller {
Expand All @@ -132,7 +132,7 @@ impl<T> Op<T> {
}
OpCanceller {
index: self.index,
#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
direction: None,
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ pub(crate) fn is_legacy() -> bool {
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub(crate) struct OpCanceller {
pub(super) index: usize,
#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
pub(super) direction: Option<super::legacy::ready::Direction>,
}

Expand Down
45 changes: 35 additions & 10 deletions monoio/src/driver/op/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,48 @@ use std::{

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(all(unix, feature = "legacy"))]
#[cfg(windows)]
use {
crate::{driver::legacy::ready::Direction, syscall_u32},
std::os::unix::prelude::AsRawFd,
crate::syscall,
std::os::windows::prelude::AsRawSocket,
windows_sys::Win32::Networking::WinSock::{
accept, socklen_t, INVALID_SOCKET, SOCKADDR_STORAGE,
},
};
#[cfg(all(unix, feature = "legacy"))]
use {crate::syscall_u32, std::os::unix::prelude::AsRawFd};

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(feature = "legacy")]
use crate::driver::legacy::ready::Direction;

/// Accept
pub(crate) struct Accept {
#[allow(unused)]
pub(crate) fd: SharedFd,
#[cfg(unix)]
pub(crate) addr: Box<(MaybeUninit<libc::sockaddr_storage>, libc::socklen_t)>,
#[cfg(windows)]
pub(crate) addr: Box<(MaybeUninit<SOCKADDR_STORAGE>, socklen_t)>,
}

impl Op<Accept> {
#[cfg(unix)]
/// Accept a connection
pub(crate) fn accept(fd: &SharedFd) -> io::Result<Self> {
#[cfg(unix)]
let addr = Box::new((
MaybeUninit::uninit(),
size_of::<libc::sockaddr_storage>() as libc::socklen_t,
));

#[cfg(windows)]
let addr = Box::new((
MaybeUninit::uninit(),
size_of::<SOCKADDR_STORAGE>() as socklen_t,
));

Op::submit_with(Accept {
fd: fd.clone(),
addr: Box::new((
MaybeUninit::uninit(),
size_of::<libc::sockaddr_storage>() as libc::socklen_t,
)),
addr,
})
}
}
Expand All @@ -46,11 +62,20 @@ impl OpAble for Accept {
.build()
}

#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
self.fd.registered_index().map(|idx| (Direction::Read, idx))
}

#[cfg(windows)]
fn legacy_call(&mut self) -> io::Result<u32> {
let fd = self.fd.as_raw_socket();
let addr = self.addr.0.as_mut_ptr() as *mut _;
let len = &mut self.addr.1;

syscall!(accept(fd, addr, len), PartialEq::eq, INVALID_SOCKET)
}

#[cfg(all(unix, feature = "legacy"))]
fn legacy_call(&mut self) -> io::Result<u32> {
let fd = self.fd.as_raw_fd();
Expand Down
29 changes: 24 additions & 5 deletions monoio/src/driver/op/close.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
use std::io;
#[cfg(unix)]
use std::{io, os::unix::io::RawFd};
use std::os::unix::io::RawFd;

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(windows)]
use {
crate::syscall, std::os::windows::io::RawSocket,
windows_sys::Win32::Networking::WinSock::closesocket,
};

use super::{Op, OpAble};
#[cfg(feature = "legacy")]
use crate::driver::legacy::ready::Direction;
#[cfg(all(unix, feature = "legacy"))]
use crate::{driver::legacy::ready::Direction, syscall_u32};
use crate::syscall_u32;

pub(crate) struct Close {
#[cfg(unix)]
fd: RawFd,
#[cfg(windows)]
fd: RawSocket,
}

impl Op<Close> {
Expand All @@ -19,6 +29,11 @@ impl Op<Close> {
pub(crate) fn close(fd: RawFd) -> io::Result<Op<Close>> {
Op::try_submit_with(Close { fd })
}

#[cfg(windows)]
pub(crate) fn close(fd: RawSocket) -> io::Result<Op<Close>> {
Op::try_submit_with(Close { fd })
}
}

impl OpAble for Close {
Expand All @@ -27,13 +42,17 @@ impl OpAble for Close {
opcode::Close::new(types::Fd(self.fd)).build()
}

#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
None
}

#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
fn legacy_call(&mut self) -> io::Result<u32> {
syscall_u32!(close(self.fd))
#[cfg(unix)]
return syscall_u32!(close(self.fd));

#[cfg(windows)]
return syscall!(closesocket(self.fd), PartialEq::ne, 0);
}
}
84 changes: 81 additions & 3 deletions monoio/src/driver/op/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ use std::{io, net::SocketAddr};

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(windows)]
use windows_sys::Win32::Networking::WinSock::{
connect, socklen_t, AF_INET, AF_INET6, IN6_ADDR, IN6_ADDR_0, IN_ADDR, IN_ADDR_0, SOCKADDR_IN,
SOCKADDR_IN6, SOCKADDR_IN6_0, SOCKET_ERROR,
};

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
use crate::driver::legacy::ready::Direction;

pub(crate) struct Connect {
pub(crate) fd: SharedFd,
socket_addr: Box<SocketAddrCRepr>,
#[cfg(windows)]
socket_addr_len: socklen_t,
#[cfg(unix)]
socket_addr_len: libc::socklen_t,
#[cfg(any(target_os = "ios", target_os = "macos"))]
tfo: bool,
Expand Down Expand Up @@ -44,12 +52,12 @@ impl OpAble for Connect {
.build()
}

#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
None
}

#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
fn legacy_call(&mut self) -> io::Result<u32> {
// For ios/macos, if tfo is enabled, we will
// call connectx here.
Expand All @@ -76,6 +84,7 @@ impl OpAble for Connect {
};
}

#[cfg(unix)]
match crate::syscall_u32!(connect(
self.fd.raw_fd(),
self.socket_addr.as_ptr(),
Expand All @@ -84,6 +93,20 @@ impl OpAble for Connect {
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
}

#[cfg(windows)]
match crate::syscall!(
connect(
self.fd.raw_socket(),
self.socket_addr.as_ptr(),
self.socket_addr_len,
),
PartialEq::eq,
SOCKET_ERROR
) {
Err(err) if err.kind() != io::ErrorKind::WouldBlock => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
}
}
}

Expand Down Expand Up @@ -147,8 +170,14 @@ impl OpAble for ConnectUnix {
// Copied from mio.
#[repr(C)]
pub(crate) union SocketAddrCRepr {
#[cfg(unix)]
v4: libc::sockaddr_in,
#[cfg(unix)]
v6: libc::sockaddr_in6,
#[cfg(windows)]
v4: SOCKADDR_IN,
#[cfg(windows)]
v6: SOCKADDR_IN6,
}

impl SocketAddrCRepr {
Expand All @@ -157,6 +186,55 @@ impl SocketAddrCRepr {
}
}

#[cfg(windows)]
pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) {
match addr {
SocketAddr::V4(ref addr) => {
// `s_addr` is stored as BE on all machine and the array is in BE order.
// So the native endian conversion method is used so that it's never swapped.
let sin_addr = unsafe {
let mut s_un = std::mem::zeroed::<IN_ADDR_0>();
s_un.S_addr = u32::from_ne_bytes(addr.ip().octets());
IN_ADDR { S_un: s_un }
};

let sockaddr_in = SOCKADDR_IN {
sin_family: AF_INET as u16, // 1
sin_port: addr.port().to_be(),
sin_addr,
sin_zero: [0; 8],
};

let sockaddr = SocketAddrCRepr { v4: sockaddr_in };
(sockaddr, std::mem::size_of::<SOCKADDR_IN>() as i32)
}
SocketAddr::V6(ref addr) => {
let sin6_addr = unsafe {
let mut u = std::mem::zeroed::<IN6_ADDR_0>();
u.Byte = addr.ip().octets();
IN6_ADDR { u }
};
let u = unsafe {
let mut u = std::mem::zeroed::<SOCKADDR_IN6_0>();
u.sin6_scope_id = addr.scope_id();
u
};

let sockaddr_in6 = SOCKADDR_IN6 {
sin6_family: AF_INET6 as u16, // 23
sin6_port: addr.port().to_be(),
sin6_addr,
sin6_flowinfo: addr.flowinfo(),
Anonymous: u,
};

let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 };
(sockaddr, std::mem::size_of::<SOCKADDR_IN6>() as i32)
}
}
}

#[cfg(unix)]
/// Converts a Rust `SocketAddr` into the system representation.
pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_t) {
match addr {
Expand Down
19 changes: 17 additions & 2 deletions monoio/src/driver/op/fsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ use std::io;

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(windows)]
use windows_sys::Win32::Storage::FileSystem::FlushFileBuffers;

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(feature = "legacy")]
use crate::driver::legacy::ready::Direction;
#[cfg(windows)]
use crate::syscall;
#[cfg(all(unix, feature = "legacy"))]
use crate::{driver::legacy::ready::Direction, syscall_u32};
use crate::syscall_u32;

pub(crate) struct Fsync {
#[allow(unused)]
Expand Down Expand Up @@ -42,11 +48,20 @@ impl OpAble for Fsync {
opc.build()
}

#[cfg(all(unix, feature = "legacy"))]
#[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
None
}

#[cfg(all(windows, feature = "legacy"))]
fn legacy_call(&mut self) -> io::Result<u32> {
syscall!(
FlushFileBuffers(self.handle.as_raw_handle()),
PartialEq::eq,
0
)
}

#[cfg(all(unix, not(target_os = "linux"), feature = "legacy"))]
fn legacy_call(&mut self) -> io::Result<u32> {
syscall_u32!(fsync(self.fd.raw_fd()))
Expand Down
9 changes: 7 additions & 2 deletions monoio/src/driver/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ macro_rules! syscall {
#[cfg(windows)]
#[macro_export]
macro_rules! syscall {
($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
unimplemented!()
($fn: ident ( $($arg: expr),* $(,)* ), $err_test: path, $err_value: expr) => {{
let res = unsafe { $fn($($arg, )*) };
if $err_test(&res, &$err_value) {
Err(io::Error::last_os_error())
} else {
Ok(res)
}
}};
}

Expand Down
Loading