From 34a9691d4cbb0e767676511572466ac473eda869 Mon Sep 17 00:00:00 2001 From: toddaaro Date: Thu, 15 Aug 2013 19:46:23 -0700 Subject: [PATCH] A round of code cleaning for the primary scheduler code. Comments have been updated, a minor amount of support type restructing has happened, methods have been reordered, and some duplicate code has been purged. --- src/libstd/rt/sched.rs | 484 ++++++++++++++++++----------------------- src/libstd/rt/task.rs | 2 +- src/libstd/rt/util.rs | 3 +- 3 files changed, 209 insertions(+), 280 deletions(-) diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ce4e64c47d2ef..a6e2096e688ed 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -31,10 +31,11 @@ use rand::{XorShiftRng, RngUtil}; use iterator::{range}; use vec::{OwnedVector}; -/// The Scheduler is responsible for coordinating execution of Coroutines -/// on a single thread. When the scheduler is running it is owned by -/// thread local storage and the running task is owned by the -/// scheduler. +/// A scheduler is responsible for coordinating the execution of Tasks +/// on a single thread. The scheduler runs inside a slightly modified +/// Rust Task. When not running this task is stored in the scheduler +/// struct. The scheduler struct acts like a baton, all scheduling +/// actions are transfers of the baton. /// /// XXX: This creates too many callbacks to run_sched_once, resulting /// in too much allocation and too many events. @@ -64,11 +65,12 @@ pub struct Scheduler { stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, - /// The scheduler runs on a special task. + /// The scheduler runs on a special task. When it is not running + /// it is stored here instead of the work queue. sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch - priv cleanup_job: Option, + cleanup_job: Option, metrics: SchedMetrics, /// Should this scheduler run any task, or only pinned tasks? run_anything: bool, @@ -77,30 +79,11 @@ pub struct Scheduler { friend_handle: Option, /// A fast XorShift rng for scheduler use rng: XorShiftRng - -} - -pub struct SchedHandle { - priv remote: ~RemoteCallbackObject, - priv queue: MessageQueue, - sched_id: uint -} - -pub enum SchedMessage { - Wake, - Shutdown, - PinnedTask(~Task), - TaskFromFriend(~Task) -} - -enum CleanupJob { - DoNothing, - GiveTask(~Task, UnsafeTaskReceiver) } impl Scheduler { - pub fn sched_id(&self) -> uint { to_uint(self) } + // * Initialization Functions pub fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, @@ -114,8 +97,6 @@ impl Scheduler { } - // When you create a scheduler it isn't yet "in" a task, so the - // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], @@ -173,7 +154,6 @@ impl Scheduler { let sched = Local::take::(); rtdebug!("starting scheduler %u", sched.sched_id()); - sched.run(); // Now that we are done with the scheduler, clean up the @@ -181,15 +161,15 @@ impl Scheduler { // cleaning up the memory it uses. As we didn't actually call // task.run() on the scheduler task we never get through all // the cleanup code it runs. - let mut stask = Local::take::(); + let mut sched_task = Local::take::(); - rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id()); + rtdebug!("stopping scheduler %u", sched_task.sched.get_ref().sched_id()); // Should not have any messages - let message = stask.sched.get_mut_ref().message_queue.pop(); + let message = sched_task.sched.get_mut_ref().message_queue.pop(); assert!(message.is_none()); - stask.destroyed = true; + sched_task.destroyed = true; } // This does not return a scheduler, as the scheduler is placed @@ -221,11 +201,11 @@ impl Scheduler { } } - // One iteration of the scheduler loop, always run at least once. + // * Execution Functions - Core Loop Logic // The model for this function is that you continue through it // until you either use the scheduler while performing a schedule - // action, in which case you give it away and do not return, or + // action, in which case you give it away and return early, or // you reach the end and sleep. In the case that a scheduler // action is performed the loop is evented such that this function // is called again. @@ -237,39 +217,18 @@ impl Scheduler { // scheduler where we get the scheduler this way. let sched = Local::take::(); - // Our first task is to read mail to see if we have important - // messages. - - // 1) A wake message is easy, mutate sched struct and return - // it. - // 2) A shutdown is also easy, shutdown. - // 3) A pinned task - we resume immediately and do not return - // here. - // 4) A message from another scheduler with a non-homed task - // to run here. - - let result = sched.interpret_message_queue(); - let sched = match result { - Some(sched) => { - // We did not resume a task, so we returned. - sched - } - None => { - return; - } + // First we check for scheduler messages, these are higher + // priority than regular tasks. + let sched = match sched.interpret_message_queue() { + Some(sched) => sched, + None => return }; - // Second activity is to try resuming a task from the queue. - - let result = sched.do_work(); - let mut sched = match result { - Some(sched) => { - // Failed to dequeue a task, so we return. - sched - } - None => { - return; - } + // This helper will use a randomized work-stealing algorithm + // to find work. + let mut sched = match sched.do_work() { + Some(sched) => sched, + None => return }; // If we got here then there was no work to do. @@ -291,62 +250,10 @@ impl Scheduler { Local::put(sched); } - pub fn make_handle(&mut self) -> SchedHandle { - let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); - - return SchedHandle { - remote: remote, - queue: self.message_queue.clone(), - sched_id: self.sched_id() - }; - } - - /// Schedule a task to be executed later. - /// - /// Pushes the task onto the work stealing queue and tells the - /// event loop to run it later. Always use this instead of pushing - /// to the work queue directly. - pub fn enqueue_task(&mut self, task: ~Task) { - - let this = self; - - // We push the task onto our local queue clone. - this.work_queue.push(task); - this.event_loop.callback(Scheduler::run_sched_once); - - // We've made work available. Notify a - // sleeping scheduler. - - // XXX: perf. Check for a sleeper without - // synchronizing memory. It's not critical - // that we always find it. - - // XXX: perf. If there's a sleeper then we - // might as well just send it the task - // directly instead of pushing it to the - // queue. That is essentially the intent here - // and it is less work. - match this.sleeper_list.pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake) - } - None => { (/* pass */) } - }; - } - - /// As enqueue_task, but with the possibility for the blocked task to - /// already have been killed. - pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) { - do blocked_task.wake().map_move |task| { - self.enqueue_task(task); - }; - } - - // * Scheduler-context operations - // This function returns None if the scheduler is "used", or it - // returns the still-available scheduler. + // returns the still-available scheduler. At this point all + // message-handling will count as a turn of work, and as a result + // return None. fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; @@ -361,12 +268,14 @@ impl Scheduler { Some(TaskFromFriend(task)) => { this.event_loop.callback(Scheduler::run_sched_once); rtdebug!("got a task from a friend. lovely!"); - return this.sched_schedule_task(task); + return this.process_task(task, + Scheduler::resume_task_immediately_cl); } Some(Wake) => { this.event_loop.callback(Scheduler::run_sched_once); this.sleepy = false; - return Some(this); + Local::put(this); + return None; } Some(Shutdown) => { this.event_loop.callback(Scheduler::run_sched_once); @@ -388,11 +297,8 @@ impl Scheduler { // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; - // YYY: Does a shutdown count as a "use" of the - // scheduler? This seems to work - so I'm leaving it - // this way despite not having a solid rational for - // why I should return the scheduler here. - return Some(this); + Local::put(this); + return None; } None => { return Some(this); @@ -400,30 +306,19 @@ impl Scheduler { } } - /// Given an input Coroutine sends it back to its home scheduler. - fn send_task_home(task: ~Task) { - let mut task = task; - let mut home = task.take_unwrap_home(); - match home { - Sched(ref mut home_handle) => { - home_handle.send(PinnedTask(task)); - } - AnySched => { - rtabort!("error: cannot send anysched task home"); - } - } - } + fn do_work(~self) -> Option<~Scheduler> { + let mut this = self; - /// Take a non-homed task we aren't allowed to run here and send - /// it to the designated friend scheduler to execute. - fn send_to_friend(&mut self, task: ~Task) { - rtdebug!("sending a task to friend"); - match self.friend_handle { - Some(ref mut handle) => { - handle.send(TaskFromFriend(task)); + rtdebug!("scheduler calling do work"); + match this.find_work() { + Some(task) => { + rtdebug!("found some work! processing the task"); + return this.process_task(task, + Scheduler::resume_task_immediately_cl); } None => { - rtabort!("tried to send task to a friend but scheduler has no friends"); + rtdebug!("no work was found, returning the scheduler struct"); + return Some(this); } } } @@ -447,8 +342,8 @@ impl Scheduler { None => { // Our naive stealing, try kinda hard. rtdebug!("scheduler trying to steal"); - let _len = self.work_queues.len(); - return self.try_steals(2); + let len = self.work_queues.len(); + return self.try_steals(len/2); } } } @@ -462,7 +357,8 @@ impl Scheduler { let work_queues = &mut self.work_queues; match work_queues[index].steal() { Some(task) => { - rtdebug!("found task by stealing"); return Some(task) + rtdebug!("found task by stealing"); + return Some(task) } None => () } @@ -471,8 +367,11 @@ impl Scheduler { return None; } - // Given a task, execute it correctly. - fn process_task(~self, task: ~Task) -> Option<~Scheduler> { + // * Task Routing Functions - Make sure tasks send up in the right + // place. + + fn process_task(~self, task: ~Task, + schedule_fn: SchedulingFn) -> Option<~Scheduler> { let mut this = self; let mut task = task; @@ -489,15 +388,13 @@ impl Scheduler { } else { rtdebug!("running task here"); task.give_home(Sched(home_handle)); - this.resume_task_immediately(task); - return None; + return schedule_fn(this, task); } } AnySched if this.run_anything => { rtdebug!("running anysched task here"); task.give_home(AnySched); - this.resume_task_immediately(task); - return None; + return schedule_fn(this, task); } AnySched => { rtdebug!("sending task to friend"); @@ -508,98 +405,71 @@ impl Scheduler { } } - // Bundle the helpers together. - fn do_work(~self) -> Option<~Scheduler> { - let mut this = self; - - rtdebug!("scheduler calling do work"); - match this.find_work() { - Some(task) => { - rtdebug!("found some work! processing the task"); - return this.process_task(task); + fn send_task_home(task: ~Task) { + let mut task = task; + let mut home = task.take_unwrap_home(); + match home { + Sched(ref mut home_handle) => { + home_handle.send(PinnedTask(task)); } - None => { - rtdebug!("no work was found, returning the scheduler struct"); - return Some(this); + AnySched => { + rtabort!("error: cannot send anysched task home"); } } } - /// Called by a running task to end execution, after which it will - /// be recycled by the scheduler for reuse in a new task. - pub fn terminate_current_task(~self) { - // Similar to deschedule running task and then, but cannot go through - // the task-blocking path. The task is already dying. - let mut this = self; - let stask = this.sched_task.take_unwrap(); - do this.change_task_context(stask) |sched, mut dead_task| { - let coroutine = dead_task.coroutine.take_unwrap(); - coroutine.recycle(&mut sched.stack_pool); + /// Take a non-homed task we aren't allowed to run here and send + /// it to the designated friend scheduler to execute. + fn send_to_friend(&mut self, task: ~Task) { + rtdebug!("sending a task to friend"); + match self.friend_handle { + Some(ref mut handle) => { + handle.send(TaskFromFriend(task)); + } + None => { + rtabort!("tried to send task to a friend but scheduler has no friends"); + } } } - // Scheduling a task requires a few checks to make sure the task - // ends up in the appropriate location. The run_anything flag on - // the scheduler and the home on the task need to be checked. This - // helper performs that check. It takes a function that specifies - // how to queue the the provided task if that is the correct - // action. This is a "core" function that requires handling the - // returned Option correctly. - - pub fn schedule_task(~self, task: ~Task, - schedule_fn: ~fn(sched: ~Scheduler, task: ~Task)) - -> Option<~Scheduler> { - - // is the task home? - let is_home = task.is_home_no_tls(&self); + /// Schedule a task to be executed later. + /// + /// Pushes the task onto the work stealing queue and tells the + /// event loop to run it later. Always use this instead of pushing + /// to the work queue directly. + pub fn enqueue_task(&mut self, task: ~Task) { - // does the task have a home? - let homed = task.homed(); + let this = self; - let mut this = self; + // We push the task onto our local queue clone. + this.work_queue.push(task); + this.event_loop.callback(Scheduler::run_sched_once); - if is_home || (!homed && this.run_anything) { - // here we know we are home, execute now OR we know we - // aren't homed, and that this sched doesn't care - rtdebug!("task: %u is on ok sched, executing", to_uint(task)); - schedule_fn(this, task); - return None; - } else if !homed && !this.run_anything { - // the task isn't homed, but it can't be run here - this.send_to_friend(task); - return Some(this); - } else { - // task isn't home, so don't run it here, send it home - Scheduler::send_task_home(task); - return Some(this); - } - } + // We've made work available. Notify a + // sleeping scheduler. - // There are two contexts in which schedule_task can be called: - // inside the scheduler, and inside a task. These contexts handle - // executing the task slightly differently. In the scheduler - // context case we want to receive the scheduler as an input, and - // manually deal with the option. In the task context case we want - // to use TLS to find the scheduler, and deal with the option - // inside the helper. - - pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> { - do self.schedule_task(task) |sched, next_task| { - sched.resume_task_immediately(next_task); - } + // XXX: perf. Check for a sleeper without + // synchronizing memory. It's not critical + // that we always find it. + match this.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake) + } + None => { (/* pass */) } + }; } - // Task context case - use TLS. - pub fn run_task(task: ~Task) { - let sched = Local::take::(); - let opt = do sched.schedule_task(task) |sched, next_task| { - do sched.switch_running_tasks_and_then(next_task) |sched, last_task| { - sched.enqueue_blocked_task(last_task); - } + /// As enqueue_task, but with the possibility for the blocked task to + /// already have been killed. + pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) { + do blocked_task.wake().map_move |task| { + self.enqueue_task(task); }; - opt.map_move(Local::put); } + // * Core Context Switching Functions + // The primary function for changing contexts. In the current // design the scheduler is just a slightly modified GreenTask, so // all context swaps are from Task to Task. The only difference @@ -629,7 +499,7 @@ impl Scheduler { // The current task is placed inside an enum with the cleanup // function. This enum is then placed inside the scheduler. - this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); + this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque)); // The scheduler is then placed inside the next task. let mut next_task = next_task; @@ -645,12 +515,9 @@ impl Scheduler { transmute_mut_region(*next_task.sched.get_mut_ref()); let current_task: &mut Task = match sched.cleanup_job { - Some(GiveTask(ref task, _)) => { + Some(CleanupJob { task: ref task, _ }) => { transmute_mut_region(*transmute_mut_unsafe(task)) } - Some(DoNothing) => { - rtabort!("no next task"); - } None => { rtabort!("no cleanup job"); } @@ -684,19 +551,42 @@ impl Scheduler { } } - // Old API for task manipulation implemented over the new core - // function. + // Returns a mutable reference to both contexts involved in this + // swap. This is unsafe - we are getting mutable internal + // references to keep even when we don't own the tasks. It looks + // kinda safe because we are doing transmutes before passing in + // the arguments. + pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> + (&'a mut Context, &'a mut Context) { + let current_task_context = + &mut current_task.coroutine.get_mut_ref().saved_context; + let next_task_context = + &mut next_task.coroutine.get_mut_ref().saved_context; + unsafe { + (transmute_mut_region(current_task_context), + transmute_mut_region(next_task_context)) + } + } + + // * Context Swapping Helpers - Here be ugliness! - pub fn resume_task_immediately(~self, task: ~Task) { + pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> { do self.change_task_context(task) |sched, stask| { sched.sched_task = Some(stask); } + return None; + } + + fn resume_task_immediately_cl(sched: ~Scheduler, + task: ~Task) -> Option<~Scheduler> { + sched.resume_task_immediately(task) } + pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) { match blocked_task.wake() { - Some(task) => self.resume_task_immediately(task), - None => Local::put(self), + Some(task) => { self.resume_task_immediately(task); } + None => Local::put(self) }; } @@ -735,54 +625,75 @@ impl Scheduler { } } - // A helper that looks up the scheduler and runs a task later by - // enqueuing it. + fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> { + do sched.switch_running_tasks_and_then(task) |sched, last_task| { + sched.enqueue_blocked_task(last_task); + }; + return None; + } + + // * Task Context Helpers + + /// Called by a running task to end execution, after which it will + /// be recycled by the scheduler for reuse in a new task. + pub fn terminate_current_task(~self) { + // Similar to deschedule running task and then, but cannot go through + // the task-blocking path. The task is already dying. + let mut this = self; + let stask = this.sched_task.take_unwrap(); + do this.change_task_context(stask) |sched, mut dead_task| { + let coroutine = dead_task.coroutine.take_unwrap(); + coroutine.recycle(&mut sched.stack_pool); + } + } + + pub fn run_task(task: ~Task) { + let sched = Local::take::(); + sched.process_task(task, Scheduler::switch_task).map_move(Local::put); + } + pub fn run_task_later(next_task: ~Task) { - // We aren't performing a scheduler operation, so we want to - // put the Scheduler back when we finish. let next_task = Cell::new(next_task); do Local::borrow:: |sched| { sched.enqueue_task(next_task.take()); }; } - // Returns a mutable reference to both contexts involved in this - // swap. This is unsafe - we are getting mutable internal - // references to keep even when we don't own the tasks. It looks - // kinda safe because we are doing transmutes before passing in - // the arguments. - pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> - (&'a mut Context, &'a mut Context) { - let current_task_context = - &mut current_task.coroutine.get_mut_ref().saved_context; - let next_task_context = - &mut next_task.coroutine.get_mut_ref().saved_context; - unsafe { - (transmute_mut_region(current_task_context), - transmute_mut_region(next_task_context)) - } - } + // * Utility Functions - pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - self.cleanup_job = Some(job); - } + pub fn sched_id(&self) -> uint { to_uint(self) } pub fn run_cleanup_job(&mut self) { - rtdebug!("running cleanup job"); let cleanup_job = self.cleanup_job.take_unwrap(); - match cleanup_job { - DoNothing => { } - GiveTask(task, f) => f.to_fn()(self, task) - } + cleanup_job.run(self); } + + pub fn make_handle(&mut self) -> SchedHandle { + let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); + + return SchedHandle { + remote: remote, + queue: self.message_queue.clone(), + sched_id: self.sched_id() + }; + } +} + +// Supporting types + +type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>; + +pub enum SchedMessage { + Wake, + Shutdown, + PinnedTask(~Task), + TaskFromFriend(~Task) } -// The cases for the below function. -enum ResumeAction { - SendHome, - Requeue, - ResumeNow, - Homeless +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject, + priv queue: MessageQueue, + sched_id: uint } impl SchedHandle { @@ -792,6 +703,25 @@ impl SchedHandle { } } +struct CleanupJob { + task: ~Task, + f: UnsafeTaskReceiver +} + +impl CleanupJob { + pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob { + CleanupJob { + task: task, + f: f + } + } + + pub fn run(self, sched: &mut Scheduler) { + let CleanupJob { task: task, f: f } = self; + f.to_fn()(sched, task) + } +} + // XXX: Some hacks to put a &fn in Scheduler without borrowck // complaining type UnsafeTaskReceiver = raw::Closure; diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 708166518bb89..698c59805a4cf 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -360,7 +360,7 @@ impl Coroutine { // Again - might work while safe, or it might not. do Local::borrow:: |sched| { - (sched).run_cleanup_job(); + sched.run_cleanup_job(); } // To call the run method on a task we need a direct diff --git a/src/libstd/rt/util.rs b/src/libstd/rt/util.rs index 6280b64ecf51c..8a2541e73b384 100644 --- a/src/libstd/rt/util.rs +++ b/src/libstd/rt/util.rs @@ -38,8 +38,7 @@ pub fn default_sched_threads() -> uint { pub fn dumb_println(s: &str) { use io::WriterUtil; let dbg = ::libc::STDERR_FILENO as ::io::fd_t; - dbg.write_str(s); - dbg.write_str("\n"); + dbg.write_str(s + "\n"); } pub fn abort(msg: &str) -> ! {