Skip to content

Commit

Permalink
Add SendDeferred trait and use it to fix #8214.
Browse files Browse the repository at this point in the history
  • Loading branch information
bblum committed Aug 2, 2013
1 parent f1c1f92 commit be7738b
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 28 deletions.
9 changes: 5 additions & 4 deletions src/libextra/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use std::borrow;
use std::comm;
use std::comm::SendDeferred;
use std::task;
use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
use std::unstable::atomics;
Expand Down Expand Up @@ -49,7 +50,7 @@ impl WaitQueue {
if self.head.peek() {
// Pop and send a wakeup signal. If the waiter was killed, its port
// will have closed. Keep trying until we get a live task.
if comm::try_send_one(self.head.recv(), ()) {
if self.head.recv().try_send_deferred(()) {
true
} else {
self.signal()
Expand All @@ -62,7 +63,7 @@ impl WaitQueue {
fn broadcast(&self) -> uint {
let mut count = 0;
while self.head.peek() {
if comm::try_send_one(self.head.recv(), ()) {
if self.head.recv().try_send_deferred(()) {
count += 1;
}
}
Expand Down Expand Up @@ -102,7 +103,7 @@ impl<Q:Send> Sem<Q> {
// Tell outer scope we need to block.
waiter_nobe = Some(WaitEnd);
// Enqueue ourself.
state.waiters.tail.send(SignalEnd);
state.waiters.tail.send_deferred(SignalEnd);
}
}
// Uncomment if you wish to test for sem races. Not valgrind-friendly.
Expand Down Expand Up @@ -256,7 +257,7 @@ impl<'self> Condvar<'self> {
}
// Enqueue ourself to be woken up by a signaller.
let SignalEnd = SignalEnd.take_unwrap();
state.blocked[condvar_id].tail.send(SignalEnd);
state.blocked[condvar_id].tail.send_deferred(SignalEnd);
} else {
out_of_bounds = Some(state.blocked.len());
}
Expand Down
30 changes: 30 additions & 0 deletions src/libstd/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use either::{Either, Left, Right};
use kinds::Send;
use option::{Option, Some};
use unstable::sync::Exclusive;
pub use rt::comm::SendDeferred;
use rtcomm = rt::comm;
use rt;

Expand Down Expand Up @@ -105,6 +106,21 @@ impl<T: Send> GenericSmartChan<T> for Chan<T> {
}
}

impl<T: Send> SendDeferred<T> for Chan<T> {
fn send_deferred(&self, x: T) {
match self.inner {
Left(ref chan) => chan.send(x),
Right(ref chan) => chan.send_deferred(x)
}
}
fn try_send_deferred(&self, x: T) -> bool {
match self.inner {
Left(ref chan) => chan.try_send(x),
Right(ref chan) => chan.try_send_deferred(x)
}
}
}

impl<T: Send> GenericPort<T> for Port<T> {
fn recv(&self) -> T {
match self.inner {
Expand Down Expand Up @@ -250,6 +266,20 @@ impl<T: Send> ChanOne<T> {
Right(p) => p.try_send(data)
}
}
pub fn send_deferred(self, data: T) {
let ChanOne { inner } = self;
match inner {
Left(p) => p.send(data),
Right(p) => p.send_deferred(data)
}
}
pub fn try_send_deferred(self, data: T) -> bool {
let ChanOne { inner } = self;
match inner {
Left(p) => p.try_send(data),
Right(p) => p.try_send_deferred(data)
}
}
}

pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
Expand Down
151 changes: 127 additions & 24 deletions src/libstd/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;
use rt::{context, SchedulerContext};
use tuple::ImmutableTuple;

/// A combined refcount / BlockedTask-as-uint pointer.
///
Expand Down Expand Up @@ -86,12 +87,32 @@ impl<T> ChanOne<T> {
}
}

/// Send a message on the one-shot channel. If a receiver task is blocked
/// waiting for the message, will wake it up and reschedule to it.
pub fn send(self, val: T) {
self.try_send(val);
}

/// As `send`, but also returns whether or not the receiver endpoint is still open.
pub fn try_send(self, val: T) -> bool {
self.try_send_inner(val, true)
}

/// Send a message without immediately rescheduling to a blocked receiver.
/// This can be useful in contexts where rescheduling is forbidden, or to
/// optimize for when the sender expects to still have useful work to do.
pub fn send_deferred(self, val: T) {
self.try_send_deferred(val);
}

/// As `send_deferred` and `try_send` together.
pub fn try_send_deferred(self, val: T) -> bool {
self.try_send_inner(val, false)
}

// 'do_resched' configures whether the scheduler immediately switches to
// the receiving task, or leaves the sending task still running.
fn try_send_inner(self, val: T, do_resched: bool) -> bool {
rtassert!(context() != SchedulerContext);

let mut this = self;
Expand Down Expand Up @@ -130,9 +151,16 @@ impl<T> ChanOne<T> {
task_as_state => {
// Port is blocked. Wake it up.
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
Scheduler::run_task(woken_task);
};
if do_resched {
do recvr.wake().map_consume |woken_task| {
Scheduler::run_task(woken_task);
};
} else {
let recvr = Cell::new(recvr);
do Local::borrow::<Scheduler, ()> |sched| {
sched.enqueue_blocked_task(recvr.take());
}
}
}
}
}
Expand All @@ -152,6 +180,7 @@ impl<T> PortOne<T> {
}
}

/// Wait for a message on the one-shot port. Fails if the send end is closed.
pub fn recv(self) -> T {
match self.try_recv() {
Some(val) => val,
Expand All @@ -161,6 +190,7 @@ impl<T> PortOne<T> {
}
}

/// As `recv`, but returns `None` if the send end is closed rather than failing.
pub fn try_recv(self) -> Option<T> {
let mut this = self;

Expand Down Expand Up @@ -382,6 +412,12 @@ impl<T> Drop for PortOne<T> {
}
}

/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
pub trait SendDeferred<T> {
fn send_deferred(&self, val: T);
fn try_send_deferred(&self, val: T) -> bool;
}

struct StreamPayload<T> {
val: T,
next: PortOne<StreamPayload<T>>
Expand Down Expand Up @@ -409,6 +445,15 @@ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
return (port, chan);
}

impl<T: Send> Chan<T> {
fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
let (next_pone, next_cone) = oneshot();
let cone = self.next.take();
self.next.put_back(next_cone);
cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
}
}

impl<T: Send> GenericChan<T> for Chan<T> {
fn send(&self, val: T) {
self.try_send(val);
Expand All @@ -417,10 +462,16 @@ impl<T: Send> GenericChan<T> for Chan<T> {

impl<T: Send> GenericSmartChan<T> for Chan<T> {
fn try_send(&self, val: T) -> bool {
let (next_pone, next_cone) = oneshot();
let cone = self.next.take();
self.next.put_back(next_cone);
cone.try_send(StreamPayload { val: val, next: next_pone })
self.try_send_inner(val, true)
}
}

impl<T: Send> SendDeferred<T> for Chan<T> {
fn send_deferred(&self, val: T) {
self.try_send_deferred(val);
}
fn try_send_deferred(&self, val: T) -> bool {
self.try_send_inner(val, false)
}
}

Expand Down Expand Up @@ -495,6 +546,17 @@ impl<T> SharedChan<T> {
}
}

impl<T: Send> SharedChan<T> {
fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
unsafe {
let (next_pone, next_cone) = oneshot();
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
do_resched)
}
}
}

impl<T: Send> GenericChan<T> for SharedChan<T> {
fn send(&self, val: T) {
self.try_send(val);
Expand All @@ -503,11 +565,16 @@ impl<T: Send> GenericChan<T> for SharedChan<T> {

impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
fn try_send(&self, val: T) -> bool {
unsafe {
let (next_pone, next_cone) = oneshot();
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
}
self.try_send_inner(val, true)
}
}

impl<T: Send> SendDeferred<T> for SharedChan<T> {
fn send_deferred(&self, val: T) {
self.try_send_deferred(val);
}
fn try_send_deferred(&self, val: T) -> bool {
self.try_send_inner(val, false)
}
}

Expand Down Expand Up @@ -584,31 +651,32 @@ pub fn megapipe<T: Send>() -> MegaPipe<T> {

impl<T: Send> GenericChan<T> for MegaPipe<T> {
fn send(&self, val: T) {
match *self {
(_, ref c) => c.send(val)
}
self.second_ref().send(val)
}
}

impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
fn try_send(&self, val: T) -> bool {
match *self {
(_, ref c) => c.try_send(val)
}
self.second_ref().try_send(val)
}
}

impl<T: Send> GenericPort<T> for MegaPipe<T> {
fn recv(&self) -> T {
match *self {
(ref p, _) => p.recv()
}
self.first_ref().recv()
}

fn try_recv(&self) -> Option<T> {
match *self {
(ref p, _) => p.try_recv()
}
self.first_ref().try_recv()
}
}

impl<T: Send> SendDeferred<T> for MegaPipe<T> {
fn send_deferred(&self, val: T) {
self.second_ref().send_deferred(val)
}
fn try_send_deferred(&self, val: T) -> bool {
self.second_ref().try_send_deferred(val)
}
}

Expand Down Expand Up @@ -1017,4 +1085,39 @@ mod test {
}
}
}

#[test]
fn send_deferred() {
use unstable::sync::atomically;

// Tests no-rescheduling of send_deferred on all types of channels.
do run_in_newsched_task {
let (pone, cone) = oneshot();
let (pstream, cstream) = stream();
let (pshared, cshared) = stream();
let cshared = SharedChan::new(cshared);
let mp = megapipe();

let pone = Cell::new(pone);
do spawntask { pone.take().recv(); }
let pstream = Cell::new(pstream);
do spawntask { pstream.take().recv(); }
let pshared = Cell::new(pshared);
do spawntask { pshared.take().recv(); }
let p_mp = Cell::new(mp.clone());
do spawntask { p_mp.take().recv(); }

let cs = Cell::new((cone, cstream, cshared, mp));
unsafe {
do atomically {
let (cone, cstream, cshared, mp) = cs.take();
cone.send_deferred(());
cstream.send_deferred(());
cshared.send_deferred(());
mp.send_deferred(());
}
}
}
}

}

0 comments on commit be7738b

Please sign in to comment.