Skip to content

Commit

Permalink
Avoid unneeded start_session() when spawning
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jun 21, 2024
1 parent 2107adc commit 0fedb4f
Showing 1 changed file with 38 additions and 49 deletions.
87 changes: 38 additions & 49 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,23 +764,6 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
handler_threads: Vec<JoinHandle<()>>,
}

impl<TH: TaskHandler> PooledScheduler<TH> {
fn do_spawn(
pool: Arc<SchedulerPool<Self, TH>>,
initial_context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Self {
Self::from_inner(
PooledSchedulerInner::<Self, TH> {
thread_manager: ThreadManager::new(pool),
usage_queue_loader: UsageQueueLoader::default(),
},
initial_context,
result_with_timings,
)
}
}

struct HandlerPanicked;
type HandlerResult = std::result::Result<Box<ExecutedTask>, HandlerPanicked>;

Expand Down Expand Up @@ -850,7 +833,11 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
);
}

fn start_threads(&mut self, context: &SchedulingContext) {
fn start_threads(
&mut self,
context: SchedulingContext,
mut result_with_timings: ResultWithTimings,
) {
// Firstly, setup bi-directional messaging between the scheduler and handlers to pass
// around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to
// the handlers and the other for finished tasks from the handlers to the scheduler).
Expand Down Expand Up @@ -928,7 +915,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// prioritization further. Consequently, this also contributes to alleviate the known
// heuristic's caveat for the first task of linearized runs, which is described above.
let (mut runnable_task_sender, runnable_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
chained_channel::unbounded::<Task, SchedulingContext>(context);
// Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks,
// because it is more likely that a blocked task will have more blocked tasks behind it,
// which should be scheduled while minimizing the delay to clear buffered linearized runs
Expand Down Expand Up @@ -1011,29 +998,8 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
};
let mut result_with_timings = initialized_result_with_timings();

'nonaborted_main_loop: loop {
match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel((
new_context,
new_result_with_timings,
))) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Ok(_) => {
unreachable!();
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop();
break 'nonaborted_main_loop;
}
}

let mut is_finished = false;
while !is_finished {
// ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl,
Expand Down Expand Up @@ -1119,6 +1085,26 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
.expect("always outlived receiver");
session_ending = false;
}

match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel((
new_context,
new_result_with_timings,
))) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Ok(_) => {
unreachable!();
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop();
break 'nonaborted_main_loop;
}
}
}

// There are several code-path reaching here out of the preceding unconditional
Expand Down Expand Up @@ -1325,13 +1311,14 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

fn start_session(
&mut self,
context: &SchedulingContext,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) {
assert!(!self.are_threads_joined());
assert_matches!(self.session_result_with_timings, None);
self.new_task_sender
.send(NewTaskPayload::OpenSubchannel((
context.clone(),
context,
result_with_timings,
)))
.expect("no new session after aborted");
Expand Down Expand Up @@ -1377,21 +1364,23 @@ impl<TH: TaskHandler> SpawnableScheduler<TH> for PooledScheduler<TH> {
) -> Self {
inner
.thread_manager
.start_session(&context, result_with_timings);
.start_session(context.clone(), result_with_timings);
Self { inner, context }
}

fn spawn(
pool: Arc<SchedulerPool<Self, TH>>,
initial_context: SchedulingContext,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Self {
let mut scheduler = Self::do_spawn(pool, initial_context, result_with_timings);
scheduler
.inner
let mut inner = Self::Inner {
thread_manager: ThreadManager::new(pool),
usage_queue_loader: UsageQueueLoader::default(),
};
inner
.thread_manager
.start_threads(&scheduler.context);
scheduler
.start_threads(context.clone(), result_with_timings);
Self { inner, context }
}
}

Expand Down

0 comments on commit 0fedb4f

Please sign in to comment.