Skip to content

Commit

Permalink
Add Windows keyed event implementation of ThreadParker
Browse files Browse the repository at this point in the history
  • Loading branch information
Amanieu committed May 22, 2016
1 parent cf64fd9 commit 43abbc9
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 83 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ smallvec = "0.1"
[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2"

[target.'cfg(windows)'.dependencies]
winapi = "0.2"
kernel32-sys = "0.2"

[dev-dependencies]
rand = "0.3"
lazy_static = "0.2"
Expand Down
10 changes: 9 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,21 @@ extern crate lazy_static;
#[cfg(all(feature = "nightly", target_os = "linux"))]
extern crate libc;

#[cfg(windows)]
extern crate winapi;
#[cfg(windows)]
extern crate kernel32;

// Spin limit from JikesRVM & Webkit experiments
const SPIN_LIMIT: usize = 40;

#[cfg(all(feature = "nightly", target_os = "linux"))]
#[path = "thread_parker/linux.rs"]
mod thread_parker;
#[cfg(not(all(feature = "nightly", target_os = "linux")))]
#[cfg(windows)]
#[path = "thread_parker/windows.rs"]
mod thread_parker;
#[cfg(not(any(windows, all(feature = "nightly", target_os = "linux"))))]
#[path = "thread_parker/generic.rs"]
mod thread_parker;

Expand Down
126 changes: 63 additions & 63 deletions src/parking_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,81 +268,81 @@ pub unsafe fn park(key: usize,
timeout: Option<Instant>)
-> bool {
// Grab our thread data, this also ensures that the hash table exists
THREAD_DATA.with(|thread_data| {
// Lock the bucket for the given key
let bucket = lock_bucket(key).unwrap();
let thread_data = &*THREAD_DATA.with(|x| x as *const ThreadData);

// If the validation function fails, just return
if !validate() {
bucket.mutex.unlock();
return false;
}
// Lock the bucket for the given key
let bucket = lock_bucket(key).unwrap();

// Append our thread data to the queue and unlock the bucket
thread_data.next_in_queue.set(ptr::null());
thread_data.key.set(key);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
// If the validation function fails, just return
if !validate() {
bucket.mutex.unlock();
return false;
}

// Invoke the pre-sleep callback
before_sleep();

// Park our thread and determine whether we were woken up by an unpark
// or by our timeout. Note that this isn't precise: we can still be
// unparked since we are still in the queue.
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
true
}
};
// Append our thread data to the queue and unlock the bucket
thread_data.next_in_queue.set(ptr::null());
thread_data.key.set(key);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
bucket.mutex.unlock();

// If we were unparked, return now
if unparked {
return true;
// Invoke the pre-sleep callback
before_sleep();

// Park our thread and determine whether we were woken up by an unpark or by
// our timeout. Note that this isn't precise: we can still be unparked
// since we are still in the queue.
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
true
}
};

// Lock our bucket again. Note that the hashtable may have been rehashed
// in the meantime.
let bucket = lock_bucket(key).unwrap();
// If we were unparked, return now
if unparked {
return true;
}

// Now we need to check again if we were unparked or timed out. Unlike
// the last check this is precise because we hold the bucket lock.
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return true;
}
// Lock our bucket again. Note that the hashtable may have been rehashed in
// the meantime.
let bucket = lock_bucket(key).unwrap();

// We timed out, so we now need to remove our thread from the queue
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
}
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
// Now we need to check again if we were unparked or timed out. Unlike the
// last check this is precise because we hold the bucket lock.
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return true;
}

// We timed out, so we now need to remove our thread from the queue
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
}
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}

// Unlock the bucket, we are done
bucket.mutex.unlock();
false
})
// Unlock the bucket, we are done
bucket.mutex.unlock();
false
}

/// Unparks one thread from the queue associated with the given key.
Expand Down
Loading

0 comments on commit 43abbc9

Please sign in to comment.