From 545d4718c8e1b9e69474165a1cb38d873627183d Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 10 Apr 2014 10:53:49 -0700 Subject: [PATCH] std: Make std::comm return types consistent There are currently a number of return values from the std::comm methods, not all of which are necessarily completely expressive: Sender::try_send(t: T) -> bool This method currently doesn't transmit back the data `t` if the send fails due to the other end having disconnected. Additionally, this shares the name of the synchronous try_send method, but it differs in semantics in that it only has one failure case, not two (the buffer can never be full). SyncSender::try_send(t: T) -> TrySendResult This method accurately conveys all possible information, but it uses a custom type to the std::comm module with no convenience methods on it. Additionally, if you want to inspect the result you're forced to import something from `std::comm`. SyncSender::send_opt(t: T) -> Option This method uses Some(T) as an "error value" and None as a "success value", but almost all other uses of Option have Some/None the other way Receiver::try_recv(t: T) -> TryRecvResult Similarly to the synchronous try_send, this custom return type is lacking in terms of usability (no convenience methods). With this number of drawbacks in mind, I believed it was time to re-work the return types of these methods. The new API for the comm module is: Sender::send(t: T) -> () Sender::send_opt(t: T) -> Result<(), T> SyncSender::send(t: T) -> () SyncSender::send_opt(t: T) -> Result<(), T> SyncSender::try_send(t: T) -> Result<(), TrySendError> Receiver::recv() -> T Receiver::recv_opt() -> Result Receiver::try_recv() -> Result The notable changes made are: * Sender::try_send => Sender::send_opt. This renaming brings the semantics in line with the SyncSender::send_opt method. An asychronous send only has one failure case, unlike the synchronous try_send method which has two failure cases (full/disconnected). * Sender::send_opt returns the data back to the caller if the send is guaranteed to fail. This method previously returned `bool`, but then it was unable to retrieve the data if the data was guaranteed to fail to send. There is still a race such that when `Ok(())` is returned the data could still fail to be received, but that's inherent to an asynchronous channel. * Result is now the basis of all return values. This not only adds lots of convenience methods to all return values for free, but it also means that you can inspect the return values with no extra imports (Ok/Err are in the prelude). Additionally, it's now self documenting when something failed or not because the return value has "Err" in the name. Things I'm a little uneasy about: * The methods send_opt and recv_opt are not returning options, but rather results. I felt more strongly that Option was the wrong return type than the _opt prefix was wrong, and I coudn't think of a much better name for these methods. One possible way to think about them is to read the _opt suffix as "optionally". * Result is often better expressed as Option. This is only applicable to the recv_opt() method, but I thought it would be more consistent for everything to return Result rather than one method returning an Option. Despite my two reasons to feel uneasy, I feel much better about the consistency in return values at this point, and I think the only real open question is if there's a better suffix for {send,recv}_opt. Closes #11527 --- src/libgreen/sched.rs | 7 +- src/libgreen/task.rs | 2 +- src/libnative/io/timer_other.rs | 9 +- src/libnative/io/timer_timerfd.rs | 9 +- src/libnative/io/timer_win32.rs | 9 +- src/libnative/task.rs | 2 +- src/librustuv/net.rs | 2 +- src/librustuv/signal.rs | 2 +- src/librustuv/timer.rs | 12 +- src/libstd/comm/mod.rs | 239 +++++++++--------- src/libstd/comm/oneshot.rs | 14 +- src/libstd/comm/select.rs | 18 +- src/libstd/comm/shared.rs | 8 +- src/libstd/comm/stream.rs | 24 +- src/libstd/comm/sync.rs | 18 +- src/libstd/io/comm_adapters.rs | 12 +- src/libstd/io/net/udp.rs | 4 +- src/libstd/io/signal.rs | 3 +- src/libstd/io/timer.rs | 22 +- src/libstd/rt/task.rs | 2 +- src/libsync/comm.rs | 8 +- src/libsync/lock.rs | 2 +- src/libsync/raw.rs | 11 +- src/test/bench/msgsend-pipes-shared.rs | 6 +- src/test/bench/msgsend-pipes.rs | 6 +- .../bench/task-perf-jargon-metal-smoke.rs | 2 +- src/test/run-pass/issue-9396.rs | 6 +- 27 files changed, 232 insertions(+), 227 deletions(-) diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 9971dfee82815..e214797d4f84b 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -1011,7 +1011,6 @@ fn new_sched_rng() -> XorShiftRng { mod test { use rustuv; - use std::comm; use std::task::TaskOpts; use std::rt::task::Task; use std::rt::local::Local; @@ -1428,7 +1427,7 @@ mod test { // This task should not be able to starve the sender; // The sender should get stolen to another thread. spawn(proc() { - while rx.try_recv() != comm::Data(()) { } + while rx.try_recv().is_err() { } }); tx.send(()); @@ -1445,7 +1444,7 @@ mod test { // This task should not be able to starve the other task. // The sends should eventually yield. spawn(proc() { - while rx1.try_recv() != comm::Data(()) { + while rx1.try_recv().is_err() { tx2.send(()); } }); @@ -1499,7 +1498,7 @@ mod test { let mut val = 20; while val > 0 { val = po.recv(); - ch.try_send(val - 1); + let _ = ch.send_opt(val - 1); } } diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 6fa40c0e42b64..534e9f8401e9d 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -515,7 +515,7 @@ mod tests { let _tx = tx; fail!() }); - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); } #[test] diff --git a/src/libnative/io/timer_other.rs b/src/libnative/io/timer_other.rs index 569b4cbb258e0..0bf97d58ffdff 100644 --- a/src/libnative/io/timer_other.rs +++ b/src/libnative/io/timer_other.rs @@ -46,7 +46,6 @@ //! //! Note that all time units in this file are in *milliseconds*. -use std::comm::Data; use libc; use std::mem; use std::os; @@ -119,7 +118,7 @@ fn helper(input: libc::c_int, messages: Receiver) { Some(timer) => timer, None => return }; let tx = timer.tx.take_unwrap(); - if tx.try_send(()) && timer.repeat { + if tx.send_opt(()).is_ok() && timer.repeat { timer.tx = Some(tx); timer.target += timer.interval; insert(timer, active); @@ -162,14 +161,14 @@ fn helper(input: libc::c_int, messages: Receiver) { 1 => { loop { match messages.try_recv() { - Data(Shutdown) => { + Ok(Shutdown) => { assert!(active.len() == 0); break 'outer; } - Data(NewTimer(timer)) => insert(timer, &mut active), + Ok(NewTimer(timer)) => insert(timer, &mut active), - Data(RemoveTimer(id, ack)) => { + Ok(RemoveTimer(id, ack)) => { match dead.iter().position(|&(i, _)| id == i) { Some(i) => { let (_, i) = dead.remove(i).unwrap(); diff --git a/src/libnative/io/timer_timerfd.rs b/src/libnative/io/timer_timerfd.rs index d37a39fc30e8d..3fd61dc1da5d0 100644 --- a/src/libnative/io/timer_timerfd.rs +++ b/src/libnative/io/timer_timerfd.rs @@ -28,7 +28,6 @@ //! //! As with timer_other, all units in this file are in units of millseconds. -use std::comm::Data; use libc; use std::ptr; use std::os; @@ -107,7 +106,7 @@ fn helper(input: libc::c_int, messages: Receiver) { match list.as_slice().bsearch(|&(f, _, _)| f.cmp(&fd)) { Some(i) => { let (_, ref c, oneshot) = *list.get(i); - (!c.try_send(()) || oneshot, i) + (c.send_opt(()).is_err() || oneshot, i) } None => fail!("fd not active: {}", fd), } @@ -121,7 +120,7 @@ fn helper(input: libc::c_int, messages: Receiver) { while incoming { match messages.try_recv() { - Data(NewTimer(fd, chan, one, timeval)) => { + Ok(NewTimer(fd, chan, one, timeval)) => { // acknowledge we have the new channel, we will never send // another message to the old channel chan.send(()); @@ -149,7 +148,7 @@ fn helper(input: libc::c_int, messages: Receiver) { assert_eq!(ret, 0); } - Data(RemoveTimer(fd, chan)) => { + Ok(RemoveTimer(fd, chan)) => { match list.as_slice().bsearch(|&(f, _, _)| f.cmp(&fd)) { Some(i) => { drop(list.remove(i)); @@ -160,7 +159,7 @@ fn helper(input: libc::c_int, messages: Receiver) { chan.send(()); } - Data(Shutdown) => { + Ok(Shutdown) => { assert!(list.len() == 0); break 'outer; } diff --git a/src/libnative/io/timer_win32.rs b/src/libnative/io/timer_win32.rs index 8b7592783da04..a15898feb92b7 100644 --- a/src/libnative/io/timer_win32.rs +++ b/src/libnative/io/timer_win32.rs @@ -20,7 +20,6 @@ //! Other than that, the implementation is pretty straightforward in terms of //! the other two implementations of timers with nothing *that* new showing up. -use std::comm::Data; use libc; use std::ptr; use std::rt::rtio; @@ -54,11 +53,11 @@ fn helper(input: libc::HANDLE, messages: Receiver) { if idx == 0 { loop { match messages.try_recv() { - Data(NewTimer(obj, c, one)) => { + Ok(NewTimer(obj, c, one)) => { objs.push(obj); chans.push((c, one)); } - Data(RemoveTimer(obj, c)) => { + Ok(RemoveTimer(obj, c)) => { c.send(()); match objs.iter().position(|&o| o == obj) { Some(i) => { @@ -68,7 +67,7 @@ fn helper(input: libc::HANDLE, messages: Receiver) { None => {} } } - Data(Shutdown) => { + Ok(Shutdown) => { assert_eq!(objs.len(), 1); assert_eq!(chans.len(), 0); break 'outer; @@ -79,7 +78,7 @@ fn helper(input: libc::HANDLE, messages: Receiver) { } else { let remove = { match chans.get(idx as uint - 1) { - &(ref c, oneshot) => !c.try_send(()) || oneshot + &(ref c, oneshot) => c.send_opt(()).is_err() || oneshot } }; if remove { diff --git a/src/libnative/task.rs b/src/libnative/task.rs index 871fc94bde46a..ddfd46ecad9b5 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -274,7 +274,7 @@ mod tests { let _tx = tx; fail!() }); - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); } #[test] diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 4d4b62dddd4a0..b893f5f693fa7 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -1065,7 +1065,7 @@ mod test { } reads += 1; - tx2.try_send(()); + let _ = tx2.send_opt(()); } // Make sure we had multiple reads diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index c38b4fdd96faf..2dcf2de681c39 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -51,7 +51,7 @@ impl SignalWatcher { extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; assert_eq!(signum as int, s.signal as int); - s.channel.try_send(s.signal); + let _ = s.channel.send_opt(s.signal); } impl HomingIO for SignalWatcher { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 3d323382ad536..58008002837d1 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -140,9 +140,9 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { let task = timer.blocker.take_unwrap(); let _ = task.wake().map(|t| t.reawaken()); } - SendOnce(chan) => { let _ = chan.try_send(()); } + SendOnce(chan) => { let _ = chan.send_opt(()); } SendMany(chan, id) => { - let _ = chan.try_send(()); + let _ = chan.send_opt(()); // Note that the above operation could have performed some form of // scheduling. This means that the timer may have decided to insert @@ -196,8 +196,8 @@ mod test { let oport = timer.oneshot(1); let pport = timer.period(1); timer.sleep(1); - assert_eq!(oport.recv_opt(), None); - assert_eq!(pport.recv_opt(), None); + assert_eq!(oport.recv_opt(), Err(())); + assert_eq!(pport.recv_opt(), Err(())); timer.oneshot(1).recv(); } @@ -284,7 +284,7 @@ mod test { let mut timer = TimerWatcher::new(local_loop()); timer.oneshot(1000) }; - assert_eq!(port.recv_opt(), None); + assert_eq!(port.recv_opt(), Err(())); } #[test] @@ -293,7 +293,7 @@ mod test { let mut timer = TimerWatcher::new(local_loop()); timer.period(1000) }; - assert_eq!(port.recv_opt(), None); + assert_eq!(port.recv_opt(), Err(())); } #[test] diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index f210bfc88bc35..58781c01d662e 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -322,25 +322,19 @@ pub struct SyncSender { /// This enumeration is the list of the possible reasons that try_recv could not /// return data when called. #[deriving(Eq, Clone, Show)] -pub enum TryRecvResult { +pub enum TryRecvError { /// This channel is currently empty, but the sender(s) have not yet /// disconnected, so data may yet become available. Empty, /// This channel's sending half has become disconnected, and there will /// never be any more data received on this channel Disconnected, - /// The channel had some data and we successfully popped it - Data(T), } -/// This enumeration is the list of the possible outcomes for the +/// This enumeration is the list of the possible error outcomes for the /// `SyncSender::try_send` method. #[deriving(Eq, Clone, Show)] -pub enum TrySendResult { - /// The data was successfully sent along the channel. This either means that - /// it was buffered in the channel, or handed off to a receiver. In either - /// case, the callee no longer has ownership of the data. - Sent, +pub enum TrySendError { /// The data could not be sent on the channel because it would require that /// the callee block to send the data. /// @@ -365,7 +359,7 @@ enum Flavor { /// of `Receiver` and `Sender` to see what's possible with them. pub fn channel() -> (Sender, Receiver) { let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a))) + (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) } /// Creates a new synchronous, bounded channel. @@ -401,7 +395,7 @@ pub fn channel() -> (Sender, Receiver) { /// ``` pub fn sync_channel(bound: uint) -> (SyncSender, Receiver) { let (a, b) = UnsafeArc::new2(sync::Packet::new(bound)); - (SyncSender::new(a), Receiver::my_new(Sync(b))) + (SyncSender::new(a), Receiver::new(Sync(b))) } //////////////////////////////////////////////////////////////////////////////// @@ -409,7 +403,7 @@ pub fn sync_channel(bound: uint) -> (SyncSender, Receiver) { //////////////////////////////////////////////////////////////////////////////// impl Sender { - fn my_new(inner: Flavor) -> Sender { + fn new(inner: Flavor) -> Sender { Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare } } @@ -433,25 +427,42 @@ impl Sender { /// The purpose of this functionality is to propagate failure among tasks. /// If failure is not desired, then consider using the `try_send` method pub fn send(&self, t: T) { - if !self.try_send(t) { + if self.send_opt(t).is_err() { fail!("sending on a closed channel"); } } - /// Attempts to send a value on this channel, returning whether it was - /// successfully sent. + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. /// /// A successful send occurs when it is determined that the other end of /// the channel has not hung up already. An unsuccessful send would be one /// where the corresponding receiver has already been deallocated. Note - /// that a return value of `false` means that the data will never be - /// received, but a return value of `true` does *not* mean that the data + /// that a return value of `Err` means that the data will never be + /// received, but a return value of `Ok` does *not* mean that the data /// will be received. It is possible for the corresponding receiver to - /// hang up immediately after this function returns `true`. + /// hang up immediately after this function returns `Ok`. /// - /// Like `send`, this method will never block. If the failure of send cannot - /// be tolerated, then this method should be used instead. - pub fn try_send(&self, t: T) -> bool { + /// Like `send`, this method will never block. + /// + /// # Failure + /// + /// This method will never fail, it will return the message back to the + /// caller if the other end is disconnected + /// + /// # Example + /// + /// ``` + /// let (tx, rx) = channel(); + /// + /// // This send is always successful + /// assert_eq!(tx.send_opt(1), Ok(())); + /// + /// // This send will fail because the receiver is gone + /// drop(rx); + /// assert_eq!(tx.send_opt(1), Err(1)); + /// ``` + pub fn send_opt(&self, t: T) -> Result<(), T> { // In order to prevent starvation of other tasks in situations where // a task sends repeatedly without ever receiving, we occassionally // yield instead of doing a send immediately. @@ -475,16 +486,19 @@ impl Sender { return (*p).send(t); } else { let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Receiver::my_new(Stream(b))) { + match (*p).upgrade(Receiver::new(Stream(b))) { oneshot::UpSuccess => { - (*a.get()).send(t); - (a, true) + let ret = (*a.get()).send(t); + (a, ret) } - oneshot::UpDisconnected => (a, false), + oneshot::UpDisconnected => (a, Err(t)), oneshot::UpWoke(task) => { - (*a.get()).send(t); + // This send cannot fail because the task is + // asleep (we're looking at it), so the receiver + // can't go away. + (*a.get()).send(t).unwrap(); task.wake().map(|t| t.reawaken()); - (a, true) + (a, Ok(())) } } } @@ -496,7 +510,7 @@ impl Sender { }; unsafe { - let mut tmp = Sender::my_new(Stream(new_inner)); + let mut tmp = Sender::new(Stream(new_inner)); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } return ret; @@ -508,21 +522,21 @@ impl Clone for Sender { let (packet, sleeper) = match self.inner { Oneshot(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), oneshot::UpWoke(task) => (b, Some(task)) } } Stream(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { stream::UpSuccess | stream::UpDisconnected => (b, None), stream::UpWoke(task) => (b, Some(task)), } } Shared(ref p) => { unsafe { (*p.get()).clone_chan(); } - return Sender::my_new(Shared(p.clone())); + return Sender::new(Shared(p.clone())); } Sync(..) => unreachable!(), }; @@ -530,10 +544,10 @@ impl Clone for Sender { unsafe { (*packet.get()).inherit_blocker(sleeper); - let mut tmp = Sender::my_new(Shared(packet.clone())); + let mut tmp = Sender::new(Shared(packet.clone())); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } - Sender::my_new(Shared(packet)) + Sender::new(Shared(packet)) } } @@ -579,7 +593,7 @@ impl SyncSender { /// `SyncSender::send_opt` method which will not fail if the receiver /// disconnects. pub fn send(&self, t: T) { - if self.send_opt(t).is_some() { + if self.send_opt(t).is_err() { fail!("sending on a closed channel"); } } @@ -595,11 +609,8 @@ impl SyncSender { /// # Failure /// /// This function cannot fail. - pub fn send_opt(&self, t: T) -> Option { - match unsafe { (*self.inner.get()).send(t) } { - Ok(()) => None, - Err(t) => Some(t), - } + pub fn send_opt(&self, t: T) -> Result<(), T> { + unsafe { (*self.inner.get()).send(t) } } /// Attempts to send a value on this channel without blocking. @@ -615,7 +626,7 @@ impl SyncSender { /// # Failure /// /// This function cannot fail - pub fn try_send(&self, t: T) -> TrySendResult { + pub fn try_send(&self, t: T) -> Result<(), TrySendError> { unsafe { (*self.inner.get()).try_send(t) } } } @@ -639,7 +650,7 @@ impl Drop for SyncSender { //////////////////////////////////////////////////////////////////////////////// impl Receiver { - fn my_new(inner: Flavor) -> Receiver { + fn new(inner: Flavor) -> Receiver { Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare } } @@ -664,8 +675,8 @@ impl Receiver { /// peek at a value on this receiver. pub fn recv(&self) -> T { match self.recv_opt() { - Some(t) => t, - None => fail!("receiving on a closed channel"), + Ok(t) => t, + Err(()) => fail!("receiving on a closed channel"), } } @@ -679,7 +690,7 @@ impl Receiver { /// block on a receiver. /// /// This function cannot fail. - pub fn try_recv(&self) -> TryRecvResult { + pub fn try_recv(&self) -> Result { // If a thread is spinning in try_recv, we should take the opportunity // to reschedule things occasionally. See notes above in scheduling on // sends for why this doesn't always hit TLS, and also for why this uses @@ -695,32 +706,32 @@ impl Receiver { let mut new_port = match self.inner { Oneshot(ref p) => { match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Data(t), - Err(oneshot::Empty) => return Empty, - Err(oneshot::Disconnected) => return Disconnected, + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return Err(Empty), + Err(oneshot::Disconnected) => return Err(Disconnected), Err(oneshot::Upgraded(rx)) => rx, } } Stream(ref p) => { match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Data(t), - Err(stream::Empty) => return Empty, - Err(stream::Disconnected) => return Disconnected, + Ok(t) => return Ok(t), + Err(stream::Empty) => return Err(Empty), + Err(stream::Disconnected) => return Err(Disconnected), Err(stream::Upgraded(rx)) => rx, } } Shared(ref p) => { match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Data(t), - Err(shared::Empty) => return Empty, - Err(shared::Disconnected) => return Disconnected, + Ok(t) => return Ok(t), + Err(shared::Empty) => return Err(Empty), + Err(shared::Disconnected) => return Err(Disconnected), } } Sync(ref p) => { match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Data(t), - Err(sync::Empty) => return Empty, - Err(sync::Disconnected) => return Disconnected, + Ok(t) => return Ok(t), + Err(sync::Empty) => return Err(Empty), + Err(sync::Disconnected) => return Err(Disconnected), } } }; @@ -741,32 +752,32 @@ impl Receiver { /// In other words, this function has the same semantics as the `recv` /// method except for the failure aspect. /// - /// If the channel has hung up, then `None` is returned. Otherwise `Some` of + /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of /// the value found on the receiver is returned. - pub fn recv_opt(&self) -> Option { + pub fn recv_opt(&self) -> Result { loop { let mut new_port = match self.inner { Oneshot(ref p) => { match unsafe { (*p.get()).recv() } { - Ok(t) => return Some(t), + Ok(t) => return Ok(t), Err(oneshot::Empty) => return unreachable!(), - Err(oneshot::Disconnected) => return None, + Err(oneshot::Disconnected) => return Err(()), Err(oneshot::Upgraded(rx)) => rx, } } Stream(ref p) => { match unsafe { (*p.get()).recv() } { - Ok(t) => return Some(t), + Ok(t) => return Ok(t), Err(stream::Empty) => return unreachable!(), - Err(stream::Disconnected) => return None, + Err(stream::Disconnected) => return Err(()), Err(stream::Upgraded(rx)) => rx, } } Shared(ref p) => { match unsafe { (*p.get()).recv() } { - Ok(t) => return Some(t), + Ok(t) => return Ok(t), Err(shared::Empty) => return unreachable!(), - Err(shared::Disconnected) => return None, + Err(shared::Disconnected) => return Err(()), } } Sync(ref p) => return unsafe { (*p.get()).recv() } @@ -873,7 +884,7 @@ impl select::Packet for Receiver { } impl<'a, T: Send> Iterator for Messages<'a, T> { - fn next(&mut self) -> Option { self.rx.recv_opt() } + fn next(&mut self) -> Option { self.rx.recv_opt().ok() } } #[unsafe_destructor] @@ -1022,7 +1033,7 @@ mod test { assert_eq!(rx.recv(), 1); } match rx.try_recv() { - Data(..) => fail!(), + Ok(..) => fail!(), _ => {} } dtx.send(()); @@ -1136,45 +1147,45 @@ mod test { test!(fn oneshot_single_thread_try_send_open() { let (tx, rx) = channel::(); - assert!(tx.try_send(10)); + assert!(tx.send_opt(10).is_ok()); assert!(rx.recv() == 10); }) test!(fn oneshot_single_thread_try_send_closed() { let (tx, rx) = channel::(); drop(rx); - assert!(!tx.try_send(10)); + assert!(tx.send_opt(10).is_err()); }) test!(fn oneshot_single_thread_try_recv_open() { let (tx, rx) = channel::(); tx.send(10); - assert!(rx.recv_opt() == Some(10)); + assert!(rx.recv_opt() == Ok(10)); }) test!(fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = channel::(); drop(tx); - assert!(rx.recv_opt() == None); + assert!(rx.recv_opt() == Err(())); }) test!(fn oneshot_single_thread_peek_data() { let (tx, rx) = channel::(); - assert_eq!(rx.try_recv(), Empty) + assert_eq!(rx.try_recv(), Err(Empty)) tx.send(10); - assert_eq!(rx.try_recv(), Data(10)); + assert_eq!(rx.try_recv(), Ok(10)); }) test!(fn oneshot_single_thread_peek_close() { let (tx, rx) = channel::(); drop(tx); - assert_eq!(rx.try_recv(), Disconnected); - assert_eq!(rx.try_recv(), Disconnected); + assert_eq!(rx.try_recv(), Err(Disconnected)); + assert_eq!(rx.try_recv(), Err(Disconnected)); }) test!(fn oneshot_single_thread_peek_open() { let (_tx, rx) = channel::(); - assert_eq!(rx.try_recv(), Empty); + assert_eq!(rx.try_recv(), Err(Empty)); }) test!(fn oneshot_multi_task_recv_then_send() { @@ -1335,7 +1346,7 @@ mod test { tx.send(2); tx.send(2); tx.send(2); - tx.try_send(2); + let _ = tx.send_opt(2); drop(tx); assert_eq!(count_rx.recv(), 4); }) @@ -1353,14 +1364,14 @@ mod test { tx3.send(()); }); - assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Err(Empty)); tx2.send(()); rx3.recv(); - assert_eq!(rx1.try_recv(), Data(1)); - assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(Empty)); tx2.send(()); rx3.recv(); - assert_eq!(rx1.try_recv(), Disconnected); + assert_eq!(rx1.try_recv(), Err(Disconnected)); }) // This bug used to end up in a livelock inside of the Receiver destructor @@ -1409,9 +1420,9 @@ mod test { let mut hits = 0; while hits < 10 { match rx.try_recv() { - Data(()) => { hits += 1; } - Empty => { Thread::yield_now(); } - Disconnected => return, + Ok(()) => { hits += 1; } + Err(Empty) => { Thread::yield_now(); } + Err(Disconnected) => return, } } cdone.send(()); @@ -1542,7 +1553,7 @@ mod sync_tests { assert_eq!(rx.recv(), 1); } match rx.try_recv() { - Data(..) => fail!(), + Ok(..) => fail!(), _ => {} } dtx.send(()); @@ -1596,50 +1607,50 @@ mod sync_tests { test!(fn oneshot_single_thread_try_send_open() { let (tx, rx) = sync_channel::(1); - assert_eq!(tx.try_send(10), Sent); + assert_eq!(tx.try_send(10), Ok(())); assert!(rx.recv() == 10); }) test!(fn oneshot_single_thread_try_send_closed() { let (tx, rx) = sync_channel::(0); drop(rx); - assert_eq!(tx.try_send(10), RecvDisconnected(10)); + assert_eq!(tx.try_send(10), Err(RecvDisconnected(10))); }) test!(fn oneshot_single_thread_try_send_closed2() { let (tx, _rx) = sync_channel::(0); - assert_eq!(tx.try_send(10), Full(10)); + assert_eq!(tx.try_send(10), Err(Full(10))); }) test!(fn oneshot_single_thread_try_recv_open() { let (tx, rx) = sync_channel::(1); tx.send(10); - assert!(rx.recv_opt() == Some(10)); + assert!(rx.recv_opt() == Ok(10)); }) test!(fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = sync_channel::(0); drop(tx); - assert!(rx.recv_opt() == None); + assert!(rx.recv_opt() == Err(())); }) test!(fn oneshot_single_thread_peek_data() { let (tx, rx) = sync_channel::(1); - assert_eq!(rx.try_recv(), Empty) + assert_eq!(rx.try_recv(), Err(Empty)) tx.send(10); - assert_eq!(rx.try_recv(), Data(10)); + assert_eq!(rx.try_recv(), Ok(10)); }) test!(fn oneshot_single_thread_peek_close() { let (tx, rx) = sync_channel::(0); drop(tx); - assert_eq!(rx.try_recv(), Disconnected); - assert_eq!(rx.try_recv(), Disconnected); + assert_eq!(rx.try_recv(), Err(Disconnected)); + assert_eq!(rx.try_recv(), Err(Disconnected)); }) test!(fn oneshot_single_thread_peek_open() { let (_tx, rx) = sync_channel::(0); - assert_eq!(rx.try_recv(), Empty); + assert_eq!(rx.try_recv(), Err(Empty)); }) test!(fn oneshot_multi_task_recv_then_send() { @@ -1800,7 +1811,7 @@ mod sync_tests { tx.send(2); tx.send(2); tx.send(2); - tx.try_send(2); + let _ = tx.try_send(2); drop(tx); assert_eq!(count_rx.recv(), 4); }) @@ -1818,14 +1829,14 @@ mod sync_tests { tx3.send(()); }); - assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Err(Empty)); tx2.send(()); rx3.recv(); - assert_eq!(rx1.try_recv(), Data(1)); - assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(Empty)); tx2.send(()); rx3.recv(); - assert_eq!(rx1.try_recv(), Disconnected); + assert_eq!(rx1.try_recv(), Err(Disconnected)); }) // This bug used to end up in a livelock inside of the Receiver destructor @@ -1859,9 +1870,9 @@ mod sync_tests { let mut hits = 0; while hits < 10 { match rx.try_recv() { - Data(()) => { hits += 1; } - Empty => { Thread::yield_now(); } - Disconnected => return, + Ok(()) => { hits += 1; } + Err(Empty) => { Thread::yield_now(); } + Err(Disconnected) => return, } } cdone.send(()); @@ -1876,20 +1887,20 @@ mod sync_tests { test!(fn send_opt1() { let (tx, rx) = sync_channel(0); spawn(proc() { rx.recv(); }); - assert_eq!(tx.send_opt(1), None); + assert_eq!(tx.send_opt(1), Ok(())); }) test!(fn send_opt2() { let (tx, rx) = sync_channel(0); spawn(proc() { drop(rx); }); - assert_eq!(tx.send_opt(1), Some(1)); + assert_eq!(tx.send_opt(1), Err(1)); }) test!(fn send_opt3() { let (tx, rx) = sync_channel(1); - assert_eq!(tx.send_opt(1), None); + assert_eq!(tx.send_opt(1), Ok(())); spawn(proc() { drop(rx); }); - assert_eq!(tx.send_opt(1), Some(1)); + assert_eq!(tx.send_opt(1), Err(1)); }) test!(fn send_opt4() { @@ -1898,11 +1909,11 @@ mod sync_tests { let (done, donerx) = channel(); let done2 = done.clone(); spawn(proc() { - assert_eq!(tx.send_opt(1), Some(1)); + assert_eq!(tx.send_opt(1), Err(1)); done.send(()); }); spawn(proc() { - assert_eq!(tx2.send_opt(2), Some(2)); + assert_eq!(tx2.send_opt(2), Err(2)); done2.send(()); }); drop(rx); @@ -1912,27 +1923,27 @@ mod sync_tests { test!(fn try_send1() { let (tx, _rx) = sync_channel(0); - assert_eq!(tx.try_send(1), Full(1)); + assert_eq!(tx.try_send(1), Err(Full(1))); }) test!(fn try_send2() { let (tx, _rx) = sync_channel(1); - assert_eq!(tx.try_send(1), Sent); - assert_eq!(tx.try_send(1), Full(1)); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(Full(1))); }) test!(fn try_send3() { let (tx, rx) = sync_channel(1); - assert_eq!(tx.try_send(1), Sent); + assert_eq!(tx.try_send(1), Ok(())); drop(rx); - assert_eq!(tx.try_send(1), RecvDisconnected(1)); + assert_eq!(tx.try_send(1), Err(RecvDisconnected(1))); }) test!(fn try_send4() { let (tx, rx) = sync_channel(0); spawn(proc() { for _ in range(0, 1000) { task::deschedule(); } - assert_eq!(tx.try_send(1), Sent); + assert_eq!(tx.try_send(1), Ok(())); }); assert_eq!(rx.recv(), 1); } #[ignore(reason = "flaky on libnative")]) diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index 1bc7349a70d0b..e92b5cb272a80 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -90,7 +90,7 @@ impl Packet { } } - pub fn send(&mut self, t: T) -> bool { + pub fn send(&mut self, t: T) -> Result<(), T> { // Sanity check match self.upgrade { NothingSent => {} @@ -102,14 +102,12 @@ impl Packet { match self.state.swap(DATA, atomics::SeqCst) { // Sent the data, no one was waiting - EMPTY => true, + EMPTY => Ok(()), - // Couldn't send the data, the port hung up first. We need to be - // sure to deallocate the sent data (to not leave it stuck in the - // queue) + // Couldn't send the data, the port hung up first. Return the data + // back up the stack. DISCONNECTED => { - self.data.take_unwrap(); - false + Err(self.data.take_unwrap()) } // Not possible, these are one-use channels @@ -121,7 +119,7 @@ impl Packet { n => unsafe { let t = BlockedTask::cast_from_uint(n); t.wake().map(|t| t.reawaken()); - true + Ok(()) } } } diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 84191ed6b28c5..c286fd8484934 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -236,7 +236,7 @@ impl<'rx, T: Send> Handle<'rx, T> { /// Block to receive a value on the underlying receiver, returning `Some` on /// success or `None` if the channel disconnects. This function has the same /// semantics as `Receiver.recv_opt` - pub fn recv_opt(&mut self) -> Option { self.rx.recv_opt() } + pub fn recv_opt(&mut self) -> Result { self.rx.recv_opt() } /// Adds this handle to the receiver set that the handle was created from. This /// method can be called multiple times, but it has no effect if `add` was @@ -338,12 +338,12 @@ mod test { ) drop(tx1); select! ( - foo = rx1.recv_opt() => { assert_eq!(foo, None); }, + foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); }, _bar = rx2.recv() => { fail!() } ) drop(tx2); select! ( - bar = rx2.recv_opt() => { assert_eq!(bar, None); } + bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); } ) }) @@ -370,7 +370,7 @@ mod test { select! ( _a1 = rx1.recv_opt() => { fail!() }, - a2 = rx2.recv_opt() => { assert_eq!(a2, None); } + a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); } ) }) @@ -392,7 +392,7 @@ mod test { ) tx3.send(1); select! ( - a = rx1.recv_opt() => { assert_eq!(a, None); }, + a = rx1.recv_opt() => { assert_eq!(a, Err(())); }, _b = rx2.recv() => { fail!() } ) }) @@ -417,8 +417,8 @@ mod test { a = rx1.recv() => { assert_eq!(a, 1); }, a = rx2.recv() => { assert_eq!(a, 2); } ) - assert_eq!(rx1.try_recv(), Empty); - assert_eq!(rx2.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Err(Empty)); + assert_eq!(rx2.try_recv(), Err(Empty)); tx3.send(()); }) @@ -456,7 +456,7 @@ mod test { spawn(proc() { rx3.recv(); tx1.clone(); - assert_eq!(rx3.try_recv(), Empty); + assert_eq!(rx3.try_recv(), Err(Empty)); tx1.send(2); rx3.recv(); }); @@ -477,7 +477,7 @@ mod test { spawn(proc() { rx3.recv(); tx1.clone(); - assert_eq!(rx3.try_recv(), Empty); + assert_eq!(rx3.try_recv(), Err(Empty)); tx1.send(2); rx3.recv(); }); diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index e8ba9d6e62809..525786f5d1e76 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -131,9 +131,9 @@ impl Packet { unsafe { self.select_lock.unlock_noguard() } } - pub fn send(&mut self, t: T) -> bool { + pub fn send(&mut self, t: T) -> Result<(), T> { // See Port::drop for what's going on - if self.port_dropped.load(atomics::SeqCst) { return false } + if self.port_dropped.load(atomics::SeqCst) { return Err(t) } // Note that the multiple sender case is a little tricker // semantically than the single sender case. The logic for @@ -161,7 +161,7 @@ impl Packet { // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE { - return false + return Err(t) } self.queue.push(t); @@ -213,7 +213,7 @@ impl Packet { _ => {} } - true + Ok(()) } pub fn recv(&mut self) -> Result { diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index 5820b13a35f46..6c9280e0abc69 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -87,25 +87,27 @@ impl Packet { } - pub fn send(&mut self, t: T) -> bool { + pub fn send(&mut self, t: T) -> Result<(), T> { + // If the other port has deterministically gone away, then definitely + // must return the data back up the stack. Otherwise, the data is + // considered as being sent. + if self.port_dropped.load(atomics::SeqCst) { return Err(t) } + match self.do_send(Data(t)) { - UpSuccess => true, - UpDisconnected => false, - UpWoke(task) => { - task.wake().map(|t| t.reawaken()); - true - } + UpSuccess | UpDisconnected => {}, + UpWoke(task) => { task.wake().map(|t| t.reawaken()); } } + Ok(()) } pub fn upgrade(&mut self, up: Receiver) -> UpgradeResult { + // If the port has gone away, then there's no need to proceed any + // further. + if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected } + self.do_send(GoUp(up)) } fn do_send(&mut self, t: Message) -> UpgradeResult { - // Use an acquire/release ordering to maintain the same position with - // respect to the atomic loads below - if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected } - self.queue.push(t); match self.cnt.fetch_add(1, atomics::SeqCst) { // As described in the mod's doc comment, -1 == wakeup diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs index b3591dad274b2..6228c4c682b06 100644 --- a/src/libstd/comm/sync.rs +++ b/src/libstd/comm/sync.rs @@ -201,22 +201,22 @@ impl Packet { } } - pub fn try_send(&self, t: T) -> super::TrySendResult { + pub fn try_send(&self, t: T) -> Result<(), super::TrySendError> { let (guard, state) = self.lock(); if state.disconnected { - super::RecvDisconnected(t) + Err(super::RecvDisconnected(t)) } else if state.buf.size() == state.buf.cap() { - super::Full(t) + Err(super::Full(t)) } else if state.cap == 0 { // With capacity 0, even though we have buffer space we can't // transfer the data unless there's a receiver waiting. match mem::replace(&mut state.blocker, NoneBlocked) { - NoneBlocked => super::Full(t), + NoneBlocked => Err(super::Full(t)), BlockedSender(..) => unreachable!(), BlockedReceiver(task) => { state.buf.enqueue(t); wakeup(task, guard); - super::Sent + Ok(()) } } } else { @@ -224,7 +224,7 @@ impl Packet { // just enqueue the data for later retrieval. assert!(state.buf.size() < state.buf.cap()); state.buf.enqueue(t); - super::Sent + Ok(()) } } @@ -232,7 +232,7 @@ impl Packet { // // When reading this, remember that there can only ever be one receiver at // time. - pub fn recv(&self) -> Option { + pub fn recv(&self) -> Result { let (guard, state) = self.lock(); // Wait for the buffer to have something in it. No need for a while loop @@ -242,13 +242,13 @@ impl Packet { wait(&mut state.blocker, BlockedReceiver, &self.lock); waited = true; } - if state.disconnected && state.buf.size() == 0 { return None } + if state.disconnected && state.buf.size() == 0 { return Err(()) } // Pick up the data, wake up our neighbors, and carry on assert!(state.buf.size() > 0); let ret = state.buf.dequeue(); self.wakeup_senders(waited, guard, state); - return Some(ret); + return Ok(ret); } pub fn try_recv(&self) -> Result { diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index 06e020721358b..aa7371944daf7 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -73,7 +73,7 @@ impl Reader for ChanReader { break; } self.pos = 0; - self.buf = self.rx.recv_opt(); + self.buf = self.rx.recv_opt().ok(); self.closed = self.buf.is_none(); } if self.closed && num_read == 0 { @@ -116,15 +116,13 @@ impl Clone for ChanWriter { impl Writer for ChanWriter { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - if !self.tx.try_send(buf.to_owned()) { - Err(io::IoError { + self.tx.send_opt(buf.to_owned()).map_err(|_| { + io::IoError { kind: io::BrokenPipe, desc: "Pipe closed", detail: None - }) - } else { - Ok(()) - } + } + }) } } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 8dd59e859b877..cd2c81d284ad1 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -422,13 +422,13 @@ mod test { spawn(proc() { let mut sock3 = sock3; match sock3.sendto([1], addr2) { - Ok(..) => { let _ = tx2.try_send(()); } + Ok(..) => { let _ = tx2.send_opt(()); } Err(..) => {} } done.send(()); }); match sock1.sendto([2], addr2) { - Ok(..) => { let _ = tx.try_send(()); } + Ok(..) => { let _ = tx.send_opt(()); } Err(..) => {} } drop(tx); diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 00b2e4f2307d8..d3f3d888b87b9 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -149,6 +149,7 @@ impl Listener { #[cfg(test, unix)] mod test_unix { + use prelude::*; use libc; use comm::Empty; use io::timer; @@ -199,7 +200,7 @@ mod test_unix { s2.unregister(Interrupt); sigint(); timer::sleep(10); - assert_eq!(s2.rx.try_recv(), Empty); + assert_eq!(s2.rx.try_recv(), Err(Empty)); } } diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index 839fcab8f86a4..1ca36df968cce 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -137,7 +137,7 @@ mod test { let rx1 = timer.oneshot(10000); let rx = timer.oneshot(1); rx.recv(); - assert_eq!(rx1.recv_opt(), None); + assert_eq!(rx1.recv_opt(), Err(())); }) iotest!(fn test_io_timer_oneshot_then_sleep() { @@ -145,7 +145,7 @@ mod test { let rx = timer.oneshot(100000000000); timer.sleep(1); // this should inalidate rx - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); }) iotest!(fn test_io_timer_sleep_periodic() { @@ -170,11 +170,11 @@ mod test { let rx = timer.oneshot(1); rx.recv(); - assert!(rx.recv_opt().is_none()); + assert!(rx.recv_opt().is_err()); let rx = timer.oneshot(1); rx.recv(); - assert!(rx.recv_opt().is_none()); + assert!(rx.recv_opt().is_err()); }) iotest!(fn override() { @@ -182,8 +182,8 @@ mod test { let orx = timer.oneshot(100); let prx = timer.periodic(100); timer.sleep(1); - assert_eq!(orx.recv_opt(), None); - assert_eq!(prx.recv_opt(), None); + assert_eq!(orx.recv_opt(), Err(())); + assert_eq!(prx.recv_opt(), Err(())); timer.oneshot(1).recv(); }) @@ -226,7 +226,7 @@ mod test { let timer_rx = timer.periodic(1000); spawn(proc() { - timer_rx.recv_opt(); + let _ = timer_rx.recv_opt(); }); // when we drop the TimerWatcher we're going to destroy the channel, @@ -239,7 +239,7 @@ mod test { let timer_rx = timer.periodic(1000); spawn(proc() { - timer_rx.recv_opt(); + let _ = timer_rx.recv_opt(); }); timer.oneshot(1); @@ -251,7 +251,7 @@ mod test { let timer_rx = timer.periodic(1000); spawn(proc() { - timer_rx.recv_opt(); + let _ = timer_rx.recv_opt(); }); timer.sleep(1); @@ -262,7 +262,7 @@ mod test { let mut timer = Timer::new().unwrap(); timer.oneshot(1000) }; - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); }) iotest!(fn sender_goes_away_period() { @@ -270,7 +270,7 @@ mod test { let mut timer = Timer::new().unwrap(); timer.periodic(1000) }; - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); }) iotest!(fn receiver_goes_away_oneshot() { diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index bae20d3bb9b90..a112ed77f094f 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -385,7 +385,7 @@ impl Death { pub fn collect_failure(&mut self, result: TaskResult) { match self.on_exit.take() { Some(Execute(f)) => f(result), - Some(SendMessage(ch)) => { ch.try_send(result); } + Some(SendMessage(ch)) => { let _ = ch.send_opt(result); } None => {} } } diff --git a/src/libsync/comm.rs b/src/libsync/comm.rs index 9e01b16ee9ba9..13e075501d9f5 100644 --- a/src/libsync/comm.rs +++ b/src/libsync/comm.rs @@ -37,16 +37,16 @@ impl DuplexStream { pub fn send(&self, x: S) { self.tx.send(x) } - pub fn try_send(&self, x: S) -> bool { - self.tx.try_send(x) + pub fn send_opt(&self, x: S) -> Result<(), S> { + self.tx.send_opt(x) } pub fn recv(&self) -> R { self.rx.recv() } - pub fn try_recv(&self) -> comm::TryRecvResult { + pub fn try_recv(&self) -> Result { self.rx.try_recv() } - pub fn recv_opt(&self) -> Option { + pub fn recv_opt(&self) -> Result { self.rx.recv_opt() } } diff --git a/src/libsync/lock.rs b/src/libsync/lock.rs index b83bdf9df299e..911cd1d2eb1cc 100644 --- a/src/libsync/lock.rs +++ b/src/libsync/lock.rs @@ -800,7 +800,7 @@ mod tests { // At this point, all spawned tasks should be blocked, // so we shouldn't get anything from the port assert!(match rx.try_recv() { - Empty => true, + Err(Empty) => true, _ => false, }); diff --git a/src/libsync/raw.rs b/src/libsync/raw.rs index 9bb7a81a2ff00..eb90797395edf 100644 --- a/src/libsync/raw.rs +++ b/src/libsync/raw.rs @@ -16,7 +16,6 @@ //! containing data. use std::cast; -use std::comm; use std::kinds::marker; use std::mem::replace; use std::sync::atomics; @@ -46,10 +45,10 @@ impl WaitQueue { // Signals one live task from the queue. fn signal(&self) -> bool { match self.head.try_recv() { - comm::Data(ch) => { + Ok(ch) => { // Send a wakeup signal. If the waiter was killed, its port will // have closed. Keep trying until we get a live task. - if ch.try_send(()) { + if ch.send_opt(()).is_ok() { true } else { self.signal() @@ -63,8 +62,8 @@ impl WaitQueue { let mut count = 0; loop { match self.head.try_recv() { - comm::Data(ch) => { - if ch.try_send(()) { + Ok(ch) => { + if ch.send_opt(()).is_ok() { count += 1; } } @@ -76,7 +75,7 @@ impl WaitQueue { fn wait_end(&self) -> WaitEnd { let (signal_end, wait_end) = channel(); - assert!(self.tail.try_send(signal_end)); + self.tail.send(signal_end); wait_end } } diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index 629b4cbfeea2b..d2e08cfccf890 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -38,12 +38,12 @@ fn server(requests: &Receiver, responses: &Sender) { let mut done = false; while !done { match requests.recv_opt() { - Some(get_count) => { responses.send(count.clone()); } - Some(bytes(b)) => { + Ok(get_count) => { responses.send(count.clone()); } + Ok(bytes(b)) => { //println!("server: received {:?} bytes", b); count += b; } - None => { done = true; } + Err(..) => { done = true; } _ => { } } } diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 49d9c5d3a2e31..dc9b3561bb168 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -33,12 +33,12 @@ fn server(requests: &Receiver, responses: &Sender) { let mut done = false; while !done { match requests.recv_opt() { - Some(get_count) => { responses.send(count.clone()); } - Some(bytes(b)) => { + Ok(get_count) => { responses.send(count.clone()); } + Ok(bytes(b)) => { //println!("server: received {:?} bytes", b); count += b; } - None => { done = true; } + Err(..) => { done = true; } _ => { } } } diff --git a/src/test/bench/task-perf-jargon-metal-smoke.rs b/src/test/bench/task-perf-jargon-metal-smoke.rs index f5711d91447d9..a45f8c61be50f 100644 --- a/src/test/bench/task-perf-jargon-metal-smoke.rs +++ b/src/test/bench/task-perf-jargon-metal-smoke.rs @@ -50,7 +50,7 @@ fn main() { let (tx, rx) = channel(); child_generation(from_str::(*args.get(1)).unwrap(), tx); - if rx.recv_opt().is_none() { + if rx.recv_opt().is_err() { fail!("it happened when we slumbered"); } } diff --git a/src/test/run-pass/issue-9396.rs b/src/test/run-pass/issue-9396.rs index 2630057c988a0..9f08f1db41057 100644 --- a/src/test/run-pass/issue-9396.rs +++ b/src/test/run-pass/issue-9396.rs @@ -20,9 +20,9 @@ pub fn main() { }); loop { match rx.try_recv() { - comm::Data(()) => break, - comm::Empty => {} - comm::Disconnected => unreachable!() + Ok(()) => break, + Err(comm::Empty) => {} + Err(comm::Disconnected) => unreachable!() } } }