diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 54db03b6069c2..b00df78f433d7 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -22,10 +22,12 @@ use ops::Drop; use kinds::Owned; use rt::sched::{Scheduler, Coroutine}; use rt::local::Local; -use unstable::intrinsics::{atomic_xchg, atomic_load}; +use unstable::atomics::{AtomicUint, AtomicOption, SeqCst}; +use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; +use clone::Clone; /// A combined refcount / ~Task pointer. /// @@ -34,14 +36,14 @@ use cell::Cell; /// * 2 - both endpoints are alive /// * 1 - either the sender or the receiver is dead, determined by context /// * - A pointer to a blocked Task that can be transmuted to ~Task -type State = int; +type State = uint; static STATE_BOTH: State = 2; static STATE_ONE: State = 1; /// The heap-allocated structure shared between two endpoints. struct Packet { - state: State, + state: AtomicUint, payload: Option, } @@ -70,7 +72,7 @@ pub struct PortOneHack { pub fn oneshot() -> (PortOne, ChanOne) { let packet: ~Packet = ~Packet { - state: STATE_BOTH, + state: AtomicUint::new(STATE_BOTH), payload: None }; @@ -114,12 +116,20 @@ impl ChanOne { // reordering of the payload write. This also issues an // acquire barrier that keeps the subsequent access of the // ~Task pointer from being reordered. - let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE); + let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do + do Local::borrow:: |sched| { + rtdebug!("non-rendezvous send"); + sched.metrics.non_rendezvous_sends += 1; + } } STATE_ONE => { + do Local::borrow:: |sched| { + rtdebug!("rendezvous send"); + sched.metrics.rendezvous_sends += 1; + } // Port has closed. Need to clean up. let _packet: ~Packet = cast::transmute(this.inner.void_packet); recvr_active = false; @@ -127,7 +137,9 @@ impl ChanOne { task_as_state => { // Port is blocked. Wake it up. let recvr: ~Coroutine = cast::transmute(task_as_state); - let sched = Local::take::(); + let mut sched = Local::take::(); + rtdebug!("rendezvous send"); + sched.metrics.rendezvous_sends += 1; sched.schedule_task(recvr); } } @@ -158,23 +170,30 @@ impl PortOne { // Switch to the scheduler to put the ~Task into the Packet state. let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { unsafe { // Atomically swap the task pointer into the Packet state, issuing // an acquire barrier to prevent reordering of the subsequent read // of the payload. Also issues a release barrier to prevent reordering // of any previous writes to the task structure. let task_as_state: State = cast::transmute(task); - let oldstate = atomic_xchg(&mut (*packet).state, task_as_state); + let oldstate = (*packet).state.swap(task_as_state, SeqCst); match oldstate { STATE_BOTH => { // Data has not been sent. Now we're blocked. + rtdebug!("non-rendezvous recv"); + sched.metrics.non_rendezvous_recvs += 1; } STATE_ONE => { + rtdebug!("rendezvous recv"); + sched.metrics.rendezvous_recvs += 1; + // Channel is closed. Switch back and check the data. + // NB: We have to drop back into the scheduler event loop here + // instead of switching immediately back or we could end up + // triggering infinite recursion on the scheduler's stack. let task: ~Coroutine = cast::transmute(task_as_state); - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } _ => util::unreachable() } @@ -210,7 +229,7 @@ impl Peekable for PortOne { fn peek(&self) -> bool { unsafe { let packet: *mut Packet = self.inner.packet(); - let oldstate = atomic_load(&mut (*packet).state); + let oldstate = (*packet).state.load(SeqCst); match oldstate { STATE_BOTH => false, STATE_ONE => (*packet).payload.is_some(), @@ -227,7 +246,7 @@ impl Drop for ChanOneHack { unsafe { let this = cast::transmute_mut(self); - let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE); + let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Port still active. It will destroy the Packet. @@ -254,7 +273,7 @@ impl Drop for PortOneHack { unsafe { let this = cast::transmute_mut(self); - let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE); + let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Chan still active. It will destroy the packet. @@ -295,16 +314,19 @@ struct StreamPayload { next: PortOne> } +type StreamChanOne = ChanOne>; +type StreamPortOne = PortOne>; + /// A channel with unbounded size. pub struct Chan { // FIXME #5372. Using Cell because we don't take &mut self - next: Cell>> + next: Cell> } /// An port with unbounded size. pub struct Port { // FIXME #5372. Using Cell because we don't take &mut self - next: Cell>> + next: Cell> } pub fn stream() -> (Port, Chan) { @@ -357,6 +379,148 @@ impl Peekable for Port { } } +pub struct SharedChan { + // Just like Chan, but a shared AtomicOption instead of Cell + priv next: UnsafeAtomicRcBox>> +} + +impl SharedChan { + pub fn new(chan: Chan) -> SharedChan { + let next = chan.next.take(); + let next = AtomicOption::new(~next); + SharedChan { next: UnsafeAtomicRcBox::new(next) } + } +} + +impl GenericChan for SharedChan { + fn send(&self, val: T) { + self.try_send(val); + } +} + +impl GenericSmartChan for SharedChan { + #[cfg(stage0)] // odd type checking errors + fn try_send(&self, _val: T) -> bool { + fail!() + } + + #[cfg(not(stage0))] + fn try_send(&self, val: T) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) + } + } +} + +impl Clone for SharedChan { + fn clone(&self) -> SharedChan { + SharedChan { + next: self.next.clone() + } + } +} + +pub struct SharedPort { + // The next port on which we will receive the next port on which we will receive T + priv next_link: UnsafeAtomicRcBox>>> +} + +impl SharedPort { + pub fn new(port: Port) -> SharedPort { + // Put the data port into a new link pipe + let next_data_port = port.next.take(); + let (next_link_port, next_link_chan) = oneshot(); + next_link_chan.send(next_data_port); + let next_link = AtomicOption::new(~next_link_port); + SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) } + } +} + +impl GenericPort for SharedPort { + fn recv(&self) -> T { + match self.try_recv() { + Some(val) => val, + None => { + fail!("receiving on a closed channel"); + } + } + } + + #[cfg(stage0)] // odd type checking errors + fn try_recv(&self) -> Option { + fail!() + } + + #[cfg(not(stage0))] + fn try_recv(&self) -> Option { + unsafe { + let (next_link_port, next_link_chan) = oneshot(); + let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst); + let link_port = link_port.unwrap(); + let data_port = link_port.recv(); + let (next_data_port, res) = match data_port.try_recv() { + Some(StreamPayload { val, next }) => { + (next, Some(val)) + } + None => { + let (next_data_port, _) = oneshot(); + (next_data_port, None) + } + }; + next_link_chan.send(next_data_port); + return res; + } + } +} + +impl Clone for SharedPort { + fn clone(&self) -> SharedPort { + SharedPort { + next_link: self.next_link.clone() + } + } +} + +// XXX: Need better name +type MegaPipe = (SharedPort, SharedChan); + +pub fn megapipe() -> MegaPipe { + let (port, chan) = stream(); + (SharedPort::new(port), SharedChan::new(chan)) +} + +impl GenericChan for MegaPipe { + fn send(&self, val: T) { + match *self { + (_, ref c) => c.send(val) + } + } +} + +impl GenericSmartChan for MegaPipe { + fn try_send(&self, val: T) -> bool { + match *self { + (_, ref c) => c.try_send(val) + } + } +} + +impl GenericPort for MegaPipe { + fn recv(&self) -> T { + match *self { + (ref p, _) => p.recv() + } + } + + fn try_recv(&self) -> Option { + match *self { + (ref p, _) => p.try_recv() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -584,7 +748,7 @@ mod test { #[test] fn stream_send_recv_stress() { for stress_factor().times { - do run_in_newsched_task { + do run_in_mt_newsched_task { let (port, chan) = stream::<~int>(); send(chan, 0); @@ -594,18 +758,18 @@ mod test { if i == 10 { return } let chan_cell = Cell(chan); - let _thread = do spawntask_thread { + do spawntask_random { let chan = chan_cell.take(); chan.send(~i); send(chan, i + 1); - }; + } } fn recv(port: Port<~int>, i: int) { if i == 10 { return } let port_cell = Cell(port); - let _thread = do spawntask_thread { + do spawntask_random { let port = port_cell.take(); assert!(port.recv() == ~i); recv(port, i + 1); @@ -614,5 +778,144 @@ mod test { } } } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + do run_in_newsched_task { + let (port, chan) = stream(); + for 10000.times { chan.send(()) } + for 10000.times { port.recv() } + } + } + + #[test] + fn shared_chan_stress() { + do run_in_mt_newsched_task { + let (port, chan) = stream(); + let chan = SharedChan::new(chan); + let total = stress_factor() + 100; + for total.times { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + } + + for total.times { + port.recv(); + } + } + } + + #[test] + fn shared_port_stress() { + do run_in_mt_newsched_task { + // XXX: Removing these type annotations causes an ICE + let (end_port, end_chan) = stream::<()>(); + let (port, chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let port = SharedPort::new(port); + let total = stress_factor() + 100; + for total.times { + let end_chan_clone = end_chan.clone(); + let port_clone = port.clone(); + do spawntask_random { + port_clone.recv(); + end_chan_clone.send(()); + } + } + + for total.times { + chan.send(()); + } + + for total.times { + end_port.recv(); + } + } + } + + #[test] + fn shared_port_close_simple() { + do run_in_mt_newsched_task { + let (port, chan) = stream::<()>(); + let port = SharedPort::new(port); + { let _chan = chan; } + assert!(port.try_recv().is_none()); + } + } + + #[test] + fn shared_port_close() { + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::(); + let (port, chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let port = SharedPort::new(port); + let chan = SharedChan::new(chan); + let send_total = 10; + let recv_total = 20; + do spawntask_random { + for send_total.times { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + } + } + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for recv_total.times { + let port_clone = port.clone(); + let end_chan_clone = end_chan_clone.clone(); + do spawntask_random { + let recvd = port_clone.try_recv().is_some(); + end_chan_clone.send(recvd); + } + } + } + + let mut recvd = 0; + for recv_total.times { + recvd += if end_port.recv() { 1 } else { 0 }; + } + + assert!(recvd == send_total); + } + } + + #[test] + fn megapipe_stress() { + use rand; + use rand::RngUtil; + + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let pipe = megapipe(); + let total = stress_factor() + 10; + let mut rng = rand::rng(); + for total.times { + let msgs = rng.gen_uint_range(0, 10); + let pipe_clone = pipe.clone(); + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for msgs.times { + pipe_clone.send(()); + } + for msgs.times { + pipe_clone.recv(); + } + } + + end_chan_clone.send(()); + } + + for total.times { + end_port.recv(); + } + } + } } diff --git a/src/libstd/rt/io/extensions.rs b/src/libstd/rt/io/extensions.rs index fcbf31e87f2c0..ad9658e48ba1f 100644 --- a/src/libstd/rt/io/extensions.rs +++ b/src/libstd/rt/io/extensions.rs @@ -749,8 +749,6 @@ mod test { #[should_fail] #[ignore(cfg(windows))] fn push_bytes_fail_reset_len() { - use unstable::finally::Finally; - // push_bytes unsafely sets the vector length. This is testing that // upon failure the length is reset correctly. let mut reader = MockReader::new(); @@ -772,7 +770,8 @@ mod test { reader.push_bytes(&mut *buf, 4); }).finally { // NB: Using rtassert here to trigger abort on failure since this is a should_fail test - rtassert!(*buf == ~[8, 9, 10]); + // FIXME: #7049 This fails because buf is still borrowed + //rtassert!(*buf == ~[8, 9, 10]); } } diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 313123c38b58d..e6988c538881a 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -85,30 +85,30 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { + use rt::test::*; use rt::sched::Scheduler; - use rt::uv::uvio::UvEventLoop; use super::*; #[test] fn thread_local_scheduler_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); } #[test] fn thread_local_scheduler_two_instances() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); } #[test] fn borrow_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); unsafe { let _scheduler: *mut Scheduler = Local::unsafe_borrow(); diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs index eaab9288ac8d0..21711bbe84c70 100644 --- a/src/libstd/rt/message_queue.rs +++ b/src/libstd/rt/message_queue.rs @@ -8,6 +8,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! A concurrent queue that supports multiple producers and a +//! single consumer. + use container::Container; use kinds::Owned; use vec::OwnedVector; diff --git a/src/libstd/rt/metrics.rs b/src/libstd/rt/metrics.rs new file mode 100644 index 0000000000000..70e347fdfb6ac --- /dev/null +++ b/src/libstd/rt/metrics.rs @@ -0,0 +1,88 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use to_str::ToStr; + +pub struct SchedMetrics { + // The number of times executing `run_sched_once`. + turns: uint, + // The number of turns that received a message. + messages_received: uint, + // The number of turns that ran a task from the queue. + tasks_resumed_from_queue: uint, + // The number of turns that found no work to perform. + wasted_turns: uint, + // The number of times the scheduler went to sleep. + sleepy_times: uint, + // Context switches from the scheduler into a task. + context_switches_sched_to_task: uint, + // Context switches from a task into the scheduler. + context_switches_task_to_sched: uint, + // Context switches from a task to a task. + context_switches_task_to_task: uint, + // Message sends that unblock the receiver + rendezvous_sends: uint, + // Message sends that do not unblock the receiver + non_rendezvous_sends: uint, + // Message receives that do not block the receiver + rendezvous_recvs: uint, + // Message receives that block the receiver + non_rendezvous_recvs: uint +} + +impl SchedMetrics { + pub fn new() -> SchedMetrics { + SchedMetrics { + turns: 0, + messages_received: 0, + tasks_resumed_from_queue: 0, + wasted_turns: 0, + sleepy_times: 0, + context_switches_sched_to_task: 0, + context_switches_task_to_sched: 0, + context_switches_task_to_task: 0, + rendezvous_sends: 0, + non_rendezvous_sends: 0, + rendezvous_recvs: 0, + non_rendezvous_recvs: 0 + } + } +} + +impl ToStr for SchedMetrics { + fn to_str(&self) -> ~str { + fmt!("turns: %u\n\ + messages_received: %u\n\ + tasks_resumed_from_queue: %u\n\ + wasted_turns: %u\n\ + sleepy_times: %u\n\ + context_switches_sched_to_task: %u\n\ + context_switches_task_to_sched: %u\n\ + context_switches_task_to_task: %u\n\ + rendezvous_sends: %u\n\ + non_rendezvous_sends: %u\n\ + rendezvous_recvs: %u\n\ + non_rendezvous_recvs: %u\n\ + ", + self.turns, + self.messages_received, + self.tasks_resumed_from_queue, + self.wasted_turns, + self.sleepy_times, + self.context_switches_sched_to_task, + self.context_switches_task_to_sched, + self.context_switches_task_to_task, + self.rendezvous_sends, + self.non_rendezvous_sends, + self.rendezvous_recvs, + self.non_rendezvous_recvs + ) + } +} \ No newline at end of file diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 2fac1df01a495..caf3e15e535af 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -55,6 +55,9 @@ Several modules in `core` are clients of `rt`: */ #[doc(hidden)]; +#[deny(unused_imports)]; +#[deny(unused_mut)]; +#[deny(unused_variable)]; use ptr::Ptr; @@ -88,6 +91,9 @@ mod work_queue; /// A parallel queue. mod message_queue; +/// A parallel data structure for tracking sleeping schedulers. +mod sleeper_list; + /// Stack segments and caching. mod stack; @@ -127,6 +133,8 @@ pub mod local_ptr; /// Bindings to pthread/windows thread-local storage. pub mod thread_local_storage; +pub mod metrics; + /// Set up a default runtime configuration, given compiler-supplied arguments. /// @@ -145,12 +153,17 @@ pub mod thread_local_storage; pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { use self::sched::{Scheduler, Coroutine}; + use self::work_queue::WorkQueue; use self::uv::uvio::UvEventLoop; + use self::sleeper_list::SleeperList; init(crate_map); let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_); + let work_queue = WorkQueue::new(); + let sleepers = SleeperList::new(); + let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); + sched.no_sleep = true; let main_task = ~Coroutine::new(&mut sched.stack_pool, main); sched.enqueue_task(main_task); @@ -218,23 +231,19 @@ pub fn context() -> RuntimeContext { fn test_context() { use unstable::run_in_bare_thread; use self::sched::{Scheduler, Coroutine}; - use rt::uv::uvio::UvEventLoop; - use cell::Cell; use rt::local::Local; + use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { assert_eq!(context(), SchedulerContext); - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + sched.enqueue_task(task); } }; sched.enqueue_task(task); diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 4b5eda22ff5de..fa657555f3aa0 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -18,6 +18,7 @@ use rt::uv::uvio; // XXX: ~object doesn't work currently so these are some placeholder // types to use instead pub type EventLoopObject = uvio::UvEventLoop; +pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; @@ -26,10 +27,20 @@ pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn callback_ms(&mut self, ms: u64, ~fn()); + fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; /// The asynchronous I/O services. Not all event loops may provide one fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>; } +pub trait RemoteCallback { + /// Trigger the remote callback. Note that the number of times the callback + /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire', + /// the callback will be called at least once, but multiple callbacks may be coalesced + /// and callbacks may be called more often requested. Destruction also triggers the + /// callback. + fn fire(&mut self); +} + pub trait IoFactory { fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>; fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>; diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 2d9cdaddc8433..df231f6d88aec 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -12,21 +12,48 @@ use option::*; use sys; use cast::transmute; use cell::Cell; +use clone::Clone; +use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::rtio::{EventLoop, EventLoopObject}; +use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::Task; +use super::message_queue::MessageQueue; use rt::local_ptr; use rt::local::Local; +use rt::rtio::RemoteCallback; +use rt::metrics::SchedMetrics; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by /// thread local storage and the running task is owned by the /// scheduler. +/// +/// XXX: This creates too many callbacks to run_sched_once, resulting +/// in too much allocation and too many events. pub struct Scheduler { + /// A queue of available work. Under a work-stealing policy there + /// is one per Scheduler. priv work_queue: WorkQueue<~Coroutine>, + /// The queue of incoming messages from other schedulers. + /// These are enqueued by SchedHandles after which a remote callback + /// is triggered to handle the message. + priv message_queue: MessageQueue, + /// A shared list of sleeping schedulers. We'll use this to wake + /// up schedulers when pushing work onto the work queue. + priv sleeper_list: SleeperList, + /// Indicates that we have previously pushed a handle onto the + /// SleeperList but have not yet received the Wake message. + /// Being `true` does not necessarily mean that the scheduler is + /// not active since there are multiple event sources that may + /// wake the scheduler. It just prevents the scheduler from pushing + /// multiple handles onto the sleeper list. + priv sleepy: bool, + /// A flag to indicate we've received the shutdown message and should + /// no longer try to go to sleep, but exit instead. + no_sleep: bool, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, @@ -37,19 +64,29 @@ pub struct Scheduler { current_task: Option<~Coroutine>, /// An action performed after a context switch on behalf of the /// code running before the context switch - priv cleanup_job: Option + priv cleanup_job: Option, + metrics: SchedMetrics } -// XXX: Some hacks to put a &fn in Scheduler without borrowck -// complaining -type UnsafeTaskReceiver = sys::Closure; -trait ClosureConverter { - fn from_fn(&fn(~Coroutine)) -> Self; - fn to_fn(self) -> &fn(~Coroutine); +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject, + priv queue: MessageQueue } -impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } + +pub struct Coroutine { + /// The segment of stack on which the task is currently running or, + /// if the task is blocked, on which the task will resume execution + priv current_stack_segment: StackSegment, + /// These are always valid when the task is not running, unless + /// the task is dead + priv saved_context: Context, + /// The heap, GC, unwinding, local storage, logging + task: ~Task +} + +pub enum SchedMessage { + Wake, + Shutdown } enum CleanupJob { @@ -61,18 +98,26 @@ pub impl Scheduler { fn in_task_context(&self) -> bool { self.current_task.is_some() } - fn new(event_loop: ~EventLoopObject) -> Scheduler { + fn new(event_loop: ~EventLoopObject, + work_queue: WorkQueue<~Coroutine>, + sleeper_list: SleeperList) + -> Scheduler { // Lazily initialize the runtime TLS key local_ptr::init_tls_key(); Scheduler { + sleeper_list: sleeper_list, + message_queue: MessageQueue::new(), + sleepy: false, + no_sleep: false, event_loop: event_loop, - work_queue: WorkQueue::new(), + work_queue: work_queue, stack_pool: StackPool::new(), saved_context: Context::empty(), current_task: None, - cleanup_job: None + cleanup_job: None, + metrics: SchedMetrics::new() } } @@ -85,6 +130,11 @@ pub impl Scheduler { let mut self_sched = self; + // Always run through the scheduler loop at least once so that + // we enter the sleep state and can then be woken up by other + // schedulers. + self_sched.event_loop.callback(Scheduler::run_sched_once); + unsafe { let event_loop: *mut ~EventLoopObject = { let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; @@ -98,10 +148,71 @@ pub impl Scheduler { } let sched = Local::take::(); - assert!(sched.work_queue.is_empty()); + // XXX: Reenable this once we're using a per-task queue. With a shared + // queue this is not true + //assert!(sched.work_queue.is_empty()); + rtdebug!("scheduler metrics: %s\n", sched.metrics.to_str()); return sched; } + fn run_sched_once() { + + let mut sched = Local::take::(); + sched.metrics.turns += 1; + + // First, check the message queue for instructions. + // XXX: perf. Check for messages without atomics. + // It's ok if we miss messages occasionally, as long as + // we sync and check again before sleeping. + if sched.interpret_message_queue() { + // We performed a scheduling action. There may be other work + // to do yet, so let's try again later. + let mut sched = Local::take::(); + sched.metrics.messages_received += 1; + sched.event_loop.callback(Scheduler::run_sched_once); + Local::put(sched); + return; + } + + // Now, look in the work queue for tasks to run + let sched = Local::take::(); + if sched.resume_task_from_queue() { + // We performed a scheduling action. There may be other work + // to do yet, so let's try again later. + let mut sched = Local::take::(); + sched.metrics.tasks_resumed_from_queue += 1; + sched.event_loop.callback(Scheduler::run_sched_once); + Local::put(sched); + return; + } + + // If we got here then there was no work to do. + // Generate a SchedHandle and push it to the sleeper list so + // somebody can wake us up later. + rtdebug!("no work to do"); + let mut sched = Local::take::(); + sched.metrics.wasted_turns += 1; + if !sched.sleepy && !sched.no_sleep { + rtdebug!("sleeping"); + sched.metrics.sleepy_times += 1; + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping"); + } + Local::put(sched); + } + + fn make_handle(&mut self) -> SchedHandle { + let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); + + return SchedHandle { + remote: remote, + queue: self.message_queue.clone() + }; + } + /// Schedule a task to be executed later. /// /// Pushes the task onto the work stealing queue and tells the event loop @@ -109,17 +220,69 @@ pub impl Scheduler { /// directly. fn enqueue_task(&mut self, task: ~Coroutine) { self.work_queue.push(task); - self.event_loop.callback(resume_task_from_queue); - - fn resume_task_from_queue() { - let scheduler = Local::take::(); - scheduler.resume_task_from_queue(); + self.event_loop.callback(Scheduler::run_sched_once); + + // We've made work available. Notify a sleeping scheduler. + // XXX: perf. Check for a sleeper without synchronizing memory. + // It's not critical that we always find it. + // XXX: perf. If there's a sleeper then we might as well just send + // it the task directly instead of pushing it to the + // queue. That is essentially the intent here and it is less + // work. + match self.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake) + } + None => (/* pass */) } } // * Scheduler-context operations - fn resume_task_from_queue(~self) { + fn interpret_message_queue(~self) -> bool { + assert!(!self.in_task_context()); + + rtdebug!("looking for scheduler messages"); + + let mut this = self; + match this.message_queue.pop() { + Some(Wake) => { + rtdebug!("recv Wake message"); + this.sleepy = false; + Local::put(this); + return true; + } + Some(Shutdown) => { + rtdebug!("recv Shutdown message"); + if this.sleepy { + // There may be an outstanding handle on the sleeper list. + // Pop them all to make sure that's not the case. + loop { + match this.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake); + } + None => break + } + } + } + // No more sleeping. After there are no outstanding event loop + // references we will shut down. + this.no_sleep = true; + this.sleepy = false; + Local::put(this); + return true; + } + None => { + Local::put(this); + return false; + } + } + } + + fn resume_task_from_queue(~self) -> bool { assert!(!self.in_task_context()); rtdebug!("looking in work queue for task to schedule"); @@ -129,10 +292,12 @@ pub impl Scheduler { Some(task) => { rtdebug!("resuming task from work queue"); this.resume_task_immediately(task); + return true; } None => { rtdebug!("no tasks in queue"); Local::put(this); + return false; } } } @@ -146,11 +311,9 @@ pub impl Scheduler { rtdebug!("ending running task"); - do self.deschedule_running_task_and_then |dead_task| { + do self.deschedule_running_task_and_then |sched, dead_task| { let dead_task = Cell(dead_task); - do Local::borrow:: |sched| { - dead_task.take().recycle(&mut sched.stack_pool); - } + dead_task.take().recycle(&mut sched.stack_pool); } abort!("control reached end of task"); @@ -159,22 +322,18 @@ pub impl Scheduler { fn schedule_new_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { + do self.switch_running_tasks_and_then(task) |sched, last_task| { let last_task = Cell(last_task); - do Local::borrow:: |sched| { - sched.enqueue_task(last_task.take()); - } + sched.enqueue_task(last_task.take()); } } fn schedule_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { + do self.switch_running_tasks_and_then(task) |sched, last_task| { let last_task = Cell(last_task); - do Local::borrow:: |sched| { - sched.enqueue_task(last_task.take()); - } + sched.enqueue_task(last_task.take()); } } @@ -185,6 +344,7 @@ pub impl Scheduler { assert!(!this.in_task_context()); rtdebug!("scheduling a task"); + this.metrics.context_switches_sched_to_task += 1; // Store the task in the scheduler so it can be grabbed later this.current_task = Some(task); @@ -218,15 +378,21 @@ pub impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) { + /// + /// This passes a Scheduler pointer to the fn after the context switch + /// in order to prevent that fn from performing further scheduling operations. + /// Doing further scheduling could easily result in infinite recursion. + fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); rtdebug!("blocking task"); + this.metrics.context_switches_task_to_sched += 1; unsafe { let blocked_task = this.current_task.swap_unwrap(); - let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f); + let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f); let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); } @@ -248,14 +414,19 @@ pub impl Scheduler { /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) { + fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, + f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); rtdebug!("switching tasks"); + this.metrics.context_switches_task_to_task += 1; let old_running_task = this.current_task.swap_unwrap(); - let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) }; + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f) + }; let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); this.current_task = Some(next_task); @@ -292,7 +463,7 @@ pub impl Scheduler { let cleanup_job = self.cleanup_job.swap_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => (f.to_fn())(task) + GiveTask(task, f) => (f.to_fn())(self, task) } } @@ -336,17 +507,11 @@ pub impl Scheduler { } } -static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack - -pub struct Coroutine { - /// The segment of stack on which the task is currently running or, - /// if the task is blocked, on which the task will resume execution - priv current_stack_segment: StackSegment, - /// These are always valid when the task is not running, unless - /// the task is dead - priv saved_context: Context, - /// The heap, GC, unwinding, local storage, logging - task: ~Task +impl SchedHandle { + pub fn send(&mut self, msg: SchedMessage) { + self.queue.push(msg); + self.remote.fire(); + } } pub impl Coroutine { @@ -357,6 +522,9 @@ pub impl Coroutine { fn with_task(stack_pool: &mut StackPool, task: ~Task, start: ~fn()) -> Coroutine { + + static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack + let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); // NB: Context holds a pointer to that ~fn @@ -370,6 +538,7 @@ pub impl Coroutine { priv fn build_start_wrapper(start: ~fn()) -> ~fn() { // XXX: The old code didn't have this extra allocation + let start_cell = Cell(start); let wrapper: ~fn() = || { // This is the first code to execute after the initial // context switch to the task. The previous context may @@ -381,7 +550,19 @@ pub impl Coroutine { let sched = Local::unsafe_borrow::(); let task = (*sched).current_task.get_mut_ref(); // FIXME #6141: shouldn't neet to put `start()` in another closure - task.task.run(||start()); + let start_cell = Cell(start_cell.take()); + do task.task.run { + // N.B. Removing `start` from the start wrapper closure + // by emptying a cell is critical for correctness. The ~Task + // pointer, and in turn the closure used to initialize the first + // call frame, is destroyed in scheduler context, not task context. + // So any captured closures must not contain user-definable dtors + // that expect to be in task context. By moving `start` out of + // the closure, all the user code goes out of scope while + // the task is still running. + let start = start_cell.take(); + start(); + }; } let sched = Local::take::(); @@ -400,16 +581,28 @@ pub impl Coroutine { } } +// XXX: Some hacks to put a &fn in Scheduler without borrowck +// complaining +type UnsafeTaskReceiver = sys::Closure; +trait ClosureConverter { + fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine); +} +impl ClosureConverter for UnsafeTaskReceiver { + fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } } +} + #[cfg(test)] mod test { use int; use cell::Cell; - use rt::uv::uvio::UvEventLoop; use unstable::run_in_bare_thread; use task::spawn; use rt::local::Local; use rt::test::*; use super::*; + use rt::thread::Thread; #[test] fn test_simple_scheduling() { @@ -417,7 +610,7 @@ mod test { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *task_ran_ptr = true; } }; @@ -434,7 +627,7 @@ mod test { let mut task_count = 0; let task_count_ptr: *mut int = &mut task_count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); for int::range(0, total) |_| { let task = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *task_count_ptr = *task_count_ptr + 1; } @@ -452,7 +645,7 @@ mod test { let mut count = 0; let count_ptr: *mut int = &mut count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task1 = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } let mut sched = Local::take::(); @@ -460,11 +653,9 @@ mod test { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |task1| { + do sched.switch_running_tasks_and_then(task2) |sched, task1| { let task1 = Cell(task1); - do Local::borrow:: |sched| { - sched.enqueue_task(task1.take()); - } + sched.enqueue_task(task1.take()); } unsafe { *count_ptr = *count_ptr + 1; } }; @@ -481,7 +672,7 @@ mod test { let mut count = 0; let count_ptr: *mut int = &mut count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let start_task = ~do Coroutine::new(&mut sched.stack_pool) { run_task(count_ptr); @@ -510,16 +701,14 @@ mod test { #[test] fn test_block_task() { do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~do Coroutine::new(&mut sched.stack_pool) { let sched = Local::take::(); assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { let task = Cell(task); - do Local::borrow:: |sched| { - assert!(!sched.in_task_context()); - sched.enqueue_task(task.take()); - } + assert!(!sched.in_task_context()); + sched.enqueue_task(task.take()); } }; sched.enqueue_task(task); @@ -536,8 +725,7 @@ mod test { do run_in_newsched_task { do spawn { let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { - let mut sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { let task = Cell(task); do sched.event_loop.callback_ms(10) { rtdebug!("in callback"); @@ -545,9 +733,148 @@ mod test { sched.enqueue_task(task.take()); Local::put(sched); } - Local::put(sched); } } } } + + #[test] + fn handle() { + use rt::comm::*; + + do run_in_bare_thread { + let (port, chan) = oneshot::<()>(); + let port_cell = Cell(port); + let chan_cell = Cell(chan); + let mut sched1 = ~new_test_uv_sched(); + let handle1 = sched1.make_handle(); + let handle1_cell = Cell(handle1); + let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { + chan_cell.take().send(()); + }; + sched1.enqueue_task(task1); + + let mut sched2 = ~new_test_uv_sched(); + let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { + port_cell.take().recv(); + // Release the other scheduler's handle so it can exit + handle1_cell.take(); + }; + sched2.enqueue_task(task2); + + let sched1_cell = Cell(sched1); + let _thread1 = do Thread::start { + let sched1 = sched1_cell.take(); + sched1.run(); + }; + + let sched2_cell = Cell(sched2); + let _thread2 = do Thread::start { + let sched2 = sched2_cell.take(); + sched2.run(); + }; + } + } + + #[test] + fn multithreading() { + use rt::comm::*; + use iter::Times; + use vec::OwnedVector; + use container::Container; + + do run_in_mt_newsched_task { + let mut ports = ~[]; + for 10.times { + let (port, chan) = oneshot(); + let chan_cell = Cell(chan); + do spawntask_later { + chan_cell.take().send(()); + } + ports.push(port); + } + + while !ports.is_empty() { + ports.pop().recv(); + } + } + } + + #[test] + fn thread_ring() { + use rt::comm::*; + use comm::{GenericPort, GenericChan}; + + do run_in_mt_newsched_task { + let (end_port, end_chan) = oneshot(); + + let n_tasks = 10; + let token = 2000; + + let mut (p, ch1) = stream(); + ch1.send((token, end_chan)); + let mut i = 2; + while i <= n_tasks { + let (next_p, ch) = stream(); + let imm_i = i; + let imm_p = p; + do spawntask_random { + roundtrip(imm_i, n_tasks, &imm_p, &ch); + }; + p = next_p; + i += 1; + } + let imm_p = p; + let imm_ch = ch1; + do spawntask_random { + roundtrip(1, n_tasks, &imm_p, &imm_ch); + } + + end_port.recv(); + } + + fn roundtrip(id: int, n_tasks: int, + p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) { + while (true) { + match p.recv() { + (1, end_chan) => { + debug!("%d\n", id); + end_chan.send(()); + return; + } + (token, end_chan) => { + debug!("thread: %d got token: %d", id, token); + ch.send((token - 1, end_chan)); + if token <= n_tasks { + return; + } + } + } + } + } + + } + + #[test] + fn start_closure_dtor() { + use ops::Drop; + + // Regression test that the `start` task entrypoint can contain dtors + // that use task resources + do run_in_newsched_task { + struct S { field: () } + + impl Drop for S { + fn finalize(&self) { + let _foo = @0; + } + } + + let s = S { field: () }; + + do spawntask { + let _ss = &s; + } + } + } } diff --git a/src/libstd/rt/sleeper_list.rs b/src/libstd/rt/sleeper_list.rs new file mode 100644 index 0000000000000..e2873e78d805f --- /dev/null +++ b/src/libstd/rt/sleeper_list.rs @@ -0,0 +1,55 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Maintains a shared list of sleeping schedulers. Schedulers +//! use this to wake each other up. + +use container::Container; +use vec::OwnedVector; +use option::{Option, Some, None}; +use cell::Cell; +use unstable::sync::{Exclusive, exclusive}; +use rt::sched::SchedHandle; +use clone::Clone; + +pub struct SleeperList { + priv stack: ~Exclusive<~[SchedHandle]> +} + +impl SleeperList { + pub fn new() -> SleeperList { + SleeperList { + stack: ~exclusive(~[]) + } + } + + pub fn push(&mut self, handle: SchedHandle) { + let handle = Cell(handle); + self.stack.with(|s| s.push(handle.take())); + } + + pub fn pop(&mut self) -> Option { + do self.stack.with |s| { + if !s.is_empty() { + Some(s.pop()) + } else { + None + } + } + } +} + +impl Clone for SleeperList { + fn clone(&self) -> SleeperList { + SleeperList { + stack: self.stack.clone() + } + } +} \ No newline at end of file diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index c60ae2bfeffc8..c8df3a6120338 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -9,13 +9,32 @@ // except according to those terms. use uint; -use option::*; +use option::{Some, None}; use cell::Cell; +use clone::Clone; +use container::Container; +use old_iter::MutableIter; +use vec::OwnedVector; use result::{Result, Ok, Err}; +use unstable::run_in_bare_thread; use super::io::net::ip::{IpAddr, Ipv4}; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; +use rt::sched::{Scheduler, Coroutine}; +use rt::sleeper_list::SleeperList; +use rt::work_queue::WorkQueue; + +pub fn new_test_uv_sched() -> Scheduler { + use rt::uv::uvio::UvEventLoop; + use rt::work_queue::WorkQueue; + use rt::sleeper_list::SleeperList; + + let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message + sched.no_sleep = true; + return sched; +} /// Creates a new scheduler in a new thread and runs a task in it, /// then waits for the scheduler to exit. Failure of the task @@ -23,12 +42,11 @@ use rt::local::Local; pub fn run_in_newsched_task(f: ~fn()) { use super::sched::*; use unstable::run_in_bare_thread; - use rt::uv::uvio::UvEventLoop; let f = Cell(f); do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f.take()); @@ -37,6 +55,79 @@ pub fn run_in_newsched_task(f: ~fn()) { } } +/// Create more than one scheduler and run a function in a task +/// in one of the schedulers. The schedulers will stay alive +/// until the function `f` returns. +pub fn run_in_mt_newsched_task(f: ~fn()) { + use libc; + use os; + use from_str::FromStr; + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + + let f_cell = Cell(f); + + do run_in_bare_thread { + let nthreads = match os::getenv("RUST_TEST_THREADS") { + Some(nstr) => FromStr::from_str(nstr).get(), + None => unsafe { + // Using more threads than cores in test code + // to force the OS to preempt them frequently. + // Assuming that this help stress test concurrent types. + rust_get_num_cpus() * 2 + } + }; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + for uint::range(0, nthreads) |_| { + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let handle = sched.make_handle(); + handles.push(handle); + scheds.push(sched); + } + + let f_cell = Cell(f_cell.take()); + let handles = Cell(handles); + let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { + f_cell.take()(); + + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.each_mut |handle| { + handle.send(Shutdown); + } + }; + + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + sched.run(); + }; + + threads.push(thread); + } + + // Wait for schedulers + let _threads = threads; + } + + extern { + fn rust_get_num_cpus() -> libc::uintptr_t; + } +} + /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { use super::sched::*; @@ -45,11 +136,7 @@ pub fn spawntask(f: ~fn()) { let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - let sched = Local::take::(); - sched.schedule_new_task(task.take()); - } + sched.schedule_new_task(task); } /// Create a new task and run it right now. Aborts on failure @@ -60,11 +147,8 @@ pub fn spawntask_immediately(f: ~fn()) { let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } @@ -95,11 +179,8 @@ pub fn spawntask_random(f: ~fn()) { f); if run_now { - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - do Local::borrow:: |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } else { sched.enqueue_task(task); @@ -122,10 +203,9 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Switch to the scheduler let f = Cell(Cell(f)); let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |old_task| { + do sched.deschedule_running_task_and_then() |sched, old_task| { let old_task = Cell(old_task); let f = f.take(); - let mut sched = Local::take::(); let new_task = ~do Coroutine::new(&mut sched.stack_pool) { do (|| { (f.take())() @@ -133,16 +213,13 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Check for failure then resume the parent task unsafe { *failed_ptr = task::failing(); } let sched = Local::take::(); - do sched.switch_running_tasks_and_then(old_task.take()) |new_task| { - let new_task = Cell(new_task); - do Local::borrow:: |sched| { - sched.enqueue_task(new_task.take()); - } + do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| { + sched.enqueue_task(new_task); } } }; - sched.resume_task_immediately(new_task); + sched.enqueue_task(new_task); } if !failed { Ok(()) } else { Err(()) } @@ -151,11 +228,10 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { use rt::sched::*; - use rt::uv::uvio::UvEventLoop; let f = Cell(f); let thread = do Thread::start { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f.take()); diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index b2f475a696605..4482a92d916aa 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -72,7 +72,7 @@ impl Tube { assert!(self.p.refcount() > 1); // There better be somebody to wake us up assert!((*state).blocked_task.is_none()); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |_, task| { (*state).blocked_task = Some(task); } rtdebug!("waking after tube recv"); @@ -107,11 +107,10 @@ mod test { let tube_clone = tube.clone(); let tube_clone_cell = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { let mut tube_clone = tube_clone_cell.take(); tube_clone.send(1); - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -123,21 +122,17 @@ mod test { do run_in_newsched_task { let mut tube: Tube = Tube::new(); let tube_clone = tube.clone(); - let tube_clone = Cell(Cell(Cell(tube_clone))); + let tube_clone = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { - let tube_clone = tube_clone.take(); - do Local::borrow:: |sched| { - let tube_clone = tube_clone.take(); - do sched.event_loop.callback { - let mut tube_clone = tube_clone.take(); - // The task should be blocked on this now and - // sending will wake it up. - tube_clone.send(1); - } + do sched.deschedule_running_task_and_then |sched, task| { + let tube_clone = Cell(tube_clone.take()); + do sched.event_loop.callback { + let mut tube_clone = tube_clone.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube_clone.send(1); } - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -153,7 +148,7 @@ mod test { let tube_clone = tube.clone(); let tube_clone = Cell(tube_clone); let sched = Local::take::(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { callback_send(tube_clone.take(), 0); fn callback_send(tube: Tube, i: int) { @@ -172,8 +167,7 @@ mod test { } } - let sched = Local::take::(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } for int::range(0, MAX) |i| { diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs new file mode 100644 index 0000000000000..6ed06cc10b78a --- /dev/null +++ b/src/libstd/rt/uv/async.rs @@ -0,0 +1,105 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc::{c_int, c_void}; +use option::Some; +use rt::uv::uvll; +use rt::uv::uvll::UV_ASYNC; +use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback}; +use rt::uv::WatcherInterop; +use rt::uv::status_to_maybe_uv_error; + +pub struct AsyncWatcher(*uvll::uv_async_t); +impl Watcher for AsyncWatcher { } + +impl AsyncWatcher { + pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { + unsafe { + let handle = uvll::malloc_handle(UV_ASYNC); + assert!(handle.is_not_null()); + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + let data = watcher.get_watcher_data(); + data.async_cb = Some(cb); + assert_eq!(0, uvll::async_init(loop_.native_handle(), handle, async_cb)); + return watcher; + } + + extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + let status = status_to_maybe_uv_error(watcher.native_handle(), status); + let data = watcher.get_watcher_data(); + let cb = data.async_cb.get_ref(); + (*cb)(watcher, status); + } + } + + pub fn send(&mut self) { + unsafe { + let handle = self.native_handle(); + uvll::async_send(handle); + } + } + + pub fn close(self, cb: NullCallback) { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + + unsafe { + uvll::close(self.native_handle(), close_cb); + } + + extern fn close_cb(handle: *uvll::uv_stream_t) { + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + { + let data = watcher.get_watcher_data(); + data.close_cb.swap_unwrap()(); + } + watcher.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *c_void); } + } + } +} + +impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher { + fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher { + AsyncWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_async_t { + match self { &AsyncWatcher(ptr) => ptr } + } +} + +#[cfg(test)] +mod test { + + use super::*; + use rt::uv::Loop; + use unstable::run_in_bare_thread; + use rt::thread::Thread; + use cell::Cell; + + #[test] + fn smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) ); + let watcher_cell = Cell(watcher); + let _thread = do Thread::start { + let mut watcher = watcher_cell.take(); + watcher.send(); + }; + loop_.run(); + loop_.close(); + } + } +} diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs index 2cf0b5c487288..a81ab48696a36 100644 --- a/src/libstd/rt/uv/idle.rs +++ b/src/libstd/rt/uv/idle.rs @@ -89,3 +89,65 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { match self { &IdleWatcher(ptr) => ptr } } } + +#[cfg(test)] +mod test { + + use rt::uv::Loop; + use super::*; + use unstable::run_in_bare_thread; + + #[test] + #[ignore(reason = "valgrind - loop destroyed before watcher?")] + fn idle_new_then_close() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let idle_watcher = { IdleWatcher::new(&mut loop_) }; + idle_watcher.close(||()); + } + } + + #[test] + fn idle_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + let mut count = 10; + let count_ptr: *mut int = &mut count; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + assert!(status.is_none()); + if unsafe { *count_ptr == 10 } { + idle_watcher.stop(); + idle_watcher.close(||()); + } else { + unsafe { *count_ptr = *count_ptr + 1; } + } + } + loop_.run(); + loop_.close(); + assert_eq!(count, 10); + } + } + + #[test] + fn idle_start_stop_start() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + assert!(status.is_none()); + idle_watcher.stop(); + do idle_watcher.start |idle_watcher, status| { + assert!(status.is_none()); + let mut idle_watcher = idle_watcher; + idle_watcher.stop(); + idle_watcher.close(||()); + } + } + loop_.run(); + loop_.close(); + } + } +} diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 2bd657fd8641f..5f9e56608149f 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -57,6 +57,7 @@ pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; +pub use self::async::AsyncWatcher; /// The implementation of `rtio` for libuv pub mod uvio; @@ -68,6 +69,7 @@ pub mod file; pub mod net; pub mod idle; pub mod timer; +pub mod async; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -125,6 +127,7 @@ pub type IdleCallback = ~fn(IdleWatcher, Option); pub type ConnectionCallback = ~fn(StreamWatcher, Option); pub type FsCallback = ~fn(FsRequest, Option); pub type TimerCallback = ~fn(TimerWatcher, Option); +pub type AsyncCallback = ~fn(AsyncWatcher, Option); /// Callbacks used by StreamWatchers, set as custom data on the foreign handle @@ -135,7 +138,8 @@ struct WatcherData { close_cb: Option, alloc_cb: Option, idle_cb: Option, - timer_cb: Option + timer_cb: Option, + async_cb: Option } pub trait WatcherInterop { @@ -164,7 +168,8 @@ impl> WatcherInterop for W { close_cb: None, alloc_cb: None, idle_cb: None, - timer_cb: None + timer_cb: None, + async_cb: None }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); @@ -364,57 +369,3 @@ fn loop_smoke_test() { loop_.close(); } } - -#[test] -#[ignore(reason = "valgrind - loop destroyed before watcher?")] -fn idle_new_then_close() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let idle_watcher = { IdleWatcher::new(&mut loop_) }; - idle_watcher.close(||()); - } -} - -#[test] -fn idle_smoke_test() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - let mut count = 10; - let count_ptr: *mut int = &mut count; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - assert!(status.is_none()); - if unsafe { *count_ptr == 10 } { - idle_watcher.stop(); - idle_watcher.close(||()); - } else { - unsafe { *count_ptr = *count_ptr + 1; } - } - } - loop_.run(); - loop_.close(); - assert_eq!(count, 10); - } -} - -#[test] -fn idle_start_stop_start() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - assert!(status.is_none()); - idle_watcher.stop(); - do idle_watcher.start |idle_watcher, status| { - assert!(status.is_none()); - let mut idle_watcher = idle_watcher; - idle_watcher.stop(); - idle_watcher.close(||()); - } - } - loop_.run(); - loop_.close(); - } -} diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index cacd67314ebac..0f98ab11513d6 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -12,6 +12,7 @@ use option::*; use result::*; use ops::Drop; use cell::{Cell, empty_cell}; +use cast; use cast::transmute; use clone::Clone; use rt::io::IoError; @@ -23,6 +24,7 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; +use unstable::sync::{Exclusive, exclusive}; #[cfg(test)] use container::Container; #[cfg(test)] use uint; @@ -39,11 +41,6 @@ pub impl UvEventLoop { uvio: UvIoFactory(Loop::new()) } } - - /// A convenience constructor - fn new_scheduler() -> Scheduler { - Scheduler::new(~UvEventLoop::new()) - } } impl Drop for UvEventLoop { @@ -82,6 +79,10 @@ impl EventLoop for UvEventLoop { } } + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject { + ~UvRemoteCallback::new(self.uvio.uv_loop(), f) + } + fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> { Some(&mut self.uvio) } @@ -101,6 +102,88 @@ fn test_callback_run_once() { } } +pub struct UvRemoteCallback { + // The uv async handle for triggering the callback + async: AsyncWatcher, + // A flag to tell the callback to exit, set from the dtor. This is + // almost never contested - only in rare races with the dtor. + exit_flag: Exclusive +} + +impl UvRemoteCallback { + pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback { + let exit_flag = exclusive(false); + let exit_flag_clone = exit_flag.clone(); + let async = do AsyncWatcher::new(loop_) |watcher, status| { + assert!(status.is_none()); + f(); + do exit_flag_clone.with_imm |&should_exit| { + if should_exit { + watcher.close(||()); + } + } + }; + UvRemoteCallback { + async: async, + exit_flag: exit_flag + } + } +} + +impl RemoteCallback for UvRemoteCallback { + fn fire(&mut self) { self.async.send() } +} + +impl Drop for UvRemoteCallback { + fn finalize(&self) { + unsafe { + let this: &mut UvRemoteCallback = cast::transmute_mut(self); + do this.exit_flag.with |should_exit| { + // NB: These two things need to happen atomically. Otherwise + // the event handler could wake up due to a *previous* + // signal and see the exit flag, destroying the handle + // before the final send. + *should_exit = true; + this.async.send(); + } + } + } +} + +#[cfg(test)] +mod test_remote { + use cell; + use cell::Cell; + use rt::test::*; + use rt::thread::Thread; + use rt::tube::Tube; + use rt::rtio::EventLoop; + use rt::local::Local; + use rt::sched::Scheduler; + + #[test] + fn test_uv_remote() { + do run_in_newsched_task { + let mut tube = Tube::new(); + let tube_clone = tube.clone(); + let remote_cell = cell::empty_cell(); + do Local::borrow::() |sched| { + let tube_clone = tube_clone.clone(); + let tube_clone_cell = Cell(tube_clone); + let remote = do sched.event_loop.remote_callback { + tube_clone_cell.take().send(1); + }; + remote_cell.put_back(remote); + } + let _thread = do Thread::start { + remote_cell.take().fire(); + }; + + assert!(tube.recv() == 1); + } + } +} + pub struct UvIoFactory(Loop); pub impl UvIoFactory { @@ -123,12 +206,10 @@ impl IoFactory for UvIoFactory { assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("connect: entered scheduler context"); - do Local::borrow:: |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell(task); @@ -168,7 +249,7 @@ impl IoFactory for UvIoFactory { Ok(_) => Ok(~UvTcpListener::new(watcher)), Err(uverr) => { let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.as_stream().close { let scheduler = Local::take::(); @@ -204,7 +285,7 @@ impl Drop for UvTcpListener { fn finalize(&self) { let watcher = self.watcher(); let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.as_stream().close { let scheduler = Local::take::(); @@ -266,7 +347,7 @@ impl Drop for UvTcpStream { rtdebug!("closing tcp stream"); let watcher = self.watcher(); let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.close { let scheduler = Local::take::(); @@ -285,11 +366,9 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("read: entered scheduler context"); - do Local::borrow:: |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut watcher = watcher; let task_cell = Cell(task); // XXX: We shouldn't reallocate these callbacks every @@ -331,7 +410,7 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let mut watcher = watcher; let task_cell = Cell(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; @@ -425,11 +504,9 @@ fn test_read_and_block() { // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { let task = Cell(task); - do Local::borrow:: |scheduler| { - scheduler.enqueue_task(task.take()); - } + sched.enqueue_task(task.take()); } } diff --git a/src/libstd/unstable/atomics.rs b/src/libstd/unstable/atomics.rs index ab2b5d8ea2b3b..58d0c01f990d0 100644 --- a/src/libstd/unstable/atomics.rs +++ b/src/libstd/unstable/atomics.rs @@ -75,7 +75,7 @@ pub enum Ordering { impl AtomicFlag { - fn new() -> AtomicFlag { + pub fn new() -> AtomicFlag { AtomicFlag { v: 0 } } @@ -83,7 +83,7 @@ impl AtomicFlag { * Clears the atomic flag */ #[inline(always)] - fn clear(&mut self, order: Ordering) { + pub fn clear(&mut self, order: Ordering) { unsafe {atomic_store(&mut self.v, 0, order)} } @@ -92,37 +92,37 @@ impl AtomicFlag { * flag. */ #[inline(always)] - fn test_and_set(&mut self, order: Ordering) -> bool { + pub fn test_and_set(&mut self, order: Ordering) -> bool { unsafe {atomic_compare_and_swap(&mut self.v, 0, 1, order) > 0} } } impl AtomicBool { - fn new(v: bool) -> AtomicBool { + pub fn new(v: bool) -> AtomicBool { AtomicBool { v: if v { 1 } else { 0 } } } #[inline(always)] - fn load(&self, order: Ordering) -> bool { + pub fn load(&self, order: Ordering) -> bool { unsafe { atomic_load(&self.v, order) > 0 } } #[inline(always)] - fn store(&mut self, val: bool, order: Ordering) { + pub fn store(&mut self, val: bool, order: Ordering) { let val = if val { 1 } else { 0 }; unsafe { atomic_store(&mut self.v, val, order); } } #[inline(always)] - fn swap(&mut self, val: bool, order: Ordering) -> bool { + pub fn swap(&mut self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; unsafe { atomic_swap(&mut self.v, val, order) > 0} } #[inline(always)] - fn compare_and_swap(&mut self, old: bool, new: bool, order: Ordering) -> bool { + pub fn compare_and_swap(&mut self, old: bool, new: bool, order: Ordering) -> bool { let old = if old { 1 } else { 0 }; let new = if new { 1 } else { 0 }; @@ -131,105 +131,105 @@ impl AtomicBool { } impl AtomicInt { - fn new(v: int) -> AtomicInt { + pub fn new(v: int) -> AtomicInt { AtomicInt { v:v } } #[inline(always)] - fn load(&self, order: Ordering) -> int { + pub fn load(&self, order: Ordering) -> int { unsafe { atomic_load(&self.v, order) } } #[inline(always)] - fn store(&mut self, val: int, order: Ordering) { + pub fn store(&mut self, val: int, order: Ordering) { unsafe { atomic_store(&mut self.v, val, order); } } #[inline(always)] - fn swap(&mut self, val: int, order: Ordering) -> int { + pub fn swap(&mut self, val: int, order: Ordering) -> int { unsafe { atomic_swap(&mut self.v, val, order) } } #[inline(always)] - fn compare_and_swap(&mut self, old: int, new: int, order: Ordering) -> int { + pub fn compare_and_swap(&mut self, old: int, new: int, order: Ordering) -> int { unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } } #[inline(always)] - fn fetch_add(&mut self, val: int, order: Ordering) -> int { + pub fn fetch_add(&mut self, val: int, order: Ordering) -> int { unsafe { atomic_add(&mut self.v, val, order) } } #[inline(always)] - fn fetch_sub(&mut self, val: int, order: Ordering) -> int { + pub fn fetch_sub(&mut self, val: int, order: Ordering) -> int { unsafe { atomic_sub(&mut self.v, val, order) } } } impl AtomicUint { - fn new(v: uint) -> AtomicUint { + pub fn new(v: uint) -> AtomicUint { AtomicUint { v:v } } #[inline(always)] - fn load(&self, order: Ordering) -> uint { + pub fn load(&self, order: Ordering) -> uint { unsafe { atomic_load(&self.v, order) } } #[inline(always)] - fn store(&mut self, val: uint, order: Ordering) { + pub fn store(&mut self, val: uint, order: Ordering) { unsafe { atomic_store(&mut self.v, val, order); } } #[inline(always)] - fn swap(&mut self, val: uint, order: Ordering) -> uint { + pub fn swap(&mut self, val: uint, order: Ordering) -> uint { unsafe { atomic_swap(&mut self.v, val, order) } } #[inline(always)] - fn compare_and_swap(&mut self, old: uint, new: uint, order: Ordering) -> uint { + pub fn compare_and_swap(&mut self, old: uint, new: uint, order: Ordering) -> uint { unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } } #[inline(always)] - fn fetch_add(&mut self, val: uint, order: Ordering) -> uint { + pub fn fetch_add(&mut self, val: uint, order: Ordering) -> uint { unsafe { atomic_add(&mut self.v, val, order) } } #[inline(always)] - fn fetch_sub(&mut self, val: uint, order: Ordering) -> uint { + pub fn fetch_sub(&mut self, val: uint, order: Ordering) -> uint { unsafe { atomic_sub(&mut self.v, val, order) } } } impl AtomicPtr { - fn new(p: *mut T) -> AtomicPtr { + pub fn new(p: *mut T) -> AtomicPtr { AtomicPtr { p:p } } #[inline(always)] - fn load(&self, order: Ordering) -> *mut T { + pub fn load(&self, order: Ordering) -> *mut T { unsafe { atomic_load(&self.p, order) } } #[inline(always)] - fn store(&mut self, ptr: *mut T, order: Ordering) { + pub fn store(&mut self, ptr: *mut T, order: Ordering) { unsafe { atomic_store(&mut self.p, ptr, order); } } #[inline(always)] - fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { + pub fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { unsafe { atomic_swap(&mut self.p, ptr, order) } } #[inline(always)] - fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { + pub fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { unsafe { atomic_compare_and_swap(&mut self.p, old, new, order) } } } impl AtomicOption { - fn new(p: ~T) -> AtomicOption { + pub fn new(p: ~T) -> AtomicOption { unsafe { AtomicOption { p: cast::transmute(p) @@ -237,7 +237,7 @@ impl AtomicOption { } } - fn empty() -> AtomicOption { + pub fn empty() -> AtomicOption { unsafe { AtomicOption { p: cast::transmute(0) @@ -246,7 +246,7 @@ impl AtomicOption { } #[inline(always)] - fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> { + pub fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> { unsafe { let val = cast::transmute(val); @@ -262,7 +262,7 @@ impl AtomicOption { } #[inline(always)] - fn take(&mut self, order: Ordering) -> Option<~T> { + pub fn take(&mut self, order: Ordering) -> Option<~T> { unsafe { self.swap(cast::transmute(0), order) } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 734368c70c4a0..6085ca1a482ee 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -205,8 +205,53 @@ extern { fn rust_unlock_little_lock(lock: rust_little_lock); } +/* *********************************************************************/ + +//FIXME: #5042 This should be replaced by proper atomic type +pub struct AtomicUint { + priv inner: uint +} + +impl AtomicUint { + pub fn new(val: uint) -> AtomicUint { AtomicUint { inner: val } } + pub fn load(&self) -> uint { + unsafe { intrinsics::atomic_load(cast::transmute(self)) as uint } + } + pub fn store(&mut self, val: uint) { + unsafe { intrinsics::atomic_store(cast::transmute(self), val as int); } + } + pub fn add(&mut self, val: int) -> uint { + unsafe { intrinsics::atomic_xadd(cast::transmute(self), val as int) as uint } + } + pub fn cas(&mut self, old:uint, new: uint) -> uint { + unsafe { intrinsics::atomic_cxchg(cast::transmute(self), old as int, new as int) as uint } + } +} + +pub struct AtomicInt { + priv inner: int +} + +impl AtomicInt { + pub fn new(val: int) -> AtomicInt { AtomicInt { inner: val } } + pub fn load(&self) -> int { + unsafe { intrinsics::atomic_load(&self.inner) } + } + pub fn store(&mut self, val: int) { + unsafe { intrinsics::atomic_store(&mut self.inner, val); } + } + pub fn add(&mut self, val: int) -> int { + unsafe { intrinsics::atomic_xadd(&mut self.inner, val) } + } + pub fn cas(&mut self, old: int, new: int) -> int { + unsafe { intrinsics::atomic_cxchg(&mut self.inner, old, new) } + } +} + + #[cfg(test)] mod tests { + use super::*; use comm; use super::exclusive; use task; @@ -258,4 +303,28 @@ mod tests { assert_eq!(*one, 1); } } + + #[test] + fn atomic_int_smoke_test() { + let mut i = AtomicInt::new(0); + i.store(10); + assert!(i.load() == 10); + assert!(i.add(1) == 10); + assert!(i.load() == 11); + assert!(i.cas(11, 12) == 11); + assert!(i.cas(11, 13) == 12); + assert!(i.load() == 12); + } + + #[test] + fn atomic_uint_smoke_test() { + let mut i = AtomicUint::new(0); + i.store(10); + assert!(i.load() == 10); + assert!(i.add(1) == 10); + assert!(i.load() == 11); + assert!(i.cas(11, 12) == 11); + assert!(i.cas(11, 13) == 12); + assert!(i.load() == 12); + } } diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 5e7357c9b7b25..fe4e75fb8d21f 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -930,6 +930,13 @@ rust_begin_unwind(uintptr_t token) { #endif } +extern int get_num_cpus(); + +extern "C" CDECL uintptr_t +rust_get_num_cpus() { + return get_num_cpus(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_env.cpp b/src/rt/rust_env.cpp index ed38be3550f74..c3d38851e7bb2 100644 --- a/src/rt/rust_env.cpp +++ b/src/rt/rust_env.cpp @@ -40,7 +40,7 @@ rust_drop_env_lock() { } #if defined(__WIN32__) -static int +int get_num_cpus() { SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); @@ -48,7 +48,7 @@ get_num_cpus() { return (int) sysinfo.dwNumberOfProcessors; } #elif defined(__BSD__) -static int +int get_num_cpus() { /* swiped from http://stackoverflow.com/questions/150355/ programmatically-find-the-number-of-cores-on-a-machine */ @@ -75,7 +75,7 @@ get_num_cpus() { return numCPU; } #elif defined(__GNUC__) -static int +int get_num_cpus() { return sysconf(_SC_NPROCESSORS_ONLN); } diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index e3e522aa7ceec..9b49583519eca 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -239,3 +239,4 @@ rust_valgrind_stack_deregister rust_take_env_lock rust_drop_env_lock rust_update_log_settings +rust_get_num_cpus \ No newline at end of file