Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an efficient mutex in std::sync #11462

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/etc/licenseck.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"libstd/sync/mpsc_queue.rs", # BSD
"libstd/sync/spsc_queue.rs", # BSD
"libstd/sync/mpmc_bounded_queue.rs", # BSD
"libstd/sync/mpsc_intrusive.rs", # BSD
]

def check_license(name, contents):
Expand All @@ -59,4 +60,4 @@ def check_license(name, contents):
if (boilerplate.find(license1) == -1 or boilerplate.find(license2) == -1) and \
(boilerplate.find(license3) == -1 or boilerplate.find(license4) == -1):
return False
return True
return True
51 changes: 28 additions & 23 deletions src/libextra/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@


use std::borrow;
use std::unstable::sync::Exclusive;
use std::cast;
use std::sync::arc::UnsafeArc;
use std::sync::atomics;
use std::sync;
use std::unstable::finally::Finally;
use std::util;
use std::util::NonCopyable;
use std::util;

/****************************************************************************
* Internals
Expand Down Expand Up @@ -52,7 +53,7 @@ impl WaitQueue {
Some(ch) => {
// Send a wakeup signal. If the waiter was killed, its port will
// have closed. Keep trying until we get a live task.
if ch.try_send_deferred(()) {
if ch.try_send(()) {
true
} else {
self.signal()
Expand All @@ -68,7 +69,7 @@ impl WaitQueue {
match self.head.try_recv() {
None => break,
Some(ch) => {
if ch.try_send_deferred(()) {
if ch.try_send(()) {
count += 1;
}
}
Expand All @@ -79,36 +80,44 @@ impl WaitQueue {

fn wait_end(&self) -> WaitEnd {
let (wait_end, signal_end) = Chan::new();
assert!(self.tail.try_send_deferred(signal_end));
assert!(self.tail.try_send(signal_end));
wait_end
}
}

// The building-block used to make semaphores, mutexes, and rwlocks.
#[doc(hidden)]
struct SemInner<Q> {
lock: sync::Mutex,
count: int,
waiters: WaitQueue,
waiters: WaitQueue,
// Can be either unit or another waitqueue. Some sems shouldn't come with
// a condition variable attached, others should.
blocked: Q
blocked: Q
}

#[doc(hidden)]
struct Sem<Q>(Exclusive<SemInner<Q>>);
struct Sem<Q>(UnsafeArc<SemInner<Q>>);

#[doc(hidden)]
impl<Q:Send> Sem<Q> {
fn new(count: int, q: Q) -> Sem<Q> {
Sem(Exclusive::new(SemInner {
count: count, waiters: WaitQueue::new(), blocked: q }))
Sem(UnsafeArc::new(SemInner {
count: count,
waiters: WaitQueue::new(),
blocked: q,
lock: sync::Mutex::new(),
}))
}

unsafe fn with(&self, f: |&mut SemInner<Q>|) {
let Sem(ref arc) = *self;
let state = arc.get();
let _g = (*state).lock.lock();
f(cast::transmute(state));
}

pub fn acquire(&self) {
unsafe {
let mut waiter_nobe = None;
let Sem(ref lock) = *self;
lock.with(|state| {
self.with(|state| {
state.count -= 1;
if state.count < 0 {
// Create waiter nobe, enqueue ourself, and tell
Expand All @@ -127,8 +136,7 @@ impl<Q:Send> Sem<Q> {

pub fn release(&self) {
unsafe {
let Sem(ref lock) = *self;
lock.with(|state| {
self.with(|state| {
state.count += 1;
if state.count <= 0 {
state.waiters.signal();
Expand Down Expand Up @@ -208,8 +216,7 @@ impl<'a> Condvar<'a> {
let mut out_of_bounds = None;
// Release lock, 'atomically' enqueuing ourselves in so doing.
unsafe {
let Sem(ref queue) = *self.sem;
queue.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
// Drop the lock.
state.count += 1;
Expand Down Expand Up @@ -251,8 +258,7 @@ impl<'a> Condvar<'a> {
unsafe {
let mut out_of_bounds = None;
let mut result = false;
let Sem(ref lock) = *self.sem;
lock.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
result = state.blocked[condvar_id].signal();
} else {
Expand All @@ -274,8 +280,7 @@ impl<'a> Condvar<'a> {
let mut out_of_bounds = None;
let mut queue = None;
unsafe {
let Sem(ref lock) = *self.sem;
lock.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
// To avoid :broadcast_heavy, we make a new waitqueue,
// swap it out with the old one, and broadcast on the
Expand Down
3 changes: 2 additions & 1 deletion src/libgreen/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Runtime for SimpleTask {
}
Local::put(cur_task);
}
fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
fn reawaken(mut ~self, mut to_wake: ~Task) {
let me = &mut *self as *mut SimpleTask;
to_wake.put_runtime(self as ~Runtime);
unsafe {
Expand All @@ -76,6 +76,7 @@ impl Runtime for SimpleTask {
}
fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> { None }
fn stack_bounds(&self) -> (uint, uint) { fail!() }
fn can_block(&self) -> bool { true }
fn wrap(~self) -> ~Any { fail!() }
}

Expand Down
13 changes: 5 additions & 8 deletions src/libgreen/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ impl Runtime for GreenTask {
}
}

fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) {
fn reawaken(mut ~self, to_wake: ~Task) {
self.put_task(to_wake);
assert!(self.sched.is_none());

Expand Down Expand Up @@ -409,15 +409,10 @@ impl Runtime for GreenTask {
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
running_green_task.put_task(running_task);
let mut sched = running_green_task.sched.take_unwrap();
let sched = running_green_task.sched.take_unwrap();

if sched.pool_id == self.pool_id {
if can_resched {
sched.run_task(running_green_task, self);
} else {
sched.enqueue_task(self);
running_green_task.put_with_sched(sched);
}
sched.run_task(running_green_task, self);
} else {
self.reawaken_remotely();

Expand Down Expand Up @@ -462,6 +457,8 @@ impl Runtime for GreenTask {
c.current_stack_segment.end() as uint)
}

fn can_block(&self) -> bool { false }

fn wrap(~self) -> ~Any { self as ~Any }
}

Expand Down
2 changes: 1 addition & 1 deletion src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub fn init() {
}

unsafe {
use std::unstable::mutex::{Once, ONCE_INIT};
use std::sync::{Once, ONCE_INIT};
static mut INIT: Once = ONCE_INIT;
INIT.doit(|| {
let mut data: WSADATA = intrinsics::init();
Expand Down
4 changes: 3 additions & 1 deletion src/libnative/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ impl rt::Runtime for Ops {

fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }

fn can_block(&self) -> bool { true }

// This function gets a little interesting. There are a few safety and
// ownership violations going on here, but this is all done in the name of
// shared state. Additionally, all of the violations are protected with a
Expand Down Expand Up @@ -230,7 +232,7 @@ impl rt::Runtime for Ops {

// See the comments on `deschedule` for why the task is forgotten here, and
// why it's valid to do so.
fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
fn reawaken(mut ~self, mut to_wake: ~Task) {
unsafe {
let me = &mut *self as *mut Ops;
to_wake.put_runtime(self as ~rt::Runtime);
Expand Down
2 changes: 1 addition & 1 deletion src/librustc/back/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ pub mod write {
}

unsafe fn configure_llvm(sess: Session) {
use std::unstable::mutex::{Once, ONCE_INIT};
use std::sync::{Once, ONCE_INIT};
static mut INIT: Once = ONCE_INIT;

// Copy what clan does by turning on loop vectorization at O2 and
Expand Down
2 changes: 1 addition & 1 deletion src/librustc/middle/trans/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3295,7 +3295,7 @@ pub fn trans_crate(sess: session::Session,
output: &Path) -> CrateTranslation {
// Before we touch LLVM, make sure that multithreading is enabled.
unsafe {
use std::unstable::mutex::{Once, ONCE_INIT};
use std::sync::{Once, ONCE_INIT};
static mut INIT: Once = ONCE_INIT;
static mut POISONED: bool = false;
INIT.doit(|| {
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {

fn wakeup(slot: &mut Option<BlockedTask>) {
assert!(slot.is_some());
slot.take_unwrap().wake().map(|t| t.reawaken(true));
slot.take_unwrap().wake().map(|t| t.reawaken());
}

pub struct Request {
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
loop {
match state.consumer.pop() {
mpsc::Data(Task(task)) => {
task.wake().map(|t| t.reawaken(true));
task.wake().map(|t| t.reawaken());
}
mpsc::Data(Increment) => unsafe {
if state.refcnt == 0 {
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {

match timer.action.take_unwrap() {
WakeTask(task) => {
task.wake().map(|t| t.reawaken(true));
task.wake().map(|t| t.reawaken());
}
SendOnce(chan) => { chan.try_send(()); }
SendMany(chan, id) => {
Expand Down
20 changes: 7 additions & 13 deletions src/libstd/comm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,9 @@ impl Packet {

// This function must have had at least an acquire fence before it to be
// properly called.
fn wakeup(&mut self, can_resched: bool) {
fn wakeup(&mut self) {
match self.to_wake.take_unwrap().wake() {
Some(task) => task.reawaken(can_resched),
Some(task) => task.reawaken(),
None => {}
}
self.selecting.store(false, Relaxed);
Expand Down Expand Up @@ -496,7 +496,7 @@ impl Packet {
match self.channels.fetch_sub(1, SeqCst) {
1 => {
match self.cnt.swap(DISCONNECTED, SeqCst) {
-1 => { self.wakeup(true); }
-1 => { self.wakeup(); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
}
Expand Down Expand Up @@ -571,20 +571,14 @@ impl<T: Send> Chan<T> {
///
/// Like `send`, this method will never block. If the failure of send cannot
/// be tolerated, then this method should be used instead.
pub fn try_send(&self, t: T) -> bool { self.try(t, true) }

/// This function will not stick around for very long. The purpose of this
/// function is to guarantee that no rescheduling is performed.
pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) }

fn try(&self, t: T, can_resched: bool) -> bool {
pub fn try_send(&self, t: T) -> bool {
unsafe {
let this = cast::transmute_mut(self);
this.queue.push(t);
let packet = this.queue.packet();
match (*packet).increment() {
// As described above, -1 == wakeup
-1 => { (*packet).wakeup(can_resched); true }
-1 => { (*packet).wakeup(); true }
// Also as above, SPSC queues must be >= -2
-2 => true,
// We succeeded if we sent data
Expand All @@ -599,7 +593,7 @@ impl<T: Send> Chan<T> {
// the TLS overhead can be a bit much.
n => {
assert!(n >= 0);
if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
if n > 0 && n % RESCHED_FREQ == 0 {
let task: ~Task = Local::take();
task.maybe_yield();
}
Expand Down Expand Up @@ -675,7 +669,7 @@ impl<T: Send> SharedChan<T> {

match (*packet).increment() {
DISCONNECTED => {} // oh well, we tried
-1 => { (*packet).wakeup(true); }
-1 => { (*packet).wakeup(); }
n => {
if n > 0 && n % RESCHED_FREQ == 0 {
let task: ~Task = Local::take();
Expand Down
3 changes: 2 additions & 1 deletion src/libstd/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,15 @@ pub trait Runtime {
fn maybe_yield(~self, cur_task: ~Task);
fn deschedule(~self, times: uint, cur_task: ~Task,
f: |BlockedTask| -> Result<(), BlockedTask>);
fn reawaken(~self, to_wake: ~Task, can_resched: bool);
fn reawaken(~self, to_wake: ~Task);

// Miscellaneous calls which are very different depending on what context
// you're in.
fn spawn_sibling(~self, cur_task: ~Task, opts: TaskOpts, f: proc());
fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>>;
/// The (low, high) edges of the current stack.
fn stack_bounds(&self) -> (uint, uint); // (lo, hi)
fn can_block(&self) -> bool;

// XXX: This is a serious code smell and this should not exist at all.
fn wrap(~self) -> ~Any;
Expand Down
10 changes: 8 additions & 2 deletions src/libstd/rt/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ impl Task {
/// Wakes up a previously blocked task, optionally specifiying whether the
/// current task can accept a change in scheduling. This function can only
/// be called on tasks that were previously blocked in `deschedule`.
pub fn reawaken(mut ~self, can_resched: bool) {
pub fn reawaken(mut ~self) {
let ops = self.imp.take_unwrap();
ops.reawaken(self, can_resched);
ops.reawaken(self);
}

/// Yields control of this task to another task. This function will
Expand Down Expand Up @@ -292,6 +292,12 @@ impl Task {
pub fn stack_bounds(&self) -> (uint, uint) {
self.imp.get_ref().stack_bounds()
}

/// Returns whether it is legal for this task to block the OS thread that it
/// is running on.
pub fn can_block(&self) -> bool {
self.imp.get_ref().can_block()
}
}

impl Drop for Task {
Expand Down
7 changes: 7 additions & 0 deletions src/libstd/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@
//! and/or blocking at all, but rather provide the necessary tools to build
//! other types of concurrent primitives.

pub use self::mutex::{Mutex, StaticMutex, Guard, MUTEX_INIT};
pub use self::one::{Once, ONCE_INIT};

pub mod arc;
pub mod atomics;
pub mod deque;
pub mod mpmc_bounded_queue;
pub mod mpsc_queue;
pub mod spsc_queue;

mod mpsc_intrusive;
mod mutex;
mod one;
Loading