Skip to content

try support IOCP #323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a51c248
support completion io for windows
loongs-zhang Nov 13, 2024
31a425f
fix workflow
loongs-zhang Nov 13, 2024
11442dc
fix linux CI
loongs-zhang Nov 13, 2024
9445411
hook connect
loongs-zhang Nov 13, 2024
1d40dda
hook connect
loongs-zhang Nov 13, 2024
1b688f0
hook connect
loongs-zhang Nov 13, 2024
ae65113
定位IOCP bug
loongs-zhang Nov 13, 2024
1014fec
定位IOCP bug
loongs-zhang Nov 13, 2024
471aaf6
定位IOCP bug
loongs-zhang Nov 13, 2024
dc2045e
定位IOCP bug
loongs-zhang Nov 13, 2024
76b02d4
定位IOCP bug
loongs-zhang Nov 13, 2024
eb4118b
定位IOCP bug
loongs-zhang Nov 13, 2024
382de04
定位IOCP bug
loongs-zhang Nov 13, 2024
aff1562
定位IOCP bug
loongs-zhang Nov 13, 2024
01fdaca
fix IOCP accept bug
loongs-zhang Nov 13, 2024
356097d
try fix IOCP
loongs-zhang Nov 13, 2024
192bf15
try fix IOCP
loongs-zhang Nov 13, 2024
2354ce5
try fix IOCP
loongs-zhang Nov 13, 2024
a725ec1
try fix IOCP
loongs-zhang Nov 13, 2024
e7e3ba0
try fix IOCP
loongs-zhang Nov 14, 2024
e0e161c
try fix IOCP
loongs-zhang Nov 14, 2024
56f2fb0
try fix IOCP
loongs-zhang Nov 14, 2024
a02b86e
try fix IOCP
loongs-zhang Nov 14, 2024
3975986
try fix IOCP
loongs-zhang Nov 14, 2024
9c7918f
try fix IOCP
loongs-zhang Nov 14, 2024
fdb7d04
try fix IOCP
loongs-zhang Nov 14, 2024
d1715e9
try fix IOCP
loongs-zhang Nov 14, 2024
39c68d2
add log
loongs-zhang Nov 14, 2024
c92f5c8
add log
loongs-zhang Nov 14, 2024
bb33f7c
add log
loongs-zhang Nov 14, 2024
dc8c18a
add log
loongs-zhang Nov 14, 2024
cce5961
add log
loongs-zhang Nov 14, 2024
8244491
Merge branch 'master' into dev-iocp4
loongs-zhang Nov 15, 2024
ffe34cb
make CI faster
loongs-zhang Nov 15, 2024
054174b
try pass IOCP CI
loongs-zhang Nov 15, 2024
37e7853
test accept IOCP
loongs-zhang Nov 15, 2024
193c73b
test accept IOCP
loongs-zhang Nov 15, 2024
d9f2e41
test accept IOCP
loongs-zhang Nov 15, 2024
04e996e
test WSARecv/WSASend IOCP
loongs-zhang Nov 15, 2024
19c8453
test send IOCP
loongs-zhang Nov 15, 2024
aaaf15e
test send IOCP
loongs-zhang Nov 15, 2024
11d96b7
test send IOCP
loongs-zhang Nov 15, 2024
e0a8c34
Merge branch 'master' into dev-iocp4
loongs-zhang Jan 6, 2025
7af988e
try IOCP
loongs-zhang Jan 6, 2025
43be1fc
Merge branch 'master' into dev-iocp4
loongs-zhang Jan 11, 2025
15e7a29
Merge branch 'master' into dev-iocp4
loongs-zhang Jan 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ci-preemptive.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive,ci
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive,ci --release
fi

# test IOCP
if [ "${OS}" = "windows-latest" ]; then
cd "${PROJECT_DIR}"/core
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive --release
cd "${PROJECT_DIR}"/open-coroutine
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive --release
fi
17 changes: 17 additions & 0 deletions .github/workflows/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ fi
export RUST_TEST_THREADS=1
export RUST_BACKTRACE=1

# todo remove this
if [ "${OS}" = "windows-latest" ]; then
cd "${PROJECT_DIR}"/open-coroutine
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp --release
fi

# test open-coroutine-core mod
cd "${PROJECT_DIR}"/core
"${CARGO}" test --target "${TARGET}" --features ci
Expand All @@ -34,3 +41,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,ci
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,ci --release
fi

# test IOCP
if [ "${OS}" = "windows-latest" ]; then
cd "${PROJECT_DIR}"/core
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp --release
cd "${PROJECT_DIR}"/open-coroutine
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp --release
fi
6 changes: 6 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,11 @@ net = ["korosensei", "polling", "mio", "crossbeam-utils", "core_affinity"]
# Provide io_uring adaptation, this feature only works in linux.
io_uring = ["net", "io-uring"]

# Provide IOCP adaptation, this feature only works in windows.
iocp = ["net"]

# Provide completion IOCP adaptation
completion_io = ["io_uring", "iocp"]

# Provide syscall implementation.
syscall = ["net"]
107 changes: 103 additions & 4 deletions core/src/net/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,34 @@ cfg_if::cfg_if! {
}
}

cfg_if::cfg_if! {
if #[cfg(all(windows, feature = "iocp"))] {
use std::ffi::c_uint;
use windows_sys::core::{PCSTR, PSTR};
use windows_sys::Win32::Networking::WinSock::{
setsockopt, LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT, WSABUF,
};
use windows_sys::Win32::System::IO::OVERLAPPED;
}
}

