Skip to content

windows signal support part1 #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions open-coroutine-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ uuid = { version = "1.3.0", features = [
crossbeam-utils = "0.8.15"
crossbeam-deque = "0.8.2"
polling = "2.8.0"
core_affinity = "0.8.0"
open-coroutine-timer = { version = "0.1.0", path = "../open-coroutine-timer" }
open-coroutine-queue = { version = "0.1.2", path = "../open-coroutine-queue" }

[target."cfg(windows)".dependencies]
windows-sys = { version = "0.48.0", features = ["Win32_System_SystemInformation", "Win32_System_Diagnostics_Debug"] }

[target.'cfg(target_os = "linux")'.dependencies]
core_affinity = "0.8.0"
windows-sys = { version = "0.48.0", features = [
"Win32_System_SystemInformation",
"Win32_System_Diagnostics_Debug",
"Win32_Foundation",
"Win32_System_Console"
] }

[dev-dependencies]
backtrace = "0.3.67"
Expand Down
4 changes: 2 additions & 2 deletions open-coroutine-core/src/event_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub type UserFunc = extern "C" fn(*const Suspender<(), ()>, usize) -> usize;
#[derive(Debug, Copy, Clone)]
pub struct EventLoops {}

#[cfg(target_os = "linux")]
#[cfg(any(target_os = "linux", windows))]
static BIND: Lazy<bool> = Lazy::new(|| unsafe { EVENT_LOOPS.len() } <= num_cpus::get());

