From 03aae2f7f0d98256dee65554e72c26ba787d30fa Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 17 Mar 2014 11:20:42 -0700 Subject: [PATCH 1/2] comm: Rename my_new to new This opened up after the removal of `Chan::new`. This is just an internal implementation detail. --- src/libstd/comm/mod.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 267140a0089bd..de8a464361f43 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -335,11 +335,11 @@ enum Flavor { /// of `Receiver` and `Sender` to see what's possible with them. pub fn channel() -> (Sender, Receiver) { let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a))) + (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) } impl Sender { - fn my_new(inner: Flavor) -> Sender { + fn new(inner: Flavor) -> Sender { Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare } } @@ -405,7 +405,7 @@ impl Sender { return (*p).send(t); } else { let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Receiver::my_new(Stream(b))) { + match (*p).upgrade(Receiver::new(Stream(b))) { oneshot::UpSuccess => { (*a.get()).send(t); (a, true) @@ -425,7 +425,7 @@ impl Sender { }; unsafe { - let mut tmp = Sender::my_new(Stream(new_inner)); + let mut tmp = Sender::new(Stream(new_inner)); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } return ret; @@ -437,31 +437,31 @@ impl Clone for Sender { let (packet, sleeper) = match self.inner { Oneshot(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), oneshot::UpWoke(task) => (b, Some(task)) } } Stream(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { stream::UpSuccess | stream::UpDisconnected => (b, None), stream::UpWoke(task) => (b, Some(task)), } } Shared(ref p) => { unsafe { (*p.get()).clone_chan(); } - return Sender::my_new(Shared(p.clone())); + return Sender::new(Shared(p.clone())); } }; unsafe { (*packet.get()).inherit_blocker(sleeper); - let mut tmp = Sender::my_new(Shared(packet.clone())); + let mut tmp = Sender::new(Shared(packet.clone())); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } - Sender::my_new(Shared(packet)) + Sender::new(Shared(packet)) } } @@ -477,7 +477,7 @@ impl Drop for Sender { } impl Receiver { - fn my_new(inner: Flavor) -> Receiver { + fn new(inner: Flavor) -> Receiver { Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare } } From d17116c2d80b8a40f05445c7f3ad7ecca48c4baf Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 17 Mar 2014 10:56:40 -0700 Subject: [PATCH 2/2] comm: Impose an artificial bound on channel size This commit adds an artificial and "very high" bound on the size of an unbounded asynchronous channel. An unbounded channel runs the risk of memory exhaustions, resulting in a process abort (currently OOM is not a fail!()). This is often a disastrous failure mode, so instead this commit attempts to mitigate some of the worry by imposing a large limit which will theoretically get triggered before hitting OOM. This commit also adds an ability to sidestep these assertions by using the `unchecked_channel` method to create a Sender/Receiver pair. This function is currently marked as `#[experimental]` due to leaking implementation abstractions. --- src/libstd/comm/mod.rs | 58 +++++++++++++++++++++++++++++++++++++-- src/libstd/comm/shared.rs | 13 ++++++++- src/libstd/comm/stream.rs | 11 ++++++-- 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index de8a464361f43..b33e2ee7bd3e0 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -238,6 +238,7 @@ use cast; use cell::Cell; use clone::Clone; +use cmp; use iter::Iterator; use kinds::Send; use kinds::marker; @@ -248,6 +249,7 @@ use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; use sync::arc::UnsafeArc; +use uint; pub use comm::select::{Select, Handle}; @@ -330,14 +332,53 @@ enum Flavor { Shared(UnsafeArc>), } -/// Creates a new channel, returning the sender/receiver halves. All data sent -/// on the sender will become available on the receiver. See the documentation -/// of `Receiver` and `Sender` to see what's possible with them. +/// Creates a new asynchronous, unbounded channel. +/// +/// The return value is the sender/receiver pair which can be deconstructed to +/// move ownership separately. All data sent on the sender will become +/// available on the receiver. See the documentation of `Receiver` and +/// `Sender` to see what's possible with them. +/// +/// The `Receiver` returned will always block in `recv` waiting for new values, +/// but the `Sender` will never block when sending values (this is an +/// asynchronous channel). Additionally, the `Sender` will never fail if the +/// receiver has not disconnected, because the channel is unbounded. +/// +/// # Example +/// +/// ``` +/// let (tx, rx) = channel(); +/// +/// spawn(proc() { +/// tx.send(100); +/// }); +/// +/// println!("received: {}", rx.recv()); +/// ``` pub fn channel() -> (Sender, Receiver) { let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) } +/// Creates a new asynchronous, unbounded channel which disables asserts about +/// the size of the channel. +/// +/// The asynchronous channels in Rust will normally assert that the size of the +/// channel is under a certain "very high" threshold, but using this function +/// disables this assertion, enabling a truly unbounded number of sends. +/// +/// It is not recommended to use this method, the `channel()` constructor is +/// likely what you want. +#[experimental] +pub fn unchecked_channel() -> (Sender, Receiver) { + let mut packet = shared::Packet::new(); + packet.inherit_blocker(None); // finish upgrade protocol + packet.bound_checks = false; // disable assertions + + let (a, b) = UnsafeArc::new2(packet); + (Sender::new(Shared(a)), Receiver::new(Shared(b))) +} + impl Sender { fn new(inner: Flavor) -> Sender { Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare } @@ -708,6 +749,17 @@ impl Drop for Receiver { } } +fn assert_sane_bound(amt: uint) { + // Assume that either taking up half the address space with messages or + // having some "very large number" of messages constitues overflowing. + // + // Currently, this "very large number" is around 500 million. This was + // arbitrarily chosen. On OSX, it took about a 80 seconds of sending 1s on a + // channel to reach this limit. + let limit = cmp::min(uint::MAX / 2 / mem::nonzero_size_of::(), 2 << 28); + assert!(amt < limit); +} + #[cfg(test)] mod test { use prelude::*; diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index 8c8ae85e4ea20..cc9d95c5c6097 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -58,6 +58,10 @@ pub struct Packet { // this lock protects various portions of this implementation during // select() select_lock: NativeMutex, + + // Flag to disable sanity bound checks. This value does not change after + // construction. + bound_checks: bool, } pub enum Failure { @@ -77,6 +81,7 @@ impl Packet { port_dropped: atomics::AtomicBool::new(false), sender_drain: atomics::AtomicInt::new(0), select_lock: unsafe { NativeMutex::new() }, + bound_checks: true, }; // see comments in inherit_blocker about why we grab this lock unsafe { p.select_lock.lock_noguard() } @@ -210,7 +215,13 @@ impl Packet { } // Can't make any assumptions about this case like in the SPSC case. - _ => {} + // Be sure, however, that we're not going towards exhausting the + // address space. + n => { + if n > 0 && self.bound_checks { + super::assert_sane_bound::(n as uint); + } + } } true diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index 5820b13a35f46..0fc6021f3583e 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -132,9 +132,14 @@ impl Packet { } } - // Otherwise we just sent some data on a non-waiting queue, so just - // make sure the world is sane and carry on! - n => { assert!(n >= 0); UpSuccess } + // Otherwise we just sent some data on a non-waiting queue. Be sure + // that we're not moving towards exhausting the address space, and + // then carry on. + n => { + assert!(n >= 0); + super::assert_sane_bound::(n as uint); + UpSuccess + } } }