Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std: Rewrite the sync module #19274

Merged
merged 2 commits into from
Dec 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/etc/licenseck.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
"rt/isaac/randport.cpp", # public domain
"rt/isaac/rand.h", # public domain
"rt/isaac/standard.h", # public domain
"libstd/sync/mpsc_queue.rs", # BSD
"libstd/sync/spsc_queue.rs", # BSD
"libstd/sync/mpmc_bounded_queue.rs", # BSD
"libstd/comm/mpsc_queue.rs", # BSD
"libstd/comm/spsc_queue.rs", # BSD
"test/bench/shootout-binarytrees.rs", # BSD
"test/bench/shootout-chameneos-redux.rs", # BSD
"test/bench/shootout-fannkuch-redux.rs", # BSD
Expand Down
20 changes: 12 additions & 8 deletions src/libstd/comm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ mod select;
mod shared;
mod stream;
mod sync;
mod mpsc_queue;
mod spsc_queue;

/// The receiving-half of Rust's channel type. This half can only be owned by
/// one task
Expand Down Expand Up @@ -628,24 +630,26 @@ impl<T: Send> Sender<T> {
#[unstable]
impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let (packet, sleeper) = match *unsafe { self.inner() } {
let (packet, sleeper, guard) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
(*a.get()).postinit_lock();
let guard = (*a.get()).postinit_lock();
match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
oneshot::UpWoke(task) => (a, Some(task))
oneshot::UpSuccess |
oneshot::UpDisconnected => (a, None, guard),
oneshot::UpWoke(task) => (a, Some(task), guard)
}
}
}
Stream(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
(*a.get()).postinit_lock();
let guard = (*a.get()).postinit_lock();
match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
stream::UpSuccess | stream::UpDisconnected => (a, None),
stream::UpWoke(task) => (a, Some(task)),
stream::UpSuccess |
stream::UpDisconnected => (a, None, guard),
stream::UpWoke(task) => (a, Some(task), guard),
}
}
}
Expand All @@ -657,7 +661,7 @@ impl<T: Send> Clone for Sender<T> {
};

unsafe {
(*packet.get()).inherit_blocker(sleeper);
(*packet.get()).inherit_blocker(sleeper, guard);

let tmp = Sender::new(Shared(packet.clone()));
mem::swap(self.inner_mut(), tmp.inner_mut());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,6 @@ impl<T: Send> Queue<T> {
if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
}
}

/// Attempts to pop data from this queue, but doesn't attempt too hard. This
/// will canonicalize inconsistent states to a `None` value.
pub fn casual_pop(&self) -> Option<T> {
match self.pop() {
Data(t) => Some(t),
Empty | Inconsistent => None,
}
}
}

#[unsafe_destructor]
Expand Down
21 changes: 11 additions & 10 deletions src/libstd/comm/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ use alloc::boxed::Box;
use core::cmp;
use core::int;
use rustrt::local::Local;
use rustrt::mutex::NativeMutex;
use rustrt::task::{Task, BlockedTask};
use rustrt::thread::Thread;

use sync::atomic;
use sync::mpsc_queue as mpsc;
use sync::{atomic, Mutex, MutexGuard};
use comm::mpsc_queue as mpsc;

const DISCONNECTED: int = int::MIN;
const FUDGE: int = 1024;
Expand All @@ -56,7 +55,7 @@ pub struct Packet<T> {

// this lock protects various portions of this implementation during
// select()
select_lock: NativeMutex,
select_lock: Mutex<()>,
}