static mut INDEX: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
Expand Down Expand Up @@ -82,7 +82,7 @@ impl EventLoops {
std::thread::Builder::new()
.name(format!("open-coroutine-event-loop-{i}"))
.spawn(move || {
#[cfg(target_os = "linux")]
#[cfg(any(target_os = "linux", windows))]
if *BIND {
assert!(
core_affinity::set_for_current(core_affinity::CoreId {
Expand Down
3 changes: 2 additions & 1 deletion open-coroutine-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ macro_rules! unbreakable {
};
}

#[cfg(all(unix, feature = "preemptive-schedule"))]
#[allow(dead_code)]
#[cfg(feature = "preemptive-schedule")]
mod monitor;

#[allow(
Expand Down
112 changes: 88 additions & 24 deletions open-coroutine-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ impl Monitor {
target_os = "android",
target_os = "emscripten"))] {
libc::SIGRTMIN()
} else if #[cfg(windows)] {
windows_sys::Win32::System::Console::CTRL_C_EVENT.try_into().unwrap()
} else {
libc::SIGURG
}
Expand All @@ -48,34 +50,51 @@ impl Monitor {
}
}

fn new() -> Self {
#[allow(clippy::fn_to_numeric_cast)]
unsafe extern "C" fn sigurg_handler(_signal: libc::c_int) {
// invoke by Monitor::signal()
if let Some(s) = crate::coroutine::suspender::Suspender::<(), ()>::current() {
//获取当前信号屏蔽集
let mut current_mask: libc::sigset_t = std::mem::zeroed();
assert_eq!(
0,
libc::pthread_sigmask(libc::SIG_BLOCK, std::ptr::null(), &mut current_mask),
);
//删除对Monitor::signum()信号的屏蔽,使信号处理函数即使在处理中,也可以再次进入信号处理函数
assert_eq!(0, libc::sigdelset(&mut current_mask, Monitor::signum()));
assert_eq!(
0,
libc::pthread_sigmask(libc::SIG_SETMASK, &current_mask, std::ptr::null_mut())
);
s.suspend();
fn init_signal_handler() {
cfg_if::cfg_if! {
if #[cfg(windows)] {
unsafe extern "system" fn sigint_handler(_: u32) -> windows_sys::Win32::Foundation::BOOL {
// invoke by Monitor::signal()
if let Some(s) = crate::coroutine::suspender::Suspender::<(), ()>::current() {
s.suspend();
}
windows_sys::Win32::Foundation::TRUE
}
assert_eq!(windows_sys::Win32::Foundation::TRUE,
unsafe{ windows_sys::Win32::System::Console::SetConsoleCtrlHandler(Some(sigint_handler), windows_sys::Win32::Foundation::TRUE) });
} else {
#[allow(clippy::fn_to_numeric_cast)]
unsafe extern "C" fn sigurg_handler(_signal: libc::c_int) {
// invoke by Monitor::signal()
if let Some(s) = crate::coroutine::suspender::Suspender::<(), ()>::current() {
//删除对Monitor::signum()信号的屏蔽,使信号处理函数即使在处理中,也可以再次进入信号处理函数
let mut current_mask: libc::sigset_t = std::mem::zeroed();
assert_eq!(
0,
libc::pthread_sigmask(libc::SIG_BLOCK, std::ptr::null(), &mut current_mask),
);
assert_eq!(0, libc::sigdelset(&mut current_mask, Monitor::signum()));
assert_eq!(
0,
libc::pthread_sigmask(libc::SIG_SETMASK, &current_mask, std::ptr::null_mut())
);
s.suspend();
}
}
Monitor::register_handler(sigurg_handler as libc::sighandler_t);
}
}
Monitor::register_handler(sigurg_handler as libc::sighandler_t);
}

fn new() -> Self {
Monitor::init_signal_handler();
//通过这种方式来初始化monitor线程
_ = MONITOR.get_or_init(|| {
std::thread::Builder::new()
.name("open-coroutine-monitor".to_string())
.spawn(|| {
// todo pin this thread to the CPU core closest to the network card
#[cfg(target_os = "linux")]
#[cfg(any(target_os = "linux", windows))]
assert!(
core_affinity::set_for_current(core_affinity::CoreId { id: 0 }),
"pin monitor thread to a single CPU core failed !"
Expand Down Expand Up @@ -117,10 +136,19 @@ impl Monitor {
if CoroutineState::Running == (*coroutine).get_state() {
//只对陷入重度计算的协程发送信号抢占,对陷入执行系统调用的协程
//不发送信号(如果发送信号,会打断系统调用,进而降低总体性能)
#[cfg(unix)]
assert_eq!(
0,
libc::pthread_kill(node.get_pthread(), Monitor::signum())
);
#[cfg(windows)]
assert_ne!(
0,
windows_sys::Win32::System::Console::GenerateConsoleCtrlEvent(
Monitor::signum().try_into().unwrap(),
0
)
);
}
}
}
Expand All @@ -130,7 +158,13 @@ impl Monitor {

pub(crate) fn add_task(time: u64, coroutine: Option<*const SchedulableCoroutine>) {
unsafe {
let pthread = libc::pthread_self();
cfg_if::cfg_if! {
if #[cfg(windows)] {
let pthread = windows_sys::Win32::System::Threading::GetCurrentThread();
} else {
let pthread = libc::pthread_self();
}
}
Monitor::global()
.task
.insert(time, TaskNode::new(pthread, coroutine));
Expand All @@ -140,7 +174,13 @@ impl Monitor {
pub(crate) fn clean_task(time: u64) {
if let Some(entry) = Monitor::global().task.get_entry(time) {
unsafe {
let pthread = libc::pthread_self();
cfg_if::cfg_if! {
if #[cfg(windows)] {
let pthread = windows_sys::Win32::System::Threading::GetCurrentThread();
} else {
let pthread = libc::pthread_self();
}
}
if !entry.is_empty() {
_ = entry.remove(&TaskNode::new(pthread, None));
}
Expand All @@ -149,12 +189,12 @@ impl Monitor {
}
}

#[cfg(all(test, unix))]
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

#[ignore]
#[cfg(unix)]
#[test]
fn test() {
extern "C" fn sigurg_handler(_signal: libc::c_int) {
Expand All @@ -168,6 +208,30 @@ mod tests {
}

#[ignore]
#[cfg(windows)]
#[test]
fn test() {
unsafe extern "system" fn sigint_handler(_: u32) -> windows_sys::Win32::Foundation::BOOL {
println!("sigint handled");
windows_sys::Win32::Foundation::TRUE
}
unsafe {
assert_eq!(
windows_sys::Win32::Foundation::TRUE,
windows_sys::Win32::System::Console::SetConsoleCtrlHandler(
Some(sigint_handler),
windows_sys::Win32::Foundation::TRUE
)
)
};
let time = open_coroutine_timer::get_timeout_time(Duration::from_millis(10));
Monitor::add_task(time, None);
std::thread::sleep(Duration::from_millis(20));
Monitor::clean_task(time);
}

#[ignore]
#[cfg(unix)]
#[test]
fn test_clean() {
extern "C" fn sigurg_handler(_signal: libc::c_int) {
Expand Down
19 changes: 19 additions & 0 deletions open-coroutine-core/src/monitor/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,38 @@ use crate::scheduler::SchedulableCoroutine;

#[derive(Debug)]
pub(crate) struct TaskNode {
#[cfg(windows)]
pthread: windows_sys::Win32::Foundation::HANDLE,
#[cfg(unix)]
pthread: libc::pthread_t,
coroutine: Option<*const SchedulableCoroutine>,
}

#[allow(dead_code)]
impl TaskNode {
#[cfg(unix)]
pub fn new(pthread: libc::pthread_t, coroutine: Option<*const SchedulableCoroutine>) -> Self {
TaskNode { pthread, coroutine }
}

#[cfg(windows)]
pub fn new(
pthread: windows_sys::Win32::Foundation::HANDLE,
coroutine: Option<*const SchedulableCoroutine>,
) -> Self {
TaskNode { pthread, coroutine }
}

#[cfg(unix)]
pub fn get_pthread(&self) -> libc::pthread_t {
self.pthread
}

#[cfg(windows)]
pub fn get_pthread(&self) -> windows_sys::Win32::Foundation::HANDLE {
self.pthread
}

pub fn get_coroutine(&self) -> Option<*const SchedulableCoroutine> {
self.coroutine
}
Expand Down
67 changes: 65 additions & 2 deletions open-coroutine-core/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Scheduler {
Some(mut coroutine) => {
_ = coroutine.set_scheduler(self);
cfg_if::cfg_if! {
if #[cfg(all(unix, feature = "preemptive-schedule"))] {
if #[cfg(feature = "preemptive-schedule")] {
let start = open_coroutine_timer::get_timeout_time(Duration::from_millis(10));
crate::monitor::Monitor::add_task(start, Some(&coroutine));
}
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Scheduler {
_ => unreachable!("should never execute to here"),
};
cfg_if::cfg_if! {
if #[cfg(all(unix, feature = "preemptive-schedule"))] {
if #[cfg(feature = "preemptive-schedule")] {
//还没执行到10ms就主动yield或者执行完毕了,此时需要清理任务
//否则下一个协程执行不到10ms就会被抢占调度
crate::monitor::Monitor::clean_task(start);
Expand Down Expand Up @@ -319,6 +319,69 @@ mod tests {
scheduler.try_schedule();
}

#[cfg(windows)]
#[test]
fn simple_preemptive_schedule() -> std::io::Result<()> {
//fixme not success now
use std::sync::{Arc, Condvar, Mutex};
static mut TEST_FLAG0: bool = true;
let pair = Arc::new((Mutex::new(true), Condvar::new()));
let pair2 = Arc::clone(&pair);
let handler = std::thread::Builder::new()
.name("test_preemptive_schedule".to_string())
.spawn(move || {
let scheduler = Box::leak(Box::new(Scheduler::new()));
_ = scheduler.submit(
|_, _| {
unsafe {
while TEST_FLAG0 {
windows_sys::Win32::System::Threading::Sleep(10);
}
}
1
},
None,
);
_ = scheduler.submit(
|_, _| {
unsafe { TEST_FLAG0 = false };
2
},
None,
);
scheduler.try_schedule();

let (lock, cvar) = &*pair2;
let mut pending = lock.lock().unwrap();
*pending = false;
// notify the condvar that the value has changed.
cvar.notify_one();
})
.expect("failed to spawn thread");

// wait for the thread to start up
let (lock, cvar) = &*pair;
let result = cvar
.wait_timeout_while(
lock.lock().unwrap(),
Duration::from_millis(3000),
|&mut pending| pending,
)
.unwrap();
if result.1.timed_out() {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"preemptive schedule failed",
))
} else {
unsafe {
handler.join().unwrap();
assert!(!TEST_FLAG0);
}
Ok(())
}
}

#[cfg(all(unix, feature = "preemptive-schedule"))]
#[test]
fn preemptive_schedule() -> std::io::Result<()> {
Expand Down