From fea60daeadbb091c71e09bed1e7c1da1670d5c64 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 9 Jan 2014 11:57:05 -0800 Subject: [PATCH 1/2] std: Introduce a standard Mutex type Much of the justification can be found in the large implementation comment found in the module itself. --- src/etc/licenseck.py | 3 +- src/libgreen/simple.rs | 1 + src/libgreen/task.rs | 2 + src/libnative/io/net.rs | 2 +- src/libnative/task.rs | 2 + src/librustc/back/link.rs | 2 +- src/librustc/middle/trans/base.rs | 2 +- src/libstd/rt/mod.rs | 1 + src/libstd/rt/task.rs | 6 + src/libstd/sync/mod.rs | 7 + src/libstd/sync/mpsc_intrusive.rs | 142 ++++++++ src/libstd/sync/mutex.rs | 520 ++++++++++++++++++++++++++++++ src/libstd/sync/one.rs | 170 ++++++++++ src/libstd/unstable/mutex.rs | 518 ++++++++++++----------------- src/rt/rust_builtin.c | 20 -- 15 files changed, 1065 insertions(+), 333 deletions(-) create mode 100644 src/libstd/sync/mpsc_intrusive.rs create mode 100644 src/libstd/sync/mutex.rs create mode 100644 src/libstd/sync/one.rs diff --git a/src/etc/licenseck.py b/src/etc/licenseck.py index b5a721c03ff09..a14c5e8adf38e 100644 --- a/src/etc/licenseck.py +++ b/src/etc/licenseck.py @@ -41,6 +41,7 @@ "libstd/sync/mpsc_queue.rs", # BSD "libstd/sync/spsc_queue.rs", # BSD "libstd/sync/mpmc_bounded_queue.rs", # BSD + "libstd/sync/mpsc_intrusive.rs", # BSD ] def check_license(name, contents): @@ -59,4 +60,4 @@ def check_license(name, contents): if (boilerplate.find(license1) == -1 or boilerplate.find(license2) == -1) and \ (boilerplate.find(license3) == -1 or boilerplate.find(license4) == -1): return False - return True \ No newline at end of file + return True diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 4a0523fe47a7a..43c7095ae17c0 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -76,6 +76,7 @@ impl Runtime for SimpleTask { } fn local_io<'a>(&'a mut self) -> Option> { None } fn stack_bounds(&self) -> (uint, uint) { fail!() } + fn can_block(&self) -> bool { true } fn wrap(~self) -> ~Any { fail!() } } diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 31752941231cb..3e4b8662eacd1 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -462,6 +462,8 @@ impl Runtime for GreenTask { c.current_stack_segment.end() as uint) } + fn can_block(&self) -> bool { false } + fn wrap(~self) -> ~Any { self as ~Any } } diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index adcd21f0ac4c5..a60034a7170ca 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -201,7 +201,7 @@ pub fn init() { } unsafe { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; INIT.doit(|| { let mut data: WSADATA = intrinsics::init(); diff --git a/src/libnative/task.rs b/src/libnative/task.rs index e827b495852a4..8eb429553a811 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -142,6 +142,8 @@ impl rt::Runtime for Ops { fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds } + fn can_block(&self) -> bool { true } + // This function gets a little interesting. There are a few safety and // ownership violations going on here, but this is all done in the name of // shared state. Additionally, all of the violations are protected with a diff --git a/src/librustc/back/link.rs b/src/librustc/back/link.rs index ffb9cce033ed7..04c4c9ce99f2b 100644 --- a/src/librustc/back/link.rs +++ b/src/librustc/back/link.rs @@ -311,7 +311,7 @@ pub mod write { } unsafe fn configure_llvm(sess: Session) { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; // Copy what clan does by turning on loop vectorization at O2 and diff --git a/src/librustc/middle/trans/base.rs b/src/librustc/middle/trans/base.rs index aaa8d071aff5d..f2c9425293236 100644 --- a/src/librustc/middle/trans/base.rs +++ b/src/librustc/middle/trans/base.rs @@ -3295,7 +3295,7 @@ pub fn trans_crate(sess: session::Session, output: &Path) -> CrateTranslation { // Before we touch LLVM, make sure that multithreading is enabled. unsafe { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; static mut POISONED: bool = false; INIT.doit(|| { diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index e7adb5ad7ddaf..df93d42f3cc04 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -156,6 +156,7 @@ pub trait Runtime { fn local_io<'a>(&'a mut self) -> Option>; /// The (low, high) edges of the current stack. fn stack_bounds(&self) -> (uint, uint); // (lo, hi) + fn can_block(&self) -> bool; // XXX: This is a serious code smell and this should not exist at all. fn wrap(~self) -> ~Any; diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index b4ead4252ca41..9e65974816686 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -292,6 +292,12 @@ impl Task { pub fn stack_bounds(&self) -> (uint, uint) { self.imp.get_ref().stack_bounds() } + + /// Returns whether it is legal for this task to block the OS thread that it + /// is running on. + pub fn can_block(&self) -> bool { + self.imp.get_ref().can_block() + } } impl Drop for Task { diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 3213c538152c6..e206ba6129f16 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,9 +15,16 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. +pub use self::mutex::{Mutex, StaticMutex, Guard, MUTEX_INIT}; +pub use self::one::{Once, ONCE_INIT}; + pub mod arc; pub mod atomics; pub mod deque; pub mod mpmc_bounded_queue; pub mod mpsc_queue; pub mod spsc_queue; + +mod mpsc_intrusive; +mod mutex; +mod one; diff --git a/src/libstd/sync/mpsc_intrusive.rs b/src/libstd/sync/mpsc_intrusive.rs new file mode 100644 index 0000000000000..374cb8010b4a7 --- /dev/null +++ b/src/libstd/sync/mpsc_intrusive.rs @@ -0,0 +1,142 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module implements an intrusive MPSC queue. This queue is incredibly +//! unsafe (due to use of unsafe pointers for nodes), and hence is not public. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/intrusive-mpsc-node-based-queue + +use cast; +use kinds::Send; +use option::{Option, Some, None}; +use ptr::RawPtr; +use sync::atomics; + +// NB: all links are done as AtomicUint instead of AtomicPtr to allow for static +// initialization. + +pub struct Node { + next: atomics::AtomicUint, + data: T, +} + +pub struct DummyNode { + next: atomics::AtomicUint, +} + +pub struct Queue { + head: atomics::AtomicUint, + tail: *mut Node, + stub: DummyNode, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + head: atomics::AtomicUint::new(0), + tail: 0 as *mut Node, + stub: DummyNode { + next: atomics::AtomicUint::new(0), + }, + } + } + + pub unsafe fn push(&mut self, node: *mut Node) { + (*node).next.store(0, atomics::Release); + let prev = self.head.swap(node as uint, atomics::AcqRel); + + // Note that this code is slightly modified to allow static + // initialization of these queues with rust's flavor of static + // initialization. + if prev == 0 { + self.stub.next.store(node as uint, atomics::Release); + } else { + let prev = prev as *mut Node; + (*prev).next.store(node as uint, atomics::Release); + } + } + + /// You'll note that the other MPSC queue in std::sync is non-intrusive and + /// returns a `PopResult` here to indicate when the queue is inconsistent. + /// An "inconsistent state" in the other queue means that a pusher has + /// pushed, but it hasn't finished linking the rest of the chain. + /// + /// This queue also suffers from this problem, but I currently haven't been + /// able to detangle when this actually happens. This code is translated + /// verbatim from the website above, and is more complicated than the + /// non-intrusive version. + /// + /// Right now consumers of this queue must be ready for this fact. Just + /// because `pop` returns `None` does not mean that there is not data + /// on the queue. + pub unsafe fn pop(&mut self) -> Option<*mut Node> { + let tail = self.tail; + let mut tail = if !tail.is_null() {tail} else { + cast::transmute(&self.stub) + }; + let mut next = (*tail).next(atomics::Relaxed); + if tail as uint == &self.stub as *DummyNode as uint { + if next.is_null() { + return None; + } + self.tail = next; + tail = next; + next = (*next).next(atomics::Relaxed); + } + if !next.is_null() { + self.tail = next; + return Some(tail); + } + let head = self.head.load(atomics::Acquire) as *mut Node; + if tail != head { + return None; + } + let stub = cast::transmute(&self.stub); + self.push(stub); + next = (*tail).next(atomics::Relaxed); + if !next.is_null() { + self.tail = next; + return Some(tail); + } + return None + } +} + +impl Node { + pub fn new(t: T) -> Node { + Node { + data: t, + next: atomics::AtomicUint::new(0), + } + } + pub unsafe fn next(&mut self, ord: atomics::Ordering) -> *mut Node { + cast::transmute::>(self.next.load(ord)) + } +} diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs new file mode 100644 index 0000000000000..c97568472e384 --- /dev/null +++ b/src/libstd/sync/mutex.rs @@ -0,0 +1,520 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A proper mutex implementation regardless of the "flavor of task" which is +//! acquiring the lock. + +// # The implementation of Rust's mutexes +// +// As hinted in the doc-comment above, the fundamental problem of implementing a +// mutex for rust is that you can't "just use pthreads". Green tasks are not +// allowed to block on a pthread mutex, because this can very easily lead to +// deadlock. Otherwise, there are other properties that we would want out of an +// "official mutex": +// +// * Any flavor of task can acquire the mutex, green or native +// * Any mixing of flavors of tasks can acquire the mutex. It should be possible +// for green and native threads to contend over acquiring the mutex +// * This mutex should be "just as fast" as pthreads +// * Mutexes should be statically initializeable +// * Mutexes should really not need to have destructors (see static +// initialization) +// +// Some properties which have been deemed not critical +// +// * Enforcing bounded waiting among all tasks acquiring the mutex. Mixing +// green/native tasks is predicted to be a fairly rare case. +// +// ## Mutexes, take 1 +// +// Within these constraints, the primitives we have available to us for blocking +// a task are the `deschedule` and `reawaken` methods on the `rt::Runtime` +// trait. These are the obvious choices to use first because they're "what we +// havel already" and should certainly be efficient. +// +// The sketch behind this mutex would be to use an intrusive (to avoid +// allocations) MPSC queue (the consumer is the lock holder) with some +// sprinkling of atomics to wake threads up. Each `BlockedTask` would be stored +// in the nodes of the queue. +// +// This implementation is all fine and dandy for green threads (user space +// context switching is fast), but when implemented, it was found that this +// implementation was about 50x slower than pthreads for native threads. +// +// Upon profiling, nearly all time was spent in cvar signal/wait (that's how +// native threads implement deschedule/reawaken). The problem was never tracked +// down with 100% certainty, but it was able discovered that this huge slowdown +// was only on a multicore system, not a single core system. With this knowledge +// in hand, plus some idea of how pthread mutexes are implemented, it was +// deduced that the kernel essentially knows what's going on when everyone's +// contended on the same mutex (as in the pthreads case). The kernel can +// cleverly schedule threads to *not* wake up on remote cores because all the +// work needs to happen on the same core (that's the whole point of a mutex). +// The deschedule/reawaken methods put threads to sleep on localized cvars, so +// the kernel had no idea that all our threads were contending *on the same +// mutex*. +// +// With this information in mind, it was concluded that it's impossible to +// create a pthreads-competitive mutex with the deschedule/reawaken primitives. +// We simply have no way of instructing the kernel that all native threads are +// contended on one object and should therefore *not* be spread out on many +// cores. +// +// ## Mutexes, take 2 +// +// Back do the drawing board, the key idea was to actually have this mutex be a +// wrapper around a pthreads mutex. This would clearly solve the native threads +// problem (we'd be "just as fast" as pthreads), but the green problem comes +// back into play (you can't just grab the lock). +// +// The solution found (and the current implementation) ended up having a hybrid +// solution of queues/mutexes. The key idea is that green threads only ever +// *trylock* and use an internal queue to keep track of who's waiting, and +// native threads will simply just call *lock*. +// +// With this scheme, we get all the benefits of both worlds: +// +// * Any flavor of task (even mixed) can grab a mutex, pthreads arbitrates among +// all native and the first green tasks, and then green tasks use atomics to +// arbitrate among themselves. +// * We're just as fast as pthreads (within a small percentage of course) +// * Native mutexes are statically initializeable, and some clever usage of +// atomics can make the green halves of the mutex also statically +// initializeable. +// * No destructors are necessary (there is no memory allocation). The caveat +// here is that windows doesn't have statically initialized mutexes, but it is +// predicted that statically initialized mutexes won't be *too* common. Plus, +// the "free" happens at program end when cleaning up doesn't matter *that* +// much. +// +// As you'll find out in the implementation, this approach cannot be fair to +// native and green threads. In order to soundly drain the internal queue of +// green threads, they *must* be favored over native threads. It was an explicit +// non-goal of these mutexes to be completely fair to everyone, so this has been +// deemed acceptable. +// +// This is the high-level implementation of the mutexes, but the nitty gritty +// details can be found in the code below. + +use cast; +use ops::Drop; +use option::{Option, Some, None}; +use q = sync::mpsc_intrusive; +use result::{Err, Ok}; +use rt::local::Local; +use rt::task::{BlockedTask, Task}; +use rt::thread::Thread; +use sync::atomics; +use unstable::mutex; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex is an implementation of a lock for all flavors of tasks which may +/// be grabbing. A common problem with green threads is that they cannot grab +/// locks (if they reschedule during the lock a contender could deadlock the +/// system), but this mutex does *not* suffer this problem. +/// +/// This mutex will properly block tasks waiting for the lock to become +/// available. The mutex can also be statically initialized or created via a +/// `new` constructor. +/// +/// # Example +/// +/// ```rust +/// use std::sync::Mutex; +/// +/// let mut m = Mutex::new(); +/// let guard = m.lock(); +/// // do some work +/// drop(guard); // unlock the lock +/// +/// { +/// let _g = m.lock(); +/// // do some work in a scope +/// } +/// +/// // now the mutex is unlocked +/// ``` +pub struct Mutex { + priv lock: StaticMutex, +} + +/// The static mutex type is provided to allow for static allocation of mutexes. +/// +/// Note that this is a separate type because using a Mutex correctly means that +/// it needs to have a destructor run. In Rust, statics are not allowed to have +/// destructors. As a result, a `StaticMutex` has one extra method when compared +/// to a `Mutex`, a `destroy` method. This method is unsafe to call, and +/// documentation can be found directly on the method. +/// +/// # Example +/// +/// ```rust +/// use std::sync::{StaticMutex, MUTEX_INIT}; +/// +/// static mut LOCK: StaticMutex = MUTEX_INIT; +/// +/// unsafe { +/// let _g = LOCK.lock(); +/// // do some productive work +/// } +/// // lock is unlocked here. +/// ``` +pub struct StaticMutex { + /// The OS mutex (pthreads/windows equivalent) that we're wrapping. + priv lock: mutex::Mutex, + /// Internal queue that all green threads will be blocked on. + priv q: q::Queue, + /// Dubious flag about whether this mutex is held or not. You might be + /// thinking "this is impossible to manage atomically," and you would be + /// correct! Keep on reading! + priv held: atomics::AtomicBool, +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +pub struct Guard<'a> { + priv lock: &'a mut StaticMutex, +} + +/// Static initialization of a mutex. This constant can be used to initialize +/// other mutex constants. +pub static MUTEX_INIT: StaticMutex = StaticMutex { + lock: mutex::MUTEX_INIT, + held: atomics::INIT_ATOMIC_BOOL, + q: q::Queue { + head: atomics::INIT_ATOMIC_UINT, + tail: 0 as *mut q::Node, + stub: q::DummyNode { + next: atomics::INIT_ATOMIC_UINT, + } + } +}; + +impl StaticMutex { + /// Attempts to grab this lock, see `Mutex::try_lock` + pub fn try_lock<'a>(&'a mut self) -> Option> { + if unsafe { self.lock.trylock() } { + self.held.store(true, atomics::Relaxed); // see below + Some(Guard{ lock: self }) + } else { + None + } + } + + /// Acquires this lock, see `Mutex::lock` + pub fn lock<'a>(&'a mut self) -> Guard<'a> { + // Remember that an explicit goal of these mutexes is to be "just as + // fast" as pthreads. Note that at some point our implementation + // requires an answer to the question "can we block" and implies a hit + // to OS TLS. In attempt to avoid this hit and to maintain efficiency in + // the uncontended case (very important) we start off by hitting a + // trylock on the OS mutex. If we succeed, then we're lucky! + if unsafe { self.lock.trylock() } { + self.held.store(true, atomics::Relaxed); // see below + return Guard{ lock: self } + } + + let t: ~Task = Local::take(); + if t.can_block() { + // Tasks which can block are super easy. These tasks just accept the + // TLS hit we just made, and then call the blocking `lock()` + // function. Turns out the TLS hit is essentially 0 on contention. + Local::put(t); + unsafe { self.lock.lock(); } + self.held.store(true, atomics::Relaxed); // see below + } else { + // And here's where we come to the "fun part" of this + // implementation. Contention with a green task is fairly difficult + // to resolve. The goal here is to push ourselves onto the internal + // queue, but still be able to "cancel" our enqueue in case the lock + // was dropped while we were doing our business. + // + // The pseudocode for this is: + // + // let mut node = ...; + // push(node) + // if trylock() { + // wakeup(pop()) + // } else { + // node.sleep() + // } + // + // And the pseudocode for the wakeup protocol is: + // + // match pop() { + // Some(node) => node.wakeup(), + // None => lock.unlock() + // } + // + // Note that a contended green thread does *not* re-acquire the + // mutex because ownership was silently transferred to it. You'll + // note a fairly large race condition here, which is that whenever + // the OS mutex is unlocked, "just before" it's unlocked a green + // thread can fly in and block itself. This turns out to be a + // fundamental problem with any sort of attempt to arbitrate among + // the unlocker and a locking green thread. + // + // One possible solution for this is to attempt to re-acquire the + // lock during the unlock procedure. This is less than ideal, + // however, because it means that the memory of a mutex must be + // guaranteed to be valid until *all unlocks* have returned. That's + // normally the job of the mutex itself, so it can be seen that + // touching a mutex again after it has been unlocked is an unwise + // decision. + // + // Another alternative solution (and the one implemented) is more + // distasteful, but functional. You'll notice that the struct + // definition has a `held` flag, which is impossible to maintain + // atomically. For our usage, the flag is set to `true` immediately + // after a mutex is acquired and set to `false` at the *beginning* + // of an unlock. + // + // By doing this, we're essentially instructing green threads to + // "please spin" while another thread is in the middle of performing + // the unlock procedure. Again, this is distasteful, but within the + // constraints that we're working in I found it difficult to think + // of other courses of action. + let mut node = q::Node::new(atomics::AtomicUint::new(0)); + t.deschedule(1, |task| unsafe { + self.q.push(&mut node); + let mut stolen = false; + // Spinloop attempting to grab a mutex while someone's unlocking + // the mutex. While it's not held and we fail the trylock, the + // best thing we can do is hope that our yield will run the + // unlocker before us (note that bounded waiting is shattered + // here for green threads). + while !self.held.load(atomics::Relaxed) { + if self.lock.trylock() { + self.held.store(true, atomics::Relaxed); + stolen = true; + break + } else { + Thread::yield_now(); + } + } + + // If we managed to steal the lock, then we need to wake up a + // thread. Note that we may not have acquired the mutex for + // ourselves (we're not guaranteed to be the head of the queue). + // The good news is that we *are* guaranteed to have a non-empty + // queue. This is because if we acquired the mutex no one could + // have transferred it to us (hence our own node must still be + // on the queue). + // + // The queue itself can return `None` from a pop when there's + // data on the queue (a known limitation of the queue), so here + // you'll find the second spin loop (which is in theory even + // rarer than the one above). + // + // If we popped ourselves, then we just unblock. If it's someone + // else, we block ourselves (nonatomically b/c we hold the lock) + // and then wake up the true owner. + // + // Note that the blocking procedure on a node requires a little + // dance of some atomic swaps just for normal atomicity with + // someone popping our node and waking us up. A sentinel value + // of `1` is used for "cancel the sleep". + if stolen { + let locker; + loop { + match self.q.pop() { + Some(t) => { locker = t; break } + None => Thread::yield_now() + } + } + + if locker == &mut node as *mut q::Node { + Err(task) + } else { + node.data.store(task.cast_to_uint(), atomics::Relaxed); + match (*locker).data.swap(1, atomics::SeqCst) { + 0 => Ok(()), + n => Err(BlockedTask::cast_from_uint(n)) + } + } + } else { + let n = task.cast_to_uint(); + match node.data.swap(n, atomics::SeqCst) { + 0 => Ok(()), + 1 => Err(BlockedTask::cast_from_uint(n)), + _ => unreachable!(), + } + } + }); + assert!(self.held.load(atomics::Relaxed)); + } + + Guard { lock: self } + } + + fn unlock(&mut self) { + // As documented above, we *initially* flag our mutex as unlocked in + // order to allow green threads just starting to block to realize that + // they shouldn't completely block. + assert!(self.held.load(atomics::Relaxed)); + self.held.store(false, atomics::Relaxed); + + // Remember that the queues we are using may return None when there is + // indeed data on the queue. In this case, we can just safely ignore it. + // The reason for this ignorance is that a value of `None` with data on + // the queue means that the "head popper" hasn't finished yet. We've + // already flagged our mutex as acquire-able, so the "head popper" will + // see this and attempt to grab the mutex (or someone else will steal it + // and this whole process will begin anew). + match unsafe { self.q.pop() } { + Some(t) => { + self.held.store(true, atomics::Relaxed); + match unsafe { (*t).data.swap(1, atomics::SeqCst) } { + 0 => {} + n => { + let t = unsafe { BlockedTask::cast_from_uint(n) }; + t.wake().map(|t| t.reawaken(true)); + } + } + } + None => unsafe { self.lock.unlock() } + } + } + + /// Deallocates resources associated with this static mutex. + /// + /// This method is unsafe because it provides no guarantees that there are + /// no active users of this mutex, and safety is not guaranteed if there are + /// active users of this mutex. + /// + /// This method is required to ensure that there are no memory leaks on + /// *all* platforms. It may be the case that some platforms do not leak + /// memory if this method is not called, but this is not guaranteed to be + /// true on all platforms. + pub unsafe fn destroy(&mut self) { + self.lock.destroy() + } +} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + pub fn new() -> Mutex { + Mutex { + lock: StaticMutex { + held: atomics::AtomicBool::new(false), + q: q::Queue::new(), + lock: unsafe { mutex::Mutex::new() }, + } + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then `None` is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + pub fn try_lock<'a>(&'a mut self) -> Option> { + self.lock.try_lock() + } + + /// Acquires a mutex, blocking the current task until it is able to do so. + /// + /// This function will block the local task until it is availble to acquire + /// the mutex. Upon returning, the task is the only task with the mutex + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + pub fn lock<'a>(&'a mut self) -> Guard<'a> { self.lock.lock() } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + #[inline] + fn drop(&mut self) { + self.lock.unlock(); + } +} + +impl Drop for Mutex { + fn drop(&mut self) { + // This is actually safe b/c we know that there is no further usage of + // this mutex (it's up to the user to arrange for a mutex to get + // dropped, that's not our job) + unsafe { self.lock.destroy() } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use super::{Mutex, StaticMutex, MUTEX_INIT}; + use native; + + #[test] + fn smoke() { + let mut m = Mutex::new(); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn smoke_static() { + static mut m: StaticMutex = MUTEX_INIT; + unsafe { + drop(m.lock()); + drop(m.lock()); + m.destroy(); + } + } + + #[test] + fn lots_and_lots() { + static mut m: StaticMutex = MUTEX_INIT; + static mut CNT: uint = 0; + static M: uint = 10000; + static N: uint = 3; + + fn inc() { + for _ in range(0, M) { + unsafe { + let _g = m.lock(); + CNT += 1; + } + } + } + + let (p, c) = SharedChan::new(); + for _ in range(0, N) { + let c = c.clone(); + do native::task::spawn { inc(); c.send(()); } + let c = c.clone(); + do spawn { inc(); c.send(()); } + } + + drop(c); + for _ in range(0, 2 * N) { + p.recv(); + } + assert_eq!(unsafe {CNT}, M * N); + unsafe { + m.destroy(); + } + } + + #[test] + fn trylock() { + let mut m = Mutex::new(); + assert!(m.try_lock().is_some()); + } + + #[test] #[should_fail] + fn double_lock() { + static mut m: StaticMutex = MUTEX_INIT; + let _g = m.lock(); + m.lock(); + } +} diff --git a/src/libstd/sync/one.rs b/src/libstd/sync/one.rs new file mode 100644 index 0000000000000..1c395b9cb08ab --- /dev/null +++ b/src/libstd/sync/one.rs @@ -0,0 +1,170 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A "once initialization" primitive +//! +//! This primitive is meant to be used to run one-time initialization. An +//! example use case would be for initializing an FFI library. + +use int; +use sync::atomics; +use sync::{StaticMutex, MUTEX_INIT}; + +/// A type which can be used to run a one-time global initialization. This type +/// is *unsafe* to use because it is built on top of the `Mutex` in this module. +/// It does not know whether the currently running task is in a green or native +/// context, and a blocking mutex should *not* be used under normal +/// circumstances on a green task. +/// +/// Despite its unsafety, it is often useful to have a one-time initialization +/// routine run for FFI bindings or related external functionality. This type +/// can only be statically constructed with the `ONCE_INIT` value. +/// +/// # Example +/// +/// ```rust +/// use std::unstable::mutex::{Once, ONCE_INIT}; +/// +/// static mut START: Once = ONCE_INIT; +/// unsafe { +/// START.doit(|| { +/// // run initialization here +/// }); +/// } +/// ``` +pub struct Once { + priv mutex: StaticMutex, + priv cnt: atomics::AtomicInt, + priv lock_cnt: atomics::AtomicInt, +} + +/// Initialization value for static `Once` values. +pub static ONCE_INIT: Once = Once { + mutex: MUTEX_INIT, + cnt: atomics::INIT_ATOMIC_INT, + lock_cnt: atomics::INIT_ATOMIC_INT, +}; + +impl Once { + /// Perform an initialization routine once and only once. The given closure + /// will be executed if this is the first time `doit` has been called, and + /// otherwise the routine will *not* be invoked. + /// + /// This method will block the calling *os thread* if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). + pub fn doit(&mut self, f: ||) { + // Implementation-wise, this would seem like a fairly trivial primitive. + // The stickler part is where our mutexes currently require an + // allocation, and usage of a `Once` should't leak this allocation. + // + // This means that there must be a deterministic destroyer of the mutex + // contained within (because it's not needed after the initialization + // has run). + // + // The general scheme here is to gate all future threads once + // initialization has completed with a "very negative" count, and to + // allow through threads to lock the mutex if they see a non negative + // count. For all threads grabbing the mutex, exactly one of them should + // be responsible for unlocking the mutex, and this should only be done + // once everyone else is done with the mutex. + // + // This atomicity is achieved by swapping a very negative value into the + // shared count when the initialization routine has completed. This will + // read the number of threads which will at some point attempt to + // acquire the mutex. This count is then squirreled away in a separate + // variable, and the last person on the way out of the mutex is then + // responsible for destroying the mutex. + // + // It is crucial that the negative value is swapped in *after* the + // initialization routine has completed because otherwise new threads + // calling `doit` will return immediately before the initialization has + // completed. + + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + if prev < 0 { + // Make sure we never overflow, we'll never have int::min_value + // simultaneous calls to `doit` to make this value go back to 0 + self.cnt.store(int::min_value, atomics::SeqCst); + return + } + + // If the count is negative, then someone else finished the job, + // otherwise we run the job and record how many people will try to grab + // this lock + { + let _guard = self.mutex.lock(); + if self.cnt.load(atomics::SeqCst) > 0 { + f(); + let prev = self.cnt.swap(int::min_value, atomics::SeqCst); + self.lock_cnt.store(prev, atomics::SeqCst); + } + } + + // Last one out cleans up after everyone else, no leaks! + if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { + unsafe { self.mutex.destroy() } + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + + use super::{ONCE_INIT, Once}; + use task; + + #[test] + fn smoke_once() { + static mut o: Once = ONCE_INIT; + let mut a = 0; + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static mut o: Once = ONCE_INIT; + static mut run: bool = false; + + let (p, c) = SharedChan::new(); + for _ in range(0, 10) { + let c = c.clone(); + do spawn { + for _ in range(0, 4) { task::deschedule() } + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + c.send(()); + } + } + + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + + for _ in range(0, 10) { + p.recv(); + } + } +} diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 4d12435e01a90..3cd119281bea3 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -47,215 +47,284 @@ #[allow(non_camel_case_types)]; -use int; -use libc::c_void; -use sync::atomics; - pub struct Mutex { - // pointers for the lock/cond handles, atomically updated - priv lock: atomics::AtomicUint, - priv cond: atomics::AtomicUint, + priv inner: imp::Mutex, } pub static MUTEX_INIT: Mutex = Mutex { - lock: atomics::INIT_ATOMIC_UINT, - cond: atomics::INIT_ATOMIC_UINT, + inner: imp::MUTEX_INIT, }; impl Mutex { - /// Creates a new mutex, with the lock/condition variable pre-initialized + /// Creates a new mutex pub unsafe fn new() -> Mutex { - Mutex { - lock: atomics::AtomicUint::new(imp::init_lock()), - cond: atomics::AtomicUint::new(imp::init_cond()), - } - } - - /// Creates a new mutex, with the lock/condition variable not initialized. - /// This is the same as initializing from the MUTEX_INIT static. - pub unsafe fn empty() -> Mutex { - Mutex { - lock: atomics::AtomicUint::new(0), - cond: atomics::AtomicUint::new(0), - } - } - - /// Creates a new copy of this mutex. This is an unsafe operation because - /// there is no reference counting performed on this type. - /// - /// This function may only be called on mutexes which have had both the - /// internal condition variable and lock initialized. This means that the - /// mutex must have been created via `new`, or usage of it has already - /// initialized the internal handles. - /// - /// This is a dangerous function to call as both this mutex and the returned - /// mutex will share the same handles to the underlying mutex/condition - /// variable. Care must be taken to ensure that deallocation happens - /// accordingly. - pub unsafe fn clone(&self) -> Mutex { - let lock = self.lock.load(atomics::Relaxed); - let cond = self.cond.load(atomics::Relaxed); - assert!(lock != 0); - assert!(cond != 0); - Mutex { - lock: atomics::AtomicUint::new(lock), - cond: atomics::AtomicUint::new(cond), - } + Mutex { inner: imp::Mutex::new() } } /// Acquires this lock. This assumes that the current thread does not /// already hold the lock. - pub unsafe fn lock(&mut self) { imp::lock(self.getlock()) } + pub unsafe fn lock(&mut self) { self.inner.lock() } /// Attempts to acquire the lock. The value returned is whether the lock was /// acquired or not - pub unsafe fn trylock(&mut self) -> bool { imp::trylock(self.getlock()) } + pub unsafe fn trylock(&mut self) -> bool { self.inner.trylock() } /// Unlocks the lock. This assumes that the current thread already holds the /// lock. - pub unsafe fn unlock(&mut self) { imp::unlock(self.getlock()) } + pub unsafe fn unlock(&mut self) { self.inner.unlock() } /// Block on the internal condition variable. /// /// This function assumes that the lock is already held - pub unsafe fn wait(&mut self) { imp::wait(self.getcond(), self.getlock()) } + pub unsafe fn wait(&mut self) { self.inner.wait() } /// Signals a thread in `wait` to wake up - pub unsafe fn signal(&mut self) { imp::signal(self.getcond()) } + pub unsafe fn signal(&mut self) { self.inner.signal() } /// This function is especially unsafe because there are no guarantees made /// that no other thread is currently holding the lock or waiting on the /// condition variable contained inside. - pub unsafe fn destroy(&mut self) { - let lock = self.lock.swap(0, atomics::Relaxed); - let cond = self.cond.swap(0, atomics::Relaxed); - if lock != 0 { imp::free_lock(lock) } - if cond != 0 { imp::free_cond(cond) } - } - - unsafe fn getlock(&mut self) -> *c_void { - match self.lock.load(atomics::Relaxed) { - 0 => {} - n => return n as *c_void - } - let lock = imp::init_lock(); - match self.lock.compare_and_swap(0, lock, atomics::SeqCst) { - 0 => return lock as *c_void, - _ => {} - } - imp::free_lock(lock); - return self.lock.load(atomics::Relaxed) as *c_void; - } - - unsafe fn getcond(&mut self) -> *c_void { - match self.cond.load(atomics::Relaxed) { - 0 => {} - n => return n as *c_void - } - let cond = imp::init_cond(); - match self.cond.compare_and_swap(0, cond, atomics::SeqCst) { - 0 => return cond as *c_void, - _ => {} - } - imp::free_cond(cond); - return self.cond.load(atomics::Relaxed) as *c_void; - } + pub unsafe fn destroy(&mut self) { self.inner.destroy() } } #[cfg(unix)] mod imp { - use libc::c_void; use libc; - use ptr; - use ptr::RawPtr; + use self::os::{PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, + pthread_mutex_t, pthread_cond_t}; + use unstable::intrinsics; - type pthread_mutex_t = libc::c_void; type pthread_mutexattr_t = libc::c_void; - type pthread_cond_t = libc::c_void; type pthread_condattr_t = libc::c_void; - pub unsafe fn init_lock() -> uint { - let block = libc::malloc(rust_pthread_mutex_t_size() as libc::size_t); - assert!(!block.is_null()); - let n = pthread_mutex_init(block, ptr::null()); - assert_eq!(n, 0); - return block as uint; - } - - pub unsafe fn init_cond() -> uint { - let block = libc::malloc(rust_pthread_cond_t_size() as libc::size_t); - assert!(!block.is_null()); - let n = pthread_cond_init(block, ptr::null()); - assert_eq!(n, 0); - return block as uint; - } + #[cfg(target_os = "freebsd")] + mod os { + use libc; + + pub type pthread_mutex_t = *libc::c_void; + pub type pthread_cond_t = *libc::c_void; + + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = + 0 as pthread_mutex_t; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = + 0 as pthread_cond_t; + } + + #[cfg(target_os = "macos")] + mod os { + use libc; + + #[cfg(target_arch = "x86_64")] + static __PTHREAD_MUTEX_SIZE__: uint = 56; + #[cfg(target_arch = "x86_64")] + static __PTHREAD_COND_SIZE__: uint = 40; + #[cfg(target_arch = "x86")] + static __PTHREAD_MUTEX_SIZE__: uint = 40; + #[cfg(target_arch = "x86")] + static __PTHREAD_COND_SIZE__: uint = 24; + static _PTHREAD_MUTEX_SIG_init: libc::c_long = 0x32AAABA7; + static _PTHREAD_COND_SIG_init: libc::c_long = 0x3CB0B1BB; + + pub struct pthread_mutex_t { + __sig: libc::c_long, + __opaque: [u8, ..__PTHREAD_MUTEX_SIZE__], + } + pub struct pthread_cond_t { + __sig: libc::c_long, + __opaque: [u8, ..__PTHREAD_COND_SIZE__], + } - pub unsafe fn free_lock(h: uint) { - let block = h as *c_void; - assert_eq!(pthread_mutex_destroy(block), 0); - libc::free(block); - } + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + __sig: _PTHREAD_MUTEX_SIG_init, + __opaque: [0, ..__PTHREAD_MUTEX_SIZE__], + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + __sig: _PTHREAD_COND_SIG_init, + __opaque: [0, ..__PTHREAD_COND_SIZE__], + }; + } + + #[cfg(target_os = "linux")] + #[cfg(target_os = "android")] + mod os { + use libc; + + // minus 8 because we have an 'align' field + #[cfg(target_arch = "x86_64")] + static __SIZEOF_PTHREAD_MUTEX_T: uint = 40 - 8; + #[cfg(target_arch = "x86")] + static __SIZEOF_PTHREAD_MUTEX_T: uint = 24 - 8; + #[cfg(target_arch = "x86_64")] + static __SIZEOF_PTHREAD_COND_T: uint = 48 - 8; + #[cfg(target_arch = "x86")] + static __SIZEOF_PTHREAD_COND_T: uint = 48 - 8; + + pub struct pthread_mutex_t { + __align: libc::c_long, + size: [u8, ..__SIZEOF_PTHREAD_MUTEX_T], + } + pub struct pthread_cond_t { + __align: libc::c_longlong, + size: [u8, ..__SIZEOF_PTHREAD_COND_T], + } - pub unsafe fn free_cond(h: uint) { - let block = h as *c_void; - assert_eq!(pthread_cond_destroy(block), 0); - libc::free(block); + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + __align: 0, + size: [0, ..__SIZEOF_PTHREAD_MUTEX_T], + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + __align: 0, + size: [0, ..__SIZEOF_PTHREAD_COND_T], + }; } - pub unsafe fn lock(l: *pthread_mutex_t) { - assert_eq!(pthread_mutex_lock(l), 0); + pub struct Mutex { + priv lock: pthread_mutex_t, + priv cond: pthread_cond_t, } - pub unsafe fn trylock(l: *c_void) -> bool { - pthread_mutex_trylock(l) == 0 - } + pub static MUTEX_INIT: Mutex = Mutex { + lock: PTHREAD_MUTEX_INITIALIZER, + cond: PTHREAD_COND_INITIALIZER, + }; - pub unsafe fn unlock(l: *pthread_mutex_t) { - assert_eq!(pthread_mutex_unlock(l), 0); - } + impl Mutex { + pub unsafe fn new() -> Mutex { + let m = Mutex { + lock: intrinsics::init(), + cond: intrinsics::init(), + }; - pub unsafe fn wait(cond: *pthread_cond_t, m: *pthread_mutex_t) { - assert_eq!(pthread_cond_wait(cond, m), 0); - } + pthread_mutex_init(&m.lock, 0 as *libc::c_void); + pthread_cond_init(&m.cond, 0 as *libc::c_void); - pub unsafe fn signal(cond: *pthread_cond_t) { - assert_eq!(pthread_cond_signal(cond), 0); - } + return m; + } - extern { - fn rust_pthread_mutex_t_size() -> libc::c_int; - fn rust_pthread_cond_t_size() -> libc::c_int; + pub unsafe fn lock(&mut self) { pthread_mutex_lock(&self.lock); } + pub unsafe fn unlock(&mut self) { pthread_mutex_unlock(&self.lock); } + pub unsafe fn signal(&mut self) { pthread_cond_signal(&self.cond); } + pub unsafe fn wait(&mut self) { + pthread_cond_wait(&self.cond, &self.lock); + } + pub unsafe fn trylock(&mut self) -> bool { + pthread_mutex_trylock(&self.lock) == 0 + } + pub unsafe fn destroy(&mut self) { + pthread_mutex_destroy(&self.lock); + pthread_cond_destroy(&self.cond); + } } extern { fn pthread_mutex_init(lock: *pthread_mutex_t, - attr: *pthread_mutexattr_t) -> libc::c_int; - fn pthread_mutex_destroy(lock: *pthread_mutex_t) -> libc::c_int; - fn pthread_cond_init(cond: *pthread_cond_t, - attr: *pthread_condattr_t) -> libc::c_int; - fn pthread_cond_destroy(cond: *pthread_cond_t) -> libc::c_int; + attr: *libc::c_void) -> libc::c_int; fn pthread_mutex_lock(lock: *pthread_mutex_t) -> libc::c_int; fn pthread_mutex_trylock(lock: *pthread_mutex_t) -> libc::c_int; fn pthread_mutex_unlock(lock: *pthread_mutex_t) -> libc::c_int; + fn pthread_cond_init(cond: *pthread_cond_t, + attr: *libc::c_void) -> libc::c_int; fn pthread_cond_wait(cond: *pthread_cond_t, lock: *pthread_mutex_t) -> libc::c_int; fn pthread_cond_signal(cond: *pthread_cond_t) -> libc::c_int; + fn pthread_mutex_destroy(lock: *pthread_mutex_t) -> libc::c_int; + fn pthread_cond_destroy(lock: *pthread_cond_t) -> libc::c_int; } } #[cfg(windows)] mod imp { - use libc; use libc::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES, c_void, DWORD, LPCSTR}; - use ptr; + use libc; use ptr::RawPtr; + use ptr; + use sync::atomics; type LPCRITICAL_SECTION = *c_void; static SPIN_COUNT: DWORD = 4000; + #[cfg(target_arch = "x86")] + static CRIT_SECTION_SIZE: uint = 24; + + pub struct Mutex { + // pointers for the lock/cond handles, atomically updated + priv lock: atomics::AtomicUint, + priv cond: atomics::AtomicUint, + } + + pub static MUTEX_INIT: Mutex = Mutex { + lock: atomics::INIT_ATOMIC_UINT, + cond: atomics::INIT_ATOMIC_UINT, + }; + + impl Mutex { + pub unsafe fn new() -> Mutex { + Mutex { + lock: atomics::AtomicUint::new(init_lock()), + cond: atomics::AtomicUint::new(init_cond()), + } + } + pub unsafe fn lock(&mut self) { + EnterCriticalSection(self.getlock() as LPCRITICAL_SECTION) + } + pub unsafe fn trylock(&mut self) -> bool { + TryEnterCriticalSection(self.getlock() as LPCRITICAL_SECTION) != 0 + } + pub unsafe fn unlock(&mut self) { + LeaveCriticalSection(self.getlock() as LPCRITICAL_SECTION) + } + + pub unsafe fn wait(&mut self) { + self.unlock(); + WaitForSingleObject(self.getcond() as HANDLE, libc::INFINITE); + self.lock(); + } + + pub unsafe fn signal(&mut self) { + assert!(SetEvent(self.getcond() as HANDLE) != 0); + } + + /// This function is especially unsafe because there are no guarantees made + /// that no other thread is currently holding the lock or waiting on the + /// condition variable contained inside. + pub unsafe fn destroy(&mut self) { + let lock = self.lock.swap(0, atomics::Relaxed); + let cond = self.cond.swap(0, atomics::Relaxed); + if lock != 0 { free_lock(lock) } + if cond != 0 { free_cond(cond) } + } + + unsafe fn getlock(&mut self) -> *c_void { + match self.lock.load(atomics::Relaxed) { + 0 => {} + n => return n as *c_void + } + let lock = init_lock(); + match self.lock.compare_and_swap(0, lock, atomics::SeqCst) { + 0 => return lock as *c_void, + _ => {} + } + free_lock(lock); + return self.lock.load(atomics::Relaxed) as *c_void; + } + + unsafe fn getcond(&mut self) -> *c_void { + match self.cond.load(atomics::Relaxed) { + 0 => {} + n => return n as *c_void + } + let cond = init_cond(); + match self.cond.compare_and_swap(0, cond, atomics::SeqCst) { + 0 => return cond as *c_void, + _ => {} + } + free_cond(cond); + return self.cond.load(atomics::Relaxed) as *c_void; + } + } pub unsafe fn init_lock() -> uint { - let block = libc::malloc(rust_crit_section_size() as libc::size_t); + let block = libc::malloc(CRIT_SECTION_SIZE as libc::size_t); assert!(!block.is_null()); InitializeCriticalSectionAndSpinCount(block, SPIN_COUNT); return block as uint; @@ -276,32 +345,6 @@ mod imp { libc::CloseHandle(block); } - pub unsafe fn lock(l: *c_void) { - EnterCriticalSection(l as LPCRITICAL_SECTION) - } - - pub unsafe fn trylock(l: *c_void) -> bool { - TryEnterCriticalSection(l as LPCRITICAL_SECTION) != 0 - } - - pub unsafe fn unlock(l: *c_void) { - LeaveCriticalSection(l as LPCRITICAL_SECTION) - } - - pub unsafe fn wait(cond: *c_void, m: *c_void) { - unlock(m); - WaitForSingleObject(cond as HANDLE, libc::INFINITE); - lock(m); - } - - pub unsafe fn signal(cond: *c_void) { - assert!(SetEvent(cond as HANDLE) != 0); - } - - extern { - fn rust_crit_section_size() -> libc::c_int; - } - extern "system" { fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES, bManualReset: BOOL, @@ -319,157 +362,14 @@ mod imp { } } -/// A type which can be used to run a one-time global initialization. This type -/// is *unsafe* to use because it is built on top of the `Mutex` in this module. -/// It does not know whether the currently running task is in a green or native -/// context, and a blocking mutex should *not* be used under normal -/// circumstances on a green task. -/// -/// Despite its unsafety, it is often useful to have a one-time initialization -/// routine run for FFI bindings or related external functionality. This type -/// can only be statically constructed with the `ONCE_INIT` value. -/// -/// # Example -/// -/// ```rust -/// use std::unstable::mutex::{Once, ONCE_INIT}; -/// -/// static mut START: Once = ONCE_INIT; -/// unsafe { -/// START.doit(|| { -/// // run initialization here -/// }); -/// } -/// ``` -pub struct Once { - priv mutex: Mutex, - priv cnt: atomics::AtomicInt, - priv lock_cnt: atomics::AtomicInt, -} - -/// Initialization value for static `Once` values. -pub static ONCE_INIT: Once = Once { - mutex: MUTEX_INIT, - cnt: atomics::INIT_ATOMIC_INT, - lock_cnt: atomics::INIT_ATOMIC_INT, -}; - -impl Once { - /// Perform an initialization routine once and only once. The given closure - /// will be executed if this is the first time `doit` has been called, and - /// otherwise the routine will *not* be invoked. - /// - /// This method will block the calling *os thread* if another initialization - /// routine is currently running. - /// - /// When this function returns, it is guaranteed that some initialization - /// has run and completed (it may not be the closure specified). - pub fn doit(&mut self, f: ||) { - // Implementation-wise, this would seem like a fairly trivial primitive. - // The stickler part is where our mutexes currently require an - // allocation, and usage of a `Once` should't leak this allocation. - // - // This means that there must be a deterministic destroyer of the mutex - // contained within (because it's not needed after the initialization - // has run). - // - // The general scheme here is to gate all future threads once - // initialization has completed with a "very negative" count, and to - // allow through threads to lock the mutex if they see a non negative - // count. For all threads grabbing the mutex, exactly one of them should - // be responsible for unlocking the mutex, and this should only be done - // once everyone else is done with the mutex. - // - // This atomicity is achieved by swapping a very negative value into the - // shared count when the initialization routine has completed. This will - // read the number of threads which will at some point attempt to - // acquire the mutex. This count is then squirreled away in a separate - // variable, and the last person on the way out of the mutex is then - // responsible for destroying the mutex. - // - // It is crucial that the negative value is swapped in *after* the - // initialization routine has completed because otherwise new threads - // calling `doit` will return immediately before the initialization has - // completed. - - let prev = self.cnt.fetch_add(1, atomics::SeqCst); - if prev < 0 { - // Make sure we never overflow, we'll never have int::min_value - // simultaneous calls to `doit` to make this value go back to 0 - self.cnt.store(int::min_value, atomics::SeqCst); - return - } - - // If the count is negative, then someone else finished the job, - // otherwise we run the job and record how many people will try to grab - // this lock - unsafe { self.mutex.lock() } - if self.cnt.load(atomics::SeqCst) > 0 { - f(); - let prev = self.cnt.swap(int::min_value, atomics::SeqCst); - self.lock_cnt.store(prev, atomics::SeqCst); - } - unsafe { self.mutex.unlock() } - - // Last one out cleans up after everyone else, no leaks! - if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { - unsafe { self.mutex.destroy() } - } - } -} - #[cfg(test)] mod test { use prelude::*; + use super::{Mutex, MUTEX_INIT}; use rt::thread::Thread; - use super::{ONCE_INIT, Once, Mutex, MUTEX_INIT}; use task; - #[test] - fn smoke_once() { - static mut o: Once = ONCE_INIT; - let mut a = 0; - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - } - - #[test] - fn stampede_once() { - static mut o: Once = ONCE_INIT; - static mut run: bool = false; - - let (p, c) = SharedChan::new(); - for _ in range(0, 10) { - let c = c.clone(); - do spawn { - for _ in range(0, 4) { task::deschedule() } - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - c.send(()); - } - } - - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - - for _ in range(0, 10) { - p.recv(); - } - } - #[test] fn somke_lock() { static mut lock: Mutex = MUTEX_INIT; diff --git a/src/rt/rust_builtin.c b/src/rt/rust_builtin.c index 6de5f80829003..81eba2984dad0 100644 --- a/src/rt/rust_builtin.c +++ b/src/rt/rust_builtin.c @@ -437,26 +437,6 @@ rust_win32_rand_release() { #endif -#if defined(__WIN32__) - -int -rust_crit_section_size() { return sizeof(CRITICAL_SECTION); } -int -rust_pthread_mutex_t_size() { return 0; } -int -rust_pthread_cond_t_size() { return 0; } - -#else - -int -rust_crit_section_size() { return 0; } -int -rust_pthread_mutex_t_size() { return sizeof(pthread_mutex_t); } -int -rust_pthread_cond_t_size() { return sizeof(pthread_cond_t); } - -#endif - // // Local Variables: // mode: C++ From 4219fd3e07d80cb26cd27acfdcbe59dc6c9966be Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 10 Jan 2014 14:20:56 -0800 Subject: [PATCH 2/2] Remove try_send_deferred plus fallout This removes all usage of `try_send_deferred` and all related functionality. Primarily, this builds all extra::sync primitives on top of `sync::Mutex` instead of `unstable::sync::Mutex`. --- src/libextra/sync.rs | 51 +++++++++++++++++-------------- src/libgreen/simple.rs | 2 +- src/libgreen/task.rs | 11 ++----- src/libnative/task.rs | 2 +- src/librustuv/lib.rs | 2 +- src/librustuv/queue.rs | 2 +- src/librustuv/timer.rs | 2 +- src/libstd/comm/mod.rs | 20 +++++------- src/libstd/rt/mod.rs | 2 +- src/libstd/rt/task.rs | 4 +-- src/libstd/sync/mpsc_intrusive.rs | 6 ++-- src/libstd/sync/mutex.rs | 12 ++++---- src/libstd/unstable/mutex.rs | 2 +- 13 files changed, 56 insertions(+), 62 deletions(-) diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 12566ac85515f..14cb7b9709ed2 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -19,12 +19,13 @@ use std::borrow; -use std::unstable::sync::Exclusive; +use std::cast; use std::sync::arc::UnsafeArc; use std::sync::atomics; +use std::sync; use std::unstable::finally::Finally; -use std::util; use std::util::NonCopyable; +use std::util; /**************************************************************************** * Internals @@ -52,7 +53,7 @@ impl WaitQueue { Some(ch) => { // Send a wakeup signal. If the waiter was killed, its port will // have closed. Keep trying until we get a live task. - if ch.try_send_deferred(()) { + if ch.try_send(()) { true } else { self.signal() @@ -68,7 +69,7 @@ impl WaitQueue { match self.head.try_recv() { None => break, Some(ch) => { - if ch.try_send_deferred(()) { + if ch.try_send(()) { count += 1; } } @@ -79,36 +80,44 @@ impl WaitQueue { fn wait_end(&self) -> WaitEnd { let (wait_end, signal_end) = Chan::new(); - assert!(self.tail.try_send_deferred(signal_end)); + assert!(self.tail.try_send(signal_end)); wait_end } } // The building-block used to make semaphores, mutexes, and rwlocks. -#[doc(hidden)] struct SemInner { + lock: sync::Mutex, count: int, - waiters: WaitQueue, + waiters: WaitQueue, // Can be either unit or another waitqueue. Some sems shouldn't come with // a condition variable attached, others should. - blocked: Q + blocked: Q } -#[doc(hidden)] -struct Sem(Exclusive>); +struct Sem(UnsafeArc>); -#[doc(hidden)] impl Sem { fn new(count: int, q: Q) -> Sem { - Sem(Exclusive::new(SemInner { - count: count, waiters: WaitQueue::new(), blocked: q })) + Sem(UnsafeArc::new(SemInner { + count: count, + waiters: WaitQueue::new(), + blocked: q, + lock: sync::Mutex::new(), + })) + } + + unsafe fn with(&self, f: |&mut SemInner|) { + let Sem(ref arc) = *self; + let state = arc.get(); + let _g = (*state).lock.lock(); + f(cast::transmute(state)); } pub fn acquire(&self) { unsafe { let mut waiter_nobe = None; - let Sem(ref lock) = *self; - lock.with(|state| { + self.with(|state| { state.count -= 1; if state.count < 0 { // Create waiter nobe, enqueue ourself, and tell @@ -127,8 +136,7 @@ impl Sem { pub fn release(&self) { unsafe { - let Sem(ref lock) = *self; - lock.with(|state| { + self.with(|state| { state.count += 1; if state.count <= 0 { state.waiters.signal(); @@ -208,8 +216,7 @@ impl<'a> Condvar<'a> { let mut out_of_bounds = None; // Release lock, 'atomically' enqueuing ourselves in so doing. unsafe { - let Sem(ref queue) = *self.sem; - queue.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { // Drop the lock. state.count += 1; @@ -251,8 +258,7 @@ impl<'a> Condvar<'a> { unsafe { let mut out_of_bounds = None; let mut result = false; - let Sem(ref lock) = *self.sem; - lock.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { result = state.blocked[condvar_id].signal(); } else { @@ -274,8 +280,7 @@ impl<'a> Condvar<'a> { let mut out_of_bounds = None; let mut queue = None; unsafe { - let Sem(ref lock) = *self.sem; - lock.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { // To avoid :broadcast_heavy, we make a new waitqueue, // swap it out with the old one, and broadcast on the diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 43c7095ae17c0..8db95f55d18db 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -54,7 +54,7 @@ impl Runtime for SimpleTask { } Local::put(cur_task); } - fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + fn reawaken(mut ~self, mut to_wake: ~Task) { let me = &mut *self as *mut SimpleTask; to_wake.put_runtime(self as ~Runtime); unsafe { diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 3e4b8662eacd1..1c451435844e6 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -376,7 +376,7 @@ impl Runtime for GreenTask { } } - fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) { + fn reawaken(mut ~self, to_wake: ~Task) { self.put_task(to_wake); assert!(self.sched.is_none()); @@ -409,15 +409,10 @@ impl Runtime for GreenTask { match running_task.maybe_take_runtime::() { Some(mut running_green_task) => { running_green_task.put_task(running_task); - let mut sched = running_green_task.sched.take_unwrap(); + let sched = running_green_task.sched.take_unwrap(); if sched.pool_id == self.pool_id { - if can_resched { - sched.run_task(running_green_task, self); - } else { - sched.enqueue_task(self); - running_green_task.put_with_sched(sched); - } + sched.run_task(running_green_task, self); } else { self.reawaken_remotely(); diff --git a/src/libnative/task.rs b/src/libnative/task.rs index 8eb429553a811..e8644974b2d32 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -232,7 +232,7 @@ impl rt::Runtime for Ops { // See the comments on `deschedule` for why the task is forgotten here, and // why it's valid to do so. - fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + fn reawaken(mut ~self, mut to_wake: ~Task) { unsafe { let me = &mut *self as *mut Ops; to_wake.put_runtime(self as ~rt::Runtime); diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 675e852ebaef0..e366c97e17bb4 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -207,7 +207,7 @@ fn wait_until_woken_after(slot: *mut Option, f: ||) { fn wakeup(slot: &mut Option) { assert!(slot.is_some()); - slot.take_unwrap().wake().map(|t| t.reawaken(true)); + slot.take_unwrap().wake().map(|t| t.reawaken()); } pub struct Request { diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 32f8d8532a209..4eb198340d8f3 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -67,7 +67,7 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { loop { match state.consumer.pop() { mpsc::Data(Task(task)) => { - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } mpsc::Data(Increment) => unsafe { if state.refcnt == 0 { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 4a0ad44d31147..8eda598c0ce2c 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -138,7 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { match timer.action.take_unwrap() { WakeTask(task) => { - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } SendOnce(chan) => { chan.try_send(()); } SendMany(chan, id) => { diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index bf37e5fca6a5f..1f045c20268d9 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -420,9 +420,9 @@ impl Packet { // This function must have had at least an acquire fence before it to be // properly called. - fn wakeup(&mut self, can_resched: bool) { + fn wakeup(&mut self) { match self.to_wake.take_unwrap().wake() { - Some(task) => task.reawaken(can_resched), + Some(task) => task.reawaken(), None => {} } self.selecting.store(false, Relaxed); @@ -496,7 +496,7 @@ impl Packet { match self.channels.fetch_sub(1, SeqCst) { 1 => { match self.cnt.swap(DISCONNECTED, SeqCst) { - -1 => { self.wakeup(true); } + -1 => { self.wakeup(); } DISCONNECTED => {} n => { assert!(n >= 0); } } @@ -571,20 +571,14 @@ impl Chan { /// /// Like `send`, this method will never block. If the failure of send cannot /// be tolerated, then this method should be used instead. - pub fn try_send(&self, t: T) -> bool { self.try(t, true) } - - /// This function will not stick around for very long. The purpose of this - /// function is to guarantee that no rescheduling is performed. - pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } - - fn try(&self, t: T, can_resched: bool) -> bool { + pub fn try_send(&self, t: T) -> bool { unsafe { let this = cast::transmute_mut(self); this.queue.push(t); let packet = this.queue.packet(); match (*packet).increment() { // As described above, -1 == wakeup - -1 => { (*packet).wakeup(can_resched); true } + -1 => { (*packet).wakeup(); true } // Also as above, SPSC queues must be >= -2 -2 => true, // We succeeded if we sent data @@ -599,7 +593,7 @@ impl Chan { // the TLS overhead can be a bit much. n => { assert!(n >= 0); - if can_resched && n > 0 && n % RESCHED_FREQ == 0 { + if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); task.maybe_yield(); } @@ -675,7 +669,7 @@ impl SharedChan { match (*packet).increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(true); } + -1 => { (*packet).wakeup(); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index df93d42f3cc04..69d3ff39d4696 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -148,7 +148,7 @@ pub trait Runtime { fn maybe_yield(~self, cur_task: ~Task); fn deschedule(~self, times: uint, cur_task: ~Task, f: |BlockedTask| -> Result<(), BlockedTask>); - fn reawaken(~self, to_wake: ~Task, can_resched: bool); + fn reawaken(~self, to_wake: ~Task); // Miscellaneous calls which are very different depending on what context // you're in. diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 9e65974816686..af06541b43945 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -259,9 +259,9 @@ impl Task { /// Wakes up a previously blocked task, optionally specifiying whether the /// current task can accept a change in scheduling. This function can only /// be called on tasks that were previously blocked in `deschedule`. - pub fn reawaken(mut ~self, can_resched: bool) { + pub fn reawaken(mut ~self) { let ops = self.imp.take_unwrap(); - ops.reawaken(self, can_resched); + ops.reawaken(self); } /// Yields control of this task to another task. This function will diff --git a/src/libstd/sync/mpsc_intrusive.rs b/src/libstd/sync/mpsc_intrusive.rs index 374cb8010b4a7..acc83e58967b5 100644 --- a/src/libstd/sync/mpsc_intrusive.rs +++ b/src/libstd/sync/mpsc_intrusive.rs @@ -101,14 +101,14 @@ impl Queue { let mut tail = if !tail.is_null() {tail} else { cast::transmute(&self.stub) }; - let mut next = (*tail).next(atomics::Relaxed); + let mut next = (*tail).next(atomics::Acquire); if tail as uint == &self.stub as *DummyNode as uint { if next.is_null() { return None; } self.tail = next; tail = next; - next = (*next).next(atomics::Relaxed); + next = (*next).next(atomics::Acquire); } if !next.is_null() { self.tail = next; @@ -120,7 +120,7 @@ impl Queue { } let stub = cast::transmute(&self.stub); self.push(stub); - next = (*tail).next(atomics::Relaxed); + next = (*tail).next(atomics::Acquire); if !next.is_null() { self.tail = next; return Some(tail); diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs index c97568472e384..75b89814b9f29 100644 --- a/src/libstd/sync/mutex.rs +++ b/src/libstd/sync/mutex.rs @@ -376,7 +376,7 @@ impl StaticMutex { 0 => {} n => { let t = unsafe { BlockedTask::cast_from_uint(n) }; - t.wake().map(|t| t.reawaken(true)); + t.wake().map(|t| t.reawaken()); } } } @@ -489,17 +489,17 @@ mod test { let (p, c) = SharedChan::new(); for _ in range(0, N) { - let c = c.clone(); - do native::task::spawn { inc(); c.send(()); } - let c = c.clone(); - do spawn { inc(); c.send(()); } + let c2 = c.clone(); + do native::task::spawn { inc(); c2.send(()); } + let c2 = c.clone(); + do spawn { inc(); c2.send(()); } } drop(c); for _ in range(0, 2 * N) { p.recv(); } - assert_eq!(unsafe {CNT}, M * N); + assert_eq!(unsafe {CNT}, M * N * 2); unsafe { m.destroy(); } diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 3cd119281bea3..c4fe5a03d637e 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -398,7 +398,7 @@ mod test { #[test] fn destroy_immediately() { unsafe { - let mut m = Mutex::empty(); + let mut m = Mutex::new(); m.destroy(); } }