Skip to content

Commit

Permalink
auto merge of #12186 : alexcrichton/rust/no-sleep-2, r=brson
Browse files Browse the repository at this point in the history
Any single-threaded task benchmark will spend a good chunk of time in `kqueue()` on osx and `epoll()` on linux, and the reason for this is that each time a task is terminated it will hit the syscall. When a task terminates, it context switches back to the scheduler thread, and the scheduler thread falls out of `run_sched_once` whenever it figures out that it did some work.

If we know that `epoll()` will return nothing, then we can continue to do work locally (only while there's work to be done). We must fall back to `epoll()` whenever there's active I/O in order to check whether it's ready or not, but without that (which is largely the case in benchmarks), we can prevent the costly syscall and can get a nice speedup.

I've separated the commits into preparation for this change and then the change itself, the last commit message has more details.
  • Loading branch information
bors committed Feb 14, 2014
2 parents 2fe7bfe + 2650b61 commit 03b324f
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 85 deletions.
2 changes: 2 additions & 0 deletions src/libgreen/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ impl EventLoop for BasicLoop {
}

fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }

fn has_active_io(&self) -> bool { false }
}

struct BasicRemote {
Expand Down
157 changes: 97 additions & 60 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,23 @@ impl Scheduler {

// * Execution Functions - Core Loop Logic

// The model for this function is that you continue through it
// until you either use the scheduler while performing a schedule
// action, in which case you give it away and return early, or
// you reach the end and sleep. In the case that a scheduler
// action is performed the loop is evented such that this function
// is called again.
// This function is run from the idle callback on the uv loop, indicating
// that there are no I/O events pending. When this function returns, we will
// fall back to epoll() in the uv event loop, waiting for more things to
// happen. We may come right back off epoll() if the idle callback is still
// active, in which case we're truly just polling to see if I/O events are
// complete.
//
// The model for this function is to execute as much work as possible while
// still fairly considering I/O tasks. Falling back to epoll() frequently is
// often quite expensive, so we attempt to avoid it as much as possible. If
// we have any active I/O on the event loop, then we're forced to fall back
// to epoll() in order to provide fairness, but as long as we're doing work
// and there's no active I/O, we can continue to do work.
//
// If we try really hard to do some work, but no work is available to be
// done, then we fall back to epoll() to block this thread waiting for more
// work (instead of busy waiting).
fn run_sched_once(mut ~self, stask: ~GreenTask) {
// Make sure that we're not lying in that the `stask` argument is indeed
// the scheduler task for this scheduler.
Expand All @@ -269,26 +280,46 @@ impl Scheduler {

// First we check for scheduler messages, these are higher
// priority than regular tasks.
let (sched, stask) =
match self.interpret_message_queue(stask, DontTryTooHard) {
Some(pair) => pair,
None => return
};

// This helper will use a randomized work-stealing algorithm
// to find work.
let (sched, stask) = match sched.do_work(stask) {
Some(pair) => pair,
None => return
};
let (mut sched, mut stask, mut did_work) =
self.interpret_message_queue(stask, DontTryTooHard);

// Now, before sleeping we need to find out if there really
// were any messages. Give it your best!
let (mut sched, stask) =
match sched.interpret_message_queue(stask, GiveItYourBest) {
Some(pair) => pair,
None => return
// After processing a message, we consider doing some more work on the
// event loop. The "keep going" condition changes after the first
// iteration becase we don't want to spin here infinitely.
//
// Once we start doing work we can keep doing work so long as the
// iteration does something. Note that we don't want to starve the
// message queue here, so each iteration when we're done working we
// check the message queue regardless of whether we did work or not.
let mut keep_going = !did_work || !sched.event_loop.has_active_io();
while keep_going {
let (a, b, c) = match sched.do_work(stask) {
(sched, task, false) => {
sched.interpret_message_queue(task, GiveItYourBest)
}
(sched, task, true) => {
let (sched, task, _) =
sched.interpret_message_queue(task, GiveItYourBest);
(sched, task, true)
}
};
sched = a;
stask = b;
did_work = c;

// We only keep going if we managed to do something productive and
// also don't have any active I/O. If we didn't do anything, we
// should consider going to sleep, and if we have active I/O we need
// to poll for completion.
keep_going = did_work && !sched.event_loop.has_active_io();
}

// If we ever did some work, then we shouldn't put our scheduler
// entirely to sleep just yet. Leave the idle callback active and fall
// back to epoll() to see what's going on.
if did_work {
return stask.put_with_sched(sched);
}

// If we got here then there was no work to do.
// Generate a SchedHandle and push it to the sleeper list so
Expand Down Expand Up @@ -318,7 +349,7 @@ impl Scheduler {
// return None.
fn interpret_message_queue(mut ~self, stask: ~GreenTask,
effort: EffortLevel)
-> Option<(~Scheduler, ~GreenTask)>
-> (~Scheduler, ~GreenTask, bool)
{

let msg = if effort == DontTryTooHard {
Expand Down Expand Up @@ -349,25 +380,25 @@ impl Scheduler {
Some(PinnedTask(task)) => {
let mut task = task;
task.give_home(HomeSched(self.make_handle()));
self.resume_task_immediately(stask, task).put();
return None;
let (sched, task) = self.resume_task_immediately(stask, task);
(sched, task, true)
}
Some(TaskFromFriend(task)) => {
rtdebug!("got a task from a friend. lovely!");
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
return None;
let (sched, task) =
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
(sched, task, true)
}
Some(RunOnce(task)) => {
// bypass the process_task logic to force running this task once
// on this home scheduler. This is often used for I/O (homing).
self.resume_task_immediately(stask, task).put();
return None;
let (sched, task) = self.resume_task_immediately(stask, task);
(sched, task, true)
}
Some(Wake) => {
self.sleepy = false;
stask.put_with_sched(self);
return None;
(self, stask, true)
}
Some(Shutdown) => {
rtdebug!("shutting down");
Expand All @@ -389,31 +420,30 @@ impl Scheduler {
// event loop references we will shut down.
self.no_sleep = true;
self.sleepy = false;
stask.put_with_sched(self);
return None;
(self, stask, true)
}
Some(NewNeighbor(neighbor)) => {
self.work_queues.push(neighbor);
return Some((self, stask));
}
None => {
return Some((self, stask));
(self, stask, false)
}
None => (self, stask, false)
}
}

fn do_work(mut ~self, stask: ~GreenTask) -> Option<(~Scheduler, ~GreenTask)> {
fn do_work(mut ~self,
stask: ~GreenTask) -> (~Scheduler, ~GreenTask, bool) {
rtdebug!("scheduler calling do work");
match self.find_work() {
Some(task) => {
rtdebug!("found some work! running the task");
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
return None;
let (sched, task) =
self.process_task(stask, task,
Scheduler::resume_task_immediately_cl);
(sched, task, true)
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
return Some((self, stask));
(self, stask, false)
}
}
}
Expand Down Expand Up @@ -486,7 +516,8 @@ impl Scheduler {
// place.

fn process_task(mut ~self, cur: ~GreenTask,
mut next: ~GreenTask, schedule_fn: SchedulingFn) {
mut next: ~GreenTask,
schedule_fn: SchedulingFn) -> (~Scheduler, ~GreenTask) {
rtdebug!("processing a task");

match next.take_unwrap_home() {
Expand All @@ -495,23 +526,23 @@ impl Scheduler {
rtdebug!("sending task home");
next.give_home(HomeSched(home_handle));
Scheduler::send_task_home(next);
cur.put_with_sched(self);
(self, cur)
} else {
rtdebug!("running task here");
next.give_home(HomeSched(home_handle));
schedule_fn(self, cur, next);
schedule_fn(self, cur, next)
}
}
AnySched if self.run_anything => {
rtdebug!("running anysched task here");
next.give_home(AnySched);
schedule_fn(self, cur, next);
schedule_fn(self, cur, next)
}
AnySched => {
rtdebug!("sending task to friend");
next.give_home(AnySched);
self.send_to_friend(next);
cur.put_with_sched(self);
(self, cur)
}
}
}
Expand Down Expand Up @@ -664,18 +695,19 @@ impl Scheduler {
// * Context Swapping Helpers - Here be ugliness!

pub fn resume_task_immediately(~self, cur: ~GreenTask,
next: ~GreenTask) -> ~GreenTask {
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
assert!(cur.is_sched());
self.change_task_context(cur, next, |sched, stask| {
let mut cur = self.change_task_context(cur, next, |sched, stask| {
assert!(sched.sched_task.is_none());
sched.sched_task = Some(stask);
})
});
(cur.sched.take_unwrap(), cur)
}

fn resume_task_immediately_cl(sched: ~Scheduler,
cur: ~GreenTask,
next: ~GreenTask) {
sched.resume_task_immediately(cur, next).put()
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
sched.resume_task_immediately(cur, next)
}

/// Block a running task, context switch to the scheduler, then pass the
Expand Down Expand Up @@ -741,15 +773,17 @@ impl Scheduler {
cur.put();
}

fn switch_task(sched: ~Scheduler, cur: ~GreenTask, next: ~GreenTask) {
sched.change_task_context(cur, next, |sched, last_task| {
fn switch_task(sched: ~Scheduler, cur: ~GreenTask,
next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
let mut cur = sched.change_task_context(cur, next, |sched, last_task| {
if last_task.is_sched() {
assert!(sched.sched_task.is_none());
sched.sched_task = Some(last_task);
} else {
sched.enqueue_task(last_task);
}
}).put()
});
(cur.sched.take_unwrap(), cur)
}

// * Task Context Helpers
Expand All @@ -769,7 +803,9 @@ impl Scheduler {
}

pub fn run_task(~self, cur: ~GreenTask, next: ~GreenTask) {
self.process_task(cur, next, Scheduler::switch_task);
let (sched, task) =
self.process_task(cur, next, Scheduler::switch_task);
task.put_with_sched(sched);
}

pub fn run_task_later(mut cur: ~GreenTask, next: ~GreenTask) {
Expand Down Expand Up @@ -836,7 +872,8 @@ impl Scheduler {

// Supporting types

type SchedulingFn = extern "Rust" fn (~Scheduler, ~GreenTask, ~GreenTask);
type SchedulingFn = fn (~Scheduler, ~GreenTask, ~GreenTask)
-> (~Scheduler, ~GreenTask);

pub enum SchedMessage {
Wake,
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/addrinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl GetAddrInfoRequest {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0, addrinfo: None };

wait_until_woken_after(&mut cx.slot, || {
wait_until_woken_after(&mut cx.slot, loop_, || {
req.set_data(&cx);
});

Expand Down
3 changes: 2 additions & 1 deletion src/librustuv/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
0 => {
req.fired = true;
let mut slot = None;
wait_until_woken_after(&mut slot, || {
let loop_ = unsafe { uvll::get_loop_from_fs_req(req.req) };
wait_until_woken_after(&mut slot, &Loop::wrap(loop_), || {
unsafe { uvll::set_data_for_req(req.req, &slot) }
});
match req.get_result() {
Expand Down
28 changes: 25 additions & 3 deletions src/librustuv/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ via `close` and `delete` methods.
use std::cast;
use std::io;
use std::io::IoError;
use std::libc::c_int;
use std::libc::{c_int, c_void};
use std::ptr::null;
use std::ptr;
use std::rt::local::Local;
Expand Down Expand Up @@ -95,6 +95,10 @@ pub mod stream;
pub trait UvHandle<T> {
fn uv_handle(&self) -> *T;

fn uv_loop(&self) -> Loop {
Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(self.uv_handle()) })
}

// FIXME(#8888) dummy self
fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
unsafe {
Expand Down Expand Up @@ -136,7 +140,7 @@ pub trait UvHandle<T> {
uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());

wait_until_woken_after(&mut slot, || {
wait_until_woken_after(&mut slot, &self.uv_loop(), || {
uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
})
}
Expand Down Expand Up @@ -195,16 +199,20 @@ impl Drop for ForbidUnwind {
}
}

fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {
fn wait_until_woken_after(slot: *mut Option<BlockedTask>,
loop_: &Loop,
f: ||) {
let _f = ForbidUnwind::new("wait_until_woken_after");
unsafe {
assert!((*slot).is_none());
let task: ~Task = Local::take();
loop_.modify_blockers(1);
task.deschedule(1, |task| {
*slot = Some(task);
f();
Ok(())
});
loop_.modify_blockers(-1);
}
}

Expand Down Expand Up @@ -273,6 +281,7 @@ impl Loop {
pub fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
assert!(handle.is_not_null());
unsafe { uvll::set_data_for_uv_loop(handle, 0 as *c_void) }
Loop::wrap(handle)
}

Expand All @@ -285,6 +294,19 @@ impl Loop {
pub fn close(&mut self) {
unsafe { uvll::uv_loop_delete(self.handle) };
}

// The 'data' field of the uv_loop_t is used to count the number of tasks
// that are currently blocked waiting for I/O to complete.
fn modify_blockers(&self, amt: uint) {
unsafe {
let cur = uvll::get_data_for_uv_loop(self.handle) as uint;
uvll::set_data_for_uv_loop(self.handle, (cur + amt) as *c_void)
}
}

fn get_blockers(&self) -> uint {
unsafe { uvll::get_data_for_uv_loop(self.handle) as uint }
}
}

// FIXME: Need to define the error constants like EOF so they can be
Expand Down
Loading

0 comments on commit 03b324f

Please sign in to comment.