diff --git a/src/doc/guide-tasks.md b/src/doc/guide-tasks.md index 387f481025df7..9ff712df02152 100644 --- a/src/doc/guide-tasks.md +++ b/src/doc/guide-tasks.md @@ -232,7 +232,7 @@ Instead we can use a `SharedChan`, a type that allows a single ~~~ # use std::task::spawn; -let (port, chan) = SharedChan::new(); +let (port, chan) = Chan::new(); for init_val in range(0u, 3) { // Create a new channel handle to distribute to the child task diff --git a/src/libextra/test.rs b/src/libextra/test.rs index d207bd2298be8..9ebd91bdfb6cd 100644 --- a/src/libextra/test.rs +++ b/src/libextra/test.rs @@ -767,7 +767,7 @@ fn run_tests(opts: &TestOpts, remaining.reverse(); let mut pending = 0; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); while pending > 0 || !remaining.is_empty() { while pending < concurrency && !remaining.is_empty() { @@ -878,7 +878,7 @@ pub fn filter_tests( pub fn run_test(force_ignore: bool, test: TestDescAndFn, - monitor_ch: SharedChan) { + monitor_ch: Chan) { let TestDescAndFn {desc, testfn} = test; @@ -888,7 +888,7 @@ pub fn run_test(force_ignore: bool, } fn run_test_inner(desc: TestDesc, - monitor_ch: SharedChan, + monitor_ch: Chan, testfn: proc()) { spawn(proc() { let mut task = task::task(); @@ -1260,7 +1260,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert!(res != TrOk); @@ -1277,7 +1277,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrIgnored); @@ -1294,7 +1294,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrOk); @@ -1311,7 +1311,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrFailed); diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index b6738f25c68be..834bf7951ef1c 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -193,6 +193,7 @@ use task::GreenTask; mod macros; mod simple; +mod message_queue; pub mod basic; pub mod context; @@ -314,7 +315,7 @@ pub struct SchedPool { #[deriving(Clone)] struct TaskState { cnt: UnsafeArc, - done: SharedChan<()>, + done: Chan<()>, } impl SchedPool { @@ -468,7 +469,7 @@ impl SchedPool { impl TaskState { fn new() -> (Port<()>, TaskState) { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); (p, TaskState { cnt: UnsafeArc::new(AtomicUint::new(0)), done: c, diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs new file mode 100644 index 0000000000000..3a118476affb7 --- /dev/null +++ b/src/libgreen/message_queue.rs @@ -0,0 +1,61 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use mpsc = std::sync::mpsc_queue; +use std::sync::arc::UnsafeArc; + +pub enum PopResult { + Inconsistent, + Empty, + Data(T), +} + +pub fn queue() -> (Consumer, Producer) { + let (a, b) = UnsafeArc::new2(mpsc::Queue::new()); + (Consumer { inner: a }, Producer { inner: b }) +} + +pub struct Producer { + priv inner: UnsafeArc>, +} + +pub struct Consumer { + priv inner: UnsafeArc>, +} + +impl Consumer { + pub fn pop(&mut self) -> PopResult { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => Inconsistent, + mpsc::Empty => Empty, + mpsc::Data(t) => Data(t), + } + } + + pub fn casual_pop(&mut self) -> Option { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => None, + mpsc::Empty => None, + mpsc::Data(t) => Some(t), + } + } +} + +impl Producer { + pub fn push(&mut self, t: T) { + unsafe { (*self.inner.get()).push(t); } + } +} + +impl Clone for Producer { + fn clone(&self) -> Producer { + Producer { inner: self.inner.clone() } + } +} diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 2b090ac94d785..bf6e0c3430e9c 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -17,7 +17,6 @@ use std::rt::task::Task; use std::sync::deque; use std::unstable::mutex::Mutex; use std::unstable::raw; -use mpsc = std::sync::mpsc_queue; use TaskState; use context::Context; @@ -25,6 +24,7 @@ use coroutine::Coroutine; use sleeper_list::SleeperList; use stack::StackPool; use task::{TypeSched, GreenTask, HomeSched, AnySched}; +use msgq = message_queue; /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified @@ -47,9 +47,9 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - message_queue: mpsc::Consumer, + message_queue: msgq::Consumer, /// Producer used to clone sched handles from - message_producer: mpsc::Producer, + message_producer: msgq::Producer, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -143,7 +143,7 @@ impl Scheduler { state: TaskState) -> Scheduler { - let (consumer, producer) = mpsc::queue(()); + let (consumer, producer) = msgq::queue(); let mut sched = Scheduler { pool_id: pool_id, sleeper_list: sleeper_list, @@ -215,7 +215,7 @@ impl Scheduler { // Should not have any messages let message = stask.sched.get_mut_ref().message_queue.pop(); - rtassert!(match message { mpsc::Empty => true, _ => false }); + rtassert!(match message { msgq::Empty => true, _ => false }); stask.task.get_mut_ref().destroyed = true; } @@ -340,8 +340,8 @@ impl Scheduler { // // I have chosen to take route #2. match self.message_queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty | mpsc::Inconsistent => None + msgq::Data(t) => Some(t), + msgq::Empty | msgq::Inconsistent => None } }; @@ -849,7 +849,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: mpsc::Producer, + priv queue: msgq::Producer, sched_id: uint } diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index c94554f510e46..69ef10ac11bea 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -22,7 +22,6 @@ //! that you would find on the respective platform. use std::c_str::CString; -use std::comm::SharedChan; use std::io; use std::io::IoError; use std::io::net::ip::SocketAddr; @@ -289,7 +288,7 @@ impl rtio::IoFactory for IoFactory { }) } } - fn signal(&mut self, _signal: Signum, _channel: SharedChan) + fn signal(&mut self, _signal: Signum, _channel: Chan) -> IoResult<~RtioSignal> { Err(unimpl()) } diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs index c00b0efadb5ee..2c976e67d25b3 100644 --- a/src/libnative/io/timer_helper.rs +++ b/src/libnative/io/timer_helper.rs @@ -33,7 +33,7 @@ use task; // only torn down after everything else has exited. This means that these // variables are read-only during use (after initialization) and both of which // are safe to use concurrently. -static mut HELPER_CHAN: *mut SharedChan = 0 as *mut SharedChan; +static mut HELPER_CHAN: *mut Chan = 0 as *mut Chan; static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal; pub fn boot(helper: fn(imp::signal, Port)) { @@ -43,7 +43,9 @@ pub fn boot(helper: fn(imp::signal, Port)) { unsafe { LOCK.lock(); if !INITIALIZED { - let (msgp, msgc) = SharedChan::new(); + let (msgp, msgc) = Chan::new(); + // promote this to a shared channel + drop(msgc.clone()); HELPER_CHAN = cast::transmute(~msgc); let (receive, send) = imp::new(); HELPER_SIGNAL = send; @@ -84,8 +86,8 @@ fn shutdown() { // Clean up after ther helper thread unsafe { imp::close(HELPER_SIGNAL); - let _chan: ~SharedChan = cast::transmute(HELPER_CHAN); - HELPER_CHAN = 0 as *mut SharedChan; + let _chan: ~Chan = cast::transmute(HELPER_CHAN); + HELPER_CHAN = 0 as *mut Chan; HELPER_SIGNAL = 0 as imp::signal; } } diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 358582d436b32..5b697e0d73d08 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -24,6 +24,7 @@ use std::cast; use std::libc::{c_void, c_int}; use std::rt::task::BlockedTask; use std::unstable::sync::LittleLock; +use std::sync::arc::UnsafeArc; use mpsc = std::sync::mpsc_queue; use async::AsyncWatcher; @@ -39,46 +40,46 @@ enum Message { struct State { handle: *uvll::uv_async_t, lock: LittleLock, // see comments in async_cb for why this is needed + queue: mpsc::Queue, } /// This structure is intended to be stored next to the event loop, and it is /// used to create new `Queue` structures. pub struct QueuePool { - priv producer: mpsc::Producer, - priv consumer: mpsc::Consumer, + priv queue: UnsafeArc, priv refcnt: uint, } /// This type is used to send messages back to the original event loop. pub struct Queue { - priv queue: mpsc::Producer, + priv queue: UnsafeArc, } extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { assert_eq!(status, 0); - let state: &mut QueuePool = unsafe { + let pool: &mut QueuePool = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; - let packet = unsafe { state.consumer.packet() }; + let state: &mut State = unsafe { cast::transmute(pool.queue.get()) }; // Remember that there is no guarantee about how many times an async // callback is called with relation to the number of sends, so process the // entire queue in a loop. loop { - match state.consumer.pop() { + match state.queue.pop() { mpsc::Data(Task(task)) => { let _ = task.wake().map(|t| t.reawaken()); } mpsc::Data(Increment) => unsafe { - if state.refcnt == 0 { - uvll::uv_ref((*packet).handle); + if pool.refcnt == 0 { + uvll::uv_ref(state.handle); } - state.refcnt += 1; + pool.refcnt += 1; }, mpsc::Data(Decrement) => unsafe { - state.refcnt -= 1; - if state.refcnt == 0 { - uvll::uv_unref((*packet).handle); + pool.refcnt -= 1; + if pool.refcnt == 0 { + uvll::uv_unref(state.handle); } }, mpsc::Empty | mpsc::Inconsistent => break @@ -99,9 +100,9 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { // If we acquire the mutex here, then we are guaranteed that there are no // longer any senders which are holding on to their handles, so we can // safely allow the event loop to exit. - if state.refcnt == 0 { + if pool.refcnt == 0 { unsafe { - let _l = (*packet).lock.lock(); + let _l = state.lock.lock(); } } } @@ -109,14 +110,14 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { impl QueuePool { pub fn new(loop_: &mut Loop) -> ~QueuePool { let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); - let (c, p) = mpsc::queue(State { + let state = UnsafeArc::new(State { handle: handle, lock: LittleLock::new(), + queue: mpsc::Queue::new(), }); let q = ~QueuePool { - producer: p, - consumer: c, refcnt: 0, + queue: state, }; unsafe { @@ -132,23 +133,23 @@ impl QueuePool { pub fn queue(&mut self) -> Queue { unsafe { if self.refcnt == 0 { - uvll::uv_ref((*self.producer.packet()).handle); + uvll::uv_ref((*self.queue.get()).handle); } self.refcnt += 1; } - Queue { queue: self.producer.clone() } + Queue { queue: self.queue.clone() } } pub fn handle(&self) -> *uvll::uv_async_t { - unsafe { (*self.producer.packet()).handle } + unsafe { (*self.queue.get()).handle } } } impl Queue { pub fn push(&mut self, task: BlockedTask) { - self.queue.push(Task(task)); unsafe { - uvll::uv_async_send((*self.queue.packet()).handle); + (*self.queue.get()).queue.push(Task(task)); + uvll::uv_async_send((*self.queue.get()).handle); } } } @@ -161,7 +162,7 @@ impl Clone for Queue { // and if the queue is dropped later on it'll see the increment for the // decrement anyway. unsafe { - cast::transmute_mut(self).queue.push(Increment); + (*self.queue.get()).queue.push(Increment); } Queue { queue: self.queue.clone() } } @@ -172,9 +173,9 @@ impl Drop for Queue { // See the comments in the async_cb function for why there is a lock // that is acquired only on a drop. unsafe { - let state = self.queue.packet(); + let state = self.queue.get(); let _l = (*state).lock.lock(); - self.queue.push(Decrement); + (*state).queue.push(Decrement); uvll::uv_async_send((*state).handle); } } diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index 2fcc61be79bdf..0a66c3445ee42 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -10,7 +10,6 @@ use std::libc::c_int; use std::io::signal::Signum; -use std::comm::SharedChan; use std::rt::rtio::RtioSignal; use homing::{HomingIO, HomeHandle}; @@ -22,13 +21,13 @@ pub struct SignalWatcher { handle: *uvll::uv_signal_t, home: HomeHandle, - channel: SharedChan, + channel: Chan, signal: Signum, } impl SignalWatcher { pub fn new(io: &mut UvIoFactory, signum: Signum, - channel: SharedChan) -> Result<~SignalWatcher, UvError> { + channel: Chan) -> Result<~SignalWatcher, UvError> { let s = ~SignalWatcher { handle: UvHandle::alloc(None::, uvll::UV_SIGNAL), home: io.make_handle(), @@ -81,7 +80,7 @@ mod test { #[test] fn closing_channel_during_drop_doesnt_kill_everything() { // see issue #10375, relates to timers as well. - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let _signal = SignalWatcher::new(local_loop(), signal::Interrupt, chan); diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 8a8ef4a41ec4e..54db4b4d3d13f 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -10,7 +10,6 @@ use std::c_str::CString; use std::cast; -use std::comm::SharedChan; use std::io::IoError; use std::io::net::ip::SocketAddr; use std::io::process::ProcessConfig; @@ -304,7 +303,7 @@ impl IoFactory for UvIoFactory { } } - fn signal(&mut self, signum: Signum, channel: SharedChan) + fn signal(&mut self, signum: Signum, channel: Chan) -> Result<~rtio::RtioSignal, IoError> { match SignalWatcher::new(self, signum, channel) { Ok(s) => Ok(s as ~rtio::RtioSignal), diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 34bf83ac49101..3487a92d8491c 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -15,17 +15,16 @@ //! communication between concurrent tasks. The primitives defined in this //! module are the building blocks for synchronization in rust. //! -//! This module currently provides three main types: +//! This module currently provides two types: //! //! * `Chan` //! * `Port` -//! * `SharedChan` //! -//! The `Chan` and `SharedChan` types are used to send data to a `Port`. A -//! `SharedChan` is clone-able such that many tasks can send simultaneously to -//! one receiving port. These communication primitives are *task blocking*, not -//! *thread blocking*. This means that if one task is blocked on a channel, -//! other tasks can continue to make progress. +//! `Chan` is used to send data to a `Port`. A `Chan` is clone-able such that +//! many tasks can send simultaneously to one receiving port. These +//! communication primitives are *task blocking*, not *thread blocking*. This +//! means that if one task is blocked on a channel, other tasks can continue to +//! make progress. //! //! Rust channels can be used as if they have an infinite internal buffer. What //! this means is that the `send` operation will never block. `Port`s, on the @@ -39,8 +38,8 @@ //! next operation `fail!`. The purpose of this is to allow propagation of //! failure among tasks that are linked to one another via channels. //! -//! There are methods on all of `Chan`, `SharedChan`, and `Port` to perform -//! their respective operations without failing, however. +//! There are methods on both of `Chan` and `Port` to perform their respective +//! operations without failing, however. //! //! ## Outside the Runtime //! @@ -66,7 +65,7 @@ //! assert_eq!(port.recv(), 10); //! //! // Create a shared channel which can be sent along from many tasks -//! let (port, chan) = SharedChan::new(); +//! let (port, chan) = Chan::new(); //! for i in range(0, 10) { //! let chan = chan.clone(); //! spawn(proc() { @@ -100,10 +99,22 @@ // // ## Flavors of channels // -// Rust channels come in two flavors: streams and shared channels. A stream has -// one sender and one receiver while a shared channel could have multiple -// senders. This choice heavily influences the design of the protocol set -// forth for both senders/receivers. +// From the perspective of a consumer of this library, there is only one flavor +// of channel. This channel can be used as a stream and cloned to allow multiple +// senders. Under the hood, however, there are actually three flavors of +// channels in play. +// +// * Oneshots - these channels are highly optimized for the one-send use case. +// They contain as few atomics as possible and involve one and +// exactly one allocation. +// * Streams - these channels are optimized for the non-shared use case. They +// use a different concurrent queue which is more tailored for this +// use case. The initial allocation of this flavor of channel is not +// optimized. +// * Shared - this is the most general form of channel that this module offers, +// a channel with multiple senders. This type is as optimized as it +// can be, but the previous two types mentioned are much faster for +// their use-cases. // // ## Concurrent queues // @@ -226,25 +237,20 @@ // here's the code for you to find some more! use cast; +use cell::Cell; use clone::Clone; -use container::Container; -use int; use iter::Iterator; -use kinds::marker; use kinds::Send; +use kinds::marker; +use mem; use ops::Drop; -use option::{Option, Some, None}; -use result::{Ok, Err}; +use option::{Some, None, Option}; +use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; -use rt::thread::Thread; -use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed}; -use vec::OwnedVector; - -use spsc = sync::spsc_queue; -use mpsc = sync::mpsc_queue; +use sync::arc::UnsafeArc; -pub use self::select::{Select, Handle}; +pub use comm::select::{Select, Handle}; macro_rules! test ( { fn $name:ident() $b:block $($a:attr)*} => ( @@ -272,34 +278,19 @@ macro_rules! test ( ) mod select; +mod oneshot; +mod stream; +mod shared; -/////////////////////////////////////////////////////////////////////////////// -// Helper type to abstract ports for channels and shared channels -/////////////////////////////////////////////////////////////////////////////// - -enum Consumer { - SPSC(spsc::Consumer), - MPSC(mpsc::Consumer), -} - -impl Consumer{ - unsafe fn packet(&self) -> *mut Packet { - match *self { - SPSC(ref c) => c.packet(), - MPSC(ref c) => c.packet(), - } - } -} - -/////////////////////////////////////////////////////////////////////////////// -// Public structs -/////////////////////////////////////////////////////////////////////////////// +// Use a power of 2 to allow LLVM to optimize to something that's not a +// division, this is hit pretty regularly. +static RESCHED_FREQ: int = 256; /// The receiving-half of Rust's channel type. This half can only be owned by /// one task pub struct Port { - priv queue: Consumer, - + priv inner: Flavor, + priv receives: Cell, // can't share in an arc priv marker: marker::NoFreeze, } @@ -314,23 +305,12 @@ pub struct Messages<'a, T> { /// The sending-half of Rust's channel type. This half can only be owned by one /// task pub struct Chan { - priv queue: spsc::Producer, - + priv inner: Flavor, + priv sends: Cell, // can't share in an arc priv marker: marker::NoFreeze, } -/// The sending-half of Rust's channel type. This half can be shared among many -/// tasks by creating copies of itself through the `clone` method. -pub struct SharedChan { - priv queue: mpsc::Producer, - - // can't share in an arc -- technically this implementation is - // shareable, but it shouldn't be required to be shareable in an - // arc - priv marker: marker::NoFreeze, -} - /// This enumeration is the list of the possible reasons that try_recv could not /// return data when called. #[deriving(Eq, Clone)] @@ -345,202 +325,10 @@ pub enum TryRecvResult { Data(T), } -/////////////////////////////////////////////////////////////////////////////// -// Internal struct definitions -/////////////////////////////////////////////////////////////////////////////// - -struct Packet { - cnt: AtomicInt, // How many items are on this channel - steals: int, // How many times has a port received without blocking? - to_wake: Option, // Task to wake up - - // This lock is used to wake up native threads blocked in select. The - // `lock` field is not used because the thread blocking in select must - // block on only one mutex. - //selection_lock: Option>, - - // The number of channels which are currently using this packet. This is - // used to reference count shared channels. - channels: AtomicInt, - - selecting: AtomicBool, - selection_id: uint, - select_next: *mut Packet, - select_prev: *mut Packet, - recv_cnt: int, -} - -/////////////////////////////////////////////////////////////////////////////// -// All implementations -- the fun part -/////////////////////////////////////////////////////////////////////////////// - -static DISCONNECTED: int = int::MIN; -static RESCHED_FREQ: int = 200; - -impl Packet { - fn new() -> Packet { - Packet { - cnt: AtomicInt::new(0), - steals: 0, - to_wake: None, - channels: AtomicInt::new(1), - - selecting: AtomicBool::new(false), - selection_id: 0, - select_next: 0 as *mut Packet, - select_prev: 0 as *mut Packet, - recv_cnt: 0, - } - } - - // Increments the channel size count, preserving the disconnected state if - // the other end has disconnected. - fn increment(&mut self) -> int { - match self.cnt.fetch_add(1, SeqCst) { - DISCONNECTED => { - // see the comment in 'try' for a shared channel for why this - // window of "not disconnected" is "ok". - self.cnt.store(DISCONNECTED, SeqCst); - DISCONNECTED - } - n => n - } - } - - // Decrements the reference count of the channel, returning whether the task - // should block or not. This assumes that the task is ready to sleep in that - // the `to_wake` field has already been filled in. Once this decrement - // happens, the task could wake up on the other end. - // - // From an implementation perspective, this is also when our "steal count" - // gets merged into the "channel count". Our steal count is reset to 0 after - // this function completes. - // - // As with increment(), this preserves the disconnected state if the - // channel is disconnected. - fn decrement(&mut self) -> bool { - let steals = self.steals; - self.steals = 0; - match self.cnt.fetch_sub(1 + steals, SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, SeqCst); - false - } - n => { - assert!(n >= 0); - n - steals <= 0 - } - } - } - - // Helper function for select, tests whether this port can receive without - // blocking (obviously not an atomic decision). - fn can_recv(&self) -> bool { - let cnt = self.cnt.load(SeqCst); - cnt == DISCONNECTED || cnt - self.steals > 0 - } - - // This function must have had at least an acquire fence before it to be - // properly called. - fn wakeup(&mut self) { - match self.to_wake.take_unwrap().wake() { - Some(task) => task.reawaken(), - None => {} - } - self.selecting.store(false, Relaxed); - } - - // Aborts the selection process for a port. This happens as part of select() - // once the task has reawoken. This will place the channel back into a - // consistent state which is ready to be received from again. - // - // The method of doing this is a little subtle. These channels have the - // invariant that if -1 is seen, then to_wake is always Some(..) and should - // be woken up. This aborting process at least needs to add 1 to the - // reference count, but that is not guaranteed to make the count positive - // (our steal count subtraction could mean that after the addition the - // channel count is still negative). - // - // In order to get around this, we force our channel count to go above 0 by - // adding a large number >= 1 to it. This way no sender will see -1 unless - // we are indeed blocking. This "extra lump" we took out of the channel - // becomes our steal count (which will get re-factored into the count on the - // next blocking recv) - // - // The return value of this method is whether there is data on this channel - // to receive or not. - fn abort_selection(&mut self, take_to_wake: bool) -> bool { - // make sure steals + 1 makes the count go non-negative - let steals = { - let cnt = self.cnt.load(SeqCst); - if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} - }; - let prev = self.cnt.fetch_add(steals + 1, SeqCst); - - // If we were previously disconnected, then we know for sure that there - // is no task in to_wake, so just keep going - if prev == DISCONNECTED { - assert!(self.to_wake.is_none()); - self.cnt.store(DISCONNECTED, SeqCst); - self.selecting.store(false, SeqCst); - true // there is data, that data is that we're disconnected - } else { - let cur = prev + steals + 1; - assert!(cur >= 0); - - // If the previous count was negative, then we just made things go - // positive, hence we passed the -1 boundary and we're responsible - // for removing the to_wake() field and trashing it. - if prev < 0 { - if take_to_wake { - self.to_wake.take_unwrap().trash(); - } else { - assert!(self.to_wake.is_none()); - } - - // We woke ourselves up, we're responsible for cancelling - assert!(self.selecting.load(Relaxed)); - self.selecting.store(false, Relaxed); - } - assert_eq!(self.steals, 0); - self.steals = steals; - - // if we were previously positive, then there's surely data to - // receive - prev >= 0 - } - } - - // Decrement the reference count on a channel. This is called whenever a - // Chan is dropped and may end up waking up a receiver. It's the receiver's - // responsibility on the other end to figure out that we've disconnected. - unsafe fn drop_chan(&mut self) { - match self.channels.fetch_sub(1, SeqCst) { - 1 => { - match self.cnt.swap(DISCONNECTED, SeqCst) { - -1 => { self.wakeup(); } - DISCONNECTED => {} - n => { assert!(n >= 0); } - } - } - n if n > 1 => {}, - n => fail!("bad number of channels left {}", n), - } - } -} - -impl Drop for Packet { - fn drop(&mut self) { - unsafe { - // Note that this load is not only an assert for correctness about - // disconnection, but also a proper fence before the read of - // `to_wake`, so this assert cannot be removed with also removing - // the `to_wake` assert. - assert_eq!(self.cnt.load(SeqCst), DISCONNECTED); - assert!(self.to_wake.is_none()); - assert_eq!(self.channels.load(SeqCst), 0); - } - } +enum Flavor { + Oneshot(UnsafeArc>), + Stream(UnsafeArc>), + Shared(UnsafeArc>), } impl Chan { @@ -548,12 +336,12 @@ impl Chan { /// will become available on the port as well. See the documentation of /// `Port` and `Chan` to see what's possible with them. pub fn new() -> (Port, Chan) { - // arbitrary 128 size cache -- this is just a max cache size, not a - // maximum buffer size - let (c, p) = spsc::queue(128, Packet::new()); - let c = SPSC(c); - (Port { queue: c, marker: marker::NoFreeze }, - Chan { queue: p, marker: marker::NoFreeze }) + let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); + (Port::my_new(Oneshot(a)), Chan::my_new(Oneshot(b))) + } + + fn my_new(inner: Flavor) -> Chan { + Chan { inner: inner, sends: Cell::new(0), marker: marker::NoFreeze } } /// Sends a value along this channel to be received by the corresponding @@ -595,132 +383,105 @@ impl Chan { /// Like `send`, this method will never block. If the failure of send cannot /// be tolerated, then this method should be used instead. pub fn try_send(&self, t: T) -> bool { - unsafe { - let this = cast::transmute_mut(self); - this.queue.push(t); - let packet = this.queue.packet(); - match (*packet).increment() { - // As described above, -1 == wakeup - -1 => { (*packet).wakeup(); true } - // Also as above, SPSC queues must be >= -2 - -2 => true, - // We succeeded if we sent data - DISCONNECTED => this.queue.is_empty(), - // In order to prevent starvation of other tasks in situations - // where a task sends repeatedly without ever receiving, we - // occassionally yield instead of doing a send immediately. - // Only doing this if we're doing a rescheduling send, otherwise - // the caller is expecting not to context switch. - // - // Note that we don't unconditionally attempt to yield because - // the TLS overhead can be a bit much. - n => { - assert!(n >= 0); - if n > 0 && n % RESCHED_FREQ == 0 { - let task: ~Task = Local::take(); - task.maybe_yield(); + // In order to prevent starvation of other tasks in situations where + // a task sends repeatedly without ever receiving, we occassionally + // yield instead of doing a send immediately. Only doing this if + // we're doing a rescheduling send, otherwise the caller is + // expecting not to context switch. + // + // Note that we don't unconditionally attempt to yield because the + // TLS overhead can be a bit much. + let cnt = self.sends.get() + 1; + self.sends.set(cnt); + if cnt % (RESCHED_FREQ as uint) == 0 { + let task: ~Task = Local::take(); + task.maybe_yield(); + } + + let (new_inner, ret) = match self.inner { + Oneshot(ref p) => { + let p = p.get(); + unsafe { + if !(*p).sent() { + return (*p).send(t); + } else { + let (a, b) = UnsafeArc::new2(stream::Packet::new()); + match (*p).upgrade(Port::my_new(Stream(b))) { + oneshot::UpSuccess => { + (*a.get()).send(t); + (a, true) + } + oneshot::UpDisconnected => (a, false), + oneshot::UpWoke(task) => { + (*a.get()).send(t); + task.wake().map(|t| t.reawaken()); + (a, true) + } + } } - true } } - } - } -} - -#[unsafe_destructor] -impl Drop for Chan { - fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } - } -} - -impl SharedChan { - /// Creates a new shared channel and port pair. The purpose of a shared - /// channel is to be cloneable such that many tasks can send data at the - /// same time. All data sent on any channel will become available on the - /// provided port as well. - pub fn new() -> (Port, SharedChan) { - let (c, p) = mpsc::queue(Packet::new()); - let c = MPSC(c); - (Port { queue: c, marker: marker::NoFreeze }, - SharedChan { queue: p, marker: marker::NoFreeze }) - } + Stream(ref p) => return unsafe { (*p.get()).send(t) }, + Shared(ref p) => return unsafe { (*p.get()).send(t) }, + }; - /// Equivalent method to `send` on the `Chan` type (using the same - /// semantics) - pub fn send(&self, t: T) { - if !self.try_send(t) { - fail!("sending on a closed channel"); + unsafe { + let mut tmp = Chan::my_new(Stream(new_inner)); + mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } + return ret; } +} - /// Equivalent method to `try_send` on the `Chan` type (using the same - /// semantics) - pub fn try_send(&self, t: T) -> bool { - unsafe { - // Note that the multiple sender case is a little tricker - // semantically than the single sender case. The logic for - // incrementing is "add and if disconnected store disconnected". - // This could end up leading some senders to believe that there - // wasn't a disconnect if in fact there was a disconnect. This means - // that while one thread is attempting to re-store the disconnected - // states, other threads could walk through merrily incrementing - // this very-negative disconnected count. To prevent senders from - // spuriously attempting to send when the channels is actually - // disconnected, the count has a ranged check here. - // - // This is also done for another reason. Remember that the return - // value of this function is: - // - // `true` == the data *may* be received, this essentially has no - // meaning - // `false` == the data will *never* be received, this has a lot of - // meaning - // - // In the SPSC case, we have a check of 'queue.is_empty()' to see - // whether the data was actually received, but this same condition - // means nothing in a multi-producer context. As a result, this - // preflight check serves as the definitive "this will never be - // received". Once we get beyond this check, we have permanently - // entered the realm of "this may be received" - let packet = self.queue.packet(); - if (*packet).cnt.load(Relaxed) < DISCONNECTED + 1024 { - return false +impl Clone for Chan { + fn clone(&self) -> Chan { + let (packet, sleeper) = match self.inner { + Oneshot(ref p) => { + let (a, b) = UnsafeArc::new2(shared::Packet::new()); + match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } { + oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), + oneshot::UpWoke(task) => (b, Some(task)) + } } - - let this = cast::transmute_mut(self); - this.queue.push(t); - - match (*packet).increment() { - DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(); } - n => { - if n > 0 && n % RESCHED_FREQ == 0 { - let task: ~Task = Local::take(); - task.maybe_yield(); - } + Stream(ref p) => { + let (a, b) = UnsafeArc::new2(shared::Packet::new()); + match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } { + stream::UpSuccess | stream::UpDisconnected => (b, None), + stream::UpWoke(task) => (b, Some(task)), } } - true - } - } -} + Shared(ref p) => { + unsafe { (*p.get()).clone_chan(); } + return Chan::my_new(Shared(p.clone())); + } + }; + + unsafe { + (*packet.get()).inherit_blocker(sleeper); -impl Clone for SharedChan { - fn clone(&self) -> SharedChan { - unsafe { (*self.queue.packet()).channels.fetch_add(1, SeqCst); } - SharedChan { queue: self.queue.clone(), marker: marker::NoFreeze } + let mut tmp = Chan::my_new(Shared(packet.clone())); + mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); + } + Chan::my_new(Shared(packet)) } } #[unsafe_destructor] -impl Drop for SharedChan { +impl Drop for Chan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + match self.inner { + Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + } } } impl Port { + fn my_new(inner: Flavor) -> Port { + Port { inner: inner, receives: Cell::new(0), marker: marker::NoFreeze } + } + /// Blocks waiting for a value on this port /// /// This function will block if necessary to wait for a corresponding send @@ -758,100 +519,45 @@ impl Port { /// /// This function cannot fail. pub fn try_recv(&self) -> TryRecvResult { - self.try_recv_inc(true) - } - - fn try_recv_inc(&self, increment: bool) -> TryRecvResult { - // This is a "best effort" situation, so if a queue is inconsistent just - // don't worry about it. - let this = unsafe { cast::transmute_mut(self) }; - - // See the comment about yielding on sends, but the same applies here. - // If a thread is spinning in try_recv we should try - unsafe { - let packet = this.queue.packet(); - (*packet).recv_cnt += 1; - if (*packet).recv_cnt % RESCHED_FREQ == 0 { - let task: ~Task = Local::take(); - task.maybe_yield(); - } + // If a thread is spinning in try_recv, we should take the opportunity + // to reschedule things occasionally. See notes above in scheduling on + // sends for why this doesn't always hit TLS. + let cnt = self.receives.get() + 1; + self.receives.set(cnt); + if cnt % (RESCHED_FREQ as uint) == 0 { + let task: ~Task = Local::take(); + task.maybe_yield(); } - let ret = match this.queue { - SPSC(ref mut queue) => queue.pop(), - MPSC(ref mut queue) => match queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty => None, - - // This is a bit of an interesting case. The channel is - // reported as having data available, but our pop() has - // failed due to the queue being in an inconsistent state. - // This means that there is some pusher somewhere which has - // yet to complete, but we are guaranteed that a pop will - // eventually succeed. In this case, we spin in a yield loop - // because the remote sender should finish their enqueue - // operation "very quickly". - // - // Note that this yield loop does *not* attempt to do a green - // yield (regardless of the context), but *always* performs an - // OS-thread yield. The reasoning for this is that the pusher in - // question which is causing the inconsistent state is - // guaranteed to *not* be a blocked task (green tasks can't get - // pre-empted), so it must be on a different OS thread. Also, - // `try_recv` is normally a "guaranteed no rescheduling" context - // in a green-thread situation. By yielding control of the - // thread, we will hopefully allow time for the remote task on - // the other OS thread to make progress. - // - // Avoiding this yield loop would require a different queue - // abstraction which provides the guarantee that after M - // pushes have succeeded, at least M pops will succeed. The - // current queues guarantee that if there are N active - // pushes, you can pop N times once all N have finished. - mpsc::Inconsistent => { - let data; - loop { - Thread::yield_now(); - match queue.pop() { - mpsc::Data(t) => { data = t; break } - mpsc::Empty => fail!("inconsistent => empty"), - mpsc::Inconsistent => {} - } + loop { + let mut new_port = match self.inner { + Oneshot(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Data(t), + Err(oneshot::Empty) => return Empty, + Err(oneshot::Disconnected) => return Disconnected, + Err(oneshot::Upgraded(port)) => port, } - Some(data) } - } - }; - if increment && ret.is_some() { - unsafe { (*this.queue.packet()).steals += 1; } - } - match ret { - Some(t) => Data(t), - None => { - // It's possible that between the time that we saw the queue was - // empty and here the other side disconnected. It's also - // possible for us to see the disconnection here while there is - // data in the queue. It's pretty backwards-thinking to return - // Disconnected when there's actually data on the queue, so if - // we see a disconnected state be sure to check again to be 100% - // sure that there's no data in the queue. - let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) }; - if cnt != DISCONNECTED { return Empty } - - let ret = match this.queue { - SPSC(ref mut queue) => queue.pop(), - MPSC(ref mut queue) => match queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty => None, - mpsc::Inconsistent => { - fail!("inconsistent with no senders?!"); - } + Stream(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Data(t), + Err(stream::Empty) => return Empty, + Err(stream::Disconnected) => return Disconnected, + Err(stream::Upgraded(port)) => port, + } + } + Shared(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Data(t), + Err(shared::Empty) => return Empty, + Err(shared::Disconnected) => return Disconnected, } - }; - match ret { - Some(data) => Data(data), - None => Disconnected, } + }; + unsafe { + mem::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); } } } @@ -869,34 +575,36 @@ impl Port { /// If the channel has hung up, then `None` is returned. Otherwise `Some` of /// the value found on the port is returned. pub fn recv_opt(&self) -> Option { - // optimistic preflight check (scheduling is expensive) - match self.try_recv() { - Empty => {}, - Disconnected => return None, - Data(t) => return Some(t), - } - - let packet; - let this; - unsafe { - this = cast::transmute_mut(self); - packet = this.queue.packet(); - let task: ~Task = Local::take(); - task.deschedule(1, |task| { - assert!((*packet).to_wake.is_none()); - (*packet).to_wake = Some(task); - if (*packet).decrement() { - Ok(()) - } else { - Err((*packet).to_wake.take_unwrap()) + loop { + let mut new_port = match self.inner { + Oneshot(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Some(t), + Err(oneshot::Empty) => return unreachable!(), + Err(oneshot::Disconnected) => return None, + Err(oneshot::Upgraded(port)) => port, + } } - }); - } - - match self.try_recv_inc(false) { - Data(t) => Some(t), - Empty => fail!("bug: woke up too soon"), - Disconnected => None, + Stream(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Some(t), + Err(stream::Empty) => return unreachable!(), + Err(stream::Disconnected) => return None, + Err(stream::Upgraded(port)) => port, + } + } + Shared(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Some(t), + Err(shared::Empty) => return unreachable!(), + Err(shared::Disconnected) => return None, + } + } + }; + unsafe { + mem::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); + } } } @@ -907,6 +615,84 @@ impl Port { } } +impl select::Packet for Port { + fn can_recv(&self) -> bool { + loop { + let mut new_port = match self.inner { + Oneshot(ref p) => { + match unsafe { (*p.get()).can_recv() } { + Ok(ret) => return ret, + Err(upgrade) => upgrade, + } + } + Stream(ref p) => { + match unsafe { (*p.get()).can_recv() } { + Ok(ret) => return ret, + Err(upgrade) => upgrade, + } + } + Shared(ref p) => { + return unsafe { (*p.get()).can_recv() }; + } + }; + unsafe { + mem::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); + } + } + } + + fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{ + loop { + let (t, mut new_port) = match self.inner { + Oneshot(ref p) => { + match unsafe { (*p.get()).start_selection(task) } { + oneshot::SelSuccess => return Ok(()), + oneshot::SelCanceled(task) => return Err(task), + oneshot::SelUpgraded(t, port) => (t, port), + } + } + Stream(ref p) => { + match unsafe { (*p.get()).start_selection(task) } { + stream::SelSuccess => return Ok(()), + stream::SelCanceled(task) => return Err(task), + stream::SelUpgraded(t, port) => (t, port), + } + } + Shared(ref p) => { + return unsafe { (*p.get()).start_selection(task) }; + } + }; + task = t; + unsafe { + mem::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); + } + } + } + + fn abort_selection(&self) -> bool { + let mut was_upgrade = false; + loop { + let result = match self.inner { + Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, + Stream(ref p) => unsafe { + (*p.get()).abort_selection(was_upgrade) + }, + Shared(ref p) => return unsafe { + (*p.get()).abort_selection(was_upgrade) + }, + }; + let mut new_port = match result { Ok(b) => return b, Err(p) => p }; + was_upgrade = true; + unsafe { + mem::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); + } + } + } +} + impl<'a, T: Send> Iterator for Messages<'a, T> { fn next(&mut self) -> Option { self.port.recv_opt() } } @@ -914,11 +700,10 @@ impl<'a, T: Send> Iterator for Messages<'a, T> { #[unsafe_destructor] impl Drop for Port { fn drop(&mut self) { - // All we need to do is store that we're disconnected. If the channel - // half has already disconnected, then we'll just deallocate everything - // when the shared packet is deallocated. - unsafe { - (*self.queue.packet()).cnt.store(DISCONNECTED, SeqCst); + match self.inner { + Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, } } } @@ -950,12 +735,12 @@ mod test { }) test!(fn drop_full_shared() { - let (_p, c) = SharedChan::new(); + let (_p, c) = Chan::new(); c.send(~1); }) test!(fn smoke_shared() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); c.send(1); assert_eq!(p.recv(), 1); let c = c.clone(); @@ -978,13 +763,13 @@ mod test { } #[should_fail]) test!(fn smoke_shared_port_gone() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); drop(p); c.send(1); } #[should_fail]) test!(fn smoke_shared_port_gone2() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); drop(p); let c2 = c.clone(); drop(c); @@ -1000,7 +785,7 @@ mod test { } #[should_fail]) test!(fn port_gone_concurrent_shared() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); let c1 = c.clone(); spawn(proc() { p.recv(); @@ -1018,7 +803,7 @@ mod test { } #[should_fail]) test!(fn smoke_chan_gone_shared() { - let (p, c) = SharedChan::<()>::new(); + let (p, c) = Chan::<()>::new(); let c2 = c.clone(); drop(c); drop(c2); @@ -1047,7 +832,7 @@ mod test { test!(fn stress_shared() { static AMT: uint = 10000; static NTHREADS: uint = 8; - let (p, c) = SharedChan::::new(); + let (p, c) = Chan::::new(); let (p1, c1) = Chan::new(); spawn(proc() { @@ -1074,7 +859,7 @@ mod test { fn send_from_outside_runtime() { let (p, c) = Chan::::new(); let (p1, c1) = Chan::new(); - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let chan2 = chan.clone(); spawn(proc() { c1.send(()); @@ -1114,7 +899,7 @@ mod test { fn no_runtime() { let (p1, c1) = Chan::::new(); let (p2, c2) = Chan::::new(); - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let chan2 = chan.clone(); native::task::spawn(proc() { assert_eq!(p1.recv(), 1); @@ -1317,7 +1102,7 @@ mod test { }) test!(fn shared_chan_stress() { - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let total = stress_factor() + 100; for _ in range(0, total) { let chan_clone = chan.clone(); @@ -1396,4 +1181,26 @@ mod test { p2.recv(); assert_eq!(p.try_recv(), Disconnected); }) + + // This bug used to end up in a livelock inside of the Port destructor + // because the internal state of the Shared port was corrupted + test!(fn destroy_upgraded_shared_port_when_sender_still_active() { + let (p, c) = Chan::new(); + let (p1, c2) = Chan::new(); + spawn(proc() { + p.recv(); // wait on a oneshot port + drop(p); // destroy a shared port + c2.send(()); + }); + // make sure the other task has gone to sleep + for _ in range(0, 5000) { task::deschedule(); } + + // upgrade to a shared chan and send a message + let t = c.clone(); + drop(c); + t.send(()); + + // wait for the child task to exit before we exit + p1.recv(); + }) } diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs new file mode 100644 index 0000000000000..9deccfeb87566 --- /dev/null +++ b/src/libstd/comm/oneshot.rs @@ -0,0 +1,369 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Oneshot channels/ports +/// +/// This is the initial flavor of channels/ports used for comm module. This is +/// an optimization for the one-use case of a channel. The major optimization of +/// this type is to have one and exactly one allocation when the chan/port pair +/// is created. +/// +/// Another possible optimization would be to not use an UnsafeArc box because +/// in theory we know when the shared packet can be deallocated (no real need +/// for the atomic reference counting), but I was having trouble how to destroy +/// the data early in a drop of a Port. +/// +/// # Implementation +/// +/// Oneshots are implemented around one atomic uint variable. This variable +/// indicates both the state of the port/chan but also contains any tasks +/// blocked on the port. All atomic operations happen on this one word. +/// +/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect +/// on behalf of the channel side of things (it can be mentally thought of as +/// consuming the port). This upgrade is then also stored in the shared packet. +/// The one caveat to consider is that when a port sees a disconnected channel +/// it must check for data because there is no "data plus upgrade" state. + +use comm::Port; +use kinds::Send; +use mem; +use ops::Drop; +use option::{Some, None, Option}; +use result::{Result, Ok, Err}; +use rt::local::Local; +use rt::task::{Task, BlockedTask}; +use sync::atomics; + +// Various states you can find a port in. +static EMPTY: uint = 0; +static DATA: uint = 1; +static DISCONNECTED: uint = 2; + +pub struct Packet { + // Internal state of the chan/port pair (stores the blocked task as well) + state: atomics::AtomicUint, + // One-shot data slot location + data: Option, + // when used for the second time, a oneshot channel must be upgraded, and + // this contains the slot for the upgrade + upgrade: MyUpgrade, +} + +pub enum Failure { + Empty, + Disconnected, + Upgraded(Port), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(BlockedTask), +} + +pub enum SelectionResult { + SelCanceled(BlockedTask), + SelUpgraded(BlockedTask, Port), + SelSuccess, +} + +enum MyUpgrade { + NothingSent, + SendUsed, + GoUp(Port), +} + +impl Packet { + pub fn new() -> Packet { + Packet { + data: None, + upgrade: NothingSent, + state: atomics::AtomicUint::new(EMPTY), + } + } + + pub fn send(&mut self, t: T) -> bool { + // Sanity check + match self.upgrade { + NothingSent => {} + _ => fail!("sending on a oneshot that's already sent on "), + } + assert!(self.data.is_none()); + self.data = Some(t); + self.upgrade = SendUsed; + + match self.state.swap(DATA, atomics::SeqCst) { + // Sent the data, no one was waiting + EMPTY => true, + + // Couldn't send the data, the port hung up first. We need to be + // sure to deallocate the sent data (to not leave it stuck in the + // queue) + DISCONNECTED => { + self.data.take_unwrap(); + false + } + + // Not possible, these are one-use channels + DATA => unreachable!(), + + // Anything else means that there was a task waiting on the other + // end. We leave the 'DATA' state inside so it'll pick it up on the + // other end. + n => unsafe { + let t = BlockedTask::cast_from_uint(n); + t.wake().map(|t| t.reawaken()); + true + } + } + } + + // Just tests whether this channel has been sent on or not, this is only + // safe to use from the sender. + pub fn sent(&self) -> bool { + match self.upgrade { + NothingSent => false, + _ => true, + } + } + + pub fn recv(&mut self) -> Result> { + // Attempt to not block the task (it's a little expensive). If it looks + // like we're not empty, then immediately go through to `try_recv`. + if self.state.load(atomics::SeqCst) == EMPTY { + let t: ~Task = Local::take(); + t.deschedule(1, |task| { + let n = unsafe { task.cast_to_uint() }; + match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) { + // Nothing on the channel, we legitimately block + EMPTY => Ok(()), + + // If there's data or it's a disconnected channel, then we + // failed the cmpxchg, so we just wake ourselves back up + DATA | DISCONNECTED => { + unsafe { Err(BlockedTask::cast_from_uint(n)) } + } + + // Only one thread is allowed to sleep on this port + _ => unreachable!() + } + }); + } + + self.try_recv() + } + + pub fn try_recv(&mut self) -> Result> { + match self.state.load(atomics::SeqCst) { + EMPTY => Err(Empty), + + // We saw some data on the channel, but the channel can be used + // again to send us an upgrade. As a result, we need to re-insert + // into the channel that there's no data available (otherwise we'll + // just see DATA next time). This is done as a cmpxchg because if + // the state changes under our feet we'd rather just see that state + // change. + DATA => { + self.state.compare_and_swap(DATA, EMPTY, atomics::SeqCst); + match self.data.take() { + Some(data) => Ok(data), + None => unreachable!(), + } + } + + // There's no guarantee that we receive before an upgrade happens, + // and an upgrade flags the channel as disconnected, so when we see + // this we first need to check if there's data available and *then* + // we go through and process the upgrade. + DISCONNECTED => { + match self.data.take() { + Some(data) => Ok(data), + None => { + match mem::replace(&mut self.upgrade, SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)) + } + } + } + } + _ => unreachable!() + } + } + + // Returns whether the upgrade was completed. If the upgrade wasn't + // completed, then the port couldn't get sent to the other half (it will + // never receive it). + pub fn upgrade(&mut self, up: Port) -> UpgradeResult { + let prev = match self.upgrade { + NothingSent => NothingSent, + SendUsed => SendUsed, + _ => fail!("upgrading again"), + }; + self.upgrade = GoUp(up); + + match self.state.swap(DISCONNECTED, atomics::SeqCst) { + // If the channel is empty or has data on it, then we're good to go. + // Senders will check the data before the upgrade (in case we + // plastered over the DATA state). + DATA | EMPTY => UpSuccess, + + // If the other end is already disconnected, then we failed the + // upgrade. Be sure to trash the port we were given. + DISCONNECTED => { self.upgrade = prev; UpDisconnected } + + // If someone's waiting, we gotta wake them up + n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) }) + } + } + + pub fn drop_chan(&mut self) { + match self.state.swap(DISCONNECTED, atomics::SeqCst) { + DATA | DISCONNECTED | EMPTY => {} + + // If someone's waiting, we gotta wake them up + n => unsafe { + let t = BlockedTask::cast_from_uint(n); + t.wake().map(|t| t.reawaken()); + } + } + } + + pub fn drop_port(&mut self) { + match self.state.swap(DISCONNECTED, atomics::SeqCst) { + // An empty channel has nothing to do, and a remotely disconnected + // channel also has nothing to do b/c we're about to run the drop + // glue + DISCONNECTED | EMPTY => {} + + // There's data on the channel, so make sure we destroy it promptly. + // This is why not using an arc is a little difficult (need the box + // to stay valid while we take the data). + DATA => { self.data.take_unwrap(); } + + // We're the only ones that can block on this port + _ => unreachable!() + } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // If Ok, the value is whether this port has data, if Err, then the upgraded + // port needs to be checked instead of this one. + pub fn can_recv(&mut self) -> Result> { + match self.state.load(atomics::SeqCst) { + EMPTY => Ok(false), // Welp, we tried + DATA => Ok(true), // we have some un-acquired data + DISCONNECTED if self.data.is_some() => Ok(true), // we have data + DISCONNECTED => { + match mem::replace(&mut self.upgrade, SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => Err(upgrade), + + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { self.upgrade = up; Ok(true) } + } + } + _ => unreachable!(), // we're the "one blocker" + } + } + + // Attempts to start selection on this port. This can either succeed, fail + // because there is data, or fail because there is an upgrade pending. + pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult { + let n = unsafe { task.cast_to_uint() }; + match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) { + EMPTY => SelSuccess, + DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }), + DISCONNECTED if self.data.is_some() => { + SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }) + } + DISCONNECTED => { + match mem::replace(&mut self.upgrade, SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => { + SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) }, + upgrade) + } + + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { + self.upgrade = up; + SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }) + } + } + } + _ => unreachable!(), // we're the "one blocker" + } + } + + // Remove a previous selecting task from this port. This ensures that the + // blocked task will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&mut self) -> Result> { + let state = match self.state.load(atomics::SeqCst) { + // Each of these states means that no further activity will happen + // with regard to abortion selection + s @ EMPTY | + s @ DATA | + s @ DISCONNECTED => s, + + // If we've got a blocked task, then use an atomic to gain ownership + // of it (may fail) + n => self.state.compare_and_swap(n, EMPTY, atomics::SeqCst) + }; + + // Now that we've got ownership of our state, figure out what to do + // about it. + match state { + EMPTY => unreachable!(), + // our task used for select was stolen + DATA => Ok(true), + + // If the other end has hung up, then we have complete ownership + // of the port. We need to check to see if there was an upgrade + // requested, and if so, the other end needs to have its selection + // aborted. + DISCONNECTED => { + assert!(self.data.is_none()); + match mem::replace(&mut self.upgrade, SendUsed) { + GoUp(port) => Err(port), + _ => Ok(true), + } + } + + // We woke ourselves up from select. Assert that the task should be + // trashed and returne that we don't have any data. + n => { + let t = unsafe { BlockedTask::cast_from_uint(n) }; + t.trash(); + Ok(false) + } + } + } +} + +#[unsafe_destructor] +impl Drop for Packet { + fn drop(&mut self) { + assert_eq!(self.state.load(atomics::SeqCst), DISCONNECTED); + } +} diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index a369ecba86b3a..b6b35ccc35790 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -45,19 +45,17 @@ #[allow(dead_code)]; use cast; -use comm; +use cell::Cell; use iter::Iterator; use kinds::marker; use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use result::{Ok, Err}; +use result::{Ok, Err, Result}; use rt::local::Local; -use rt::task::Task; -use super::{Packet, Port}; -use sync::atomics::{Relaxed, SeqCst}; -use task; +use rt::task::{Task, BlockedTask}; +use super::Port; use uint; macro_rules! select { @@ -67,8 +65,12 @@ macro_rules! select { ) => ({ use std::comm::Select; let sel = Select::new(); - let mut $port1 = sel.add(&mut $port1); - $( let mut $port = sel.add(&mut $port); )* + let mut $port1 = sel.handle(&$port1); + $( let mut $port = sel.handle(&$port); )* + unsafe { + $port1.add(); + $( $port.add(); )* + } let ret = sel.wait(); if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 } $( else if ret == $port.id { let $name = $port.$meth(); $code } )* @@ -79,9 +81,9 @@ macro_rules! select { /// The "port set" of the select interface. This structure is used to manage a /// set of ports which are being selected over. pub struct Select { - priv head: *mut Packet, - priv tail: *mut Packet, - priv next_id: uint, + priv head: *mut Handle<'static, ()>, + priv tail: *mut Handle<'static, ()>, + priv next_id: Cell, priv marker1: marker::NoSend, priv marker2: marker::NoFreeze, } @@ -90,13 +92,28 @@ pub struct Select { /// This handle is used to keep the port in the set as well as interact with the /// underlying port. pub struct Handle<'port, T> { - /// A unique ID for this Handle. + /// The ID of this handle, used to compare against the return value of + /// `Select::wait()` id: uint, priv selector: &'port Select, - priv port: &'port mut Port, + priv next: *mut Handle<'static, ()>, + priv prev: *mut Handle<'static, ()>, + priv added: bool, + priv packet: &'port Packet, + + // due to our fun transmutes, we be sure to place this at the end. (nothing + // previous relies on T) + priv port: &'port Port, } -struct Packets { cur: *mut Packet } +struct Packets { cur: *mut Handle<'static, ()> } + +#[doc(hidden)] +pub trait Packet { + fn can_recv(&self) -> bool; + fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>; + fn abort_selection(&self) -> bool; +} impl Select { /// Creates a new selection structure. This set is initially empty and @@ -106,45 +123,29 @@ impl Select { /// rather much easier through the `select!` macro. pub fn new() -> Select { Select { - head: 0 as *mut Packet, - tail: 0 as *mut Packet, - next_id: 1, marker1: marker::NoSend, marker2: marker::NoFreeze, + head: 0 as *mut Handle<'static, ()>, + tail: 0 as *mut Handle<'static, ()>, + next_id: Cell::new(1), } } - /// Adds a new port to this set, returning a handle which is then used to - /// receive on the port. - /// - /// Note that this port parameter takes `&mut Port` instead of `&Port`. None - /// of the methods of receiving on a port require `&mut self`, but `&mut` is - /// used here in order to have the compiler guarantee that the same port is - /// not added to this set more than once. - /// - /// When the returned handle falls out of scope, the port will be removed - /// from this set. While the handle is in this set, usage of the port can be - /// done through the `Handle`'s receiving methods. - pub fn add<'a, T: Send>(&'a self, port: &'a mut Port) -> Handle<'a, T> { - let this = unsafe { cast::transmute_mut(self) }; - let id = this.next_id; - this.next_id += 1; - unsafe { - let packet = port.queue.packet(); - assert!(!(*packet).selecting.load(Relaxed)); - assert_eq!((*packet).selection_id, 0); - (*packet).selection_id = id; - if this.head.is_null() { - this.head = packet; - this.tail = packet; - } else { - (*packet).select_prev = this.tail; - assert!((*packet).select_next.is_null()); - (*this.tail).select_next = packet; - this.tail = packet; - } + /// Creates a new handle into this port set for a new port. Note that this + /// does *not* add the port to the port set, for that you must call the + /// `add` method on the handle itself. + pub fn handle<'a, T: Send>(&'a self, port: &'a Port) -> Handle<'a, T> { + let id = self.next_id.get(); + self.next_id.set(id + 1); + Handle { + id: id, + selector: self, + next: 0 as *mut Handle<'static, ()>, + prev: 0 as *mut Handle<'static, ()>, + added: false, + port: port, + packet: port, } - Handle { id: id, selector: this, port: port } } /// Waits for an event on this port set. The returned valus is *not* and @@ -177,10 +178,9 @@ impl Select { unsafe { let mut amt = 0; for p in self.iter() { - assert!(!(*p).selecting.load(Relaxed)); amt += 1; - if (*p).can_recv() { - return (*p).selection_id; + if (*p).packet.can_recv() { + return (*p).id; } } assert!(amt > 0); @@ -195,22 +195,14 @@ impl Select { let task: ~Task = Local::take(); task.deschedule(amt, |task| { // Prepare for the block - let (i, packet) = iter.next().unwrap(); - assert!((*packet).to_wake.is_none()); - (*packet).to_wake = Some(task); - (*packet).selecting.store(true, SeqCst); - - if (*packet).decrement() { - Ok(()) - } else { - // Empty to_wake first to avoid tripping an assertion in - // abort_selection in the disconnected case. - let task = (*packet).to_wake.take_unwrap(); - (*packet).abort_selection(false); - (*packet).selecting.store(false, SeqCst); - ready_index = i; - ready_id = (*packet).selection_id; - Err(task) + let (i, handle) = iter.next().unwrap(); + match (*handle).packet.start_selection(task) { + Ok(()) => Ok(()), + Err(task) => { + ready_index = i; + ready_id = (*handle).id; + Err(task) + } } }); @@ -235,45 +227,17 @@ impl Select { // A rewrite should focus on avoiding a yield loop, and for now this // implementation is tying us over to a more efficient "don't // iterate over everything every time" implementation. - for packet in self.iter().take(ready_index) { - if (*packet).abort_selection(true) { - ready_id = (*packet).selection_id; - while (*packet).selecting.load(Relaxed) { - task::deschedule(); - } + for handle in self.iter().take(ready_index) { + if (*handle).packet.abort_selection() { + ready_id = (*handle).id; } } - // Sanity check for now to make sure that everyone is turned off. - for packet in self.iter() { - assert!(!(*packet).selecting.load(Relaxed)); - } - assert!(ready_id != uint::MAX); return ready_id; } } - unsafe fn remove(&self, packet: *mut Packet) { - let this = cast::transmute_mut(self); - assert!(!(*packet).selecting.load(Relaxed)); - if (*packet).select_prev.is_null() { - assert_eq!(packet, this.head); - this.head = (*packet).select_next; - } else { - (*(*packet).select_prev).select_next = (*packet).select_next; - } - if (*packet).select_next.is_null() { - assert_eq!(packet, this.tail); - this.tail = (*packet).select_prev; - } else { - (*(*packet).select_next).select_prev = (*packet).select_prev; - } - (*packet).select_next = 0 as *mut Packet; - (*packet).select_prev = 0 as *mut Packet; - (*packet).selection_id = 0; - } - fn iter(&self) -> Packets { Packets { cur: self.head } } } @@ -285,10 +249,56 @@ impl<'port, T: Send> Handle<'port, T> { /// success or `None` if the channel disconnects. This function has the same /// semantics as `Port.recv_opt` pub fn recv_opt(&mut self) -> Option { self.port.recv_opt() } - /// Immediately attempt to receive a value on a port, this function will - /// never block. Has the same semantics as `Port.try_recv`. - pub fn try_recv(&mut self) -> comm::TryRecvResult { - self.port.try_recv() + + /// Adds this handle to the port set that the handle was created from. This + /// method can be called multiple times, but it has no effect if `add` was + /// called previously. + /// + /// This method is unsafe because it requires that the `Handle` is not moved + /// while it is added to the `Select` set. + pub unsafe fn add(&mut self) { + if self.added { return } + let selector: &mut Select = cast::transmute(&*self.selector); + let me: *mut Handle<'static, ()> = cast::transmute(&*self); + + if selector.head.is_null() { + selector.head = me; + selector.tail = me; + } else { + (*me).prev = selector.tail; + assert!((*me).next.is_null()); + (*selector.tail).next = me; + selector.tail = me; + } + self.added = true; + } + + /// Removes this handle from the `Select` set. This method is unsafe because + /// it has no guarantee that the `Handle` was not moved since `add` was + /// called. + pub unsafe fn remove(&mut self) { + if !self.added { return } + + let selector: &mut Select = cast::transmute(&*self.selector); + let me: *mut Handle<'static, ()> = cast::transmute(&*self); + + if self.prev.is_null() { + assert_eq!(selector.head, me); + selector.head = self.next; + } else { + (*self.prev).next = self.next; + } + if self.next.is_null() { + assert_eq!(selector.tail, me); + selector.tail = self.prev; + } else { + (*self.next).prev = self.prev; + } + + self.next = 0 as *mut Handle<'static, ()>; + self.prev = 0 as *mut Handle<'static, ()>; + + self.added = false; } } @@ -303,17 +313,17 @@ impl Drop for Select { #[unsafe_destructor] impl<'port, T: Send> Drop for Handle<'port, T> { fn drop(&mut self) { - unsafe { self.selector.remove(self.port.queue.packet()) } + unsafe { self.remove() } } } -impl Iterator<*mut Packet> for Packets { - fn next(&mut self) -> Option<*mut Packet> { +impl Iterator<*mut Handle<'static, ()>> for Packets { + fn next(&mut self) -> Option<*mut Handle<'static, ()>> { if self.cur.is_null() { None } else { let ret = Some(self.cur); - unsafe { self.cur = (*self.cur).select_next; } + unsafe { self.cur = (*self.cur).next; } ret } } @@ -326,8 +336,8 @@ mod test { use prelude::*; test!(fn smoke() { - let (mut p1, c1) = Chan::::new(); - let (mut p2, c2) = Chan::::new(); + let (p1, c1) = Chan::::new(); + let (p2, c2) = Chan::::new(); c1.send(1); select! ( foo = p1.recv() => { assert_eq!(foo, 1); }, @@ -350,11 +360,11 @@ mod test { }) test!(fn smoke2() { - let (mut p1, _c1) = Chan::::new(); - let (mut p2, _c2) = Chan::::new(); - let (mut p3, _c3) = Chan::::new(); - let (mut p4, _c4) = Chan::::new(); - let (mut p5, c5) = Chan::::new(); + let (p1, _c1) = Chan::::new(); + let (p2, _c2) = Chan::::new(); + let (p3, _c3) = Chan::::new(); + let (p4, _c4) = Chan::::new(); + let (p5, c5) = Chan::::new(); c5.send(4); select! ( _foo = p1.recv() => { fail!("1") }, @@ -366,8 +376,8 @@ mod test { }) test!(fn closed() { - let (mut p1, _c1) = Chan::::new(); - let (mut p2, c2) = Chan::::new(); + let (p1, _c1) = Chan::::new(); + let (p2, c2) = Chan::::new(); drop(c2); select! ( @@ -377,8 +387,8 @@ mod test { }) test!(fn unblocks() { - let (mut p1, c1) = Chan::::new(); - let (mut p2, _c2) = Chan::::new(); + let (p1, c1) = Chan::::new(); + let (p2, _c2) = Chan::::new(); let (p3, c3) = Chan::::new(); spawn(proc() { @@ -400,8 +410,8 @@ mod test { }) test!(fn both_ready() { - let (mut p1, c1) = Chan::::new(); - let (mut p2, c2) = Chan::::new(); + let (p1, c1) = Chan::::new(); + let (p2, c2) = Chan::::new(); let (p3, c3) = Chan::<()>::new(); spawn(proc() { @@ -426,8 +436,8 @@ mod test { test!(fn stress() { static AMT: int = 10000; - let (mut p1, c1) = Chan::::new(); - let (mut p2, c2) = Chan::::new(); + let (p1, c1) = Chan::::new(); + let (p2, c2) = Chan::::new(); let (p3, c3) = Chan::<()>::new(); spawn(proc() { @@ -449,4 +459,66 @@ mod test { c3.send(()); } }) + + test!(fn cloning() { + let (p1, c1) = Chan::::new(); + let (p2, _c2) = Chan::::new(); + let (p3, c3) = Chan::<()>::new(); + + spawn(proc() { + p3.recv(); + c1.clone(); + assert_eq!(p3.try_recv(), Empty); + c1.send(2); + p3.recv(); + }); + + c3.send(()); + select!( + _i1 = p1.recv() => {}, + _i2 = p2.recv() => fail!() + ) + c3.send(()); + }) + + test!(fn cloning2() { + let (p1, c1) = Chan::::new(); + let (p2, _c2) = Chan::::new(); + let (p3, c3) = Chan::<()>::new(); + + spawn(proc() { + p3.recv(); + c1.clone(); + assert_eq!(p3.try_recv(), Empty); + c1.send(2); + p3.recv(); + }); + + c3.send(()); + select!( + _i1 = p1.recv() => {}, + _i2 = p2.recv() => fail!() + ) + c3.send(()); + }) + + test!(fn cloning3() { + let (p1, c1) = Chan::<()>::new(); + let (p2, c2) = Chan::<()>::new(); + let (p, c) = Chan::new(); + spawn(proc() { + let mut s = Select::new(); + let mut h1 = s.handle(&p1); + let mut h2 = s.handle(&p2); + unsafe { h2.add(); } + unsafe { h1.add(); } + assert_eq!(s.wait(), h2.id); + c.send(()); + }); + + for _ in range(0, 1000) { task::deschedule(); } + drop(c1.clone()); + c2.send(()); + p.recv(); + }) } diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs new file mode 100644 index 0000000000000..30e061bb7b916 --- /dev/null +++ b/src/libstd/comm/shared.rs @@ -0,0 +1,483 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Shared channels +/// +/// This is the flavor of channels which are not necessarily optimized for any +/// particular use case, but are the most general in how they are used. Shared +/// channels are cloneable allowing for multiple senders. +/// +/// High level implementation details can be found in the comment of the parent +/// module. You'll also note that the implementation of the shared and stream +/// channels are quite similar, and this is no coincidence! + +use int; +use iter::Iterator; +use kinds::Send; +use ops::Drop; +use option::{Some, None, Option}; +use result::{Ok, Err, Result}; +use rt::local::Local; +use rt::task::{Task, BlockedTask}; +use rt::thread::Thread; +use sync::atomics; +use unstable::mutex::Mutex; +use vec::OwnedVector; + +use mpsc = sync::mpsc_queue; + +static DISCONNECTED: int = int::MIN; +static FUDGE: int = 1024; +static MAX_STEALS: int = 1 << 20; + +pub struct Packet { + queue: mpsc::Queue, + cnt: atomics::AtomicInt, // How many items are on this channel + steals: int, // How many times has a port received without blocking? + to_wake: atomics::AtomicUint, // Task to wake up + + // The number of channels which are currently using this packet. + channels: atomics::AtomicInt, + + // See the discussion in Port::drop and the channel send methods for what + // these are used for + port_dropped: atomics::AtomicBool, + sender_drain: atomics::AtomicInt, + + // this lock protects various portions of this implementation during + // select() + select_lock: Mutex, +} + +pub enum Failure { + Empty, + Disconnected, +} + +impl Packet { + // Creation of a packet *must* be followed by a call to inherit_blocker + pub fn new() -> Packet { + let mut p = Packet { + queue: mpsc::Queue::new(), + cnt: atomics::AtomicInt::new(0), + steals: 0, + to_wake: atomics::AtomicUint::new(0), + channels: atomics::AtomicInt::new(2), + port_dropped: atomics::AtomicBool::new(false), + sender_drain: atomics::AtomicInt::new(0), + select_lock: unsafe { Mutex::new() }, + }; + // see comments in inherit_blocker about why we grab this lock + unsafe { p.select_lock.lock() } + return p; + } + + // This function is used at the creation of a shared packet to inherit a + // previously blocked task. This is done to prevent spurious wakeups of + // tasks in select(). + // + // This can only be called at channel-creation time + pub fn inherit_blocker(&mut self, task: Option) { + match task { + Some(task) => { + assert_eq!(self.cnt.load(atomics::SeqCst), 0); + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + self.to_wake.store(unsafe { task.cast_to_uint() }, + atomics::SeqCst); + self.cnt.store(-1, atomics::SeqCst); + + // This store is a little sketchy. What's happening here is + // that we're transferring a blocker from a oneshot or stream + // channel to this shared channel. In doing so, we never + // spuriously wake them up and rather only wake them up at the + // appropriate time. This implementation of shared channels + // assumes that any blocking recv() will undo the increment of + // steals performed in try_recv() once the recv is complete. + // This thread that we're inheriting, however, is not in the + // middle of recv. Hence, the first time we wake them up, + // they're going to wake up from their old port, move on to the + // upgraded port, and then call the block recv() function. + // + // When calling this function, they'll find there's data + // immediately available, counting it as a steal. This in fact + // wasn't a steal because we appropriately blocked them waiting + // for data. + // + // To offset this bad increment, we initially set the steal + // count to -1. You'll find some special code in + // abort_selection() as well to ensure that this -1 steal count + // doesn't escape too far. + self.steals = -1; + } + None => {} + } + + // When the shared packet is constructed, we grabbed this lock. The + // purpose of this lock is to ensure that abort_selection() doesn't + // interfere with this method. After we unlock this lock, we're + // signifying that we're done modifying self.cnt and self.to_wake and + // the port is ready for the world to continue using it. + unsafe { self.select_lock.unlock() } + } + + pub fn send(&mut self, t: T) -> bool { + // See Port::drop for what's going on + if self.port_dropped.load(atomics::SeqCst) { return false } + + // Note that the multiple sender case is a little tricker + // semantically than the single sender case. The logic for + // incrementing is "add and if disconnected store disconnected". + // This could end up leading some senders to believe that there + // wasn't a disconnect if in fact there was a disconnect. This means + // that while one thread is attempting to re-store the disconnected + // states, other threads could walk through merrily incrementing + // this very-negative disconnected count. To prevent senders from + // spuriously attempting to send when the channels is actually + // disconnected, the count has a ranged check here. + // + // This is also done for another reason. Remember that the return + // value of this function is: + // + // `true` == the data *may* be received, this essentially has no + // meaning + // `false` == the data will *never* be received, this has a lot of + // meaning + // + // In the SPSC case, we have a check of 'queue.is_empty()' to see + // whether the data was actually received, but this same condition + // means nothing in a multi-producer context. As a result, this + // preflight check serves as the definitive "this will never be + // received". Once we get beyond this check, we have permanently + // entered the realm of "this may be received" + if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE { + return false + } + + self.queue.push(t); + match self.cnt.fetch_add(1, atomics::SeqCst) { + -1 => { + self.take_to_wake().wake().map(|t| t.reawaken()); + } + + // In this case, we have possibly failed to send our data, and + // we need to consider re-popping the data in order to fully + // destroy it. We must arbitrate among the multiple senders, + // however, because the queues that we're using are + // single-consumer queues. In order to do this, all exiting + // pushers will use an atomic count in order to count those + // flowing through. Pushers who see 0 are required to drain as + // much as possible, and then can only exit when they are the + // only pusher (otherwise they must try again). + n if n < DISCONNECTED + FUDGE => { + // see the comment in 'try' for a shared channel for why this + // window of "not disconnected" is ok. + self.cnt.store(DISCONNECTED, atomics::SeqCst); + + if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 { + loop { + // drain the queue, for info on the thread yield see the + // discussion in try_recv + loop { + match self.queue.pop() { + mpsc::Data(..) => {} + mpsc::Empty => break, + mpsc::Inconsistent => Thread::yield_now(), + } + } + // maybe we're done, if we're not the last ones + // here, then we need to go try again. + if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 { + break + } + } + + // At this point, there may still be data on the queue, + // but only if the count hasn't been incremented and + // some other sender hasn't finished pushing data just + // yet. That sender in question will drain its own data. + } + } + + // Can't make any assumptions about this case like in the SPSC case. + _ => {} + } + + true + } + + pub fn recv(&mut self) -> Result { + // This code is essentially the exact same as that found in the stream + // case (see stream.rs) + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + let task: ~Task = Local::take(); + task.deschedule(1, |task| { + self.decrement(task) + }); + + match self.try_recv() { + data @ Ok(..) => { self.steals -= 1; data } + data => data, + } + } + + // Essentially the exact same thing as the stream decrement function. + fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> { + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + let n = unsafe { task.cast_to_uint() }; + self.to_wake.store(n, atomics::SeqCst); + + let steals = self.steals; + self.steals = 0; + + match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Ok(()) } + } + } + + self.to_wake.store(0, atomics::SeqCst); + Err(unsafe { BlockedTask::cast_from_uint(n) }) + } + + pub fn try_recv(&mut self) -> Result { + let ret = match self.queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + + // This is a bit of an interesting case. The channel is + // reported as having data available, but our pop() has + // failed due to the queue being in an inconsistent state. + // This means that there is some pusher somewhere which has + // yet to complete, but we are guaranteed that a pop will + // eventually succeed. In this case, we spin in a yield loop + // because the remote sender should finish their enqueue + // operation "very quickly". + // + // Note that this yield loop does *not* attempt to do a green + // yield (regardless of the context), but *always* performs an + // OS-thread yield. The reasoning for this is that the pusher in + // question which is causing the inconsistent state is + // guaranteed to *not* be a blocked task (green tasks can't get + // pre-empted), so it must be on a different OS thread. Also, + // `try_recv` is normally a "guaranteed no rescheduling" context + // in a green-thread situation. By yielding control of the + // thread, we will hopefully allow time for the remote task on + // the other OS thread to make progress. + // + // Avoiding this yield loop would require a different queue + // abstraction which provides the guarantee that after M + // pushes have succeeded, at least M pops will succeed. The + // current queues guarantee that if there are N active + // pushes, you can pop N times once all N have finished. + mpsc::Inconsistent => { + let data; + loop { + Thread::yield_now(); + match self.queue.pop() { + mpsc::Data(t) => { data = t; break } + mpsc::Empty => fail!("inconsistent => empty"), + mpsc::Inconsistent => {} + } + } + Some(data) + } + }; + match ret { + // See the discussion in the stream implementation for why we we + // might decrement steals. + Some(data) => { + self.steals += 1; + if self.steals > MAX_STEALS { + match self.cnt.swap(0, atomics::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomics::SeqCst); + } + n => { self.steals -= n; } + } + assert!(self.steals >= 0); + } + Ok(data) + } + + // See the discussion in the stream implementation for why we try + // again. + None => { + match self.cnt.load(atomics::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + _ => { + match self.queue.pop() { + mpsc::Data(t) => Ok(t), + mpsc::Empty => Err(Disconnected), + // with no senders, an inconsistency is impossible. + mpsc::Inconsistent => unreachable!(), + } + } + } + } + } + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&mut self) { + self.channels.fetch_add(1, atomics::SeqCst); + } + + // Decrement the reference count on a channel. This is called whenever a + // Chan is dropped and may end up waking up a receiver. It's the receiver's + // responsibility on the other end to figure out that we've disconnected. + pub fn drop_chan(&mut self) { + match self.channels.fetch_sub(1, atomics::SeqCst) { + 1 => {} + n if n > 1 => return, + n => fail!("bad number of channels left {}", n), + } + + match self.cnt.swap(DISCONNECTED, atomics::SeqCst) { + -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); } + DISCONNECTED => {} + n => { assert!(n >= 0); } + } + } + + // See the long discussion inside of stream.rs for why the queue is drained, + // and why it is done in this fashion. + pub fn drop_port(&mut self) { + self.port_dropped.store(true, atomics::SeqCst); + let mut steals = self.steals; + while { + let cnt = self.cnt.compare_and_swap( + steals, DISCONNECTED, atomics::SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + // See the discussion in 'try_recv' for why we yield + // control of this thread. + loop { + match self.queue.pop() { + mpsc::Data(..) => { steals += 1; } + mpsc::Empty | mpsc::Inconsistent => break, + } + } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&mut self) -> BlockedTask { + let task = self.to_wake.load(atomics::SeqCst); + self.to_wake.store(0, atomics::SeqCst); + assert!(task != 0); + unsafe { BlockedTask::cast_from_uint(task) } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Helper function for select, tests whether this port can receive without + // blocking (obviously not an atomic decision). + // + // This is different than the stream version because there's no need to peek + // at the queue, we can just look at the local count. + pub fn can_recv(&mut self) -> bool { + let cnt = self.cnt.load(atomics::SeqCst); + cnt == DISCONNECTED || cnt - self.steals > 0 + } + + // Inserts the blocked task for selection on this port, returning it back if + // the port already has data on it. + // + // The code here is the same as in stream.rs, except that it doesn't need to + // peek at the channel to see if an upgrade is pending. + pub fn start_selection(&mut self, + task: BlockedTask) -> Result<(), BlockedTask> { + match self.decrement(task) { + Ok(()) => Ok(()), + Err(task) => { + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + assert!(prev >= 0); + return Err(task); + } + } + } + + // Cancels a previous task waiting on this port, returning whether there's + // data on the port. + // + // This is similar to the stream implementation (hence fewer comments), but + // uses a different value for the "steals" variable. + pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { + // Before we do anything else, we bounce on this lock. The reason for + // doing this is to ensure that any upgrade-in-progress is gone and + // done with. Without this bounce, we can race with inherit_blocker + // about looking at and dealing with to_wake. Once we have acquired the + // lock, we are guaranteed that inherit_blocker is done. + unsafe { + self.select_lock.lock(); + self.select_lock.unlock(); + } + + // Like the stream implementation, we want to make sure that the count + // on the channel goes non-negative. We don't know how negative the + // stream currently is, so instead of using a steal value of 1, we load + // the channel count and figure out what we should do to make it + // positive. + let steals = { + let cnt = self.cnt.load(atomics::SeqCst); + if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} + }; + let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst); + + if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + self.cnt.store(DISCONNECTED, atomics::SeqCst); + true + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + if prev < 0 { + self.take_to_wake().trash(); + } else { + while self.to_wake.load(atomics::SeqCst) != 0 { + Thread::yield_now(); + } + } + // if the number of steals is -1, it was the pre-emptive -1 steal + // count from when we inherited a blocker. This is fine because + // we're just going to overwrite it with a real value. + assert!(self.steals == 0 || self.steals == -1); + self.steals = steals; + prev >= 0 + } + } +} + +#[unsafe_destructor] +impl Drop for Packet { + fn drop(&mut self) { + unsafe { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + assert_eq!(self.channels.load(atomics::SeqCst), 0); + self.select_lock.destroy(); + } + } +} diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs new file mode 100644 index 0000000000000..0e249a55f8707 --- /dev/null +++ b/src/libstd/comm/stream.rs @@ -0,0 +1,460 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Stream channels +/// +/// This is the flavor of channels which are optimized for one sender and one +/// receiver. The sender will be upgraded to a shared channel if the channel is +/// cloned. +/// +/// High level implementation details can be found in the comment of the parent +/// module. + +use comm::Port; +use int; +use iter::Iterator; +use kinds::Send; +use ops::Drop; +use option::{Some, None}; +use result::{Ok, Err, Result}; +use rt::local::Local; +use rt::task::{Task, BlockedTask}; +use rt::thread::Thread; +use spsc = sync::spsc_queue; +use sync::atomics; +use vec::OwnedVector; + +static DISCONNECTED: int = int::MIN; +static MAX_STEALS: int = 1 << 20; + +pub struct Packet { + queue: spsc::Queue>, // internal queue for all message + + cnt: atomics::AtomicInt, // How many items are on this channel + steals: int, // How many times has a port received without blocking? + to_wake: atomics::AtomicUint, // Task to wake up + + port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed. +} + +pub enum Failure { + Empty, + Disconnected, + Upgraded(Port), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(BlockedTask), +} + +pub enum SelectionResult { + SelSuccess, + SelCanceled(BlockedTask), + SelUpgraded(BlockedTask, Port), +} + +// Any message could contain an "upgrade request" to a new shared port, so the +// internal queue it's a queue of T, but rather Message +enum Message { + Data(T), + GoUp(Port), +} + +impl Packet { + pub fn new() -> Packet { + Packet { + queue: spsc::Queue::new(128), + + cnt: atomics::AtomicInt::new(0), + steals: 0, + to_wake: atomics::AtomicUint::new(0), + + port_dropped: atomics::AtomicBool::new(false), + } + } + + + pub fn send(&mut self, t: T) -> bool { + match self.do_send(Data(t)) { + UpSuccess => true, + UpDisconnected => false, + UpWoke(task) => { + task.wake().map(|t| t.reawaken()); + true + } + } + } + pub fn upgrade(&mut self, up: Port) -> UpgradeResult { + self.do_send(GoUp(up)) + } + + fn do_send(&mut self, t: Message) -> UpgradeResult { + // Use an acquire/release ordering to maintain the same position with + // respect to the atomic loads below + if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected } + + self.queue.push(t); + match self.cnt.fetch_add(1, atomics::SeqCst) { + // As described in the mod's doc comment, -1 == wakeup + -1 => UpWoke(self.take_to_wake()), + // As as described before, SPSC queues must be >= -2 + -2 => UpSuccess, + + // Be sure to preserve the disconnected state, and the return value + // in this case is going to be whether our data was received or not. + // This manifests itself on whether we have an empty queue or not. + // + // Primarily, are required to drain the queue here because the port + // will never remove this data. We can only have at most one item to + // drain (the port drains the rest). + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomics::SeqCst); + let first = self.queue.pop(); + let second = self.queue.pop(); + assert!(second.is_none()); + + match first { + Some(..) => UpSuccess, // we failed to send the data + None => UpDisconnected, // we successfully sent data + } + } + + // Otherwise we just sent some data on a non-waiting queue, so just + // make sure the world is sane and carry on! + n => { assert!(n >= 0); UpSuccess } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&mut self) -> BlockedTask { + let task = self.to_wake.load(atomics::SeqCst); + self.to_wake.store(0, atomics::SeqCst); + assert!(task != 0); + unsafe { BlockedTask::cast_from_uint(task) } + } + + // Decrements the count on the channel for a sleeper, returning the sleeper + // back if it shouldn't sleep. Note that this is the location where we take + // steals into account. + fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> { + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + let n = unsafe { task.cast_to_uint() }; + self.to_wake.store(n, atomics::SeqCst); + + let steals = self.steals; + self.steals = 0; + + match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Ok(()) } + } + } + + self.to_wake.store(0, atomics::SeqCst); + Err(unsafe { BlockedTask::cast_from_uint(n) }) + } + + pub fn recv(&mut self) -> Result> { + // Optimistic preflight check (scheduling is expensive). + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + // Welp, our channel has no data. Deschedule the current task and + // initiate the blocking protocol. + let task: ~Task = Local::take(); + task.deschedule(1, |task| { + self.decrement(task) + }); + + match self.try_recv() { + // Messages which actually popped from the queue shouldn't count as + // a steal, so offset the decrement here (we already have our + // "steal" factored into the channel count above). + data @ Ok(..) | + data @ Err(Upgraded(..)) => { + self.steals -= 1; + data + } + + data => data, + } + } + + pub fn try_recv(&mut self) -> Result> { + match self.queue.pop() { + // If we stole some data, record to that effect (this will be + // factored into cnt later on). Note that we don't allow steals to + // grow without bound in order to prevent eventual overflow of + // either steals or cnt as an overflow would have catastrophic + // results. Also note that we don't unconditionally set steals to 0 + // because it can be true that steals > cnt. + Some(data) => { + self.steals += 1; + if self.steals > MAX_STEALS { + match self.cnt.swap(0, atomics::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomics::SeqCst); + } + n => { self.steals -= n; } + } + assert!(self.steals >= 0); + } + match data { + Data(t) => Ok(t), + GoUp(up) => Err(Upgraded(up)), + } + } + + None => { + match self.cnt.load(atomics::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + + // This is a little bit of a tricky case. We failed to pop + // data above, and then we have viewed that the channel is + // disconnected. In this window more data could have been + // sent on the channel. It doesn't really make sense to + // return that the channel is disconnected when there's + // actually data on it, so be extra sure there's no data by + // popping one more time. + // + // We can ignore steals because the other end is + // disconnected and we'll never need to really factor in our + // steals again. + _ => { + match self.queue.pop() { + Some(Data(t)) => Ok(t), + Some(GoUp(up)) => Err(Upgraded(up)), + None => Err(Disconnected), + } + } + } + } + } + } + + pub fn drop_chan(&mut self) { + // Dropping a channel is pretty simple, we just flag it as disconnected + // and then wakeup a blocker if there is one. + match self.cnt.swap(DISCONNECTED, atomics::SeqCst) { + -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); } + DISCONNECTED => {} + n => { assert!(n >= 0); } + } + } + + pub fn drop_port(&mut self) { + // Dropping a port seems like a fairly trivial thing. In theory all we + // need to do is flag that we're disconnected and then everything else + // can take over (we don't have anyone to wake up). + // + // The catch for Ports is that we want to drop the entire contents of + // the queue. There are multiple reasons for having this property, the + // largest of which is that if another chan is waiting in this channel + // (but not received yet), then waiting on that port will cause a + // deadlock. + // + // So if we accept that we must now destroy the entire contents of the + // queue, this code may make a bit more sense. The tricky part is that + // we can't let any in-flight sends go un-dropped, we have to make sure + // *everything* is dropped and nothing new will come onto the channel. + + // The first thing we do is set a flag saying that we're done for. All + // sends are gated on this flag, so we're immediately guaranteed that + // there are a bounded number of active sends that we'll have to deal + // with. + self.port_dropped.store(true, atomics::SeqCst); + + // Now that we're guaranteed to deal with a bounded number of senders, + // we need to drain the queue. This draining process happens atomically + // with respect to the "count" of the channel. If the count is nonzero + // (with steals taken into account), then there must be data on the + // channel. In this case we drain everything and then try again. We will + // continue to fail while active senders send data while we're dropping + // data, but eventually we're guaranteed to break out of this loop + // (because there is a bounded number of senders). + let mut steals = self.steals; + while { + let cnt = self.cnt.compare_and_swap( + steals, DISCONNECTED, atomics::SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + loop { + match self.queue.pop() { + Some(..) => { steals += 1; } + None => break + } + } + } + + // At this point in time, we have gated all future senders from sending, + // and we have flagged the channel as being disconnected. The senders + // still have some responsibility, however, because some sends may not + // complete until after we flag the disconnection. There are more + // details in the sending methods that see DISCONNECTED + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Tests to see whether this port can receive without blocking. If Ok is + // returned, then that's the answer. If Err is returned, then the returned + // port needs to be queried instead (an upgrade happened) + pub fn can_recv(&mut self) -> Result> { + // We peek at the queue to see if there's anything on it, and we use + // this return value to determine if we should pop from the queue and + // upgrade this channel immediately. If it looks like we've got an + // upgrade pending, then go through the whole recv rigamarole to update + // the internal state. + match self.queue.peek() { + Some(&GoUp(..)) => { + match self.recv() { + Err(Upgraded(port)) => Err(port), + _ => unreachable!(), + } + } + Some(..) => Ok(true), + None => Ok(false) + } + } + + // Attempts to start selecting on this port. Like a oneshot, this can fail + // immediately because of an upgrade. + pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult { + match self.decrement(task) { + Ok(()) => SelSuccess, + Err(task) => { + let ret = match self.queue.peek() { + Some(&GoUp(..)) => { + match self.queue.pop() { + Some(GoUp(port)) => SelUpgraded(task, port), + _ => unreachable!(), + } + } + Some(..) => SelCanceled(task), + None => SelCanceled(task), + }; + // Undo our decrement above, and we should be guaranteed that the + // previous value is positive because we're not going to sleep + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + assert!(prev >= 0); + return ret; + } + } + } + + // Removes a previous task from being blocked in this port + pub fn abort_selection(&mut self, + was_upgrade: bool) -> Result> { + // If we're aborting selection after upgrading from a oneshot, then + // we're guarantee that no one is waiting. The only way that we could + // have seen the upgrade is if data was actually sent on the channel + // half again. For us, this means that there is guaranteed to be data on + // this channel. Furthermore, we're guaranteed that there was no + // start_selection previously, so there's no need to modify `self.cnt` + // at all. + // + // Hence, because of these invariants, we immediately return `Ok(true)`. + // Note that the data may not actually be sent on the channel just yet. + // The other end could have flagged the upgrade but not sent data to + // this end. This is fine because we know it's a small bounded windows + // of time until the data is actually sent. + if was_upgrade { + assert_eq!(self.steals, 0); + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + return Ok(true) + } + + // We want to make sure that the count on the channel goes non-negative, + // and in the stream case we can have at most one steal, so just assume + // that we had one steal. + let steals = 1; + let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst); + + // If we were previously disconnected, then we know for sure that there + // is no task in to_wake, so just keep going + let has_data = if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + self.cnt.store(DISCONNECTED, atomics::SeqCst); + true // there is data, that data is that we're disconnected + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + + // If the previous count was negative, then we just made things go + // positive, hence we passed the -1 boundary and we're responsible + // for removing the to_wake() field and trashing it. + // + // If the previous count was positive then we're in a tougher + // situation. A possible race is that a sender just incremented + // through -1 (meaning it's going to try to wake a task up), but it + // hasn't yet read the to_wake. In order to prevent a future recv() + // from waking up too early (this sender picking up the plastered + // over to_wake), we spin loop here waiting for to_wake to be 0. + // Note that this entire select() implementation needs an overhaul, + // and this is *not* the worst part of it, so this is not done as a + // final solution but rather out of necessity for now to get + // something working. + if prev < 0 { + self.take_to_wake().trash(); + } else { + while self.to_wake.load(atomics::SeqCst) != 0 { + Thread::yield_now(); + } + } + assert_eq!(self.steals, 0); + self.steals = steals; + + // if we were previously positive, then there's surely data to + // receive + prev >= 0 + }; + + // Now that we've determined that this queue "has data", we peek at the + // queue to see if the data is an upgrade or not. If it's an upgrade, + // then we need to destroy this port and abort selection on the + // upgraded port. + if has_data { + match self.queue.peek() { + Some(&GoUp(..)) => { + match self.queue.pop() { + Some(GoUp(port)) => Err(port), + _ => unreachable!(), + } + } + _ => Ok(true), + } + } else { + Ok(false) + } + } +} + +#[unsafe_destructor] +impl Drop for Packet { + fn drop(&mut self) { + unsafe { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + } + } +} diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 66ceb03082f4b..53129f3df9b6f 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -698,7 +698,7 @@ mod test { iotest!(fn tcp_clone_two_read() { let addr = next_test_ip6(); let mut acceptor = TcpListener::bind(addr).listen(); - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); let c2 = c.clone(); spawn(proc() { diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index aeec36a932c46..f779d80976f6b 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -301,7 +301,7 @@ mod test { let addr2 = next_test_ip4(); let mut sock1 = UdpSocket::bind(addr1).unwrap(); let sock2 = UdpSocket::bind(addr2).unwrap(); - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); let c2 = c.clone(); spawn(proc() { @@ -335,7 +335,7 @@ mod test { let mut sock1 = UdpSocket::bind(addr1).unwrap(); let sock2 = UdpSocket::bind(addr2).unwrap(); - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); let (serv_port, serv_chan) = Chan::new(); spawn(proc() { diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index 3c7db9c868618..6ce4a6fdc8758 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -270,7 +270,7 @@ mod tests { fn unix_clone_two_read() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); let c2 = c.clone(); spawn(proc() { diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 75804c40c5806..46c106234db75 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -21,7 +21,7 @@ definitions for a number of signals. use clone::Clone; use result::{Ok, Err}; -use comm::{Port, SharedChan}; +use comm::{Port, Chan}; use container::{Map, MutableMap}; use hashmap; use io; @@ -81,7 +81,7 @@ pub struct Listener { priv handles: hashmap::HashMap, /// chan is where all the handles send signums, which are received by /// the clients from port. - priv chan: SharedChan, + priv chan: Chan, /// Clients of Listener can `recv()` from this port. This is exposed to /// allow selection over this port as well as manipulation of the port @@ -93,7 +93,7 @@ impl Listener { /// Creates a new listener for signals. Once created, signals are bound via /// the `register` method (otherwise nothing will ever be received) pub fn new() -> Listener { - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); Listener { chan: chan, port: port, diff --git a/src/libstd/prelude.rs b/src/libstd/prelude.rs index 471ec050192b0..4849b83037f09 100644 --- a/src/libstd/prelude.rs +++ b/src/libstd/prelude.rs @@ -80,7 +80,7 @@ pub use vec::{MutableVector, MutableTotalOrdVector}; pub use vec::{Vector, VectorVector, CloneableVector, ImmutableVector}; // Reexported runtime types -pub use comm::{Port, Chan, SharedChan}; +pub use comm::{Port, Chan}; pub use task::spawn; // Reexported statics diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 376d685c8ac3c..b751c57c0fa31 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -73,6 +73,7 @@ pub use self::unwind::{begin_unwind, begin_unwind_raw, begin_unwind_fmt}; // FIXME: these probably shouldn't be public... #[doc(hidden)] pub mod shouldnt_be_public { + #[cfg(not(test))] pub use super::local_ptr::native::maybe_tls_key; #[cfg(not(windows), not(target_os = "android"))] pub use super::local_ptr::compiled::RT_TLS_PTR; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 8d02048d55cf0..39623e329eae9 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -10,7 +10,7 @@ use c_str::CString; use cast; -use comm::{SharedChan, Port}; +use comm::{Chan, Port}; use libc::c_int; use libc; use ops::Drop; @@ -181,7 +181,7 @@ pub trait IoFactory { fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>; fn tty_open(&mut self, fd: c_int, readable: bool) -> Result<~RtioTTY, IoError>; - fn signal(&mut self, signal: Signum, channel: SharedChan) + fn signal(&mut self, signal: Signum, channel: Chan) -> Result<~RtioSignal, IoError>; } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index a7648dd2d19df..e2b94e655e870 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -449,7 +449,7 @@ mod test { #[test] fn comm_shared_chan() { - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); chan.send(10); assert!(port.recv() == 10); } diff --git a/src/libstd/rt/unwind.rs b/src/libstd/rt/unwind.rs index 2f4e705735ea7..9f89becaef9c9 100644 --- a/src/libstd/rt/unwind.rs +++ b/src/libstd/rt/unwind.rs @@ -73,6 +73,7 @@ use unstable::intrinsics; use uw = self::libunwind; +#[allow(dead_code)] mod libunwind { //! Unwind library interface diff --git a/src/libstd/run.rs b/src/libstd/run.rs index fdd26c6c3839c..6f684f23d4760 100644 --- a/src/libstd/run.rs +++ b/src/libstd/run.rs @@ -13,7 +13,7 @@ #[allow(missing_doc)]; #[deny(unused_must_use)]; -use comm::SharedChan; +use comm::Chan; use io::Reader; use io::process::ProcessExit; use io::process; @@ -225,7 +225,7 @@ impl Process { // in parallel so we don't deadlock while blocking on one // or the other. FIXME (#2625): Surely there's a much more // clever way to do this. - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let ch_clone = ch.clone(); spawn(proc() { diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 74f3a6f6918da..44825a1ef945b 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -172,7 +172,7 @@ mod tests { let nmsgs = 1000u; let mut q = Queue::with_capacity(nthreads*nmsgs); assert_eq!(None, q.pop()); - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); for _ in range(0, nthreads) { let q = q.clone(); diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 1ec8ac5d83e05..b5a55f3f8c973 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -39,12 +39,10 @@ // /queues/non-intrusive-mpsc-node-based-queue use cast; -use clone::Clone; use kinds::Send; use ops::Drop; use option::{Option, None, Some}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; /// A result of the `pop` function. @@ -65,40 +63,12 @@ struct Node { value: Option, } -struct State { - head: AtomicPtr>, - tail: *mut Node, - packet: P, -} - -/// The consumer half of this concurrent queue. This half is used to receive -/// data from the producers. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// The production half of the concurrent queue. This handle may be cloned in -/// order to make handles for new producers. -pub struct Producer { - priv state: UnsafeArc>, -} - -impl Clone for Producer { - fn clone(&self) -> Producer { - Producer { state: self.state.clone() } - } -} - -/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet" -/// of information which will be shared by the consumer and the producer which -/// can be re-acquired via the `packet` function. This is helpful when extra -/// state is shared between the producer and consumer, but note that there is no -/// synchronization performed of this data. -pub fn queue(p: P) -> (Consumer, Producer) { - unsafe { - let (a, b) = UnsafeArc::new2(State::new(p)); - (Consumer { state: a }, Producer { state: b }) - } +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue { + priv head: AtomicPtr>, + priv tail: *mut Node, } impl Node { @@ -110,67 +80,26 @@ impl Node { } } -impl State { - unsafe fn new(p: P) -> State { - let stub = Node::new(None); - State { +impl Queue { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: stub, - packet: p, } } - unsafe fn push(&mut self, t: T) { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - - unsafe fn pop(&mut self) -> PopResult { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - self.tail = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: ~Node = cast::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } -} - -#[unsafe_destructor] -impl Drop for State { - fn drop(&mut self) { + /// Pushes a new value onto this queue. + pub fn push(&mut self, t: T) { unsafe { - let mut cur = self.tail; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: ~Node = cast::transmute(cur); - cur = next; - } + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); } } -} - -impl Producer { - /// Pushes a new value onto this queue. - pub fn push(&mut self, value: T) { - unsafe { (*self.state.get()).push(value) } - } - /// Gets an unsafe pointer to the user-defined packet shared by the - /// producers and the consumer. Note that care must be taken to ensure that - /// the lifetime of the queue outlives the usage of the returned pointer. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} -impl Consumer { /// Pops some data from this queue. /// /// Note that the current implementation means that this function cannot @@ -182,8 +111,23 @@ impl Consumer { /// This inconsistent state means that this queue does indeed have data, but /// it does not currently have access to it at this time. pub fn pop(&mut self) -> PopResult { - unsafe { (*self.state.get()).pop() } + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node = cast::transmute(tail); + return Data(ret); + } + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + } } + /// Attempts to pop data from this queue, but doesn't attempt too hard. This /// will canonicalize inconsistent states to a `None` value. pub fn casual_pop(&mut self) -> Option { @@ -192,10 +136,19 @@ impl Consumer { Empty | Inconsistent => None, } } - /// Gets an unsafe pointer to the underlying user-defined packet. See - /// `Producer.packet` for more information. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +} + +#[unsafe_destructor] +impl Drop for Queue { + fn drop(&mut self) { + unsafe { + let mut cur = self.tail; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _: ~Node = cast::transmute(cur); + cur = next; + } + } } } @@ -203,34 +156,35 @@ impl Consumer { mod tests { use prelude::*; - use super::{queue, Data, Empty, Inconsistent}; use native; + use super::{Queue, Data, Empty, Inconsistent}; + use sync::arc::UnsafeArc; #[test] fn test_full() { - let (_, mut p) = queue(()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(); + q.push(~1); + q.push(~2); } #[test] fn test() { let nthreads = 8u; let nmsgs = 1000u; - let (mut c, p) = queue(()); - match c.pop() { + let mut q = Queue::new(); + match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() } - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); + let q = UnsafeArc::new(q); for _ in range(0, nthreads) { - let q = p.clone(); let chan = chan.clone(); + let q = q.clone(); native::task::spawn(proc() { - let mut q = q; for i in range(0, nmsgs) { - q.push(i); + unsafe { (*q.get()).push(i); } } chan.send(()); }); @@ -238,11 +192,12 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match c.pop() { + match unsafe { (*q.get()).pop() } { Empty | Inconsistent => {}, Data(_) => { i += 1 } } } + drop(chan); for _ in range(0, nthreads) { port.recv(); } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 35a5846f11aba..a2c61a2b13579 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -38,7 +38,6 @@ use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; // Node within the linked list queue of messages to send @@ -50,75 +49,25 @@ struct Node { next: AtomicPtr>, // next node in the queue } -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State { +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue { // consumer fields - tail: *mut Node, // where to pop from - tail_prev: AtomicPtr>, // where to pop from + priv tail: *mut Node, // where to pop from + priv tail_prev: AtomicPtr>, // where to pop from // producer fields - head: *mut Node, // where to push to - first: *mut Node, // where to get new nodes from - tail_copy: *mut Node, // between first/tail + priv head: *mut Node, // where to push to + priv first: *mut Node, // where to get new nodes from + priv tail_copy: *mut Node, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -/// Producer half of this queue. This handle is used to push data to the -/// consumer. -pub struct Producer { - priv state: UnsafeArc>, -} - -/// Consumer half of this queue. This handle is used to receive data from the -/// producer. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// Creates a new queue. The producer returned is connected to the consumer to -/// push all data to the consumer. -/// -/// # Arguments -/// -/// * `bound` - This queue implementation is implemented with a linked list, -/// and this means that a push is always a malloc. In order to -/// amortize this cost, an internal cache of nodes is maintained -/// to prevent a malloc from always being necessary. This bound is -/// the limit on the size of the cache (if desired). If the value -/// is 0, then the cache has no bound. Otherwise, the cache will -/// never grow larger than `bound` (although the queue itself -/// could be much larger. -/// -/// * `p` - This is the user-defined packet of data which will also be shared -/// between the producer and consumer. -pub fn queue(bound: uint, - p: P) -> (Consumer, Producer) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) + priv cache_bound: uint, + priv cache_additions: AtomicUint, + priv cache_subtractions: AtomicUint, } impl Node { @@ -132,49 +81,49 @@ impl Node { } } -impl Producer { - /// Pushes data onto the queue - pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - /// Tests whether the queue is empty. Note that if this function returns - /// `false`, the return value is significant, but if the return value is - /// `true` then almost no meaning can be attached to the return value. - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - /// Acquires an unsafe pointer to the underlying user-defined packet. Note - /// that care must be taken to ensure that the queue outlives the usage of - /// the packet (because it is an unsafe pointer). - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl Consumer { - /// Pops some data from this queue, returning `None` when the queue is - /// empty. - pub fn pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } - } - /// Same function as the producer's `packet` method. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +impl Queue { + /// Creates a new queue. The producer returned is connected to the consumer + /// to push all data to the consumer. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub fn new(bound: uint) -> Queue { + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + Queue { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + } } -} -impl State { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&mut self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } } unsafe fn alloc(&mut self) -> *mut Node { @@ -208,50 +157,59 @@ impl State { Node::new() } - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&mut self) -> Option { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); - if size < self.cache_bound { + self.tail = next; + if self.cache_bound == 0 { self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node = cast::transmute(tail); + // FIXME: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node = cast::transmute(tail); + } } + return ret; } - return ret; } - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); + /// Attempts to peek at the head of the queue, returning `None` if the queue + /// has no data currently + pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + // This is essentially the same as above with all the popping bits + // stripped out. + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + return (*next).value.as_mut(); + } } } #[unsafe_destructor] -impl Drop for State { +impl Drop for Queue { fn drop(&mut self) { unsafe { let mut cur = self.first; @@ -267,44 +225,45 @@ impl Drop for State { #[cfg(test)] mod test { use prelude::*; - use super::queue; use native; + use super::Queue; + use sync::arc::UnsafeArc; #[test] fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(0); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(0); + q.push(~1); + q.push(~2); } #[test] fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(1); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] @@ -313,13 +272,12 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); + let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); native::task::spawn(proc() { - let mut c = c; for _ in range(0, 100000) { loop { - match c.pop() { + match unsafe { (*b.get()).pop() } { Some(1) => break, Some(_) => fail!(), None => {} @@ -329,7 +287,7 @@ mod test { chan.send(()); }); for _ in range(0, 100000) { - p.push(1); + unsafe { (*a.get()).push(1); } } port.recv(); } diff --git a/src/libstd/task.rs b/src/libstd/task.rs index 921d0feaa8b19..5d8c4a87b3935 100644 --- a/src/libstd/task.rs +++ b/src/libstd/task.rs @@ -65,7 +65,6 @@ use rt::task::Task; use str::{Str, SendStr, IntoMaybeOwned}; #[cfg(test)] use any::{AnyOwnExt, AnyRefExt}; -#[cfg(test)] use comm::SharedChan; #[cfg(test)] use ptr; #[cfg(test)] use result; @@ -474,9 +473,9 @@ fn test_try_fail() { fn test_spawn_sched() { use clone::Clone; - let (po, ch) = SharedChan::new(); + let (po, ch) = Chan::new(); - fn f(i: int, ch: SharedChan<()>) { + fn f(i: int, ch: Chan<()>) { let ch = ch.clone(); spawn(proc() { if i == 0 { diff --git a/src/libsync/sync/mod.rs b/src/libsync/sync/mod.rs index 0ac385ea1d129..7078f01945e96 100644 --- a/src/libsync/sync/mod.rs +++ b/src/libsync/sync/mod.rs @@ -764,7 +764,7 @@ mod tests { use std::cast; use std::result; use std::task; - use std::comm::{SharedChan, Empty}; + use std::comm::Empty; /************************************************************************ * Semaphore tests @@ -1393,7 +1393,7 @@ mod tests { #[test] fn test_barrier() { let barrier = Barrier::new(10); - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); for _ in range(0, 9) { let c = barrier.clone(); diff --git a/src/libsync/sync/mutex.rs b/src/libsync/sync/mutex.rs index f1a81d65c1da5..3726528a5e9c8 100644 --- a/src/libsync/sync/mutex.rs +++ b/src/libsync/sync/mutex.rs @@ -531,7 +531,7 @@ mod test { } } - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); for _ in range(0, N) { let c2 = c.clone(); native::task::spawn(proc() { inc(); c2.send(()); }); diff --git a/src/libsync/sync/one.rs b/src/libsync/sync/one.rs index 93d818b704dc9..a651f3b9d4c3e 100644 --- a/src/libsync/sync/one.rs +++ b/src/libsync/sync/one.rs @@ -137,7 +137,7 @@ mod test { static mut o: Once = ONCE_INIT; static mut run: bool = false; - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); for _ in range(0, 10) { let c = c.clone(); spawn(proc() { diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index aa4e0f1ae5809..b766be88d2309 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -53,7 +53,7 @@ fn server(requests: &Port, responses: &Chan) { fn run(args: &[~str]) { let (from_child, to_parent) = Chan::new(); - let (from_parent, to_child) = SharedChan::new(); + let (from_parent, to_child) = Chan::new(); let size = from_str::(args[1]).unwrap(); let workers = from_str::(args[2]).unwrap(); diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 6ce0f9de8d0a2..89e0bcf332638 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -67,7 +67,7 @@ fn run(args: &[~str]) { }); from_parent } else { - let (from_parent, to_child) = SharedChan::new(); + let (from_parent, to_child) = Chan::new(); for _ in range(0u, workers) { let to_child = to_child.clone(); let mut builder = task::task(); diff --git a/src/test/bench/shootout-chameneos-redux.rs b/src/test/bench/shootout-chameneos-redux.rs index 7281667e6769a..5c237b306fbd8 100644 --- a/src/test/bench/shootout-chameneos-redux.rs +++ b/src/test/bench/shootout-chameneos-redux.rs @@ -100,8 +100,8 @@ fn creature( name: uint, color: color, from_rendezvous: Port>, - to_rendezvous: SharedChan, - to_rendezvous_log: SharedChan<~str> + to_rendezvous: Chan, + to_rendezvous_log: Chan<~str> ) { let mut color = color; let mut creatures_met = 0; @@ -137,8 +137,8 @@ fn creature( fn rendezvous(nn: uint, set: ~[color]) { // these ports will allow us to hear from the creatures - let (from_creatures, to_rendezvous) = SharedChan::::new(); - let (from_creatures_log, to_rendezvous_log) = SharedChan::<~str>::new(); + let (from_creatures, to_rendezvous) = Chan::::new(); + let (from_creatures_log, to_rendezvous_log) = Chan::<~str>::new(); // these channels will be passed to the creatures so they can talk to us diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index 86a2043527e21..7f4fd3cf94ce1 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -28,13 +28,13 @@ use std::task; use std::uint; fn fib(n: int) -> int { - fn pfib(c: &SharedChan, n: int) { + fn pfib(c: &Chan, n: int) { if n == 0 { c.send(0); } else if n <= 2 { c.send(1); } else { - let (pp, cc) = SharedChan::new(); + let (pp, cc) = Chan::new(); let ch = cc.clone(); task::spawn(proc() pfib(&ch, n - 1)); let ch = cc.clone(); @@ -43,7 +43,7 @@ fn fib(n: int) -> int { } } - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let _t = task::spawn(proc() pfib(&ch, n) ); p.recv() } diff --git a/src/test/bench/task-perf-linked-failure.rs b/src/test/bench/task-perf-linked-failure.rs index 2a012ef19fa63..189a3ac74480e 100644 --- a/src/test/bench/task-perf-linked-failure.rs +++ b/src/test/bench/task-perf-linked-failure.rs @@ -33,7 +33,7 @@ // Creates in the background 'num_tasks' tasks, all blocked forever. // Doesn't return until all such tasks are ready, but doesn't block forever itself. -use std::comm::{stream, SharedChan}; +use std::comm::{stream, Chan}; use std::os; use std::result; use std::task; @@ -41,7 +41,7 @@ use std::uint; fn grandchild_group(num_tasks: uint) { let (po, ch) = stream(); - let ch = SharedChan::new(ch); + let ch = Chan::new(ch); for _ in range(0, num_tasks) { let ch = ch.clone(); diff --git a/src/test/compile-fail/comm-not-freeze.rs b/src/test/compile-fail/comm-not-freeze.rs index 2b85068d4708b..ef5bd21f91394 100644 --- a/src/test/compile-fail/comm-not-freeze.rs +++ b/src/test/compile-fail/comm-not-freeze.rs @@ -13,5 +13,5 @@ fn test() {} fn main() { test::>(); //~ ERROR: does not fulfill `Freeze` test::>(); //~ ERROR: does not fulfill `Freeze` - test::>(); //~ ERROR: does not fulfill `Freeze` + test::>(); //~ ERROR: does not fulfill `Freeze` } diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs index b15c3dca85536..9c05dae46bd28 100644 --- a/src/test/run-pass/hashmap-memory.rs +++ b/src/test/run-pass/hashmap-memory.rs @@ -31,7 +31,7 @@ mod map_reduce { enum ctrl_proto { find_reducer(~[u8], Chan), mapper_done, } - fn start_mappers(ctrl: SharedChan, inputs: ~[~str]) { + fn start_mappers(ctrl: Chan, inputs: ~[~str]) { for i in inputs.iter() { let ctrl = ctrl.clone(); let i = i.clone(); @@ -39,11 +39,11 @@ mod map_reduce { } } - fn map_task(ctrl: SharedChan, input: ~str) { + fn map_task(ctrl: Chan, input: ~str) { let mut intermediates = HashMap::new(); fn emit(im: &mut HashMap<~str, int>, - ctrl: SharedChan, key: ~str, + ctrl: Chan, key: ~str, _val: ~str) { if im.contains_key(&key) { return; @@ -63,7 +63,7 @@ mod map_reduce { } pub fn map_reduce(inputs: ~[~str]) { - let (ctrl_port, ctrl_chan) = SharedChan::new(); + let (ctrl_port, ctrl_chan) = Chan::new(); // This task becomes the master control task. It spawns others // to do the rest. diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs index b51f626f3c2ab..0403284e55f5e 100644 --- a/src/test/run-pass/task-comm-14.rs +++ b/src/test/run-pass/task-comm-14.rs @@ -13,7 +13,7 @@ use std::task; pub fn main() { - let (po, ch) = SharedChan::new(); + let (po, ch) = Chan::new(); // Spawn 10 tasks each sending us back one int. let mut i = 10; @@ -37,7 +37,7 @@ pub fn main() { info!("main thread exiting"); } -fn child(x: int, ch: &SharedChan) { +fn child(x: int, ch: &Chan) { info!("{}", x); ch.send(x); } diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index 049f2d7194628..f5374e7df05a9 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -16,7 +16,7 @@ use std::task; pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); } -fn test00_start(ch: &SharedChan, message: int, count: int) { +fn test00_start(ch: &Chan, message: int, count: int) { info!("Starting test00_start"); let mut i: int = 0; while i < count { @@ -33,7 +33,7 @@ fn test00() { info!("Creating tasks"); - let (po, ch) = SharedChan::new(); + let (po, ch) = Chan::new(); let mut i: int = 0; diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs index 45994e78d9405..c63bf8bc85659 100644 --- a/src/test/run-pass/task-comm-6.rs +++ b/src/test/run-pass/task-comm-6.rs @@ -15,7 +15,7 @@ pub fn main() { test00(); } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let mut c0 = ch.clone(); let mut c1 = ch.clone(); let mut c2 = ch.clone(); diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs index 159962e1857bc..ff43a80adaca1 100644 --- a/src/test/run-pass/task-comm-7.rs +++ b/src/test/run-pass/task-comm-7.rs @@ -18,7 +18,7 @@ use std::task; pub fn main() { test00(); } -fn test00_start(c: &SharedChan, start: int, +fn test00_start(c: &Chan, start: int, number_of_messages: int) { let mut i: int = 0; while i < number_of_messages { c.send(start + i); i += 1; } @@ -27,7 +27,7 @@ fn test00_start(c: &SharedChan, start: int, fn test00() { let mut r: int = 0; let mut sum: int = 0; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let number_of_messages: int = 10; let c = ch.clone(); diff --git a/src/test/run-pass/unique-send-2.rs b/src/test/run-pass/unique-send-2.rs index d1c45a336fa7a..299fed735ab18 100644 --- a/src/test/run-pass/unique-send-2.rs +++ b/src/test/run-pass/unique-send-2.rs @@ -10,12 +10,12 @@ use std::task; -fn child(c: &SharedChan<~uint>, i: uint) { +fn child(c: &Chan<~uint>, i: uint) { c.send(~i); } pub fn main() { - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let n = 100u; let mut expected = 0u; for i in range(0u, n) { diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs index 4679f65c43cd8..e643a20436e0c 100644 --- a/src/test/run-pass/unwind-resource.rs +++ b/src/test/run-pass/unwind-resource.rs @@ -15,7 +15,7 @@ extern mod extra; use std::task; struct complainer { - c: SharedChan, + c: Chan, } impl Drop for complainer { @@ -26,20 +26,20 @@ impl Drop for complainer { } } -fn complainer(c: SharedChan) -> complainer { +fn complainer(c: Chan) -> complainer { error!("Hello!"); complainer { c: c } } -fn f(c: SharedChan) { +fn f(c: Chan) { let _c = complainer(c); fail!(); } pub fn main() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); task::spawn(proc() f(c.clone())); error!("hiiiiiiiii"); assert!(p.recv());