Skip to content

Allow more "error" values in try_recv() #11112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/libextra/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Higher level communication abstractions.

#[allow(missing_doc)];

use std::comm;

/// An extension of `pipes::stream` that allows both sending and receiving.
pub struct DuplexStream<T, U> {
priv chan: Chan<T>,
Expand All @@ -40,7 +42,7 @@ impl<T:Send,U:Send> DuplexStream<T, U> {
pub fn recv(&self) -> U {
self.port.recv()
}
pub fn try_recv(&self) -> Option<U> {
pub fn try_recv(&self) -> comm::TryRecvResult<U> {
self.port.try_recv()
}
pub fn recv_opt(&self) -> Option<U> {
Expand Down Expand Up @@ -77,11 +79,11 @@ impl<T: Send> SyncPort<T> {
})
}

pub fn try_recv(&self) -> Option<T> {
self.duplex_stream.try_recv().map(|val| {
self.duplex_stream.try_send(());
val
})
pub fn try_recv(&self) -> comm::TryRecvResult<T> {
match self.duplex_stream.try_recv() {
comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
state => state,
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/libextra/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


use std::borrow;
use std::comm;
use std::unstable::sync::Exclusive;
use std::sync::arc::UnsafeArc;
use std::sync::atomics;
Expand Down Expand Up @@ -49,7 +50,7 @@ impl WaitQueue {
// Signals one live task from the queue.
fn signal(&self) -> bool {
match self.head.try_recv() {
Some(ch) => {
comm::Data(ch) => {
// Send a wakeup signal. If the waiter was killed, its port will
// have closed. Keep trying until we get a live task.
if ch.try_send_deferred(()) {
Expand All @@ -58,20 +59,20 @@ impl WaitQueue {
self.signal()
}
}
None => false
_ => false
}
}

fn broadcast(&self) -> uint {
let mut count = 0;
loop {
match self.head.try_recv() {
None => break,
Some(ch) => {
comm::Data(ch) => {
if ch.try_send_deferred(()) {
count += 1;
}
}
_ => break
}
}
count
Expand Down
5 changes: 3 additions & 2 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ fn new_sched_rng() -> XorShiftRng {

#[cfg(test)]
mod test {
use std::comm;
use std::task::TaskOpts;
use std::rt::Runtime;
use std::rt::task::Task;
Expand Down Expand Up @@ -1376,7 +1377,7 @@ mod test {
// This task should not be able to starve the sender;
// The sender should get stolen to another thread.
do spawn {
while port.try_recv().is_none() { }
while port.try_recv() != comm::Data(()) { }
}

chan.send(());
Expand All @@ -1393,7 +1394,7 @@ mod test {
// This task should not be able to starve the other task.
// The sends should eventually yield.
do spawn {
while port.try_recv().is_none() {
while port.try_recv() != comm::Data(()) {
chan2.send(());
}
}
Expand Down
104 changes: 88 additions & 16 deletions src/libstd/comm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ macro_rules! test (
#[allow(unused_imports)];

use native;
use comm::*;
use prelude::*;
use super::*;
use super::super::*;
Expand Down Expand Up @@ -323,6 +324,20 @@ pub struct SharedChan<T> {
priv queue: mpsc::Producer<T, Packet>,
}

/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
#[deriving(Eq, Clone)]
pub enum TryRecvResult<T> {
/// This channel is currently empty, but the sender(s) have not yet
/// disconnected, so data may yet become available.
Empty,
/// This channel's sending half has become disconnected, and there will
/// never be any more data received on this channel
Disconnected,
/// The channel had some data and we successfully popped it
Data(T),
}

///////////////////////////////////////////////////////////////////////////////
// Internal struct definitions
///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -739,11 +754,11 @@ impl<T: Send> Port<T> {
/// block on a port.
///
/// This function cannot fail.
pub fn try_recv(&self) -> Option<T> {
pub fn try_recv(&self) -> TryRecvResult<T> {
self.try_recv_inc(true)
}

fn try_recv_inc(&self, increment: bool) -> Option<T> {
fn try_recv_inc(&self, increment: bool) -> TryRecvResult<T> {
// This is a "best effort" situation, so if a queue is inconsistent just
// don't worry about it.
let this = unsafe { cast::transmute_mut(self) };
Expand Down Expand Up @@ -807,7 +822,35 @@ impl<T: Send> Port<T> {
if increment && ret.is_some() {
unsafe { (*this.queue.packet()).steals += 1; }
}
return ret;
match ret {
Some(t) => Data(t),
None => {
// It's possible that between the time that we saw the queue was
// empty and here the other side disconnected. It's also
// possible for us to see the disconnection here while there is
// data in the queue. It's pretty backwards-thinking to return
// Disconnected when there's actually data on the queue, so if
// we see a disconnected state be sure to check again to be 100%
// sure that there's no data in the queue.
let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) };
if cnt != DISCONNECTED { return Empty }

let ret = match this.queue {
SPSC(ref mut queue) => queue.pop(),
MPSC(ref mut queue) => match queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
mpsc::Inconsistent => {
fail!("inconsistent with no senders?!");
}
}
};
match ret {
Some(data) => Data(data),
None => Disconnected,
}
}
}
}

/// Attempt to wait for a value on this port, but does not fail if the
Expand All @@ -824,7 +867,11 @@ impl<T: Send> Port<T> {
/// the value found on the port is returned.
pub fn recv_opt(&self) -> Option<T> {
// optimistic preflight check (scheduling is expensive)
match self.try_recv() { None => {}, data => return data }
match self.try_recv() {
Empty => {},
Disconnected => return None,
Data(t) => return Some(t),
}

let packet;
let this;
Expand All @@ -843,12 +890,11 @@ impl<T: Send> Port<T> {
});
}

let data = self.try_recv_inc(false);
if data.is_none() &&
unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED {
fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) });
match self.try_recv_inc(false) {
Data(t) => Some(t),
Empty => fail!("bug: woke up too soon"),
Disconnected => None,
}
return data;
}

/// Returns an iterator which will block waiting for messages, but never
Expand Down Expand Up @@ -1005,7 +1051,10 @@ mod test {
for _ in range(0, AMT * NTHREADS) {
assert_eq!(p.recv(), 1);
}
assert_eq!(p.try_recv(), None);
match p.try_recv() {
Data(..) => fail!(),
_ => {}
}
c1.send(());
}

Expand Down Expand Up @@ -1129,7 +1178,7 @@ mod test {
test!(fn oneshot_single_thread_try_recv_open() {
let (port, chan) = Chan::<int>::new();
chan.send(10);
assert!(port.try_recv() == Some(10));
assert!(port.recv_opt() == Some(10));
})

test!(fn oneshot_single_thread_try_recv_closed() {
Expand All @@ -1140,21 +1189,21 @@ mod test {

test!(fn oneshot_single_thread_peek_data() {
let (port, chan) = Chan::<int>::new();
assert!(port.try_recv().is_none());
assert_eq!(port.try_recv(), Empty)
chan.send(10);
assert!(port.try_recv().is_some());
assert_eq!(port.try_recv(), Data(10));
})

test!(fn oneshot_single_thread_peek_close() {
let (port, chan) = Chan::<int>::new();
{ let _c = chan; }
assert!(port.try_recv().is_none());
assert!(port.try_recv().is_none());
assert_eq!(port.try_recv(), Disconnected);
assert_eq!(port.try_recv(), Disconnected);
})

test!(fn oneshot_single_thread_peek_open() {
let (port, _) = Chan::<int>::new();
assert!(port.try_recv().is_none());
assert_eq!(port.try_recv(), Empty);
})

test!(fn oneshot_multi_task_recv_then_send() {
Expand Down Expand Up @@ -1321,4 +1370,27 @@ mod test {
drop(chan);
assert_eq!(count_port.recv(), 4);
})

test!(fn try_recv_states() {
let (p, c) = Chan::<int>::new();
let (p1, c1) = Chan::<()>::new();
let (p2, c2) = Chan::<()>::new();
do spawn {
p1.recv();
c.send(1);
c2.send(());
p1.recv();
drop(c);
c2.send(());
}

assert_eq!(p.try_recv(), Empty);
c1.send(());
p2.recv();
assert_eq!(p.try_recv(), Data(1));
assert_eq!(p.try_recv(), Empty);
c1.send(());
p2.recv();
assert_eq!(p.try_recv(), Disconnected);
})
}
9 changes: 6 additions & 3 deletions src/libstd/comm/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#[allow(dead_code)];

use cast;
use comm;
use iter::Iterator;
use kinds::Send;
use ops::Drop;
Expand Down Expand Up @@ -279,7 +280,9 @@ impl<'port, T: Send> Handle<'port, T> {
pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
/// Immediately attempt to receive a value on a port, this function will
/// never block. Has the same semantics as `Port.try_recv`.
pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() }
pub fn try_recv(&mut self) -> comm::TryRecvResult<T> {
self.port.try_recv()
}
}

#[unsafe_destructor]
Expand Down Expand Up @@ -409,8 +412,8 @@ mod test {
a = p1.recv() => { assert_eq!(a, 1); },
a = p2.recv() => { assert_eq!(a, 2); }
)
assert_eq!(p1.try_recv(), None);
assert_eq!(p2.try_recv(), None);
assert_eq!(p1.try_recv(), Empty);
assert_eq!(p2.try_recv(), Empty);
c3.send(());
})

Expand Down
3 changes: 2 additions & 1 deletion src/libstd/io/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl Listener {
#[cfg(test)]
mod test {
use libc;
use comm::Empty;
use io::timer;
use super::{Listener, Interrupt};

Expand Down Expand Up @@ -194,7 +195,7 @@ mod test {
s2.unregister(Interrupt);
sigint();
timer::sleep(10);
assert!(s2.port.try_recv().is_none());
assert_eq!(s2.port.try_recv(), Empty);
}

#[cfg(windows)]
Expand Down
5 changes: 2 additions & 3 deletions src/libstd/io/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,15 @@ mod test {
let port1 = timer.oneshot(10000);
let port = timer.oneshot(1);
port.recv();
assert_eq!(port1.try_recv(), None);
assert!(port1.recv_opt().is_none());
}

#[test]
fn test_io_timer_oneshot_then_sleep() {
let mut timer = Timer::new().unwrap();
let port = timer.oneshot(100000000000);
timer.sleep(1); // this should invalidate the port

assert_eq!(port.try_recv(), None);
assert!(port.recv_opt().is_none());
}

#[test]
Expand Down