Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unneeded start_session() when spawning #1815

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 61 additions & 52 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,23 +766,6 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
handler_threads: Vec<JoinHandle<()>>,
}

impl<TH: TaskHandler> PooledScheduler<TH> {
fn do_spawn(
pool: Arc<SchedulerPool<Self, TH>>,
initial_context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Self {
Self::from_inner(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_inner calls start session internally, which sends the initial OpenSubChannel.

PooledSchedulerInner::<Self, TH> {
thread_manager: ThreadManager::new(pool),
usage_queue_loader: UsageQueueLoader::default(),
},
initial_context,
result_with_timings,
)
}
}

struct HandlerPanicked;
type HandlerResult = std::result::Result<Box<ExecutedTask>, HandlerPanicked>;

Expand Down Expand Up @@ -852,7 +835,15 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
);
}

fn start_threads(&mut self, context: &SchedulingContext) {
// This method must take same set of session-related arguments as start_session() to avoid
// unneeded channel operations to minimize overhead. Starting threads incurs a very high cost
// already... Also, pre-creating threads isn't desirable as well to avoid `Option`-ed types
// for type safety.
fn start_threads(
&mut self,
context: SchedulingContext,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of relevant api now takes ownership of SchedulingContext because their impl always consumes. so, there's no reduced .clone()s even if we pass it via references.

mut result_with_timings: ResultWithTimings,
) {
// Firstly, setup bi-directional messaging between the scheduler and handlers to pass
// around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to
// the handlers and the other for finished tasks from the handlers to the scheduler).
Expand Down Expand Up @@ -930,7 +921,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// prioritization further. Consequently, this also contributes to alleviate the known
// heuristic's caveat for the first task of linearized runs, which is described above.
let (mut runnable_task_sender, runnable_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
chained_channel::unbounded::<Task, SchedulingContext>(context);
// Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks,
// because it is more likely that a blocked task will have more blocked tasks behind it,
// which should be scheduled while minimizing the delay to clear buffered linearized runs
Expand Down Expand Up @@ -1013,29 +1004,14 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
};
let mut result_with_timings = initialized_result_with_timings();

// The following loop maintains and updates ResultWithTimings as its
// externally-provieded mutable state for each session in this way:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo:

Suggested change
// externally-provieded mutable state for each session in this way:
// externally-provided mutable state for each session in this way:

Copy link
Member Author

@ryoqun ryoqun Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err, I'll merge this pr as-is. There's another pr for cosmetics fixes for tomorrow work...

Copy link

@apfitzge apfitzge Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all good, that's why I approved 😉

usually don't wanna block on these typos, but just try to call them out as I see them during my read.

//
// 1. Initial result_with_timing is propagated implicitly by the moved variable.
// 2. Subsequent result_with_timings are propagated explicitly from
// the new_task_receiver.recv() invocation located at the end of loop.
'nonaborted_main_loop: loop {
match new_task_receiver.recv() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order here changed; the initial iteration does not receive an open subchannel before the is_finished loop.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm actually confused here now.

We call do_take_resumed_scheduler, which calls spawn, which calls start_threads.

But I guess I'm lost on how the handler threads are getting the sub-channel opened on their end - we no longer receive the initial OpenSubChannel message before 1st inner loop, and so we do not call send_chained_channel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for detailing your confusion. I think i added some good comments to address that?: bbe54f3

Ok(NewTaskPayload::OpenSubchannel((
new_context,
new_result_with_timings,
))) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Ok(_) => {
unreachable!();
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop();
break 'nonaborted_main_loop;
}
}

let mut is_finished = false;
while !is_finished {
// ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl,
Expand Down Expand Up @@ -1121,6 +1097,28 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
.expect("always outlived receiver");
session_ending = false;
}

match new_task_receiver.recv() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after is_finished, wait for new subchannel to open or channel to be dropped

Ok(NewTaskPayload::OpenSubchannel((
new_context,
new_result_with_timings,
))) => {
// We just received subsequent (= not initial) session and about to
// enter into the preceding `while(!is_finished) {...}` loop again.
// Before that, propagate new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Ok(_) => {
unreachable!();
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop();
break 'nonaborted_main_loop;
}
}
}

// There are several code-path reaching here out of the preceding unconditional
Expand Down Expand Up @@ -1152,6 +1150,14 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let finished_blocked_task_sender = finished_blocked_task_sender.clone();
let finished_idle_task_sender = finished_idle_task_sender.clone();

// The following loop maintains and updates SchedulingContext as its
// externally-provided state for each session in this way:
//
// 1. Initial context is propagated implicitly by the moved runnable_task_receiver,
// which is clone()-d just above for this particular thread.
// 2. Subsequent contexts are propagated explicitly inside `.after_select()` as part of
// `select_biased!`, which are sent from `.send_chained_channel()` in the scheduler
// thread for all-but-initial sessions.
move || loop {
let (task, sender) = select_biased! {
recv(runnable_task_receiver.for_select()) -> message => {
Expand Down Expand Up @@ -1327,13 +1333,14 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

fn start_session(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now start_session() and spawn()'s arg types are aligned as much as possible.

&mut self,
context: &SchedulingContext,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) {
assert!(!self.are_threads_joined());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wanted to place this sane assert! as an original intention.

Copy link
Member Author

@ryoqun ryoqun Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to recap, prev code called start_session() before calling start_thread() inside from_inner().

assert_matches!(self.session_result_with_timings, None);
self.new_task_sender
.send(NewTaskPayload::OpenSubchannel((
context.clone(),
context,
result_with_timings,
)))
.expect("no new session after aborted");
Expand All @@ -1353,7 +1360,7 @@ pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {

fn spawn(
pool: Arc<SchedulerPool<Self, TH>>,
initial_context: SchedulingContext,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think initial_context is adding any substential value for readability. so, preferred consistency by naming this as just context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise, i'd have to name result_with_timings as initial_result_with_timings, which is mouthful, imo.

context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Self
where
Expand All @@ -1379,21 +1386,23 @@ impl<TH: TaskHandler> SpawnableScheduler<TH> for PooledScheduler<TH> {
) -> Self {
inner
.thread_manager
.start_session(&context, result_with_timings);
.start_session(context.clone(), result_with_timings);
Copy link
Member Author

@ryoqun ryoqun Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, .start_session() and .start_threads() is really similarly named and args are consistent for beautiful consistency.

Self { inner, context }
}

fn spawn(
pool: Arc<SchedulerPool<Self, TH>>,
initial_context: SchedulingContext,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Self {
let mut scheduler = Self::do_spawn(pool, initial_context, result_with_timings);
scheduler
.inner
let mut inner = Self::Inner {
thread_manager: ThreadManager::new(pool),
usage_queue_loader: UsageQueueLoader::default(),
};
inner
.thread_manager
.start_threads(&scheduler.context);
scheduler
.start_threads(context.clone(), result_with_timings);
Self { inner, context }
}
}

Expand Down Expand Up @@ -2775,13 +2784,13 @@ mod tests {

fn spawn(
pool: Arc<SchedulerPool<Self, DefaultTaskHandler>>,
initial_context: SchedulingContext,
context: SchedulingContext,
_result_with_timings: ResultWithTimings,
) -> Self {
AsyncScheduler::<TRIGGER_RACE_CONDITION>(
Mutex::new(initialized_result_with_timings()),
Mutex::new(vec![]),
initial_context,
context,
pool,
)
}
Expand Down