Skip to content

Implement recv_timeout for std::comm *Port implementations #9194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/libstd/rt/io/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use comm;
use kinds::Send;
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::io::{io_error};
Expand Down Expand Up @@ -48,10 +50,65 @@ impl Timer {
}
}

trait TimedPort<T: Send> {

/**
* This implementation adds the required
* API for recv_timeout with an approximate
* but not safe behavior.
*
* Current implementations of this Trait for
* both PortOne and Port poll on the port every
* second to check for new messages. A correct
* implementation for recv_timeout should implement
* `SelectInner` and Select for UvTimer and re-write
* the sleep method around that.
*
* FIXME: (flaper87) #9195
*/
fn recv_timeout(self, msecs: u64) -> Option<T>;
}

impl<T: Send> TimedPort<T> for comm::PortOne<T> {

fn recv_timeout(self, msecs: u64) -> Option<T> {
let mut tout = msecs;
let mut timer = Timer::new().unwrap();

while tout > 0 {
if self.peek() { return Some(self.recv()); }
timer.sleep(1000);
tout -= 1000;
}

None
}
}


impl<T: Send> TimedPort<T> for comm::Port<T> {

fn recv_timeout(self, msecs: u64) -> Option<T> {
let mut tout = msecs;
let mut timer = Timer::new().unwrap();

while tout > 0 {
if self.peek() { return Some(self.recv()); }
timer.sleep(1000);
tout -= 1000;
}

None
}
}

#[cfg(test)]
mod test {
use super::*;
use rt::test::*;
use task;
use comm;

#[test]
fn test_io_timer_sleep_simple() {
do run_in_mt_newsched_task {
Expand All @@ -66,4 +123,32 @@ mod test {
sleep(1)
}
}

#[test]
fn test_recv_timeout() {
do run_in_newsched_task {
let (p, c) = comm::stream::<int>();
do task::spawn {
let mut t = Timer::new().unwrap();
t.sleep(1000);
c.send(1);
}

assert!(p.recv_timeout(2000).unwrap() == 1);
}
}

#[test]
fn test_recv_timeout_expire() {
do run_in_newsched_task {
let (p, c) = comm::stream::<int>();
do task::spawn {
let mut t = Timer::new().unwrap();
t.sleep(3000);
c.send(1);
}

assert!(p.recv_timeout(1000).is_none());
}
}
}