diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index 6275eff924988..ac5a118f22a59 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -8,47 +8,100 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use option::{Option}; +use option::{Option, Some, None}; +use result::{Ok, Err}; use rt::io::net::ip::IpAddr; -use rt::io::{Reader, Writer, Listener}; -use rt::rtio::{RtioUdpStreamObject}; +use rt::io::{Reader, Writer}; +use rt::io::{io_error, read_error, EndOfFile}; +use rt::rtio::{RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject}; +use rt::local::Local; + +pub struct UdpSocket { + rtsocket: ~RtioUdpSocketObject +} + +impl UdpSocket { + fn new(s: ~RtioUdpSocketObject) -> UdpSocket { + UdpSocket { rtsocket: s } + } + + pub fn bind(addr: IpAddr) -> Option { + let socket = unsafe { + let io = Local::unsafe_borrow::(); + (*io).udp_bind(addr) + }; + match socket { + Ok(s) => { Some(UdpSocket { rtsocket: s }) } + Err(ioerr) => { + io_error::cond.raise(ioerr); + return None; + } + } + } + + pub fn recvfrom(&self, buf: &mut [u8]) -> Option<(uint, IpAddr)> { + match (*self.rtsocket).recvfrom(buf) { + Ok((nread, src)) => Some((nread, src)), + Err(ioerr) => { + // EOF is indicated by returning None + // XXX do we ever find EOF reading UDP packets? + if ioerr.kind != EndOfFile { + read_error::cond.raise(ioerr); + } + None + } + } + } + + pub fn sendto(&self, buf: &[u8], dst: IpAddr) { + match (*self.rtsocket).sendto(buf, dst) { + Ok(_) => (), + Err(ioerr) => { + io_error::cond.raise(ioerr); + } + } + } + + // XXX convert ~self to self eventually + pub fn connect(~self, other: IpAddr) -> UdpStream { + UdpStream { socket: self, connectedTo: other } + } +} pub struct UdpStream { - rtstream: ~RtioUdpStreamObject + socket: ~UdpSocket, + connectedTo: IpAddr } impl UdpStream { - fn new(s: ~RtioUdpStreamObject) -> UdpStream { - UdpStream { - rtstream: s - } + pub fn as_socket(&self, f: &fn(&UdpSocket) -> T) -> T { + f(self.socket) } - pub fn connect(_addr: IpAddr) -> Option { - fail!() + pub fn disconnect(self) -> ~UdpSocket { + let UdpStream { socket: s, _ } = self; + s } } impl Reader for UdpStream { - fn read(&mut self, _buf: &mut [u8]) -> Option { fail!() } + fn read(&mut self, buf: &mut [u8]) -> Option { + let conn = self.connectedTo; + do self.as_socket |sock| { + sock.recvfrom(buf) + .map_consume(|(nread,src)| if src == conn {nread} else {0}) + } + } fn eof(&mut self) -> bool { fail!() } } impl Writer for UdpStream { - fn write(&mut self, _buf: &[u8]) { fail!() } - - fn flush(&mut self) { fail!() } -} - -pub struct UdpListener; - -impl UdpListener { - pub fn bind(_addr: IpAddr) -> Option { - fail!() + fn write(&mut self, buf: &[u8]) { + do self.as_socket |sock| { + sock.sendto(buf, self.connectedTo); + } } -} -impl Listener for UdpListener { - fn accept(&mut self) -> Option { fail!() } + fn flush(&mut self) { fail!() } } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 0eebbb61e5b56..e38c952f744f0 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; -pub type RtioUdpStreamObject = uvio::UvUdpStream; +pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub trait EventLoop { fn run(&mut self); @@ -45,7 +45,7 @@ pub trait RemoteCallback { pub trait IoFactory { fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>; fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>; - // TODO fn udp_connect(&mut self, addr: IpAddr) -> Result<~RtioUdpStreamObject, IoError>; + fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError>; } pub trait RtioTcpListener { @@ -53,11 +53,11 @@ pub trait RtioTcpListener { } pub trait RtioTcpStream { - fn read(&mut self, buf: &mut [u8]) -> Result; - fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; + fn read(&self, buf: &mut [u8]) -> Result; + fn write(&self, buf: &[u8]) -> Result<(), IoError>; } -pub trait RtioUdpStream { - fn read(&mut self, buf: &mut [u8]) -> Result; - fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; +pub trait RtioUdpSocket { + fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError>; + fn sendto(&self, buf: &[u8], dst: IpAddr) -> Result<(), IoError>; } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index eba84e537f999..828078f48654e 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -25,6 +25,7 @@ use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; use unstable::sync::{Exclusive, exclusive}; +use rt::uv::net::uv_ip4_to_ip4; #[cfg(test)] use container::Container; #[cfg(test)] use uint; @@ -260,6 +261,24 @@ impl IoFactory for UvIoFactory { } } } + + fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError> { + let mut watcher = UdpWatcher::new(self.uv_loop()); + match watcher.bind(addr) { + Ok(_) => Ok(~UvUdpSocket { watcher: watcher }), + Err(uverr) => { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do watcher.close { + let scheduler = Local::take::(); + scheduler.resume_task_immediately(task_cell.take()); + } + } + Err(uv_error_to_io_error(uverr)) + } + } + } } // FIXME #6090: Prefer newtype structs but Drop doesn't work @@ -358,7 +377,7 @@ impl Drop for UvTcpStream { } impl RtioTcpStream for UvTcpStream { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&self, buf: &mut [u8]) -> Result { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; @@ -403,7 +422,7 @@ impl RtioTcpStream for UvTcpStream { return result_cell.take(); } - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + fn write(&self, buf: &[u8]) -> Result<(), IoError> { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); @@ -433,23 +452,21 @@ impl RtioTcpStream for UvTcpStream { } } -pub struct UvUdpStream { - watcher: UdpWatcher, - address: IpAddr +pub struct UvUdpSocket { + watcher: UdpWatcher } -impl UvUdpStream { +impl UvUdpSocket { fn watcher(&self) -> UdpWatcher { self.watcher } - fn address(&self) -> IpAddr { self.address } } -impl Drop for UvUdpStream { +impl Drop for UvUdpSocket { fn finalize(&self) { - rtdebug!("closing udp stream"); + rtdebug!("closing udp socket"); let watcher = self.watcher(); let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell(task); + let task_cell = Cell::new(task); do watcher.close { let scheduler = Local::take::(); scheduler.resume_task_immediately(task_cell.take()); @@ -458,40 +475,31 @@ impl Drop for UvUdpStream { } } -impl RtioUdpStream for UvUdpStream { - fn read(&mut self, buf: &mut [u8]) -> Result { - let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; +impl RtioUdpSocket for UvUdpSocket { + fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); - let connection_address = self.address(); let buf_ptr: *&mut [u8] = &buf; do scheduler.deschedule_running_task_and_then |sched, task| { - rtdebug!("read: entered scheduler context"); + rtdebug!("recvfrom: entered scheduler context"); assert!(!sched.in_task_context()); let mut watcher = watcher; - let task_cell = Cell(task); - // XXX: see note in RtioTcpStream implementation for UvTcpStream - let alloc: AllocCallback = |_| unsafe { - slice_to_uv_buf(*buf_ptr) - }; - do watcher.recv_start(alloc) |watcher, nread, _buf, addr, flags, status| { - let _ = flags; // TODO actually use flags + let task_cell = Cell::new(task); + let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; + do watcher.recv_start(alloc) |watcher, nread, buf, addr, flags, status| { + let _ = flags; // TODO + let _ = buf; // TODO - // XXX: see note in RtioTcpStream implementation for UvTcpStream let mut watcher = watcher; watcher.recv_stop(); - let incoming_address = net::uv_ip4_to_ip4(&addr); let result = if status.is_none() { assert!(nread >= 0); - if incoming_address != connection_address { - Ok(0u) - } else { - Ok(nread as uint) - } + Ok((nread as uint, uv_ip4_to_ip4(&addr))) } else { Err(uv_error_to_io_error(status.unwrap())) }; @@ -505,11 +513,37 @@ impl RtioUdpStream for UvUdpStream { assert!(!result_cell.is_empty()); return result_cell.take(); + } + fn sendto(&self, buf: &[u8], dst: IpAddr) -> Result<(), IoError> { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let mut watcher = watcher; + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + do watcher.send(buf, dst) |watcher, status| { + let _ = watcher; // TODO + + let result = if status.is_none() { + Ok(()) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_task_immediately(task_cell.take()); + } + } - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let _ = buf; - fail!() + assert!(!result_cell.is_empty()); + return result_cell.take(); } } @@ -535,7 +569,7 @@ fn test_simple_tcp_server_and_client() { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); - let mut stream = listener.accept().unwrap(); + let stream = listener.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert_eq!(nread, 8); @@ -549,7 +583,7 @@ fn test_simple_tcp_server_and_client() { do spawntask_immediately { unsafe { let io = Local::unsafe_borrow::(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let stream = (*io).tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); } } @@ -564,7 +598,7 @@ fn test_read_and_block() { do spawntask_immediately { let io = unsafe { Local::unsafe_borrow::() }; let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; - let mut stream = listener.accept().unwrap(); + let stream = listener.accept().unwrap(); let mut buf = [0, .. 2048]; let expected = 32; @@ -597,7 +631,7 @@ fn test_read_and_block() { do spawntask_immediately { unsafe { let io = Local::unsafe_borrow::(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let stream = (*io).tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); @@ -618,7 +652,7 @@ fn test_read_read_read() { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); - let mut stream = listener.accept().unwrap(); + let stream = listener.accept().unwrap(); let buf = [1, .. 2048]; let mut total_bytes_written = 0; while total_bytes_written < MAX { @@ -631,7 +665,7 @@ fn test_read_read_read() { do spawntask_immediately { unsafe { let io = Local::unsafe_borrow::(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let stream = (*io).tcp_connect(addr).unwrap(); let mut buf = [0, .. 2048]; let mut total_bytes_read = 0; while total_bytes_read < MAX {