From 97555e865c55f99684595471fa1d5440b2e6c15e Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Thu, 8 Aug 2013 16:56:04 -0700 Subject: [PATCH 1/6] Derive Clone for IpAddr and SocketAddr --- src/libstd/rt/io/net/ip.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libstd/rt/io/net/ip.rs b/src/libstd/rt/io/net/ip.rs index 77176088801de..3b3ea80eafa8f 100644 --- a/src/libstd/rt/io/net/ip.rs +++ b/src/libstd/rt/io/net/ip.rs @@ -17,7 +17,7 @@ use option::{Option, None, Some}; type Port = u16; -#[deriving(Eq, TotalEq)] +#[deriving(Eq, TotalEq, Clone)] pub enum IpAddr { Ipv4Addr(u8, u8, u8, u8), Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16) @@ -62,7 +62,7 @@ impl ToStr for IpAddr { } } -#[deriving(Eq, TotalEq)] +#[deriving(Eq, TotalEq, Clone)] pub struct SocketAddr { ip: IpAddr, port: Port, From f68514c128e536c5868368861871cb665ed9fdd3 Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Thu, 8 Aug 2013 17:10:22 -0700 Subject: [PATCH 2/6] Do not execute the callback before cleaning up resources. --- src/libstd/rt/io/net/tcp.rs | 8 ++------ src/libstd/rt/uv/net.rs | 6 ++++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 27222542e087d..746fa5668a5f0 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -88,9 +88,7 @@ impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) { match (**self).write(buf) { Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); - } + Err(ioerr) => io_error::cond.raise(ioerr), } } @@ -129,9 +127,7 @@ impl TcpListener { impl Listener for TcpListener { fn accept(&mut self) -> Option { match (**self).accept() { - Ok(s) => { - Some(TcpStream::new(s)) - } + Ok(s) => Some(TcpStream::new(s)), Err(ioerr) => { io_error::cond.raise(ioerr); return None; diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index c8b3d41a78d79..e8d0296e543a4 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -190,9 +190,10 @@ impl StreamWatcher { extern fn close_cb(handle: *uvll::uv_stream_t) { let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - stream_watcher.get_watcher_data().close_cb.take_unwrap()(); + let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap(); stream_watcher.drop_watcher_data(); unsafe { free_handle(handle as *c_void) } + cb(); } } } @@ -411,9 +412,10 @@ impl UdpWatcher { extern fn close_cb(handle: *uvll::uv_udp_t) { let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); - udp_watcher.get_watcher_data().close_cb.take_unwrap()(); + let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap(); udp_watcher.drop_watcher_data(); unsafe { free_handle(handle as *c_void) } + cb(); } } } From 88f718341ec279738c07f83289058aadf7c5d235 Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Thu, 8 Aug 2013 17:04:19 -0700 Subject: [PATCH 3/6] Instruct event loops to ignore SIGPIPE when constructed. libuv does not always catch SIGPIPE. --- src/libstd/rt/uv/uvll.rs | 1 + src/rt/rust_uv.cpp | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 65c0cffe5a073..8ed1287fe0108 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -172,6 +172,7 @@ fn request_sanity_check() { } } +// XXX Event loops ignore SIGPIPE by default. pub unsafe fn loop_new() -> *c_void { #[fixed_stack_segment]; #[inline(never)]; diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 0462789af9ff9..b5d6e02b46a4c 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -13,12 +13,21 @@ #include #endif +#ifndef __WIN32__ +// for signal +#include +#endif + #include "uv.h" #include "rust_globals.h" extern "C" void* rust_uv_loop_new() { +// XXX libuv doesn't always ignore SIGPIPE even though we don't need it. +#ifndef __WIN32__ + signal(SIGPIPE, SIG_IGN); +#endif return (void*)uv_loop_new(); } From d7b6fcba2978cbbcfccce83e6f4f54c5eec998f3 Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Wed, 7 Aug 2013 02:57:33 -0700 Subject: [PATCH 4/6] Working homing UDP socket prototype. --- src/libstd/rt/uv/uvio.rs | 152 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 1 deletion(-) diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index d4794da9b0f28..07ba44101c89f 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -23,7 +23,7 @@ use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError}; use rt::local::Local; use rt::rtio::*; -use rt::sched::Scheduler; +use rt::sched::{Scheduler, SchedHandle}; use rt::tube::Tube; use rt::uv::*; use rt::uv::idle::IdleWatcher; @@ -239,6 +239,27 @@ impl UvIoFactory { pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop { match self { &UvIoFactory(ref mut ptr) => ptr } } + + pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> { + let mut watcher = UdpWatcher::new(self.uv_loop()); + match watcher.bind(addr) { + Ok(_) => { + let home = do Local::borrow:: |sched| {sched.make_handle()}; + Ok(~HomedUvUdpSocket { watcher: watcher, home: home }) + } + Err(uverr) => { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + Err(uv_error_to_io_error(uverr)) + } + } + } } impl IoFactory for UvIoFactory { @@ -582,6 +603,135 @@ impl RtioTcpStream for UvTcpStream { } } +pub struct HomedUvUdpSocket { + watcher: UdpWatcher, + home: SchedHandle, +} + +impl HomedUvUdpSocket { + fn go_home(&mut self) { + use rt::sched::PinnedTask; + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + do task.wake().map_move |task| { self.home.send(PinnedTask(task)); }; + } + } +} + +impl Drop for HomedUvUdpSocket { + fn drop(&self) { + rtdebug!("closing homed udp socket"); + // first go home + // XXX need mutable finalizer + let this = unsafe { transmute::<&HomedUvUdpSocket, &mut HomedUvUdpSocket>(self) }; + this.go_home(); + // now we're home so block the task and start IO + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do this.watcher.close { + // now IO is finished so resume the blocked task + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } +} + +impl RtioSocket for HomedUvUdpSocket { + fn socket_name(&mut self) -> Result { + self.go_home(); + socket_name(Udp, self.watcher) + } +} + +#[test] +fn test_simple_homed_udp_io_bind_only() { + do run_in_newsched_task { + unsafe { + let io = Local::unsafe_borrow::(); + let addr = next_test_ip4(); + let maybe_socket = (*io).homed_udp_bind(addr); + assert!(maybe_socket.is_ok()); + } + } +} + +#[test] +fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::sched::{Shutdown, TaskFromFriend}; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; + + let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + sleepers.clone()); + let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + sleepers.clone()); + + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); + let tasksFriendHandle = Cell::new(sched2.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + rtassert!(exit_status); + }; + + let test_function: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let addr = next_test_ip4(); + let maybe_socket = unsafe { (*io).homed_udp_bind(addr) }; + // this socket is bound to this event loop + assert!(maybe_socket.is_ok()); + + // block self on sched1 + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // unblock task + do task.wake().map_move |task| { + // send self to sched2 + tasksFriendHandle.take().send(TaskFromFriend(task)); + }; + // sched1 should now sleep since it has nothing else to do + } + // sched2 will wake up and get the task + // as we do nothing else, the function ends and the socket goes out of scope + // sched2 will start to run the destructor + // the destructor will first block the task, set it's home as sched1, then enqueue it + // sched2 will dequeue the task, see that it has a home, and send it to sched1 + // sched1 will wake up, execute the close function on the correct loop, and then we're done + }; + + let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function); + main_task.death.on_exit = Some(on_exit); + let main_task = Cell::new(main_task); + + let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {}); + + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); + + // XXX could there be a race on the threads that causes a crash? + let thread1 = do Thread::start { + sched1.take().bootstrap(main_task.take()); + }; + let thread2 = do Thread::start { + sched2.take().bootstrap(null_task.take()); + }; + + thread1.join(); + thread2.join(); + } +} + pub struct UvUdpSocket(UdpWatcher); impl Drop for UvUdpSocket { From d09412ab893f54ef5309cf63d17bcb6110d582b9 Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Wed, 7 Aug 2013 04:05:06 -0700 Subject: [PATCH 5/6] Homed UDP sockets --- src/libstd/rt/rtio.rs | 2 +- src/libstd/rt/uv/uvio.rs | 235 +++++++++++++++++++++++++++++++++++---- 2 files changed, 213 insertions(+), 24 deletions(-) diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index a7c794fb5f142..2bec782847b38 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; -pub type RtioUdpSocketObject = uvio::UvUdpSocket; +pub type RtioUdpSocketObject = uvio::HomedUvUdpSocket; //uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; pub trait EventLoop { diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 07ba44101c89f..43be09434a4c4 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -239,27 +239,6 @@ impl UvIoFactory { pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop { match self { &UvIoFactory(ref mut ptr) => ptr } } - - pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> { - let mut watcher = UdpWatcher::new(self.uv_loop()); - match watcher.bind(addr) { - Ok(_) => { - let home = do Local::borrow:: |sched| {sched.make_handle()}; - Ok(~HomedUvUdpSocket { watcher: watcher, home: home }) - } - Err(uverr) => { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - Err(uv_error_to_io_error(uverr)) - } - } - } } impl IoFactory for UvIoFactory { @@ -331,6 +310,7 @@ impl IoFactory for UvIoFactory { } } + /* fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> { let mut watcher = UdpWatcher::new(self.uv_loop()); match watcher.bind(addr) { @@ -348,6 +328,28 @@ impl IoFactory for UvIoFactory { } } } + */ + + pub fn /*homed_*/udp_bind(&mut self, addr: SocketAddr) -> Result<~/*HomedUvUdpSocket*/RtioUdpSocketObject, IoError> { + let mut watcher = UdpWatcher::new(self.uv_loop()); + match watcher.bind(addr) { + Ok(_) => { + let home = do Local::borrow:: |sched| {sched.make_handle()}; + Ok(~HomedUvUdpSocket { watcher: watcher, home: home }) + } + Err(uverr) => { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + Err(uv_error_to_io_error(uverr)) + } + } + } fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> { Ok(~UvTimer(TimerWatcher::new(self.uv_loop()))) @@ -640,18 +642,205 @@ impl Drop for HomedUvUdpSocket { impl RtioSocket for HomedUvUdpSocket { fn socket_name(&mut self) -> Result { + // first go home self.go_home(); socket_name(Udp, self.watcher) } } +impl RtioUdpSocket for HomedUvUdpSocket { + fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { + // first go home + self.go_home(); + + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + + let scheduler = Local::take::(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + rtdebug!("recvfrom: entered scheduler context"); + let task_cell = Cell::new(task); + let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; + do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { + let _ = flags; // /XXX add handling for partials? + + watcher.recv_stop(); + + let result = match status { + None => { + assert!(nread >= 0); + Ok((nread as uint, addr)) + } + Some(err) => Err(uv_error_to_io_error(err)), + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { + // first go home + self.go_home(); + + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + do self.watcher.send(buf, dst) |_watcher, status| { + + let result = match status { + None => Ok(()), + Some(err) => Err(uv_error_to_io_error(err)), + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + do multi.to_str().as_c_str |m_addr| { + uvll::udp_set_membership(self.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_JOIN_GROUP) + } + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + do multi.to_str().as_c_str |m_addr| { + uvll::udp_set_membership(self.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_LEAVE_GROUP) + } + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn loop_multicast_locally(&mut self) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_multicast_loop(self.watcher.native_handle(), 1 as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_multicast_loop(self.watcher.native_handle(), 0 as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_multicast_ttl(self.watcher.native_handle(), ttl as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_ttl(self.watcher.native_handle(), ttl as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn hear_broadcasts(&mut self) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_broadcast(self.watcher.native_handle(), 1 as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } + + fn ignore_broadcasts(&mut self) -> Result<(), IoError> { + // first go home + self.go_home(); + + let r = unsafe { + uvll::udp_set_broadcast(self.watcher.native_handle(), 0 as c_int) + }; + + match status_to_maybe_uv_error(self.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } + } +} + #[test] fn test_simple_homed_udp_io_bind_only() { do run_in_newsched_task { unsafe { let io = Local::unsafe_borrow::(); let addr = next_test_ip4(); - let maybe_socket = (*io).homed_udp_bind(addr); + let maybe_socket = (*io)./*homed_*/udp_bind(addr); assert!(maybe_socket.is_ok()); } } @@ -688,7 +877,7 @@ fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() { let test_function: ~fn() = || { let io = unsafe { Local::unsafe_borrow::() }; let addr = next_test_ip4(); - let maybe_socket = unsafe { (*io).homed_udp_bind(addr) }; + let maybe_socket = unsafe { (*io)./*homed_*/udp_bind(addr) }; // this socket is bound to this event loop assert!(maybe_socket.is_ok()); From 35e844ffc1e3c022e868817ad1c548b900db800a Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Thu, 8 Aug 2013 18:58:18 -0700 Subject: [PATCH 6/6] Make IO thread-safe. Each IO handle has a home event loop, which created it. When a task wants to use an IO handle, it must first make sure it is on that home event loop. It uses the scheduler handle in the IO handle to send itself there before starting the IO action. Once the IO action completes, the task restores its previous home state. If it is an AnySched task, then it will be executed on the new scheduler. If it has a normal home, then it will return there before executing any more code after the IO action. --- src/libstd/rt/io/timer.rs | 10 +- src/libstd/rt/rtio.rs | 4 +- src/libstd/rt/uv/uvio.rs | 1297 ++++++++++++++++++------------------- src/libstd/rt/uv/uvll.rs | 2 +- 4 files changed, 653 insertions(+), 660 deletions(-) diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs index c7820ebf6238b..bfd1ed48ac180 100644 --- a/src/libstd/rt/io/timer.rs +++ b/src/libstd/rt/io/timer.rs @@ -41,7 +41,7 @@ impl Timer { } impl RtioTimer for Timer { - fn sleep(&self, msecs: u64) { + fn sleep(&mut self, msecs: u64) { (**self).sleep(msecs); } } @@ -50,15 +50,11 @@ impl RtioTimer for Timer { mod test { use super::*; use rt::test::*; - use option::{Some, None}; #[test] fn test_io_timer_sleep_simple() { do run_in_newsched_task { let timer = Timer::new(); - match timer { - Some(t) => t.sleep(1), - None => assert!(false) - } + do timer.map_move |mut t| { t.sleep(1) }; } } -} \ No newline at end of file +} diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 2bec782847b38..36eb37a3630fb 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; -pub type RtioUdpSocketObject = uvio::HomedUvUdpSocket; //uvio::UvUdpSocket; +pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; pub trait EventLoop { @@ -88,5 +88,5 @@ pub trait RtioUdpSocket : RtioSocket { } pub trait RtioTimer { - fn sleep(&self, msecs: u64); + fn sleep(&mut self, msecs: u64); } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 43be09434a4c4..1250a4512f713 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -37,6 +37,49 @@ use unstable::sync::Exclusive; run_in_newsched_task}; #[cfg(test)] use iterator::{Iterator, range}; +// XXX we should not be calling uvll functions in here. + +trait HomingIO { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle; + /* XXX This will move pinned tasks to do IO on the proper scheduler + * and then move them back to their home. + */ + fn home_for_io(&mut self, io: &fn(&mut Self) -> A) -> A { + use rt::sched::{PinnedTask, TaskFromFriend}; + // go home + let old_home = Cell::new_empty(); + let old_home_ptr = &old_home; + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // get the old home first + do task.wake().map_move |mut task| { + old_home_ptr.put_back(task.take_unwrap_home()); + self.home().send(PinnedTask(task)); + }; + } + + // do IO + let a = io(self); + + // unhome home + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |scheduler, task| { + do task.wake().map_move |mut task| { + task.give_home(old_home.take()); + scheduler.make_handle().send(TaskFromFriend(task)); + }; + } + + // return the result of the IO + a + } +} + +// get a handle for the current scheduler +macro_rules! get_handle_to_current_scheduler( + () => (do Local::borrow:: |sched| { sched.make_handle() }) +) + enum SocketNameKind { TcpPeer, Tcp, @@ -45,12 +88,10 @@ enum SocketNameKind { fn socket_name>(sk: SocketNameKind, handle: U) -> Result { - #[fixed_stack_segment]; #[inline(never)]; - let getsockname = match sk { - TcpPeer => uvll::rust_uv_tcp_getpeername, - Tcp => uvll::rust_uv_tcp_getsockname, - Udp => uvll::rust_uv_udp_getsockname + TcpPeer => uvll::tcp_getpeername, + Tcp => uvll::tcp_getsockname, + Udp => uvll::udp_getsockname, }; // Allocate a sockaddr_storage @@ -80,6 +121,7 @@ fn socket_name>(sk: SocketNameKind, } +// Obviously an Event Loop is always home. pub struct UvEventLoop { uvio: UvIoFactory } @@ -149,6 +191,7 @@ fn test_callback_run_once() { } } +// The entire point of async is to call into a loop from other threads so it does not need to home. pub struct UvRemoteCallback { // The uv async handle for triggering the callback async: AsyncWatcher, @@ -251,40 +294,38 @@ impl IoFactory for UvIoFactory { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - // Block this task and take ownership, switch to scheduler context + let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { - rtdebug!("connect: entered scheduler context"); - let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); + let mut tcp = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); // Wait for a connection - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("connect: in connect callback"); - if status.is_none() { - rtdebug!("status is none"); - let tcp_watcher = - NativeHandle::from_native_handle(stream_watcher.native_handle()); - let res = Ok(~UvTcpStream(tcp_watcher)); - - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(res); } - - // Context switch - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } else { - rtdebug!("status is some"); - let task_cell = Cell::new(task_cell.take()); - do stream_watcher.close { - let res = Err(uv_error_to_io_error(status.unwrap())); + do tcp.connect(addr) |stream, status| { + match status { + None => { + let tcp = NativeHandle::from_native_handle(stream.native_handle()); + let home = get_handle_to_current_scheduler!(); + let res = Ok(~UvTcpStream { watcher: tcp, home: home }); + + // Store the stream in the task's stack unsafe { (*result_cell_ptr).put_back(res); } + + // Context switch let scheduler = Local::take::(); scheduler.resume_blocked_task_immediately(task_cell.take()); } - }; + Some(_) => { + let task_cell = Cell::new(task_cell.take()); + do stream.close { + let res = Err(uv_error_to_io_error(status.unwrap())); + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } } } @@ -295,7 +336,10 @@ impl IoFactory for UvIoFactory { fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> { let mut watcher = TcpWatcher::new(self.uv_loop()); match watcher.bind(addr) { - Ok(_) => Ok(~UvTcpListener::new(watcher)), + Ok(_) => { + let home = get_handle_to_current_scheduler!(); + Ok(~UvTcpListener::new(watcher, home)) + } Err(uverr) => { let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { @@ -310,32 +354,12 @@ impl IoFactory for UvIoFactory { } } - /* fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> { - let mut watcher = UdpWatcher::new(self.uv_loop()); - match watcher.bind(addr) { - Ok(_) => Ok(~UvUdpSocket(watcher)), - Err(uverr) => { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - Err(uv_error_to_io_error(uverr)) - } - } - } - */ - - pub fn /*homed_*/udp_bind(&mut self, addr: SocketAddr) -> Result<~/*HomedUvUdpSocket*/RtioUdpSocketObject, IoError> { let mut watcher = UdpWatcher::new(self.uv_loop()); match watcher.bind(addr) { Ok(_) => { - let home = do Local::borrow:: |sched| {sched.make_handle()}; - Ok(~HomedUvUdpSocket { watcher: watcher, home: home }) + let home = get_handle_to_current_scheduler!(); + Ok(~UvUdpSocket { watcher: watcher, home: home }) } Err(uverr) => { let scheduler = Local::take::(); @@ -352,22 +376,30 @@ impl IoFactory for UvIoFactory { } fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> { - Ok(~UvTimer(TimerWatcher::new(self.uv_loop()))) + let watcher = TimerWatcher::new(self.uv_loop()); + let home = get_handle_to_current_scheduler!(); + Ok(~UvTimer::new(watcher, home)) } } pub struct UvTcpListener { watcher: TcpWatcher, listening: bool, - incoming_streams: Tube> + incoming_streams: Tube>, + home: SchedHandle, +} + +impl HomingIO for UvTcpListener { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } impl UvTcpListener { - fn new(watcher: TcpWatcher) -> UvTcpListener { + fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener { UvTcpListener { watcher: watcher, listening: false, - incoming_streams: Tube::new() + incoming_streams: Tube::new(), + home: home, } } @@ -376,13 +408,16 @@ impl UvTcpListener { impl Drop for UvTcpListener { fn drop(&self) { - let watcher = self.watcher(); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.as_stream().close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + // XXX need mutable finalizer + let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) }; + do self_.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher().as_stream().close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } @@ -390,83 +425,92 @@ impl Drop for UvTcpListener { impl RtioSocket for UvTcpListener { fn socket_name(&mut self) -> Result { - socket_name(Tcp, self.watcher) + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } } } impl RtioTcpListener for UvTcpListener { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { - rtdebug!("entering listen"); - - if self.listening { - return self.incoming_streams.recv(); - } - - self.listening = true; - - let server_tcp_watcher = self.watcher(); - let incoming_streams_cell = Cell::new(self.incoming_streams.clone()); - - let incoming_streams_cell = Cell::new(incoming_streams_cell.take()); - let mut server_tcp_watcher = server_tcp_watcher; - do server_tcp_watcher.listen |mut server_stream_watcher, status| { - let maybe_stream = if status.is_none() { - let mut loop_ = server_stream_watcher.event_loop(); - let client_tcp_watcher = TcpWatcher::new(&mut loop_); - // XXX: Need's to be surfaced in interface - server_stream_watcher.accept(client_tcp_watcher.as_stream()); - Ok(~UvTcpStream(client_tcp_watcher)) - } else { - Err(standard_error(OtherIoError)) - }; + do self.home_for_io |self_| { + + if !self_.listening { + self_.listening = true; + + let incoming_streams_cell = Cell::new(self_.incoming_streams.clone()); + + do self_.watcher().listen |mut server, status| { + let stream = match status { + Some(_) => Err(standard_error(OtherIoError)), + None => { + let client = TcpWatcher::new(&server.event_loop()); + // XXX: needs to be surfaced in interface + server.accept(client.as_stream()); + let home = get_handle_to_current_scheduler!(); + Ok(~UvTcpStream { watcher: client, home: home }) + } + }; + + let mut incoming_streams = incoming_streams_cell.take(); + incoming_streams.send(stream); + incoming_streams_cell.put_back(incoming_streams); + } - let mut incoming_streams = incoming_streams_cell.take(); - incoming_streams.send(maybe_stream); - incoming_streams_cell.put_back(incoming_streams); + } + self_.incoming_streams.recv() } - - return self.incoming_streams.recv(); } fn accept_simultaneously(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 1 as c_int) - }; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher(), r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 0 as c_int) - }; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher(), r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -pub struct UvTcpStream(TcpWatcher); +pub struct UvTcpStream { + watcher: TcpWatcher, + home: SchedHandle, +} + +impl HomingIO for UvTcpStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} impl Drop for UvTcpStream { fn drop(&self) { - rtdebug!("closing tcp stream"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.as_stream().close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + // XXX need mutable finalizer + let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) }; + do this.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.as_stream().close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } @@ -474,692 +518,572 @@ impl Drop for UvTcpStream { impl RtioSocket for UvTcpStream { fn socket_name(&mut self) -> Result { - socket_name(Tcp, **self) + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } } } impl RtioTcpStream for UvTcpStream { fn read(&mut self, buf: &mut [u8]) -> Result { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let scheduler = Local::take::(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("read: entered scheduler context"); - let task_cell = Cell::new(task); - // XXX: We shouldn't reallocate these callbacks every - // call to read - let alloc: AllocCallback = |_| unsafe { - slice_to_uv_buf(*buf_ptr) - }; - let mut watcher = self.as_stream(); - do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { - - // Stop reading so that no read callbacks are - // triggered before the user calls `read` again. - // XXX: Is there a performance impact to calling - // stop here? - watcher.read_stop(); - - let result = if status.is_none() { - assert!(nread >= 0); - Ok(nread as uint) - } else { - Err(uv_error_to_io_error(status.unwrap())) + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + + let scheduler = Local::take::(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_sched, task| { + let task_cell = Cell::new(task); + // XXX: We shouldn't reallocate these callbacks every + // call to read + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) }; + let mut watcher = self_.watcher.as_stream(); + do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { - unsafe { (*result_cell_ptr).put_back(result); } + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? + watcher.read_stop(); - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let result = if status.is_none() { + assert!(nread >= 0); + Ok(nread as uint) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self.as_stream(); - do watcher.write(buf) |_watcher, status| { - let result = if status.is_none() { - Ok(()) - } else { - Err(uv_error_to_io_error(status.unwrap())) - }; - - unsafe { (*result_cell_ptr).put_back(result); } + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + let mut watcher = self_.watcher.as_stream(); + do watcher.write(buf) |_watcher, status| { + let result = if status.is_none() { + Ok(()) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn peer_name(&mut self) -> Result { - socket_name(TcpPeer, **self) + do self.home_for_io |self_| { + socket_name(TcpPeer, self_.watcher) + } } fn control_congestion(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_nodelay(self.native_handle(), 0 as c_int) - }; + do self.home_for_io |self_| { + let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn nodelay(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_nodelay(self.native_handle(), 1 as c_int) - }; + do self.home_for_io |self_| { + let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_keepalive(self.native_handle(), 1 as c_int, - delay_in_seconds as c_uint) - }; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int, + delay_in_seconds as c_uint) + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn letdie(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; - - let r = unsafe { - uvll::rust_uv_tcp_keepalive(self.native_handle(), 0 as c_int, 0 as c_uint) - }; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint) + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -pub struct HomedUvUdpSocket { +pub struct UvUdpSocket { watcher: UdpWatcher, home: SchedHandle, } -impl HomedUvUdpSocket { - fn go_home(&mut self) { - use rt::sched::PinnedTask; - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - do task.wake().map_move |task| { self.home.send(PinnedTask(task)); }; - } - } +impl HomingIO for UvUdpSocket { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } -impl Drop for HomedUvUdpSocket { +impl Drop for UvUdpSocket { fn drop(&self) { - rtdebug!("closing homed udp socket"); - // first go home // XXX need mutable finalizer - let this = unsafe { transmute::<&HomedUvUdpSocket, &mut HomedUvUdpSocket>(self) }; - this.go_home(); - // now we're home so block the task and start IO - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do this.watcher.close { - // now IO is finished so resume the blocked task - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) }; + do this.home_for_io |_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do this.watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } } -impl RtioSocket for HomedUvUdpSocket { +impl RtioSocket for UvUdpSocket { fn socket_name(&mut self) -> Result { - // first go home - self.go_home(); - socket_name(Udp, self.watcher) + do self.home_for_io |self_| { + socket_name(Udp, self_.watcher) + } } } -impl RtioUdpSocket for HomedUvUdpSocket { +impl RtioUdpSocket for UvUdpSocket { fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { - // first go home - self.go_home(); - - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let scheduler = Local::take::(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - rtdebug!("recvfrom: entered scheduler context"); - let task_cell = Cell::new(task); - let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { - let _ = flags; // /XXX add handling for partials? - - watcher.recv_stop(); - - let result = match status { - None => { - assert!(nread >= 0); - Ok((nread as uint, addr)) - } - Some(err) => Err(uv_error_to_io_error(err)), - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - return result_cell.take(); - } - - fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + + let scheduler = Local::take::(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; + do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { + let _ = flags; // /XXX add handling for partials? + + watcher.recv_stop(); + + let result = match status { + None => { + assert!(nread >= 0); + Ok((nread as uint, addr)) + } + Some(err) => Err(uv_error_to_io_error(err)), + }; + + unsafe { (*result_cell_ptr).put_back(result); } - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.watcher.send(buf, dst) |_watcher, status| { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } - let result = match status { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)), - }; + assert!(!result_cell.is_empty()); + result_cell.take() + } + } - unsafe { (*result_cell_ptr).put_back(result); } + fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + do self_.watcher.send(buf, dst) |_watcher, status| { + + let result = match status { + None => Ok(()), + Some(err) => Err(uv_error_to_io_error(err)), + }; + + unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { + let r = unsafe { + do multi.to_str().with_c_str |m_addr| { + uvll::udp_set_membership(self_.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_JOIN_GROUP) + } + }; - let r = unsafe { - do multi.to_str().as_c_str |m_addr| { - uvll::udp_set_membership(self.watcher.native_handle(), m_addr, - ptr::null(), uvll::UV_JOIN_GROUP) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) } - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) } } fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { + let r = unsafe { + do multi.to_str().with_c_str |m_addr| { + uvll::udp_set_membership(self_.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_LEAVE_GROUP) + } + }; - let r = unsafe { - do multi.to_str().as_c_str |m_addr| { - uvll::udp_set_membership(self.watcher.native_handle(), m_addr, - ptr::null(), uvll::UV_LEAVE_GROUP) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) } - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) } } fn loop_multicast_locally(&mut self) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { - let r = unsafe { - uvll::udp_set_multicast_loop(self.watcher.native_handle(), 1 as c_int) - }; + let r = unsafe { + uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { - let r = unsafe { - uvll::udp_set_multicast_loop(self.watcher.native_handle(), 0 as c_int) - }; + let r = unsafe { + uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { - let r = unsafe { - uvll::udp_set_multicast_ttl(self.watcher.native_handle(), ttl as c_int) - }; + let r = unsafe { + uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { - let r = unsafe { - uvll::udp_set_ttl(self.watcher.native_handle(), ttl as c_int) - }; + let r = unsafe { + uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn hear_broadcasts(&mut self) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { - let r = unsafe { - uvll::udp_set_broadcast(self.watcher.native_handle(), 1 as c_int) - }; + let r = unsafe { + uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn ignore_broadcasts(&mut self) -> Result<(), IoError> { - // first go home - self.go_home(); + do self.home_for_io |self_| { - let r = unsafe { - uvll::udp_set_broadcast(self.watcher.native_handle(), 0 as c_int) - }; + let r = unsafe { + uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int) + }; - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -#[test] -fn test_simple_homed_udp_io_bind_only() { - do run_in_newsched_task { - unsafe { - let io = Local::unsafe_borrow::(); - let addr = next_test_ip4(); - let maybe_socket = (*io)./*homed_*/udp_bind(addr); - assert!(maybe_socket.is_ok()); - } - } +pub struct UvTimer { + watcher: timer::TimerWatcher, + home: SchedHandle, } -#[test] -fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() { - use rt::sleeper_list::SleeperList; - use rt::work_queue::WorkQueue; - use rt::thread::Thread; - use rt::task::Task; - use rt::sched::{Shutdown, TaskFromFriend}; - do run_in_bare_thread { - let sleepers = SleeperList::new(); - let work_queue1 = WorkQueue::new(); - let work_queue2 = WorkQueue::new(); - let queues = ~[work_queue1.clone(), work_queue2.clone()]; - - let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), - sleepers.clone()); - let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), - sleepers.clone()); - - let handle1 = Cell::new(sched1.make_handle()); - let handle2 = Cell::new(sched2.make_handle()); - let tasksFriendHandle = Cell::new(sched2.make_handle()); - - let on_exit: ~fn(bool) = |exit_status| { - handle1.take().send(Shutdown); - handle2.take().send(Shutdown); - rtassert!(exit_status); - }; - - let test_function: ~fn() = || { - let io = unsafe { Local::unsafe_borrow::() }; - let addr = next_test_ip4(); - let maybe_socket = unsafe { (*io)./*homed_*/udp_bind(addr) }; - // this socket is bound to this event loop - assert!(maybe_socket.is_ok()); - - // block self on sched1 - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - // unblock task - do task.wake().map_move |task| { - // send self to sched2 - tasksFriendHandle.take().send(TaskFromFriend(task)); - }; - // sched1 should now sleep since it has nothing else to do - } - // sched2 will wake up and get the task - // as we do nothing else, the function ends and the socket goes out of scope - // sched2 will start to run the destructor - // the destructor will first block the task, set it's home as sched1, then enqueue it - // sched2 will dequeue the task, see that it has a home, and send it to sched1 - // sched1 will wake up, execute the close function on the correct loop, and then we're done - }; - - let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function); - main_task.death.on_exit = Some(on_exit); - let main_task = Cell::new(main_task); - - let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {}); - - let sched1 = Cell::new(sched1); - let sched2 = Cell::new(sched2); - - // XXX could there be a race on the threads that causes a crash? - let thread1 = do Thread::start { - sched1.take().bootstrap(main_task.take()); - }; - let thread2 = do Thread::start { - sched2.take().bootstrap(null_task.take()); - }; - - thread1.join(); - thread2.join(); - } +impl HomingIO for UvTimer { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } -pub struct UvUdpSocket(UdpWatcher); +impl UvTimer { + fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer { + UvTimer { watcher: w, home: home } + } +} -impl Drop for UvUdpSocket { +impl Drop for UvTimer { fn drop(&self) { - rtdebug!("closing udp socket"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) }; + do self_.home_for_io |self_| { + rtdebug!("closing UvTimer"); + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } } -impl RtioSocket for UvUdpSocket { - fn socket_name(&mut self) -> Result { - socket_name(Udp, **self) +impl RtioTimer for UvTimer { + fn sleep(&mut self, msecs: u64) { + do self.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_sched, task| { + rtdebug!("sleep: entered scheduler context"); + let task_cell = Cell::new(task); + do self_.watcher.start(msecs, 0) |_, status| { + assert!(status.is_none()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + self_.watcher.stop(); + } } } -impl RtioUdpSocket for UvUdpSocket { - fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let scheduler = Local::take::(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("recvfrom: entered scheduler context"); - let task_cell = Cell::new(task); - let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { - let _ = flags; // XXX add handling for partials? - - watcher.recv_stop(); - - let result = match status { - None => { - assert!(nread >= 0); - Ok((nread as uint, addr)) - } - Some(err) => Err(uv_error_to_io_error(err)) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } +#[test] +fn test_simple_io_no_connect() { + do run_in_newsched_task { + unsafe { + let io = Local::unsafe_borrow::(); + let addr = next_test_ip4(); + let maybe_chan = (*io).tcp_connect(addr); + assert!(maybe_chan.is_err()); } - - assert!(!result_cell.is_empty()); - return result_cell.take(); } +} - fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.send(buf, dst) |_watcher, status| { - - let result = match status { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)), - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } +#[test] +fn test_simple_udp_io_bind_only() { + do run_in_newsched_task { + unsafe { + let io = Local::unsafe_borrow::(); + let addr = next_test_ip4(); + let maybe_socket = (*io).udp_bind(addr); + assert!(maybe_socket.is_ok()); } - - assert!(!result_cell.is_empty()); - return result_cell.take(); } +} - fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - let r = unsafe { - do multi.to_str().with_c_str |m_addr| { - uvll::udp_set_membership(self.native_handle(), m_addr, - ptr::null(), uvll::UV_JOIN_GROUP) - } +#[test] +fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::sched::{Shutdown, TaskFromFriend}; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; + + let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + sleepers.clone()); + let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + sleepers.clone()); + + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); + let tasksFriendHandle = Cell::new(sched2.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + rtassert!(exit_status); }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } + let test_function: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let addr = next_test_ip4(); + let maybe_socket = unsafe { (*io).udp_bind(addr) }; + // this socket is bound to this event loop + assert!(maybe_socket.is_ok()); - fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - let r = unsafe { - do multi.to_str().with_c_str |m_addr| { - uvll::udp_set_membership(self.native_handle(), m_addr, - ptr::null(), uvll::UV_LEAVE_GROUP) + // block self on sched1 + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // unblock task + do task.wake().map_move |task| { + // send self to sched2 + tasksFriendHandle.take().send(TaskFromFriend(task)); + }; + // sched1 should now sleep since it has nothing else to do } + // sched2 will wake up and get the task + // as we do nothing else, the function ends and the socket goes out of scope + // sched2 will start to run the destructor + // the destructor will first block the task, set it's home as sched1, then enqueue it + // sched2 will dequeue the task, see that it has a home, and send it to sched1 + // sched1 will wake up, exec the close function on the correct loop, and then we're done }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } + let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function); + main_task.death.on_exit = Some(on_exit); + let main_task = Cell::new(main_task); - fn loop_multicast_locally(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_loop(self.native_handle(), 1 as c_int) - }; + let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {}); - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); - fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_loop(self.native_handle(), 0 as c_int) + let thread1 = do Thread::start { + sched1.take().bootstrap(main_task.take()); }; - - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } - - fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_ttl(self.native_handle(), ttl as c_int) + let thread2 = do Thread::start { + sched2.take().bootstrap(null_task.take()); }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } + thread1.join(); + thread2.join(); } +} - fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_ttl(self.native_handle(), ttl as c_int) - }; +#[test] +fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::comm::oneshot; + use rt::sched::Shutdown; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } + let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + sleepers.clone()); + let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + sleepers.clone()); - fn hear_broadcasts(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_broadcast(self.native_handle(), 1 as c_int) - }; + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); - fn ignore_broadcasts(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_broadcast(self.native_handle(), 0 as c_int) + let body1: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let addr = next_test_ip4(); + let socket = unsafe { (*io).udp_bind(addr) }; + assert!(socket.is_ok()); + chan.take().send(socket); }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } -} + let body2: ~fn() = || { + let socket = port.take().recv(); + assert!(socket.is_ok()); + /* The socket goes out of scope and the destructor is called. + * The destructor: + * - sends itself back to sched1 + * - frees the socket + * - resets the home of the task to whatever it was previously + */ + }; -pub struct UvTimer(timer::TimerWatcher); + let on_exit: ~fn(bool) = |exit| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + rtassert!(exit); + }; -impl UvTimer { - fn new(w: timer::TimerWatcher) -> UvTimer { - UvTimer(w) - } -} + let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1)); -impl Drop for UvTimer { - fn drop(&self) { - rtdebug!("closing UvTimer"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } -} + let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2); + task2.death.on_exit = Some(on_exit); + let task2 = Cell::new(task2); -impl RtioTimer for UvTimer { - fn sleep(&self, msecs: u64) { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("sleep: entered scheduler context"); - let task_cell = Cell::new(task); - let mut watcher = **self; - do watcher.start(msecs, 0) |_, status| { - assert!(status.is_none()); - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - let mut w = **self; - w.stop(); - } -} + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); -#[test] -fn test_simple_io_no_connect() { - do run_in_newsched_task { - unsafe { - let io = Local::unsafe_borrow::(); - let addr = next_test_ip4(); - let maybe_chan = (*io).tcp_connect(addr); - assert!(maybe_chan.is_err()); - } - } -} + let thread1 = do Thread::start { + sched1.take().bootstrap(task1.take()); + }; + let thread2 = do Thread::start { + sched2.take().bootstrap(task2.take()); + }; -#[test] -fn test_simple_udp_io_bind_only() { - do run_in_newsched_task { - unsafe { - let io = Local::unsafe_borrow::(); - let addr = next_test_ip4(); - let maybe_socket = (*io).udp_bind(addr); - assert!(maybe_socket.is_ok()); - } + thread1.join(); + thread2.join(); } } @@ -1194,6 +1118,85 @@ fn test_simple_tcp_server_and_client() { } } +#[test] +fn test_simple_tcp_server_and_client_on_diff_threads() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::sched::{Shutdown}; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + + let server_addr = next_test_ip4(); + let client_addr = server_addr.clone(); + + let server_work_queue = WorkQueue::new(); + let client_work_queue = WorkQueue::new(); + let queues = ~[server_work_queue.clone(), client_work_queue.clone()]; + + let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue, + queues.clone(), sleepers.clone()); + let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue, + queues.clone(), sleepers.clone()); + + let server_handle = Cell::new(server_sched.make_handle()); + let client_handle = Cell::new(client_sched.make_handle()); + + let server_on_exit: ~fn(bool) = |exit_status| { + server_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + + let client_on_exit: ~fn(bool) = |exit_status| { + client_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + + let server_fn: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; + let mut stream = listener.accept().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert_eq!(nread, 8); + for i in range(0u, nread) { + assert_eq!(buf[i], i as u8); + } + }; + + let client_fn: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let mut stream = unsafe { (*io).tcp_connect(client_addr) }; + while stream.is_err() { + stream = unsafe { (*io).tcp_connect(client_addr) }; + } + stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); + }; + + let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn); + server_task.death.on_exit = Some(server_on_exit); + let server_task = Cell::new(server_task); + + let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn); + client_task.death.on_exit = Some(client_on_exit); + let client_task = Cell::new(client_task); + + let server_sched = Cell::new(server_sched); + let client_sched = Cell::new(client_sched); + + let server_thread = do Thread::start { + server_sched.take().bootstrap(server_task.take()); + }; + let client_thread = do Thread::start { + client_sched.take().bootstrap(client_task.take()); + }; + + server_thread.join(); + client_thread.join(); + } +} + #[test] fn test_simple_udp_server_and_client() { do run_in_newsched_task { @@ -1410,19 +1413,13 @@ fn test_udp_many_read() { } } -fn test_timer_sleep_simple_impl() { - unsafe { - let io = Local::unsafe_borrow::(); - let timer = (*io).timer_init(); - match timer { - Ok(t) => t.sleep(1), - Err(_) => assert!(false) - } - } -} #[test] fn test_timer_sleep_simple() { do run_in_newsched_task { - test_timer_sleep_simple_impl(); + unsafe { + let io = Local::unsafe_borrow::(); + let timer = (*io).timer_init(); + do timer.map_move |mut t| { t.sleep(1) }; + } } } diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 8ed1287fe0108..0ea2175336ab0 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -288,7 +288,7 @@ pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_ return rust_uv_get_udp_handle_from_send_req(send_req); } -pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int { +pub unsafe fn udp_getsockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int { #[fixed_stack_segment]; #[inline(never)]; return rust_uv_udp_getsockname(handle, name);