-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rt(threaded): adjust transition_from_parked behavior after introducing disable_lifo_slot feature #5753
Conversation
I'm sorry, I tried but couldn't come up with this test case. As shown in the code below, my intention was to prevent other workers from leaving the park state through on_thread_unpark, but it seems that run_task -> transition_from_searching -> transition_worker_from_searching always wakes up a parked worker. #[test]
fn issue_5751() {
use std::thread::ThreadId;
use std::collections::HashMap;
struct ParkState {
park_cnt: HashMap<ThreadId, (usize /* park */, usize /* unpark */)>,
no_unpark: bool,
}
impl ParkState {
fn new() -> Self {
Self {
park_cnt: HashMap::new(),
no_unpark: false,
}
}
}
fn wait_park(park_state: &Mutex<ParkState>, exclude: Option<ThreadId>, thread_num: usize) {
'outer: loop {
std::thread::sleep(std::time::Duration::from_millis(10));
let park_state = park_state.lock().unwrap();
if park_state.park_cnt.len() < thread_num {
continue;
}
for (&thdid, &cnts) in &park_state.park_cnt {
if let Some(exclude) = exclude {
if exclude == thdid {
continue;
}
}
if cnts.0 <= cnts.1 {
continue 'outer;
}
}
break;
}
}
let thread_num = 4;
let park_state = Arc::new(Mutex::new(ParkState::new()));
let rt = runtime::Builder::new_multi_thread()
.disable_lifo_slot()
.worker_threads(thread_num)
.on_thread_park({
let park_state = Arc::clone(&park_state);
move || {
let current_thd = std::thread::current().id();
let mut park_state = park_state.lock().unwrap();
if let Some(v) = park_state.park_cnt.get_mut(¤t_thd) {
v.0 += 1;
} else {
park_state.park_cnt.insert(current_thd, (1, 0));
}
}
})
.on_thread_unpark({
let park_state = Arc::clone(&park_state);
move || {
let current_thd = std::thread::current().id();
let mut park_state = park_state.lock().unwrap();
if park_state.no_unpark {
panic!("unexpected unpark");
}
park_state.park_cnt.get_mut(¤t_thd).unwrap().1 += 1;
}
})
.build()
.unwrap();
// wait for all workers to enter the park state
wait_park(&park_state, None, thread_num);
rt.block_on(async {
tokio::spawn(async move {
std::thread::sleep(std::time::Duration::from_secs(7));
let current_thd = std::thread::current().id();
wait_park(&park_state, Some(current_thd), thread_num);
{
let mut park_state = park_state.lock().unwrap();
park_state.no_unpark = true;
}
tokio::task::yield_now().await;
assert_eq!(current_thd, std::thread::current().id());
std::thread::sleep(std::time::Duration::from_secs(7));
{
let mut park_state = park_state.lock().unwrap();
park_state.no_unpark = false;
}
})
.await
.unwrap();
})
} |
Thanks, the PR is helpful for me to understand the issue better. I will try to write a test for this. |
I dug into writing a test for this. It turns out that some other issue (relatively minor) counters this one and prevents any bugs 🤷 The other issue is I wouldn't worry too much about this, I am in the process of rewriting all of that logic, so I will fix it then. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, one smaller change is requested.
pub(crate) fn is_stealable(&self) -> bool { | ||
!self.inner.is_empty() | ||
/// Returns the number of entries in the queue | ||
pub(crate) fn tasks_num(&self) -> (usize /* stealable */, usize /* total */) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fine to return a single usize
here. I am not sure which comment you are referencing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"protect" some tasks from stealing
tokio/tokio/src/runtime/scheduler/multi_thread/queue.rs
Lines 122 to 128 in fb4d430
/// Returns false if there are any entries in the queue /// /// Separate to is_stealable so that refactors of is_stealable to "protect" /// some tasks from stealing won't affect this pub(crate) fn has_tasks(&self) -> bool { !self.inner.is_empty() }
@carllerche @Darksonn Is there anything else I need to do here? |
Motivation
Fix #5751
Solution
As described in #5751