diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index 8758eb1179ef9..1fe84275f83a9 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -187,7 +187,6 @@ use std::vec; use std::sync::arc::UnsafeArc; use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor}; -use sleeper_list::SleeperList; use stack::StackPool; use task::GreenTask; @@ -303,7 +302,6 @@ pub struct SchedPool { priv next_friend: uint, priv stack_pool: StackPool, priv deque_pool: deque::BufferPool<~task::GreenTask>, - priv sleepers: SleeperList, priv factory: fn() -> ~rtio::EventLoop, priv task_state: TaskState, priv tasks_done: Port<()>, @@ -340,7 +338,6 @@ impl SchedPool { handles: ~[], stealers: ~[], id: unsafe { POOL_ID.fetch_add(1, SeqCst) }, - sleepers: SleeperList::new(), stack_pool: StackPool::new(), deque_pool: deque::BufferPool::new(), next_friend: 0, @@ -358,17 +355,23 @@ impl SchedPool { // Now that we've got all our work queues, create one scheduler per // queue, spawn the scheduler into a thread, and be sure to keep a // handle to the scheduler and the thread to keep them alive. - for worker in workers.move_iter() { + let mut scheds = workers.move_iter() + .map(|worker| ~Scheduler::new(pool.id, + (pool.factory)(), + worker, + pool.stealers.clone(), + pool.task_state.clone())) + .to_owned_vec(); + // Assign left and right neighbors to each scheduler + for i in range(0, scheds.len()) { + let left = if i > 0 { i - 1 } else { scheds.len() - 1 }; + let right = if (i + 1) < scheds.len() { i + 1 } else { 0 }; + scheds[i].left_sched = Some(scheds[left].make_handle()); + scheds[i].right_sched = Some(scheds[right].make_handle()); + } + for mut sched in scheds.move_iter() { rtdebug!("inserting a regular scheduler"); - - let mut sched = ~Scheduler::new(pool.id, - (pool.factory)(), - worker, - pool.stealers.clone(), - pool.sleepers.clone(), - pool.task_state.clone()); pool.handles.push(sched.make_handle()); - let sched = sched; pool.threads.push(Thread::start(proc() { sched.bootstrap(); })); } @@ -426,11 +429,10 @@ impl SchedPool { (self.factory)(), worker, self.stealers.clone(), - self.sleepers.clone(), self.task_state.clone()); + // TODO(derekchiang): setup the scheduler's neighbors let ret = sched.make_handle(); self.handles.push(sched.make_handle()); - let sched = sched; self.threads.push(Thread::start(proc() { sched.bootstrap() })); return ret; @@ -447,7 +449,6 @@ impl SchedPool { /// native tasks or extern pools will not be waited on pub fn shutdown(mut self) { self.stealers = ~[]; - // Wait for everyone to exit. We may have reached a 0-task count // multiple times in the past, meaning there could be several buffered // messages on the `tasks_done` port. We're guaranteed that after *some* @@ -456,7 +457,6 @@ impl SchedPool { while self.task_state.active() { self.tasks_done.recv(); } - // Now that everyone's gone, tell everything to shut down. for mut handle in replace(&mut self.handles, ~[]).move_iter() { handle.send(Shutdown); diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index ad32ba7ba6d1c..cf4be1742a8e8 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -9,11 +9,14 @@ // except according to those terms. use std::cast; +use std::libc::funcs::posix88::unistd; use std::rand::{XorShiftRng, Rng, Rand}; use std::rt::local::Local; use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop}; use std::rt::task::BlockedTask; use std::rt::task::Task; +use std::sync::arc::UnsafeArc; +use std::sync::atomics::{AtomicBool, SeqCst}; use std::sync::deque; use std::unstable::mutex::NativeMutex; use std::unstable::raw; @@ -21,11 +24,15 @@ use std::unstable::raw; use TaskState; use context::Context; use coroutine::Coroutine; -use sleeper_list::SleeperList; use stack::StackPool; use task::{TypeSched, GreenTask, HomeSched, AnySched}; use msgq = message_queue; +/// By default, a scheduler tries to back off three times before it +/// goes to sleep. +/// TODO: Make this value configurable. +static EXPONENTIAL_BACKOFF_FACTOR: uint = 3; + /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified /// Rust Task. When not running this task is stored in the scheduler @@ -50,19 +57,21 @@ pub struct Scheduler { message_queue: msgq::Consumer, /// Producer used to clone sched handles from message_producer: msgq::Producer, - /// A shared list of sleeping schedulers. We'll use this to wake - /// up schedulers when pushing work onto the work queue. - sleeper_list: SleeperList, /// Indicates that we have previously pushed a handle onto the /// SleeperList but have not yet received the Wake message. /// Being `true` does not necessarily mean that the scheduler is /// not active since there are multiple event sources that may /// wake the scheduler. It just prevents the scheduler from pushing /// multiple handles onto the sleeper list. - sleepy: bool, + sleepy: UnsafeArc, /// A flag to indicate we've received the shutdown message and should /// no longer try to go to sleep, but exit instead. no_sleep: bool, + /// We only go to sleep when backoff_counter hits EXPONENTIAL_BACKOFF_FACTOR. + backoff_counter: uint, + /// A scheduler only ever tries to wake up its two neighboring neighbors + left_sched: Option, + right_sched: Option, stack_pool: StackPool, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. @@ -124,12 +133,11 @@ impl Scheduler { event_loop: ~EventLoop, work_queue: deque::Worker<~GreenTask>, work_queues: ~[deque::Stealer<~GreenTask>], - sleeper_list: SleeperList, state: TaskState) -> Scheduler { Scheduler::new_special(pool_id, event_loop, work_queue, work_queues, - sleeper_list, true, None, state) + true, None, state) } @@ -137,7 +145,6 @@ impl Scheduler { event_loop: ~EventLoop, work_queue: deque::Worker<~GreenTask>, work_queues: ~[deque::Stealer<~GreenTask>], - sleeper_list: SleeperList, run_anything: bool, friend: Option, state: TaskState) @@ -146,11 +153,13 @@ impl Scheduler { let (consumer, producer) = msgq::queue(); let mut sched = Scheduler { pool_id: pool_id, - sleeper_list: sleeper_list, message_queue: consumer, message_producer: producer, - sleepy: false, + sleepy: UnsafeArc::new(AtomicBool::new(false)), no_sleep: false, + backoff_counter: 0, + left_sched: None, + right_sched: None, event_loop: event_loop, work_queue: work_queue, work_queues: work_queues, @@ -324,18 +333,24 @@ impl Scheduler { // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - if !sched.sleepy && !sched.no_sleep { - rtdebug!("scheduler has no work to do, going to sleep"); - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - // Since we are sleeping, deactivate the idle callback. - sched.idle_callback.get_mut_ref().pause(); - } else { - rtdebug!("not sleeping, already doing so or no_sleep set"); - // We may not be sleeping, but we still need to deactivate - // the idle callback. - sched.idle_callback.get_mut_ref().pause(); + unsafe { + if !sched.no_sleep && !(*sched.sleepy.get()).load(SeqCst) { + if sched.backoff_counter == EXPONENTIAL_BACKOFF_FACTOR { + sched.backoff_counter = 0; + rtdebug!("scheduler has no work to do, going to sleep"); + (*sched.sleepy.get()).store(true, SeqCst); + // Since we are sleeping, deactivate the idle callback. + sched.idle_callback.get_mut_ref().pause(); + } else if !sched.event_loop.has_active_io() { + unistd::usleep((1 << sched.backoff_counter) * 1000u as u32); + sched.backoff_counter += 1; + } + } else { + rtdebug!("not sleeping, already doing so or no_sleep set"); + // We may not be sleeping, but we still need to deactivate + // the idle callback. + sched.idle_callback.get_mut_ref().pause(); + } } // Finished a cycle without using the Scheduler. Place it back @@ -397,30 +412,30 @@ impl Scheduler { (sched, task, true) } Some(Wake) => { - self.sleepy = false; - (self, stask, true) + unsafe { + (*self.sleepy.get()).store(false, SeqCst); + (self, stask, true) + } } Some(Shutdown) => { rtdebug!("shutting down"); - if self.sleepy { - // There may be an outstanding handle on the - // sleeper list. Pop them all to make sure that's - // not the case. - loop { - match self.sleeper_list.pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake); - } - None => break - } + unsafe { + if (*self.sleepy.get()).load(SeqCst) { + match self.left_sched.take() { + Some(mut sched) => sched.wakeup_if_sleepy(), + None => () + }; + match self.right_sched.take() { + Some(mut sched) => sched.wakeup_if_sleepy(), + None => () + }; } + // No more sleeping. After there are no outstanding + // event loop references we will shut down. + self.no_sleep = true; + (*self.sleepy.get()).store(false, SeqCst); + (self, stask, true) } - // No more sleeping. After there are no outstanding - // event loop references we will shut down. - self.no_sleep = true; - self.sleepy = false; - (self, stask, true) } Some(NewNeighbor(neighbor)) => { self.work_queues.push(neighbor); @@ -586,14 +601,9 @@ impl Scheduler { // We've made work available. Notify a // sleeping scheduler. - - match self.sleeper_list.casual_pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake) - } - None => { (/* pass */) } - }; + if !self.left_sched.mutate(|mut sched| { sched.wakeup_if_sleepy(); sched }) { + self.right_sched.mutate(|mut sched| { sched.wakeup_if_sleepy(); sched }); + } } // * Core Context Switching Functions @@ -864,6 +874,7 @@ impl Scheduler { return SchedHandle { remote: remote, + sleepy: self.sleepy.clone(), queue: self.message_producer.clone(), sched_id: self.sched_id() } @@ -887,6 +898,9 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, priv queue: msgq::Producer, + // Under the current design, a scheduler always outlives the handles + // pointing to it, so it's safe to use an unsafe pointer here + priv sleepy: UnsafeArc, sched_id: uint } @@ -895,6 +909,14 @@ impl SchedHandle { self.queue.push(msg); self.remote.fire(); } + + fn wakeup_if_sleepy(&mut self) { + unsafe { + if (*self.sleepy.get()).load(SeqCst) { + self.send(Wake); + } + } + } } struct SchedRunner;