#[repr(C)]
#[derive(Debug)]
pub(crate) struct EventLoop<'e> {
stop: Arc<(Mutex<bool>, Condvar)>,
shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
cpu: usize,
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
operator: crate::net::operator::Operator<'e>,
#[allow(clippy::type_complexity)]
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
syscall_wait_table: DashMap<usize, Arc<(Mutex<Option<c_longlong>>, Condvar)>>,
selector: Poller,
pool: CoroutinePool<'e>,
Expand Down Expand Up @@ -87,9 +105,15 @@ impl<'e> EventLoop<'e> {
stop: Arc::new((Mutex::new(false), Condvar::new())),
shared_stop,
cpu,
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
operator: crate::net::operator::Operator::new(cpu)?,
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
syscall_wait_table: DashMap::new(),
selector: Poller::new()?,
pool: CoroutinePool::new(name, stack_size, min_size, max_size, keep_alive_time),
Expand Down Expand Up @@ -222,6 +246,8 @@ impl<'e> EventLoop<'e> {
cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
left_time = self.adapt_io_uring(left_time)?;
} else if #[cfg(all(windows, feature = "iocp"))] {
left_time = self.adapt_iocp(left_time)?;
}
}

Expand Down Expand Up @@ -267,6 +293,51 @@ impl<'e> EventLoop<'e> {
Ok(left_time)
}

#[cfg(all(windows, feature = "iocp"))]
fn adapt_iocp(&self, mut left_time: Option<Duration>) -> std::io::Result<Option<Duration>> {
// use IOCP
let (count, mut cq, left) = self.operator.select(left_time, 0)?;
if count > 0 {
for cqe in &mut cq {
let token = cqe.token;
let bytes_transferred = cqe.bytes_transferred;
// resolve completed read/write tasks
// todo refactor IOCP impl
let result = match cqe.syscall {
Syscall::accept => unsafe {
if setsockopt(
cqe.socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
std::ptr::from_ref(&cqe.from_fd).cast(),
c_int::try_from(size_of::<SOCKET>()).expect("overflow"),
) == 0
{
cqe.socket.try_into().expect("result overflow")
} else {
-c_longlong::from(windows_sys::Win32::Foundation::GetLastError())
}
},
Syscall::recv | Syscall::WSARecv | Syscall::send | Syscall::WSASend => {
bytes_transferred.into()
}
_ => panic!("unsupported"),
};
if let Some((_, pair)) = self.syscall_wait_table.remove(&token) {
let (lock, cvar) = &*pair;
let mut pending = lock.lock().expect("lock failed");
*pending = Some(result);
cvar.notify_one();
}
unsafe { self.resume(token) };
}
}
if left != left_time {
left_time = Some(left.unwrap_or(Duration::ZERO));
}
Ok(left_time)
}

unsafe fn resume(&self, token: usize) {
if COROUTINE_TOKENS.remove(&token).is_none() {
return;
Expand Down Expand Up @@ -446,6 +517,34 @@ impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);

macro_rules! impl_iocp {
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
#[cfg(all(windows, feature = "iocp"))]
impl EventLoop<'_> {
#[allow(non_snake_case, clippy::too_many_arguments)]
pub(super) fn $syscall(
&self,
$($arg: $arg_type),*
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
let token = EventLoop::token(Syscall::$syscall);
self.operator.$syscall(token, $($arg, )*)?;
let arc = Arc::new((Mutex::new(None), Condvar::new()));
assert!(
self.syscall_wait_table.insert(token, arc.clone()).is_none(),
"The previous token was not retrieved in a timely manner"
);
Ok(arc)
}
}
}
}

impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);

#[cfg(all(test, not(all(unix, feature = "preemptive"))))]
mod tests {
use crate::net::event_loop::EventLoop;
Expand Down
38 changes: 37 additions & 1 deletion core/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,29 @@ cfg_if::cfg_if! {
}
}

cfg_if::cfg_if! {
if #[cfg(all(windows, feature = "iocp"))] {
use std::ffi::c_uint;
use windows_sys::core::{PCSTR, PSTR};
use windows_sys::Win32::Networking::WinSock::{
LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, WSABUF,
};
use windows_sys::Win32::System::IO::OVERLAPPED;
}
}

/// 做C兼容时会用到
pub type UserFunc = extern "C" fn(usize) -> usize;

mod selector;

#[allow(clippy::too_many_arguments)]
#[cfg(all(target_os = "linux", feature = "io_uring"))]
mod operator;
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
pub(crate) mod operator;

#[allow(missing_docs)]
pub mod event_loop;
Expand Down Expand Up @@ -280,3 +295,24 @@ impl_io_uring!(fsync(fd: c_int) -> c_int);
impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);

macro_rules! impl_iocp {
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
#[allow(non_snake_case)]
#[cfg(all(windows, feature = "iocp"))]
impl EventLoops {
#[allow(missing_docs)]
pub fn $syscall(
$($arg: $arg_type),*
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
Self::event_loop().$syscall($($arg, )*)
}
}
}
}

impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
6 changes: 6 additions & 0 deletions core/src/net/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@
mod linux;
#[cfg(all(target_os = "linux", feature = "io_uring"))]
pub(crate) use linux::*;

#[allow(non_snake_case)]
#[cfg(all(windows, feature = "iocp"))]
mod windows;
#[cfg(all(windows, feature = "iocp"))]
pub(crate) use windows::*;
Loading
Loading