Skip to content

comm: Impose an artificial bound on channel size #12981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 65 additions & 13 deletions src/libstd/comm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@
use cast;
use cell::Cell;
use clone::Clone;
use cmp;
use iter::Iterator;
use kinds::Send;
use kinds::marker;
Expand All @@ -248,6 +249,7 @@ use result::{Ok, Err, Result};
use rt::local::Local;
use rt::task::{Task, BlockedTask};
use sync::arc::UnsafeArc;
use uint;

pub use comm::select::{Select, Handle};

Expand Down Expand Up @@ -330,16 +332,55 @@ enum Flavor<T> {
Shared(UnsafeArc<shared::Packet<T>>),
}

/// Creates a new channel, returning the sender/receiver halves. All data sent
/// on the sender will become available on the receiver. See the documentation
/// of `Receiver` and `Sender` to see what's possible with them.
/// Creates a new asynchronous, unbounded channel.
///
/// The return value is the sender/receiver pair which can be deconstructed to
/// move ownership separately. All data sent on the sender will become
/// available on the receiver. See the documentation of `Receiver` and
/// `Sender` to see what's possible with them.
///
/// The `Receiver` returned will always block in `recv` waiting for new values,
/// but the `Sender` will never block when sending values (this is an
/// asynchronous channel). Additionally, the `Sender` will never fail if the
/// receiver has not disconnected, because the channel is unbounded.
///
/// # Example
///
/// ```
/// let (tx, rx) = channel();
///
/// spawn(proc() {
/// tx.send(100);
/// });
///
/// println!("received: {}", rx.recv());
/// ```
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
(Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
(Sender::new(Oneshot(b)), Receiver::new(Oneshot(a)))
}

/// Creates a new asynchronous, unbounded channel which disables asserts about
/// the size of the channel.
///
/// The asynchronous channels in Rust will normally assert that the size of the
/// channel is under a certain "very high" threshold, but using this function
/// disables this assertion, enabling a truly unbounded number of sends.
///
/// It is not recommended to use this method, the `channel()` constructor is
/// likely what you want.
#[experimental]
pub fn unchecked_channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let mut packet = shared::Packet::new();
packet.inherit_blocker(None); // finish upgrade protocol
packet.bound_checks = false; // disable assertions

let (a, b) = UnsafeArc::new2(packet);
(Sender::new(Shared(a)), Receiver::new(Shared(b)))
}

impl<T: Send> Sender<T> {
fn my_new(inner: Flavor<T>) -> Sender<T> {
fn new(inner: Flavor<T>) -> Sender<T> {
Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
}

Expand Down Expand Up @@ -405,7 +446,7 @@ impl<T: Send> Sender<T> {
return (*p).send(t);
} else {
let (a, b) = UnsafeArc::new2(stream::Packet::new());
match (*p).upgrade(Receiver::my_new(Stream(b))) {
match (*p).upgrade(Receiver::new(Stream(b))) {
oneshot::UpSuccess => {
(*a.get()).send(t);
(a, true)
Expand All @@ -425,7 +466,7 @@ impl<T: Send> Sender<T> {
};

unsafe {
let mut tmp = Sender::my_new(Stream(new_inner));
let mut tmp = Sender::new(Stream(new_inner));
mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
}
return ret;
Expand All @@ -437,31 +478,31 @@ impl<T: Send> Clone for Sender<T> {
let (packet, sleeper) = match self.inner {
Oneshot(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new());
match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
oneshot::UpWoke(task) => (b, Some(task))
}
}
Stream(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new());
match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
stream::UpSuccess | stream::UpDisconnected => (b, None),
stream::UpWoke(task) => (b, Some(task)),
}
}
Shared(ref p) => {
unsafe { (*p.get()).clone_chan(); }
return Sender::my_new(Shared(p.clone()));
return Sender::new(Shared(p.clone()));
}
};

unsafe {
(*packet.get()).inherit_blocker(sleeper);

let mut tmp = Sender::my_new(Shared(packet.clone()));
let mut tmp = Sender::new(Shared(packet.clone()));
mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
}
Sender::my_new(Shared(packet))
Sender::new(Shared(packet))
}
}

Expand All @@ -477,7 +518,7 @@ impl<T: Send> Drop for Sender<T> {
}

impl<T: Send> Receiver<T> {
fn my_new(inner: Flavor<T>) -> Receiver<T> {
fn new(inner: Flavor<T>) -> Receiver<T> {
Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
}

Expand Down Expand Up @@ -708,6 +749,17 @@ impl<T: Send> Drop for Receiver<T> {
}
}

fn assert_sane_bound<T>(amt: uint) {
// Assume that either taking up half the address space with messages or
// having some "very large number" of messages constitues overflowing.
//
// Currently, this "very large number" is around 500 million. This was
// arbitrarily chosen. On OSX, it took about a 80 seconds of sending 1s on a
// channel to reach this limit.
let limit = cmp::min(uint::MAX / 2 / mem::nonzero_size_of::<T>(), 2 << 28);
assert!(amt < limit);
}

#[cfg(test)]
mod test {
use prelude::*;
Expand Down
13 changes: 12 additions & 1 deletion src/libstd/comm/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ pub struct Packet<T> {
// this lock protects various portions of this implementation during
// select()
select_lock: NativeMutex,

// Flag to disable sanity bound checks. This value does not change after
// construction.
bound_checks: bool,
}

pub enum Failure {
Expand All @@ -77,6 +81,7 @@ impl<T: Send> Packet<T> {
port_dropped: atomics::AtomicBool::new(false),
sender_drain: atomics::AtomicInt::new(0),
select_lock: unsafe { NativeMutex::new() },
bound_checks: true,
};
// see comments in inherit_blocker about why we grab this lock
unsafe { p.select_lock.lock_noguard() }
Expand Down Expand Up @@ -210,7 +215,13 @@ impl<T: Send> Packet<T> {
}

// Can't make any assumptions about this case like in the SPSC case.
_ => {}
// Be sure, however, that we're not going towards exhausting the
// address space.
n => {
if n > 0 && self.bound_checks {
super::assert_sane_bound::<T>(n as uint);
}
}
}

true
Expand Down
11 changes: 8 additions & 3 deletions src/libstd/comm/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,14 @@ impl<T: Send> Packet<T> {
}
}

// Otherwise we just sent some data on a non-waiting queue, so just
// make sure the world is sane and carry on!
n => { assert!(n >= 0); UpSuccess }
// Otherwise we just sent some data on a non-waiting queue. Be sure
// that we're not moving towards exhausting the address space, and
// then carry on.
n => {
assert!(n >= 0);
super::assert_sane_bound::<T>(n as uint);
UpSuccess
}
}
}

Expand Down