Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve green scheduler 2 #12254

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions src/libgreen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ use std::vec;
use std::sync::arc::UnsafeArc;

use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
use sleeper_list::SleeperList;
use stack::StackPool;
use task::GreenTask;

Expand Down Expand Up @@ -303,7 +302,6 @@ pub struct SchedPool {
priv next_friend: uint,
priv stack_pool: StackPool,
priv deque_pool: deque::BufferPool<~task::GreenTask>,
priv sleepers: SleeperList,
priv factory: fn() -> ~rtio::EventLoop,
priv task_state: TaskState,
priv tasks_done: Port<()>,
Expand Down Expand Up @@ -340,7 +338,6 @@ impl SchedPool {
handles: ~[],
stealers: ~[],
id: unsafe { POOL_ID.fetch_add(1, SeqCst) },
sleepers: SleeperList::new(),
stack_pool: StackPool::new(),
deque_pool: deque::BufferPool::new(),
next_friend: 0,
Expand All @@ -358,17 +355,23 @@ impl SchedPool {
// Now that we've got all our work queues, create one scheduler per
// queue, spawn the scheduler into a thread, and be sure to keep a
// handle to the scheduler and the thread to keep them alive.
for worker in workers.move_iter() {
let mut scheds = workers.move_iter()
.map(|worker| ~Scheduler::new(pool.id,
(pool.factory)(),
worker,
pool.stealers.clone(),
pool.task_state.clone()))
.to_owned_vec();
// Assign left and right neighbors to each scheduler
for i in range(0, scheds.len()) {
let left = if i > 0 { i - 1 } else { scheds.len() - 1 };
let right = if (i + 1) < scheds.len() { i + 1 } else { 0 };
scheds[i].left_sched = Some(scheds[left].make_handle());
scheds[i].right_sched = Some(scheds[right].make_handle());
}
for mut sched in scheds.move_iter() {
rtdebug!("inserting a regular scheduler");

let mut sched = ~Scheduler::new(pool.id,
(pool.factory)(),
worker,
pool.stealers.clone(),
pool.sleepers.clone(),
pool.task_state.clone());
pool.handles.push(sched.make_handle());
let sched = sched;
pool.threads.push(Thread::start(proc() { sched.bootstrap(); }));
}

Expand Down Expand Up @@ -426,11 +429,10 @@ impl SchedPool {
(self.factory)(),
worker,
self.stealers.clone(),
self.sleepers.clone(),
self.task_state.clone());
// TODO(derekchiang): setup the scheduler's neighbors
let ret = sched.make_handle();
self.handles.push(sched.make_handle());
let sched = sched;
self.threads.push(Thread::start(proc() { sched.bootstrap() }));

return ret;
Expand All @@ -447,7 +449,6 @@ impl SchedPool {
/// native tasks or extern pools will not be waited on
pub fn shutdown(mut self) {
self.stealers = ~[];

// Wait for everyone to exit. We may have reached a 0-task count
// multiple times in the past, meaning there could be several buffered
// messages on the `tasks_done` port. We're guaranteed that after *some*
Expand All @@ -456,7 +457,6 @@ impl SchedPool {
while self.task_state.active() {
self.tasks_done.recv();
}

// Now that everyone's gone, tell everything to shut down.
for mut handle in replace(&mut self.handles, ~[]).move_iter() {
handle.send(Shutdown);
Expand Down
120 changes: 71 additions & 49 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,30 @@
// except according to those terms.

use std::cast;
use std::libc::funcs::posix88::unistd;
use std::rand::{XorShiftRng, Rng, Rand};
use std::rt::local::Local;
use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop};
use std::rt::task::BlockedTask;
use std::rt::task::Task;
use std::sync::arc::UnsafeArc;
use std::sync::atomics::{AtomicBool, SeqCst};
use std::sync::deque;
use std::unstable::mutex::NativeMutex;
use std::unstable::raw;

use TaskState;
use context::Context;
use coroutine::Coroutine;
use sleeper_list::SleeperList;
use stack::StackPool;
use task::{TypeSched, GreenTask, HomeSched, AnySched};
use msgq = message_queue;

/// By default, a scheduler tries to back off three times before it
/// goes to sleep.
/// TODO: Make this value configurable.
static EXPONENTIAL_BACKOFF_FACTOR: uint = 3;

/// A scheduler is responsible for coordinating the execution of Tasks
/// on a single thread. The scheduler runs inside a slightly modified
/// Rust Task. When not running this task is stored in the scheduler
Expand All @@ -50,19 +57,21 @@ pub struct Scheduler {
message_queue: msgq::Consumer<SchedMessage>,
/// Producer used to clone sched handles from
message_producer: msgq::Producer<SchedMessage>,
/// A shared list of sleeping schedulers. We'll use this to wake
/// up schedulers when pushing work onto the work queue.
sleeper_list: SleeperList,
/// Indicates that we have previously pushed a handle onto the
/// SleeperList but have not yet received the Wake message.
/// Being `true` does not necessarily mean that the scheduler is
/// not active since there are multiple event sources that may
/// wake the scheduler. It just prevents the scheduler from pushing
/// multiple handles onto the sleeper list.
sleepy: bool,
sleepy: UnsafeArc<AtomicBool>,
/// A flag to indicate we've received the shutdown message and should
/// no longer try to go to sleep, but exit instead.
no_sleep: bool,
/// We only go to sleep when backoff_counter hits EXPONENTIAL_BACKOFF_FACTOR.
backoff_counter: uint,
/// A scheduler only ever tries to wake up its two neighboring neighbors
left_sched: Option<SchedHandle>,
right_sched: Option<SchedHandle>,
stack_pool: StackPool,
/// The scheduler runs on a special task. When it is not running
/// it is stored here instead of the work queue.
Expand Down Expand Up @@ -124,20 +133,18 @@ impl Scheduler {
event_loop: ~EventLoop,
work_queue: deque::Worker<~GreenTask>,
work_queues: ~[deque::Stealer<~GreenTask>],
sleeper_list: SleeperList,
state: TaskState)
-> Scheduler {

Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
sleeper_list, true, None, state)
true, None, state)

}

pub fn new_special(pool_id: uint,
event_loop: ~EventLoop,
work_queue: deque::Worker<~GreenTask>,
work_queues: ~[deque::Stealer<~GreenTask>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>,
state: TaskState)
Expand All @@ -146,11 +153,13 @@ impl Scheduler {
let (consumer, producer) = msgq::queue();
let mut sched = Scheduler {
pool_id: pool_id,
sleeper_list: sleeper_list,
message_queue: consumer,
message_producer: producer,
sleepy: false,
sleepy: UnsafeArc::new(AtomicBool::new(false)),
no_sleep: false,
backoff_counter: 0,
left_sched: None,
right_sched: None,
event_loop: event_loop,
work_queue: work_queue,
work_queues: work_queues,
Expand Down Expand Up @@ -324,18 +333,24 @@ impl Scheduler {
// If we got here then there was no work to do.
// Generate a SchedHandle and push it to the sleeper list so
// somebody can wake us up later.
if !sched.sleepy && !sched.no_sleep {
rtdebug!("scheduler has no work to do, going to sleep");
sched.sleepy = true;
let handle = sched.make_handle();
sched.sleeper_list.push(handle);
// Since we are sleeping, deactivate the idle callback.
sched.idle_callback.get_mut_ref().pause();
} else {
rtdebug!("not sleeping, already doing so or no_sleep set");
// We may not be sleeping, but we still need to deactivate
// the idle callback.
sched.idle_callback.get_mut_ref().pause();
unsafe {
if !sched.no_sleep && !(*sched.sleepy.get()).load(SeqCst) {
if sched.backoff_counter == EXPONENTIAL_BACKOFF_FACTOR {
sched.backoff_counter = 0;
rtdebug!("scheduler has no work to do, going to sleep");
(*sched.sleepy.get()).store(true, SeqCst);
// Since we are sleeping, deactivate the idle callback.
sched.idle_callback.get_mut_ref().pause();
} else if !sched.event_loop.has_active_io() {
unistd::usleep((1 << sched.backoff_counter) * 1000u as u32);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably be * 1000, or, if that doesn't work * 1000u32.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, @brson and I were thinking that 1000 is quite a large number to start out with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you get into millisecond territory, I don't think usleep should be used, but rather the event loop itself so I/O can wake up the scheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usleep is deprecated, please don't use it. it has been replaced with
nanosleep.

On Wed, Feb 19, 2014 at 4:21 AM, Alex Crichton notifications@github.comwrote:

In src/libgreen/sched.rs:

  •        sched.idle_callback.get_mut_ref().pause();
    
  •    } else {
    
  •        rtdebug!("not sleeping, already doing so or no_sleep set");
    
  •        // We may not be sleeping, but we still need to deactivate
    
  •        // the idle callback.
    
  •        sched.idle_callback.get_mut_ref().pause();
    
  •    unsafe {
    
  •        if !sched.no_sleep && !(*sched.sleepy.get()).load(SeqCst) {
    
  •            if sched.backoff_counter == EXPONENTIAL_BACKOFF_FACTOR {
    
  •                sched.backoff_counter = 0;
    
  •                rtdebug!("scheduler has no work to do, going to sleep");
    
  •                (*sched.sleepy.get()).store(true, SeqCst);
    
  •                // Since we are sleeping, deactivate the idle callback.
    
  •                sched.idle_callback.get_mut_ref().pause();
    
  •            } else if !sched.event_loop.has_active_io() {
    
  •                unistd::usleep((1 << sched.backoff_counter) \* 1000u as u32);
    

Once you get into millisecond territory, I don't think usleep should be
used, but rather the event loop itself so I/O can wake up the scheduler.


Reply to this email directly or view it on GitHubhttps://github.com//pull/12254/files#r9858586
.

sched.backoff_counter += 1;
}
} else {
rtdebug!("not sleeping, already doing so or no_sleep set");
// We may not be sleeping, but we still need to deactivate
// the idle callback.
sched.idle_callback.get_mut_ref().pause();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire block shouldn't need to be unsafe, and I don't think any more unsafe blocks should be added to the scheduler at all. It would be nice to have a wrapper around the AtomicBool to hide the unsafety (because it's not actually unsafe).


// Finished a cycle without using the Scheduler. Place it back
Expand Down Expand Up @@ -397,30 +412,30 @@ impl Scheduler {
(sched, task, true)
}
Some(Wake) => {
self.sleepy = false;
(self, stask, true)
unsafe {
(*self.sleepy.get()).store(false, SeqCst);
(self, stask, true)
}
}
Some(Shutdown) => {
rtdebug!("shutting down");
if self.sleepy {
// There may be an outstanding handle on the
// sleeper list. Pop them all to make sure that's
// not the case.
loop {
match self.sleeper_list.pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake);
}
None => break
}
unsafe {
if (*self.sleepy.get()).load(SeqCst) {
match self.left_sched.take() {
Some(mut sched) => sched.wakeup_if_sleepy(),
None => ()
};
match self.right_sched.take() {
Some(mut sched) => sched.wakeup_if_sleepy(),
None => ()
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still isn't quite removing this scheduler from the ring. This needs to notify each neighbor that now they are neighbors.

}
// No more sleeping. After there are no outstanding
// event loop references we will shut down.
self.no_sleep = true;
(*self.sleepy.get()).store(false, SeqCst);
(self, stask, true)
}
// No more sleeping. After there are no outstanding
// event loop references we will shut down.
self.no_sleep = true;
self.sleepy = false;
(self, stask, true)
}
Some(NewNeighbor(neighbor)) => {
self.work_queues.push(neighbor);
Expand Down Expand Up @@ -586,14 +601,9 @@ impl Scheduler {

// We've made work available. Notify a
// sleeping scheduler.

match self.sleeper_list.casual_pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake)
}
None => { (/* pass */) }
};
if !self.left_sched.mutate(|mut sched| { sched.wakeup_if_sleepy(); sched }) {
self.right_sched.mutate(|mut sched| { sched.wakeup_if_sleepy(); sched });
}
}

// * Core Context Switching Functions
Expand Down Expand Up @@ -864,6 +874,7 @@ impl Scheduler {

return SchedHandle {
remote: remote,
sleepy: self.sleepy.clone(),
queue: self.message_producer.clone(),
sched_id: self.sched_id()
}
Expand All @@ -887,6 +898,9 @@ pub enum SchedMessage {
pub struct SchedHandle {
priv remote: ~RemoteCallback,
priv queue: msgq::Producer<SchedMessage>,
// Under the current design, a scheduler always outlives the handles
// pointing to it, so it's safe to use an unsafe pointer here
priv sleepy: UnsafeArc<AtomicBool>,
sched_id: uint
}

Expand All @@ -895,6 +909,14 @@ impl SchedHandle {
self.queue.push(msg);
self.remote.fire();
}

fn wakeup_if_sleepy(&mut self) {
unsafe {
if (*self.sleepy.get()).load(SeqCst) {
self.send(Wake);
}
}
}
}

struct SchedRunner;
Expand Down