pub enum Failure {
Expand All @@ -76,7 +75,7 @@ impl<T: Send> Packet<T> {
channels: atomic::AtomicInt::new(2),
port_dropped: atomic::AtomicBool::new(false),
sender_drain: atomic::AtomicInt::new(0),
select_lock: unsafe { NativeMutex::new() },
select_lock: Mutex::new(()),
};
return p;
}
Expand All @@ -86,16 +85,18 @@ impl<T: Send> Packet<T> {
// In other case mutex data will be duplicated while cloning
// and that could cause problems on platforms where it is
// represented by opaque data structure
pub fn postinit_lock(&mut self) {
unsafe { self.select_lock.lock_noguard() }
pub fn postinit_lock(&self) -> MutexGuard<()> {
self.select_lock.lock()
}

// This function is used at the creation of a shared packet to inherit a
// previously blocked task. This is done to prevent spurious wakeups of
// tasks in select().
//
// This can only be called at channel-creation time
pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
pub fn inherit_blocker(&mut self,
task: Option<BlockedTask>,
guard: MutexGuard<()>) {
match task {
Some(task) => {
assert_eq!(self.cnt.load(atomic::SeqCst), 0);
Expand Down Expand Up @@ -135,7 +136,7 @@ impl<T: Send> Packet<T> {
// interfere with this method. After we unlock this lock, we're
// signifying that we're done modifying self.cnt and self.to_wake and
// the port is ready for the world to continue using it.
unsafe { self.select_lock.unlock_noguard() }
drop(guard);
}

pub fn send(&mut self, t: T) -> Result<(), T> {
Expand Down Expand Up @@ -441,7 +442,7 @@ impl<T: Send> Packet<T> {
// done with. Without this bounce, we can race with inherit_blocker
// about looking at and dealing with to_wake. Once we have acquired the
// lock, we are guaranteed that inherit_blocker is done.
unsafe {
{
let _guard = self.select_lock.lock();
}

Expand Down
160 changes: 56 additions & 104 deletions src/libstd/sync/spsc_queue.rs → src/libstd/comm/spsc_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use core::prelude::*;
use alloc::boxed::Box;
use core::mem;
use core::cell::UnsafeCell;
use alloc::arc::Arc;

use sync::atomic::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};

Expand Down Expand Up @@ -74,39 +73,6 @@ pub struct Queue<T> {
cache_subtractions: AtomicUint,
}

/// A safe abstraction for the consumer in a single-producer single-consumer
/// queue.
pub struct Consumer<T> {
inner: Arc<Queue<T>>
}

impl<T: Send> Consumer<T> {
/// Attempts to pop the value from the head of the queue, returning `None`
/// if the queue is empty.
pub fn pop(&mut self) -> Option<T> {
self.inner.pop()
}

/// Attempts to peek at the head of the queue, returning `None` if the queue
/// is empty.
pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
self.inner.peek()
}
}

/// A safe abstraction for the producer in a single-producer single-consumer
/// queue.
pub struct Producer<T> {
inner: Arc<Queue<T>>
}

impl<T: Send> Producer<T> {
/// Pushes a new value onto the queue.
pub fn push(&mut self, t: T) {
self.inner.push(t)
}
}

impl<T: Send> Node<T> {
fn new() -> *mut Node<T> {
unsafe {
Expand All @@ -118,30 +84,6 @@ impl<T: Send> Node<T> {
}
}

/// Creates a new queue with a consumer-producer pair.
///
/// The producer returned is connected to the consumer to push all data to
/// the consumer.
///
/// # Arguments
///
/// * `bound` - This queue implementation is implemented with a linked
/// list, and this means that a push is always a malloc. In
/// order to amortize this cost, an internal cache of nodes is
/// maintained to prevent a malloc from always being
/// necessary. This bound is the limit on the size of the
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
pub fn queue<T: Send>(bound: uint) -> (Consumer<T>, Producer<T>) {
let q = unsafe { Queue::new(bound) };
let arc = Arc::new(q);
let consumer = Consumer { inner: arc.clone() };
let producer = Producer { inner: arc };

(consumer, producer)
}

impl<T: Send> Queue<T> {
/// Creates a new queue.
///
Expand Down Expand Up @@ -296,78 +238,88 @@ impl<T: Send> Drop for Queue<T> {
mod test {
use prelude::*;

use super::{queue};
use sync::Arc;
use super::Queue;

#[test]
fn smoke() {
let (mut consumer, mut producer) = queue(0);
producer.push(1i);
producer.push(2);
assert_eq!(consumer.pop(), Some(1i));
assert_eq!(consumer.pop(), Some(2));
assert_eq!(consumer.pop(), None);
producer.push(3);
producer.push(4);
assert_eq!(consumer.pop(), Some(3));
assert_eq!(consumer.pop(), Some(4));
assert_eq!(consumer.pop(), None);
unsafe {
let queue = Queue::new(0);
queue.push(1i);
queue.push(2);
assert_eq!(queue.pop(), Some(1i));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), None);
queue.push(3);
queue.push(4);
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), Some(4));
assert_eq!(queue.pop(), None);
}
}

#[test]
fn peek() {
let (mut consumer, mut producer) = queue(0);
producer.push(vec![1i]);
unsafe {
let queue = Queue::new(0);
queue.push(vec![1i]);

// Ensure the borrowchecker works
match queue.peek() {
Some(vec) => match vec.as_slice() {
// Note that `pop` is not allowed here due to borrow
[1] => {}
_ => return
},
None => unreachable!()
}

// Ensure the borrowchecker works
match consumer.peek() {
Some(vec) => match vec.as_slice() {
// Note that `pop` is not allowed here due to borrow
[1] => {}
_ => return
},
None => unreachable!()
queue.pop();
}

consumer.pop();
}

#[test]
fn drop_full() {
let (_, mut producer) = queue(0);
producer.push(box 1i);
producer.push(box 2i);
unsafe {
let q = Queue::new(0);
q.push(box 1i);
q.push(box 2i);
}
}

#[test]
fn smoke_bound() {
let (mut consumer, mut producer) = queue(1);
producer.push(1i);
producer.push(2);
assert_eq!(consumer.pop(), Some(1));
assert_eq!(consumer.pop(), Some(2));
assert_eq!(consumer.pop(), None);
producer.push(3);
producer.push(4);
assert_eq!(consumer.pop(), Some(3));
assert_eq!(consumer.pop(), Some(4));
assert_eq!(consumer.pop(), None);
unsafe {
let q = Queue::new(0);
q.push(1i);
q.push(2);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.pop(), None);
q.push(3);
q.push(4);
assert_eq!(q.pop(), Some(3));
assert_eq!(q.pop(), Some(4));
assert_eq!(q.pop(), None);
}
}

#[test]
fn stress() {
stress_bound(0);
stress_bound(1);
unsafe {
stress_bound(0);
stress_bound(1);
}

fn stress_bound(bound: uint) {
let (consumer, mut producer) = queue(bound);
unsafe fn stress_bound(bound: uint) {
let q = Arc::new(Queue::new(bound));

let (tx, rx) = channel();
let q2 = q.clone();
spawn(proc() {
// Move the consumer to a local mutable slot
let mut consumer = consumer;
for _ in range(0u, 100000) {
loop {
match consumer.pop() {
match q2.pop() {
Some(1i) => break,
Some(_) => panic!(),
None => {}
Expand All @@ -377,7 +329,7 @@ mod test {
tx.send(());
});
for _ in range(0i, 100000) {
producer.push(1);
q.push(1);
}
rx.recv();
}
Expand Down
2 changes: 1 addition & 1 deletion src/libstd/comm/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use rustrt::task::{Task, BlockedTask};
use rustrt::thread::Thread;

use sync::atomic;
use sync::spsc_queue as spsc;
use comm::spsc_queue as spsc;
use comm::Receiver;

const DISCONNECTED: int = int::MIN;
Expand Down
4 changes: 2 additions & 2 deletions src/libstd/dynamic_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ pub mod dl {
}

pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, String> {
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
use sync::{StaticMutex, MUTEX_INIT};
static LOCK: StaticMutex = MUTEX_INIT;
unsafe {
// dlerror isn't thread safe, so we need to lock around this entire
// sequence
Expand Down
Loading