From 0fedb4f3ceeab1c47130ec3a521e73035996c939 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 20 Jun 2024 22:22:06 +0900 Subject: [PATCH] Avoid unneeded start_session() when spawning --- unified-scheduler-pool/src/lib.rs | 87 ++++++++++++++----------------- 1 file changed, 38 insertions(+), 49 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index c69df9c606bc64..a5304968b88ddb 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -764,23 +764,6 @@ struct ThreadManager, TH: TaskHandler> { handler_threads: Vec>, } -impl PooledScheduler { - fn do_spawn( - pool: Arc>, - initial_context: SchedulingContext, - result_with_timings: ResultWithTimings, - ) -> Self { - Self::from_inner( - PooledSchedulerInner:: { - thread_manager: ThreadManager::new(pool), - usage_queue_loader: UsageQueueLoader::default(), - }, - initial_context, - result_with_timings, - ) - } -} - struct HandlerPanicked; type HandlerResult = std::result::Result, HandlerPanicked>; @@ -850,7 +833,11 @@ impl, TH: TaskHandler> ThreadManager { ); } - fn start_threads(&mut self, context: &SchedulingContext) { + fn start_threads( + &mut self, + context: SchedulingContext, + mut result_with_timings: ResultWithTimings, + ) { // Firstly, setup bi-directional messaging between the scheduler and handlers to pass // around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to // the handlers and the other for finished tasks from the handlers to the scheduler). @@ -928,7 +915,7 @@ impl, TH: TaskHandler> ThreadManager { // prioritization further. Consequently, this also contributes to alleviate the known // heuristic's caveat for the first task of linearized runs, which is described above. let (mut runnable_task_sender, runnable_task_receiver) = - chained_channel::unbounded::(context.clone()); + chained_channel::unbounded::(context); // Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks, // because it is more likely that a blocked task will have more blocked tasks behind it, // which should be scheduled while minimizing the delay to clear buffered linearized runs @@ -1011,29 +998,8 @@ impl, TH: TaskHandler> ThreadManager { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let mut result_with_timings = initialized_result_with_timings(); 'nonaborted_main_loop: loop { - match new_task_receiver.recv() { - Ok(NewTaskPayload::OpenSubchannel(( - new_context, - new_result_with_timings, - ))) => { - // signal about new SchedulingContext to handler threads - runnable_task_sender - .send_chained_channel(new_context, handler_count) - .unwrap(); - result_with_timings = new_result_with_timings; - } - Ok(_) => { - unreachable!(); - } - Err(_) => { - // This unusual condition must be triggered by ThreadManager::drop(); - break 'nonaborted_main_loop; - } - } - let mut is_finished = false; while !is_finished { // ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl, @@ -1119,6 +1085,26 @@ impl, TH: TaskHandler> ThreadManager { .expect("always outlived receiver"); session_ending = false; } + + match new_task_receiver.recv() { + Ok(NewTaskPayload::OpenSubchannel(( + new_context, + new_result_with_timings, + ))) => { + // signal about new SchedulingContext to handler threads + runnable_task_sender + .send_chained_channel(new_context, handler_count) + .unwrap(); + result_with_timings = new_result_with_timings; + } + Ok(_) => { + unreachable!(); + } + Err(_) => { + // This unusual condition must be triggered by ThreadManager::drop(); + break 'nonaborted_main_loop; + } + } } // There are several code-path reaching here out of the preceding unconditional @@ -1325,13 +1311,14 @@ impl, TH: TaskHandler> ThreadManager { fn start_session( &mut self, - context: &SchedulingContext, + context: SchedulingContext, result_with_timings: ResultWithTimings, ) { + assert!(!self.are_threads_joined()); assert_matches!(self.session_result_with_timings, None); self.new_task_sender .send(NewTaskPayload::OpenSubchannel(( - context.clone(), + context, result_with_timings, ))) .expect("no new session after aborted"); @@ -1377,21 +1364,23 @@ impl SpawnableScheduler for PooledScheduler { ) -> Self { inner .thread_manager - .start_session(&context, result_with_timings); + .start_session(context.clone(), result_with_timings); Self { inner, context } } fn spawn( pool: Arc>, - initial_context: SchedulingContext, + context: SchedulingContext, result_with_timings: ResultWithTimings, ) -> Self { - let mut scheduler = Self::do_spawn(pool, initial_context, result_with_timings); - scheduler - .inner + let mut inner = Self::Inner { + thread_manager: ThreadManager::new(pool), + usage_queue_loader: UsageQueueLoader::default(), + }; + inner .thread_manager - .start_threads(&scheduler.context); - scheduler + .start_threads(context.clone(), result_with_timings); + Self { inner, context } } }