diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index e84677e6007fff..aeededab8ee784 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -25,8 +25,8 @@ use { log::*, solana_program_runtime::timings::ExecuteTimings, solana_sdk::{ + clock::Slot, hash::Hash, - slot_history::Slot, transaction::{Result, SanitizedTransaction, TransactionError}, }, std::{ @@ -34,6 +34,7 @@ use { mem, ops::Deref, sync::{Arc, RwLock}, + thread, }, }; #[cfg(feature = "dev-context-only-utils")] @@ -623,7 +624,7 @@ impl BankWithSchedulerInner { "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...", bank.slot(), reason, - std::thread::current(), + thread::current(), ); let mut scheduler = scheduler.write().unwrap(); @@ -656,7 +657,7 @@ impl BankWithSchedulerInner { reason, was_noop, result_with_timings.as_ref().map(|(result, _)| result), - std::thread::current(), + thread::current(), ); trace!( "wait_for_scheduler_termination(result_with_timings: {:?})", @@ -667,7 +668,7 @@ impl BankWithSchedulerInner { } fn drop_scheduler(&self) { - if std::thread::panicking() { + if thread::panicking() { error!( "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...", self.bank.slot(), diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 1a2706fb284601..c57217285f4762 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -761,23 +761,6 @@ struct ThreadManager, TH: TaskHandler> { handler_threads: Vec>, } -impl PooledScheduler { - fn do_spawn( - pool: Arc>, - initial_context: SchedulingContext, - result_with_timings: ResultWithTimings, - ) -> Self { - Self::from_inner( - PooledSchedulerInner:: { - thread_manager: ThreadManager::new(pool), - usage_queue_loader: UsageQueueLoader::default(), - }, - initial_context, - result_with_timings, - ) - } -} - struct HandlerPanicked; type HandlerResult = std::result::Result, HandlerPanicked>; @@ -847,7 +830,15 @@ impl, TH: TaskHandler> ThreadManager { ); } - fn start_threads(&mut self, context: &SchedulingContext) { + // This method must take same set of session-related arguments as start_session() to avoid + // unneeded channel operations to minimize overhead. Starting threads incurs a very high cost + // already... Also, pre-creating threads isn't desirable as well to avoid `Option`-ed types + // for type safety. + fn start_threads( + &mut self, + context: SchedulingContext, + mut result_with_timings: ResultWithTimings, + ) { // Firstly, setup bi-directional messaging between the scheduler and handlers to pass // around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to // the handlers and the other for finished tasks from the handlers to the scheduler). @@ -925,7 +916,7 @@ impl, TH: TaskHandler> ThreadManager { // prioritization further. Consequently, this also contributes to alleviate the known // heuristic's caveat for the first task of linearized runs, which is described above. let (mut runnable_task_sender, runnable_task_receiver) = - chained_channel::unbounded::(context.clone()); + chained_channel::unbounded::(context); // Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks, // because it is more likely that a blocked task will have more blocked tasks behind it, // which should be scheduled while minimizing the delay to clear buffered linearized runs @@ -944,7 +935,7 @@ impl, TH: TaskHandler> ThreadManager { // 4. the handler thread processes the dispatched task. // 5. the handler thread reply back to the scheduler thread as an executed task. // 6. the scheduler thread post-processes the executed task. - let scheduler_main_loop = || { + let scheduler_main_loop = { let handler_count = self.pool.handler_count; let session_result_sender = self.session_result_sender.clone(); // Taking new_task_receiver here is important to ensure there's a single receiver. In @@ -1008,29 +999,14 @@ impl, TH: TaskHandler> ThreadManager { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let mut result_with_timings = initialized_result_with_timings(); + // The following loop maintains and updates ResultWithTimings as its + // externally-provided mutable state for each session in this way: + // + // 1. Initial result_with_timing is propagated implicitly by the moved variable. + // 2. Subsequent result_with_timings are propagated explicitly from + // the new_task_receiver.recv() invocation located at the end of loop. 'nonaborted_main_loop: loop { - match new_task_receiver.recv() { - Ok(NewTaskPayload::OpenSubchannel(( - new_context, - new_result_with_timings, - ))) => { - // signal about new SchedulingContext to handler threads - runnable_task_sender - .send_chained_channel(new_context, handler_count) - .unwrap(); - result_with_timings = new_result_with_timings; - } - Ok(_) => { - unreachable!(); - } - Err(_) => { - // This unusual condition must be triggered by ThreadManager::drop(); - break 'nonaborted_main_loop; - } - } - let mut is_finished = false; while !is_finished { // ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl, @@ -1081,9 +1057,8 @@ impl, TH: TaskHandler> ThreadManager { Ok(NewTaskPayload::CloseSubchannel) => { session_ending = true; } - Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) => { - unreachable!(); - } + Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) => + unreachable!(), Err(RecvError) => { // Mostly likely is that this scheduler is dropped for pruned blocks of // abandoned forks... @@ -1106,15 +1081,36 @@ impl, TH: TaskHandler> ThreadManager { is_finished = session_ending && state_machine.has_no_active_task(); } - if session_ending { - state_machine.reinitialize(); - session_result_sender - .send(std::mem::replace( - &mut result_with_timings, - initialized_result_with_timings(), - )) - .expect("always outlived receiver"); - session_ending = false; + // Finalize the current session after asserting it's explicitly requested so. + assert!(session_ending); + // Send result first because this is blocking the replay code-path. + session_result_sender + .send(result_with_timings) + .expect("always outlived receiver"); + state_machine.reinitialize(); + session_ending = false; + + // Prepare for the new session. + match new_task_receiver.recv() { + Ok(NewTaskPayload::OpenSubchannel(( + new_context, + new_result_with_timings, + ))) => { + // We just received subsequent (= not initial) session and about to + // enter into the preceding `while(!is_finished) {...}` loop again. + // Before that, propagate new SchedulingContext to handler threads + runnable_task_sender + .send_chained_channel(new_context, handler_count) + .unwrap(); + result_with_timings = new_result_with_timings; + } + Err(_) => { + // This unusual condition must be triggered by ThreadManager::drop(). + // Initialize result_with_timings with a harmless value... + result_with_timings = initialized_result_with_timings(); + break 'nonaborted_main_loop; + } + Ok(_) => unreachable!(), } } @@ -1147,6 +1143,14 @@ impl, TH: TaskHandler> ThreadManager { let finished_blocked_task_sender = finished_blocked_task_sender.clone(); let finished_idle_task_sender = finished_idle_task_sender.clone(); + // The following loop maintains and updates SchedulingContext as its + // externally-provided state for each session in this way: + // + // 1. Initial context is propagated implicitly by the moved runnable_task_receiver, + // which is clone()-d just above for this particular thread. + // 2. Subsequent contexts are propagated explicitly inside `.after_select()` as part of + // `select_biased!`, which are sent from `.send_chained_channel()` in the scheduler + // thread for all-but-initial sessions. move || loop { let (task, sender) = select_biased! { recv(runnable_task_receiver.for_select()) -> message => { @@ -1201,7 +1205,7 @@ impl, TH: TaskHandler> ThreadManager { self.scheduler_thread = Some( thread::Builder::new() .name("solScheduler".to_owned()) - .spawn_tracked(scheduler_main_loop()) + .spawn_tracked(scheduler_main_loop) .unwrap(), ); @@ -1322,13 +1326,14 @@ impl, TH: TaskHandler> ThreadManager { fn start_session( &mut self, - context: &SchedulingContext, + context: SchedulingContext, result_with_timings: ResultWithTimings, ) { + assert!(!self.are_threads_joined()); assert_matches!(self.session_result_with_timings, None); self.new_task_sender .send(NewTaskPayload::OpenSubchannel(( - context.clone(), + context, result_with_timings, ))) .expect("no new session after aborted"); @@ -1348,7 +1353,7 @@ pub trait SpawnableScheduler: InstalledScheduler { fn spawn( pool: Arc>, - initial_context: SchedulingContext, + context: SchedulingContext, result_with_timings: ResultWithTimings, ) -> Self where @@ -1374,21 +1379,23 @@ impl SpawnableScheduler for PooledScheduler { ) -> Self { inner .thread_manager - .start_session(&context, result_with_timings); + .start_session(context.clone(), result_with_timings); Self { inner, context } } fn spawn( pool: Arc>, - initial_context: SchedulingContext, + context: SchedulingContext, result_with_timings: ResultWithTimings, ) -> Self { - let mut scheduler = Self::do_spawn(pool, initial_context, result_with_timings); - scheduler - .inner + let mut inner = Self::Inner { + thread_manager: ThreadManager::new(pool), + usage_queue_loader: UsageQueueLoader::default(), + }; + inner .thread_manager - .start_threads(&scheduler.context); - scheduler + .start_threads(context.clone(), result_with_timings); + Self { inner, context } } } @@ -2770,13 +2777,13 @@ mod tests { fn spawn( pool: Arc>, - initial_context: SchedulingContext, + context: SchedulingContext, _result_with_timings: ResultWithTimings, ) -> Self { AsyncScheduler::( Mutex::new(initialized_result_with_timings()), Mutex::new(vec![]), - initial_context, + context, pool, ) }