Skip to content

Commit 3d05e7f

Browse files
committed
auto merge of #13688 : alexcrichton/rust/accept-timeout, r=brson
This adds experimental support for timeouts when accepting sockets through `TcpAcceptor::accept`. This does not add a separate `accept_timeout` function, but rather it adds a `set_timeout` function instead. This second function is intended to be used as a hard deadline after which all accepts will never block and fail immediately. This idea was derived from Go's SetDeadline() methods. We do not currently have a robust time abstraction in the standard library, so I opted to have the argument be a relative time in millseconds into the future. I believe a more appropriate argument type is an absolute time, but this concept does not exist yet (this is also why the function is marked #[experimental]). The native support is built on select(), similarly to connect_timeout(), and the green support is based on channel select and a timer. cc #13523
2 parents d910330 + e5d3e51 commit 3d05e7f

File tree

7 files changed

+253
-41
lines changed

7 files changed

+253
-41
lines changed

src/libnative/io/c_win32.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ extern "system" {
5050
pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long,
5151
argp: *mut libc::c_ulong) -> libc::c_int;
5252
pub fn select(nfds: libc::c_int,
53-
readfds: *mut fd_set,
54-
writefds: *mut fd_set,
55-
exceptfds: *mut fd_set,
53+
readfds: *fd_set,
54+
writefds: *fd_set,
55+
exceptfds: *fd_set,
5656
timeout: *libc::timeval) -> libc::c_int;
5757
pub fn getsockopt(sockfd: libc::SOCKET,
5858
level: libc::c_int,

src/libnative/io/net.rs

+56-27
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::cast;
1313
use std::io::net::ip;
1414
use std::io;
1515
use std::mem;
16+
use std::os;
1617
use std::ptr;
1718
use std::rt::rtio;
1819
use std::sync::arc::UnsafeArc;
@@ -144,6 +145,21 @@ fn last_error() -> io::IoError {
144145
super::last_error()
145146
}
146147

148+
fn ms_to_timeval(ms: u64) -> libc::timeval {
149+
libc::timeval {
150+
tv_sec: (ms / 1000) as libc::time_t,
151+
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
152+
}
153+
}
154+
155+
fn timeout(desc: &'static str) -> io::IoError {
156+
io::IoError {
157+
kind: io::TimedOut,
158+
desc: desc,
159+
detail: None,
160+
}
161+
}
162+
147163
#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
148164
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
149165

@@ -271,8 +287,7 @@ impl TcpStream {
271287
fn connect_timeout(fd: sock_t,
272288
addrp: *libc::sockaddr,
273289
len: libc::socklen_t,
274-
timeout: u64) -> IoResult<()> {
275-
use std::os;
290+
timeout_ms: u64) -> IoResult<()> {
276291
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
277292
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
278293
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
@@ -289,12 +304,8 @@ impl TcpStream {
289304
os::errno() as int == WOULDBLOCK as int => {
290305
let mut set: c::fd_set = unsafe { mem::init() };
291306
c::fd_set(&mut set, fd);
292-
match await(fd, &mut set, timeout) {
293-
0 => Err(io::IoError {
294-
kind: io::TimedOut,
295-
desc: "connection timed out",
296-
detail: None,
297-
}),
307+
match await(fd, &mut set, timeout_ms) {
308+
0 => Err(timeout("connection timed out")),
298309
-1 => Err(last_error()),
299310
_ => {
300311
let err: libc::c_int = try!(
@@ -338,22 +349,14 @@ impl TcpStream {
338349
// Recalculate the timeout each iteration (it is generally
339350
// undefined what the value of the 'tv' is after select
340351
// returns EINTR).
341-
let timeout = timeout - (::io::timer::now() - start);
342-
let tv = libc::timeval {
343-
tv_sec: (timeout / 1000) as libc::time_t,
344-
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
345-
};
346-
c::select(fd + 1, ptr::null(), set as *mut _ as *_,
347-
ptr::null(), &tv)
352+
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
353+
c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
348354
})
349355
}
350356
#[cfg(windows)]
351357
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
352-
let tv = libc::timeval {
353-
tv_sec: (timeout / 1000) as libc::time_t,
354-
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
355-
};
356-
unsafe { c::select(1, ptr::mut_null(), set, ptr::mut_null(), &tv) }
358+
let tv = ms_to_timeval(timeout);
359+
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
357360
}
358361
}
359362

@@ -467,7 +470,7 @@ impl Drop for Inner {
467470
////////////////////////////////////////////////////////////////////////////////
468471

469472
pub struct TcpListener {
470-
inner: UnsafeArc<Inner>,
473+
inner: Inner,
471474
}
472475

473476
impl TcpListener {
@@ -477,7 +480,7 @@ impl TcpListener {
477480
let (addr, len) = addr_to_sockaddr(addr);
478481
let addrp = &addr as *libc::sockaddr_storage;
479482
let inner = Inner { fd: fd };
480-
let ret = TcpListener { inner: UnsafeArc::new(inner) };
483+
let ret = TcpListener { inner: inner };
481484
// On platforms with Berkeley-derived sockets, this allows
482485
// to quickly rebind a socket, without needing to wait for
483486
// the OS to clean up the previous one.
@@ -498,15 +501,12 @@ impl TcpListener {
498501
}
499502
}
500503

501-
pub fn fd(&self) -> sock_t {
502-
// This is just a read-only arc so the unsafety is fine
503-
unsafe { (*self.inner.get()).fd }
504-
}
504+
pub fn fd(&self) -> sock_t { self.inner.fd }
505505

506506
pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
507507
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
508508
-1 => Err(last_error()),
509-
_ => Ok(TcpAcceptor { listener: self })
509+
_ => Ok(TcpAcceptor { listener: self, deadline: 0 })
510510
}
511511
}
512512
}
@@ -525,12 +525,16 @@ impl rtio::RtioSocket for TcpListener {
525525

526526
pub struct TcpAcceptor {
527527
listener: TcpListener,
528+
deadline: u64,
528529
}
529530

530531
impl TcpAcceptor {
531532
pub fn fd(&self) -> sock_t { self.listener.fd() }
532533

533534
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
535+
if self.deadline != 0 {
536+
try!(self.accept_deadline());
537+
}
534538
unsafe {
535539
let mut storage: libc::sockaddr_storage = mem::init();
536540
let storagep = &mut storage as *mut libc::sockaddr_storage;
@@ -546,6 +550,25 @@ impl TcpAcceptor {
546550
}
547551
}
548552
}
553+
554+
fn accept_deadline(&mut self) -> IoResult<()> {
555+
let mut set: c::fd_set = unsafe { mem::init() };
556+
c::fd_set(&mut set, self.fd());
557+
558+
match retry(|| {
559+
// If we're past the deadline, then pass a 0 timeout to select() so
560+
// we can poll the status of the socket.
561+
let now = ::io::timer::now();
562+
let ms = if self.deadline > now {0} else {self.deadline - now};
563+
let tv = ms_to_timeval(ms);
564+
let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
565+
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
566+
}) {
567+
-1 => Err(last_error()),
568+
0 => Err(timeout("accept timed out")),
569+
_ => return Ok(()),
570+
}
571+
}
549572
}
550573

551574
impl rtio::RtioSocket for TcpAcceptor {
@@ -561,6 +584,12 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
561584

562585
fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
563586
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
587+
fn set_timeout(&mut self, timeout: Option<u64>) {
588+
self.deadline = match timeout {
589+
None => 0,
590+
Some(t) => ::io::timer::now() + t,
591+
};
592+
}
564593
}
565594

566595
////////////////////////////////////////////////////////////////////////////////

src/libnative/io/timer_win32.rs

+11
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,17 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
8989
}
9090
}
9191

92+
// returns the current time (in milliseconds)
93+
pub fn now() -> u64 {
94+
let mut ticks_per_s = 0;
95+
assert_eq!(unsafe { libc::QueryPerformanceFrequency(&mut ticks_per_s) }, 1);
96+
let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s};
97+
let mut ticks = 0;
98+
assert_eq!(unsafe { libc::QueryPerformanceCounter(&mut ticks) }, 1);
99+
100+
return (ticks as u64 * 1000) / (ticks_per_s as u64);
101+
}
102+
92103
impl Timer {
93104
pub fn new() -> IoResult<Timer> {
94105
timer_helper::boot(helper);

src/librustuv/net.rs

+86-2
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ pub struct TcpListener {
174174

175175
pub struct TcpAcceptor {
176176
listener: ~TcpListener,
177+
timer: Option<TimerWatcher>,
178+
timeout_tx: Option<Sender<()>>,
179+
timeout_rx: Option<Receiver<()>>,
177180
}
178181

179182
// TCP watchers (clients/streams)
@@ -459,7 +462,12 @@ impl rtio::RtioSocket for TcpListener {
459462
impl rtio::RtioTcpListener for TcpListener {
460463
fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
461464
// create the acceptor object from ourselves
462-
let mut acceptor = ~TcpAcceptor { listener: self };
465+
let mut acceptor = ~TcpAcceptor {
466+
listener: self,
467+
timer: None,
468+
timeout_tx: None,
469+
timeout_rx: None,
470+
};
463471

464472
let _m = acceptor.fire_homing_missile();
465473
// FIXME: the 128 backlog should be configurable
@@ -509,7 +517,37 @@ impl rtio::RtioSocket for TcpAcceptor {
509517

510518
impl rtio::RtioTcpAcceptor for TcpAcceptor {
511519
fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
512-
self.listener.incoming.recv()
520+
match self.timeout_rx {
521+
None => self.listener.incoming.recv(),
522+
Some(ref rx) => {
523+
use std::comm::Select;
524+
525+
// Poll the incoming channel first (don't rely on the order of
526+
// select just yet). If someone's pending then we should return
527+
// them immediately.
528+
match self.listener.incoming.try_recv() {
529+
Ok(data) => return data,
530+
Err(..) => {}
531+
}
532+
533+
// Use select to figure out which channel gets ready first. We
534+
// do some custom handling of select to ensure that we never
535+
// actually drain the timeout channel (we'll keep seeing the
536+
// timeout message in the future).
537+
let s = Select::new();
538+
let mut timeout = s.handle(rx);
539+
let mut data = s.handle(&self.listener.incoming);
540+
unsafe {
541+
timeout.add();
542+
data.add();
543+
}
544+
if s.wait() == timeout.id() {
545+
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
546+
} else {
547+
self.listener.incoming.recv()
548+
}
549+
}
550+
}
513551
}
514552

515553
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
@@ -525,6 +563,52 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
525563
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
526564
})
527565
}
566+
567+
fn set_timeout(&mut self, ms: Option<u64>) {
568+
// First, if the timeout is none, clear any previous timeout by dropping
569+
// the timer and transmission channels
570+
let ms = match ms {
571+
None => {
572+
return drop((self.timer.take(),
573+
self.timeout_tx.take(),
574+
self.timeout_rx.take()))
575+
}
576+
Some(ms) => ms,
577+
};
578+
579+
// If we have a timeout, lazily initialize the timer which will be used
580+
// to fire when the timeout runs out.
581+
if self.timer.is_none() {
582+
let _m = self.fire_homing_missile();
583+
let loop_ = Loop::wrap(unsafe {
584+
uvll::get_loop_for_uv_handle(self.listener.handle)
585+
});
586+
let mut timer = TimerWatcher::new_home(&loop_, self.home().clone());
587+
unsafe {
588+
timer.set_data(self as *mut _ as *TcpAcceptor);
589+
}
590+
self.timer = Some(timer);
591+
}
592+
593+
// Once we've got a timer, stop any previous timeout, reset it for the
594+
// current one, and install some new channels to send/receive data on
595+
let timer = self.timer.get_mut_ref();
596+
timer.stop();
597+
timer.start(timer_cb, ms, 0);
598+
let (tx, rx) = channel();
599+
self.timeout_tx = Some(tx);
600+
self.timeout_rx = Some(rx);
601+
602+
extern fn timer_cb(timer: *uvll::uv_timer_t, status: c_int) {
603+
assert_eq!(status, 0);
604+
let acceptor: &mut TcpAcceptor = unsafe {
605+
&mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor)
606+
};
607+
// This send can never fail because if this timer is active then the
608+
// receiving channel is guaranteed to be alive
609+
acceptor.timeout_tx.get_ref().send(());
610+
}
611+
}
528612
}
529613

530614
////////////////////////////////////////////////////////////////////////////////

src/librustuv/timer.rs

+11-8
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::rt::rtio::RtioTimer;
1414
use std::rt::task::BlockedTask;
1515

1616
use homing::{HomeHandle, HomingIO};
17-
use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after};
17+
use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after, Loop};
1818
use uvio::UvIoFactory;
1919
use uvll;
2020

@@ -34,18 +34,21 @@ pub enum NextAction {
3434

3535
impl TimerWatcher {
3636
pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher {
37+
let handle = io.make_handle();
38+
let me = ~TimerWatcher::new_home(&io.loop_, handle);
39+
me.install()
40+
}
41+
42+
pub fn new_home(loop_: &Loop, home: HomeHandle) -> TimerWatcher {
3743
let handle = UvHandle::alloc(None::<TimerWatcher>, uvll::UV_TIMER);
38-
assert_eq!(unsafe {
39-
uvll::uv_timer_init(io.uv_loop(), handle)
40-
}, 0);
41-
let me = ~TimerWatcher {
44+
assert_eq!(unsafe { uvll::uv_timer_init(loop_.handle, handle) }, 0);
45+
TimerWatcher {
4246
handle: handle,
4347
action: None,
4448
blocker: None,
45-
home: io.make_handle(),
49+
home: home,
4650
id: 0,
47-
};
48-
return me.install();
51+
}
4952
}
5053

5154
pub fn start(&mut self, f: uvll::uv_timer_cb, msecs: u64, period: u64) {

0 commit comments

Comments
 (0)