diff --git a/src/libgreen/basic.rs b/src/libgreen/basic.rs index 10a56b2b2253a..5bccf05f7b3f6 100644 --- a/src/libgreen/basic.rs +++ b/src/libgreen/basic.rs @@ -158,6 +158,8 @@ impl EventLoop for BasicLoop { } fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None } + + fn has_active_io(&self) -> bool { false } } struct BasicRemote { diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index b224b0cabf365..4b1c4e3b425d4 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -252,12 +252,23 @@ impl Scheduler { // * Execution Functions - Core Loop Logic - // The model for this function is that you continue through it - // until you either use the scheduler while performing a schedule - // action, in which case you give it away and return early, or - // you reach the end and sleep. In the case that a scheduler - // action is performed the loop is evented such that this function - // is called again. + // This function is run from the idle callback on the uv loop, indicating + // that there are no I/O events pending. When this function returns, we will + // fall back to epoll() in the uv event loop, waiting for more things to + // happen. We may come right back off epoll() if the idle callback is still + // active, in which case we're truly just polling to see if I/O events are + // complete. + // + // The model for this function is to execute as much work as possible while + // still fairly considering I/O tasks. Falling back to epoll() frequently is + // often quite expensive, so we attempt to avoid it as much as possible. If + // we have any active I/O on the event loop, then we're forced to fall back + // to epoll() in order to provide fairness, but as long as we're doing work + // and there's no active I/O, we can continue to do work. + // + // If we try really hard to do some work, but no work is available to be + // done, then we fall back to epoll() to block this thread waiting for more + // work (instead of busy waiting). fn run_sched_once(mut ~self, stask: ~GreenTask) { // Make sure that we're not lying in that the `stask` argument is indeed // the scheduler task for this scheduler. @@ -269,26 +280,46 @@ impl Scheduler { // First we check for scheduler messages, these are higher // priority than regular tasks. - let (sched, stask) = - match self.interpret_message_queue(stask, DontTryTooHard) { - Some(pair) => pair, - None => return - }; - - // This helper will use a randomized work-stealing algorithm - // to find work. - let (sched, stask) = match sched.do_work(stask) { - Some(pair) => pair, - None => return - }; + let (mut sched, mut stask, mut did_work) = + self.interpret_message_queue(stask, DontTryTooHard); - // Now, before sleeping we need to find out if there really - // were any messages. Give it your best! - let (mut sched, stask) = - match sched.interpret_message_queue(stask, GiveItYourBest) { - Some(pair) => pair, - None => return + // After processing a message, we consider doing some more work on the + // event loop. The "keep going" condition changes after the first + // iteration becase we don't want to spin here infinitely. + // + // Once we start doing work we can keep doing work so long as the + // iteration does something. Note that we don't want to starve the + // message queue here, so each iteration when we're done working we + // check the message queue regardless of whether we did work or not. + let mut keep_going = !did_work || !sched.event_loop.has_active_io(); + while keep_going { + let (a, b, c) = match sched.do_work(stask) { + (sched, task, false) => { + sched.interpret_message_queue(task, GiveItYourBest) + } + (sched, task, true) => { + let (sched, task, _) = + sched.interpret_message_queue(task, GiveItYourBest); + (sched, task, true) + } }; + sched = a; + stask = b; + did_work = c; + + // We only keep going if we managed to do something productive and + // also don't have any active I/O. If we didn't do anything, we + // should consider going to sleep, and if we have active I/O we need + // to poll for completion. + keep_going = did_work && !sched.event_loop.has_active_io(); + } + + // If we ever did some work, then we shouldn't put our scheduler + // entirely to sleep just yet. Leave the idle callback active and fall + // back to epoll() to see what's going on. + if did_work { + return stask.put_with_sched(sched); + } // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so @@ -318,7 +349,7 @@ impl Scheduler { // return None. fn interpret_message_queue(mut ~self, stask: ~GreenTask, effort: EffortLevel) - -> Option<(~Scheduler, ~GreenTask)> + -> (~Scheduler, ~GreenTask, bool) { let msg = if effort == DontTryTooHard { @@ -349,25 +380,25 @@ impl Scheduler { Some(PinnedTask(task)) => { let mut task = task; task.give_home(HomeSched(self.make_handle())); - self.resume_task_immediately(stask, task).put(); - return None; + let (sched, task) = self.resume_task_immediately(stask, task); + (sched, task, true) } Some(TaskFromFriend(task)) => { rtdebug!("got a task from a friend. lovely!"); - self.process_task(stask, task, - Scheduler::resume_task_immediately_cl); - return None; + let (sched, task) = + self.process_task(stask, task, + Scheduler::resume_task_immediately_cl); + (sched, task, true) } Some(RunOnce(task)) => { // bypass the process_task logic to force running this task once // on this home scheduler. This is often used for I/O (homing). - self.resume_task_immediately(stask, task).put(); - return None; + let (sched, task) = self.resume_task_immediately(stask, task); + (sched, task, true) } Some(Wake) => { self.sleepy = false; - stask.put_with_sched(self); - return None; + (self, stask, true) } Some(Shutdown) => { rtdebug!("shutting down"); @@ -389,31 +420,30 @@ impl Scheduler { // event loop references we will shut down. self.no_sleep = true; self.sleepy = false; - stask.put_with_sched(self); - return None; + (self, stask, true) } Some(NewNeighbor(neighbor)) => { self.work_queues.push(neighbor); - return Some((self, stask)); - } - None => { - return Some((self, stask)); + (self, stask, false) } + None => (self, stask, false) } } - fn do_work(mut ~self, stask: ~GreenTask) -> Option<(~Scheduler, ~GreenTask)> { + fn do_work(mut ~self, + stask: ~GreenTask) -> (~Scheduler, ~GreenTask, bool) { rtdebug!("scheduler calling do work"); match self.find_work() { Some(task) => { rtdebug!("found some work! running the task"); - self.process_task(stask, task, - Scheduler::resume_task_immediately_cl); - return None; + let (sched, task) = + self.process_task(stask, task, + Scheduler::resume_task_immediately_cl); + (sched, task, true) } None => { rtdebug!("no work was found, returning the scheduler struct"); - return Some((self, stask)); + (self, stask, false) } } } @@ -486,7 +516,8 @@ impl Scheduler { // place. fn process_task(mut ~self, cur: ~GreenTask, - mut next: ~GreenTask, schedule_fn: SchedulingFn) { + mut next: ~GreenTask, + schedule_fn: SchedulingFn) -> (~Scheduler, ~GreenTask) { rtdebug!("processing a task"); match next.take_unwrap_home() { @@ -495,23 +526,23 @@ impl Scheduler { rtdebug!("sending task home"); next.give_home(HomeSched(home_handle)); Scheduler::send_task_home(next); - cur.put_with_sched(self); + (self, cur) } else { rtdebug!("running task here"); next.give_home(HomeSched(home_handle)); - schedule_fn(self, cur, next); + schedule_fn(self, cur, next) } } AnySched if self.run_anything => { rtdebug!("running anysched task here"); next.give_home(AnySched); - schedule_fn(self, cur, next); + schedule_fn(self, cur, next) } AnySched => { rtdebug!("sending task to friend"); next.give_home(AnySched); self.send_to_friend(next); - cur.put_with_sched(self); + (self, cur) } } } @@ -664,18 +695,19 @@ impl Scheduler { // * Context Swapping Helpers - Here be ugliness! pub fn resume_task_immediately(~self, cur: ~GreenTask, - next: ~GreenTask) -> ~GreenTask { + next: ~GreenTask) -> (~Scheduler, ~GreenTask) { assert!(cur.is_sched()); - self.change_task_context(cur, next, |sched, stask| { + let mut cur = self.change_task_context(cur, next, |sched, stask| { assert!(sched.sched_task.is_none()); sched.sched_task = Some(stask); - }) + }); + (cur.sched.take_unwrap(), cur) } fn resume_task_immediately_cl(sched: ~Scheduler, cur: ~GreenTask, - next: ~GreenTask) { - sched.resume_task_immediately(cur, next).put() + next: ~GreenTask) -> (~Scheduler, ~GreenTask) { + sched.resume_task_immediately(cur, next) } /// Block a running task, context switch to the scheduler, then pass the @@ -741,15 +773,17 @@ impl Scheduler { cur.put(); } - fn switch_task(sched: ~Scheduler, cur: ~GreenTask, next: ~GreenTask) { - sched.change_task_context(cur, next, |sched, last_task| { + fn switch_task(sched: ~Scheduler, cur: ~GreenTask, + next: ~GreenTask) -> (~Scheduler, ~GreenTask) { + let mut cur = sched.change_task_context(cur, next, |sched, last_task| { if last_task.is_sched() { assert!(sched.sched_task.is_none()); sched.sched_task = Some(last_task); } else { sched.enqueue_task(last_task); } - }).put() + }); + (cur.sched.take_unwrap(), cur) } // * Task Context Helpers @@ -769,7 +803,9 @@ impl Scheduler { } pub fn run_task(~self, cur: ~GreenTask, next: ~GreenTask) { - self.process_task(cur, next, Scheduler::switch_task); + let (sched, task) = + self.process_task(cur, next, Scheduler::switch_task); + task.put_with_sched(sched); } pub fn run_task_later(mut cur: ~GreenTask, next: ~GreenTask) { @@ -836,7 +872,8 @@ impl Scheduler { // Supporting types -type SchedulingFn = extern "Rust" fn (~Scheduler, ~GreenTask, ~GreenTask); +type SchedulingFn = fn (~Scheduler, ~GreenTask, ~GreenTask) + -> (~Scheduler, ~GreenTask); pub enum SchedMessage { Wake, diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs index 2740671c00d60..5d6af2969b8b3 100644 --- a/src/librustuv/addrinfo.rs +++ b/src/librustuv/addrinfo.rs @@ -86,7 +86,7 @@ impl GetAddrInfoRequest { req.defuse(); // uv callback now owns this request let mut cx = Ctx { slot: None, status: 0, addrinfo: None }; - wait_until_woken_after(&mut cx.slot, || { + wait_until_woken_after(&mut cx.slot, loop_, || { req.set_data(&cx); }); diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs index 2cef2664c2fc4..e66452041a531 100644 --- a/src/librustuv/file.rs +++ b/src/librustuv/file.rs @@ -304,7 +304,8 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int) 0 => { req.fired = true; let mut slot = None; - wait_until_woken_after(&mut slot, || { + let loop_ = unsafe { uvll::get_loop_from_fs_req(req.req) }; + wait_until_woken_after(&mut slot, &Loop::wrap(loop_), || { unsafe { uvll::set_data_for_req(req.req, &slot) } }); match req.get_result() { diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index b71dbe05ad2b6..8c263c5e5f738 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -47,7 +47,7 @@ via `close` and `delete` methods. use std::cast; use std::io; use std::io::IoError; -use std::libc::c_int; +use std::libc::{c_int, c_void}; use std::ptr::null; use std::ptr; use std::rt::local::Local; @@ -95,6 +95,10 @@ pub mod stream; pub trait UvHandle { fn uv_handle(&self) -> *T; + fn uv_loop(&self) -> Loop { + Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(self.uv_handle()) }) + } + // FIXME(#8888) dummy self fn alloc(_: Option, ty: uvll::uv_handle_type) -> *T { unsafe { @@ -136,7 +140,7 @@ pub trait UvHandle { uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb); uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>()); - wait_until_woken_after(&mut slot, || { + wait_until_woken_after(&mut slot, &self.uv_loop(), || { uvll::set_data_for_uv_handle(self.uv_handle(), &slot); }) } @@ -195,16 +199,20 @@ impl Drop for ForbidUnwind { } } -fn wait_until_woken_after(slot: *mut Option, f: ||) { +fn wait_until_woken_after(slot: *mut Option, + loop_: &Loop, + f: ||) { let _f = ForbidUnwind::new("wait_until_woken_after"); unsafe { assert!((*slot).is_none()); let task: ~Task = Local::take(); + loop_.modify_blockers(1); task.deschedule(1, |task| { *slot = Some(task); f(); Ok(()) }); + loop_.modify_blockers(-1); } } @@ -273,6 +281,7 @@ impl Loop { pub fn new() -> Loop { let handle = unsafe { uvll::loop_new() }; assert!(handle.is_not_null()); + unsafe { uvll::set_data_for_uv_loop(handle, 0 as *c_void) } Loop::wrap(handle) } @@ -285,6 +294,19 @@ impl Loop { pub fn close(&mut self) { unsafe { uvll::uv_loop_delete(self.handle) }; } + + // The 'data' field of the uv_loop_t is used to count the number of tasks + // that are currently blocked waiting for I/O to complete. + fn modify_blockers(&self, amt: uint) { + unsafe { + let cur = uvll::get_data_for_uv_loop(self.handle) as uint; + uvll::set_data_for_uv_loop(self.handle, (cur + amt) as *c_void) + } + } + + fn get_blockers(&self) -> uint { + unsafe { uvll::get_data_for_uv_loop(self.handle) as uint } + } } // FIXME: Need to define the error constants like EOF so they can be diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 551e2c9faf74f..a091829f297e8 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -216,7 +216,7 @@ impl TcpWatcher { 0 => { req.defuse(); // uv callback now owns this request let mut cx = Ctx { status: 0, task: None }; - wait_until_woken_after(&mut cx.task, || { + wait_until_woken_after(&mut cx.task, &io.loop_, || { req.set_data(&cx); }); match cx.status { @@ -498,6 +498,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { buf: Option, result: Option<(ssize_t, Option)>, } + let loop_ = self.uv_loop(); let m = self.fire_homing_missile(); let _g = self.read_access.grant(m); @@ -511,7 +512,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { result: None, }; let handle = self.handle; - wait_until_woken_after(&mut cx.task, || { + wait_until_woken_after(&mut cx.task, &loop_, || { unsafe { uvll::set_data_for_uv_handle(handle, &cx) } }); match cx.result.take_unwrap() { @@ -571,6 +572,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { struct Ctx { task: Option, result: c_int } let m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); let _g = self.write_access.grant(m); let mut req = Request::new(uvll::UV_UDP_SEND); @@ -586,7 +588,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { 0 => { req.defuse(); // uv callback now owns this request let mut cx = Ctx { task: None, result: 0 }; - wait_until_woken_after(&mut cx.task, || { + wait_until_woken_after(&mut cx.task, &loop_, || { req.set_data(&cx); }); match cx.result { diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index c312f112d28b4..24ac17700cc27 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -92,7 +92,7 @@ impl PipeWatcher { let mut req = Request::new(uvll::UV_CONNECT); let pipe = PipeWatcher::new(io, false); - wait_until_woken_after(&mut cx.task, || { + wait_until_woken_after(&mut cx.task, &io.loop_, || { unsafe { uvll::uv_pipe_connect(req.handle, pipe.handle(), diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index 7b7a16d7084e5..e1f94d8c4df5a 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -211,7 +211,7 @@ impl RtioProcess for Process { // If there's no exit code previously listed, then the // process's exit callback has yet to be invoked. We just // need to deschedule ourselves and wait to be reawoken. - wait_until_woken_after(&mut self.to_wake, || {}); + wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {}); assert!(self.exit_status.is_some()); } } diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs index 262952f8890cb..f7bf2f051eb90 100644 --- a/src/librustuv/stream.rs +++ b/src/librustuv/stream.rs @@ -13,6 +13,7 @@ use std::libc::{c_int, size_t, ssize_t}; use std::ptr; use std::rt::task::BlockedTask; +use Loop; use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after, ForbidUnwind, wakeup}; use uvll; @@ -87,7 +88,8 @@ impl StreamWatcher { uvll::uv_read_start(self.handle, alloc_cb, read_cb) } { 0 => { - wait_until_woken_after(&mut rcx.task, || {}); + let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) }; + wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {}); match rcx.result { n if n < 0 => Err(UvError(n as c_int)), n => Ok(n as uint), @@ -121,7 +123,8 @@ impl StreamWatcher { let mut wcx = WriteContext { result: 0, task: None, }; req.defuse(); // uv callback now owns this request - wait_until_woken_after(&mut wcx.task, || { + let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) }; + wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || { req.set_data(&wcx); }); self.last_write_req = Some(Request::wrap(req.handle)); diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 0ce2501d2cc7f..8c80cc9914504 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -9,13 +9,12 @@ // except according to those terms. use std::libc::c_int; -use std::mem::replace; -use std::rt::local::Local; +use std::mem; use std::rt::rtio::RtioTimer; -use std::rt::task::{BlockedTask, Task}; +use std::rt::task::BlockedTask; use homing::{HomeHandle, HomingIO}; -use super::{UvHandle, ForbidUnwind, ForbidSwitch}; +use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after}; use uvio::UvIoFactory; use uvll; @@ -23,11 +22,12 @@ pub struct TimerWatcher { handle: *uvll::uv_timer_t, home: HomeHandle, action: Option, + blocker: Option, id: uint, // see comments in timer_cb } pub enum NextAction { - WakeTask(BlockedTask), + WakeTask, SendOnce(Chan<()>), SendMany(Chan<()>, uint), } @@ -41,6 +41,7 @@ impl TimerWatcher { let me = ~TimerWatcher { handle: handle, action: None, + blocker: None, home: io.make_handle(), id: 0, }; @@ -76,7 +77,7 @@ impl RtioTimer for TimerWatcher { let missile = self.fire_homing_missile(); self.id += 1; self.stop(); - let _missile = match replace(&mut self.action, None) { + let _missile = match mem::replace(&mut self.action, None) { None => missile, // no need to do a homing dance Some(action) => { drop(missile); // un-home ourself @@ -89,11 +90,9 @@ impl RtioTimer for TimerWatcher { // started, then we need to call stop on the timer. let _f = ForbidUnwind::new("timer"); - let task: ~Task = Local::take(); - task.deschedule(1, |task| { - self.action = Some(WakeTask(task)); + self.action = Some(WakeTask); + wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || { self.start(msecs, 0); - Ok(()) }); self.stop(); } @@ -108,7 +107,7 @@ impl RtioTimer for TimerWatcher { self.id += 1; self.stop(); self.start(msecs, 0); - replace(&mut self.action, Some(SendOnce(chan))) + mem::replace(&mut self.action, Some(SendOnce(chan))) }; return port; @@ -124,7 +123,7 @@ impl RtioTimer for TimerWatcher { self.id += 1; self.stop(); self.start(msecs, msecs); - replace(&mut self.action, Some(SendMany(chan, self.id))) + mem::replace(&mut self.action, Some(SendMany(chan, self.id))) }; return port; @@ -137,7 +136,8 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { let timer: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; match timer.action.take_unwrap() { - WakeTask(task) => { + WakeTask => { + let task = timer.blocker.take_unwrap(); let _ = task.wake().map(|t| t.reawaken()); } SendOnce(chan) => { let _ = chan.try_send(()); } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 54db4b4d3d13f..14406cb2a6a01 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -99,6 +99,10 @@ impl rtio::EventLoop for UvEventLoop { let factory = &mut self.uvio as &mut rtio::IoFactory; Some(factory) } + + fn has_active_io(&self) -> bool { + self.uvio.loop_.get_blockers() > 0 + } } #[cfg(not(test))] diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 39623e329eae9..5573f8ec02eb3 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -41,6 +41,7 @@ pub trait EventLoop { /// The asynchronous I/O services. Not all event loops may provide one. fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>; + fn has_active_io(&self) -> bool; } pub trait RemoteCallback {