From 58b2ff9f564833f5f4fa077a5708c139738dad8e Mon Sep 17 00:00:00 2001 From: Eric Reed Date: Tue, 27 Aug 2013 10:01:17 -0700 Subject: [PATCH] Split out starting a listener from accepting incoming connections. The Listener trait takes two type parameters, the type of connection and the type of Acceptor, and specifies only one method, listen, which consumes the listener and produces an Acceptor. The Acceptor trait takes one type parameter, the type of connection, and defines two methods. The accept() method waits for an incoming connection attempt and returns the result. The incoming() method creates an iterator over incoming connections and is a default method. Example: let listener = TcpListener.bind(addr); // Bind to a socket let acceptor = listener.listen(); // Start the listener for stream in acceptor.incoming() { // Process incoming connections forever (or until you break out of the loop) } --- src/libstd/rt/io/mod.rs | 38 ++++++++++-- src/libstd/rt/io/net/tcp.rs | 106 +++++++++++++++++--------------- src/libstd/rt/io/net/unix.rs | 8 ++- src/libstd/rt/io/option.rs | 20 ++++-- src/libstd/rt/rtio.rs | 5 ++ src/libstd/rt/uv/uvio.rs | 116 ++++++++++++++++++++--------------- 6 files changed, 181 insertions(+), 112 deletions(-) diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index 116d240308a36..c56b20453e5bb 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -474,17 +474,43 @@ pub trait Seek { fn seek(&mut self, pos: i64, style: SeekStyle); } -/// A listener is a value that listens for connections -pub trait Listener { - /// Wait for and accept an incoming connection - /// - /// Returns `None` on timeout. +/// A listener is a value that can consume itself to start listening for connections. +/// Doing so produces some sort of Acceptor. +pub trait Listener> { + /// Spin up the listener and start queueing incoming connections /// /// # Failure /// /// Raises `io_error` condition. If the condition is handled, + /// then `listen` returns `None`. + fn listen(self) -> Option; +} + +/// An acceptor is a value that presents incoming connections +pub trait Acceptor { + /// Wait for and accept an incoming connection + /// + /// # Failure + /// Raise `io_error` condition. If the condition is handled, /// then `accept` returns `None`. - fn accept(&mut self) -> Option; + fn accept(&mut self) -> Option; + + /// Create an iterator over incoming connections + fn incoming<'r>(&'r mut self) -> IncomingIterator<'r, Self> { + IncomingIterator { inc: self } + } +} + +/// An infinite iterator over incoming connection attempts. +/// Calling `next` will block the task until a connection is attempted. +struct IncomingIterator<'self, A> { + priv inc: &'self mut A, +} + +impl<'self, T, A: Acceptor> Iterator for IncomingIterator<'self, A> { + fn next(&mut self) -> Option { + self.inc.accept() + } } /// Common trait for decorator types. diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index ce66cd0de59c2..b7cb703eb2547 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -11,12 +11,13 @@ use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; -use rt::io::{Reader, Writer, Listener}; +use rt::io::{Reader, Writer, Listener, Acceptor}; use rt::io::{io_error, read_error, EndOfFile}; use rt::rtio::{IoFactory, IoFactoryObject, - RtioSocket, RtioTcpListener, - RtioTcpListenerObject, RtioTcpStream, - RtioTcpStreamObject}; + RtioSocket, + RtioTcpListener, RtioTcpListenerObject, + RtioTcpAcceptor, RtioTcpAcceptorObject, + RtioTcpStream, RtioTcpStreamObject}; use rt::local::Local; pub struct TcpStream(~RtioTcpStreamObject); @@ -124,13 +125,27 @@ impl TcpListener { } } -impl Listener for TcpListener { +impl Listener for TcpListener { + fn listen(self) -> Option { + match (**self).listen() { + Ok(acceptor) => Some(TcpAcceptor(acceptor)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } +} + +pub struct TcpAcceptor(~RtioTcpAcceptorObject); + +impl Acceptor for TcpAcceptor { fn accept(&mut self) -> Option { match (**self).accept() { Ok(s) => Some(TcpStream::new(s)), Err(ioerr) => { io_error::cond.raise(ioerr); - return None; + None } } } @@ -184,8 +199,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); @@ -204,8 +219,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); @@ -224,8 +239,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -244,8 +259,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -265,8 +280,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -288,8 +303,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -311,8 +326,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let buf = [0]; loop { let mut stop = false; @@ -341,8 +356,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let buf = [0]; loop { let mut stop = false; @@ -371,9 +386,8 @@ mod test { let max = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - do max.times { - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + for ref mut stream in acceptor.incoming().take(max) { let mut buf = [0]; stream.read(buf); assert_eq!(buf[0], 99); @@ -396,9 +410,8 @@ mod test { let max = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - do max.times { - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + for ref mut stream in acceptor.incoming().take(max) { let mut buf = [0]; stream.read(buf); assert_eq!(buf[0], 99); @@ -421,10 +434,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for i in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask { let mut stream = stream.take(); @@ -460,10 +472,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for i in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask { let mut stream = stream.take(); @@ -499,10 +510,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for _ in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for stream in acceptor.incoming().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask_later { let mut stream = stream.take(); @@ -537,10 +547,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for _ in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for stream in acceptor.incoming().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask_later { let mut stream = stream.take(); @@ -573,10 +582,7 @@ mod test { fn socket_name(addr: SocketAddr) { do run_in_newsched_task { do spawntask { - let listener = TcpListener::bind(addr); - - assert!(listener.is_some()); - let mut listener = listener.unwrap(); + let mut listener = TcpListener::bind(addr).unwrap(); // Make sure socket_name gives // us the socket we binded to. @@ -592,9 +598,9 @@ mod test { fn peer_name(addr: SocketAddr) { do run_in_newsched_task { do spawntask { - let mut listener = TcpListener::bind(addr); + let mut acceptor = TcpListener::bind(addr).listen(); - listener.accept(); + acceptor.accept(); } do spawntask { diff --git a/src/libstd/rt/io/net/unix.rs b/src/libstd/rt/io/net/unix.rs index b85b7dd059d82..1771a963ba78c 100644 --- a/src/libstd/rt/io/net/unix.rs +++ b/src/libstd/rt/io/net/unix.rs @@ -40,6 +40,12 @@ impl UnixListener { } } -impl Listener for UnixListener { +impl Listener for UnixListener { + fn listen(self) -> Option { fail!() } +} + +pub struct UnixAcceptor; + +impl Acceptor for UnixAcceptor { fn accept(&mut self) -> Option { fail!() } } diff --git a/src/libstd/rt/io/option.rs b/src/libstd/rt/io/option.rs index 7dadc653e6cc9..098433f299c17 100644 --- a/src/libstd/rt/io/option.rs +++ b/src/libstd/rt/io/option.rs @@ -17,7 +17,7 @@ //! # XXX Seek and Close use option::*; -use super::{Reader, Writer, Listener}; +use super::{Reader, Writer, Listener, Acceptor}; use super::{standard_error, PreviousIoError, io_error, read_error, IoError}; fn prev_io_error() -> IoError { @@ -62,10 +62,22 @@ impl Reader for Option { } } -impl, S> Listener for Option { - fn accept(&mut self) -> Option { +impl, L: Listener> Listener for Option { + fn listen(self) -> Option { + match self { + Some(listener) => listener.listen(), + None => { + io_error::cond.raise(prev_io_error()); + None + } + } + } +} + +impl> Acceptor for Option { + fn accept(&mut self) -> Option { match *self { - Some(ref mut listener) => listener.accept(), + Some(ref mut acceptor) => acceptor.accept(), None => { io_error::cond.raise(prev_io_error()); None diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 1788b7a04e334..6f1b33d1e219f 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -26,6 +26,7 @@ pub type EventLoopObject = uvio::UvEventLoop; pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; +pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor; pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; @@ -75,6 +76,10 @@ pub trait IoFactory { } pub trait RtioTcpListener : RtioSocket { + fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>; +} + +pub trait RtioTcpAcceptor : RtioSocket { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; fn accept_simultaneously(&mut self) -> Result<(), IoError>; fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index ad9d2a597947e..e37dfba0cc192 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -599,9 +599,7 @@ impl IoFactory for UvIoFactory { } pub struct UvTcpListener { - watcher: TcpWatcher, - listening: bool, - incoming_streams: Tube>, + watcher : TcpWatcher, home: SchedHandle, } @@ -611,15 +609,8 @@ impl HomingIO for UvTcpListener { impl UvTcpListener { fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener { - UvTcpListener { - watcher: watcher, - listening: false, - incoming_streams: Tube::new(), - home: home, - } + UvTcpListener { watcher: watcher, home: home } } - - fn watcher(&self) -> TcpWatcher { self.watcher } } impl Drop for UvTcpListener { @@ -628,10 +619,10 @@ impl Drop for UvTcpListener { let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) }; do self_.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.watcher().as_stream().close { + let task = Cell::new(task); + do self_.watcher.as_stream().close { let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task.take()); } } } @@ -641,50 +632,71 @@ impl Drop for UvTcpListener { impl RtioSocket for UvTcpListener { fn socket_name(&mut self) -> Result { do self.home_for_io |self_| { - socket_name(Tcp, self_.watcher) + socket_name(Tcp, self_.watcher) } } } impl RtioTcpListener for UvTcpListener { - - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { - do self.home_for_io |self_| { - - if !self_.listening { - self_.listening = true; - - let incoming_streams_cell = Cell::new(self_.incoming_streams.clone()); - - do self_.watcher().listen |mut server, status| { - let stream = match status { + fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> { + do self.home_for_io_consume |self_| { + let mut acceptor = ~UvTcpAcceptor::new(self_); + let incoming = Cell::new(acceptor.incoming.clone()); + do acceptor.listener.watcher.listen |mut server, status| { + do incoming.with_mut_ref |incoming| { + let inc = match status { Some(_) => Err(standard_error(OtherIoError)), None => { - let client = TcpWatcher::new(&server.event_loop()); - // XXX: needs to be surfaced in interface - server.accept(client.as_stream()); + let inc = TcpWatcher::new(&server.event_loop()); + // first accept call in the callback guarenteed to succeed + server.accept(inc.as_stream()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpStream { watcher: client, home: home }) + Ok(~UvTcpStream { watcher: inc, home: home }) } }; - - let mut incoming_streams = incoming_streams_cell.take(); - incoming_streams.send(stream); - incoming_streams_cell.put_back(incoming_streams); + incoming.send(inc); } + }; + Ok(acceptor) + } + } +} - } - self_.incoming_streams.recv() +pub struct UvTcpAcceptor { + listener: UvTcpListener, + incoming: Tube>, +} + +impl HomingIO for UvTcpAcceptor { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } +} + +impl UvTcpAcceptor { + fn new(listener: UvTcpListener) -> UvTcpAcceptor { + UvTcpAcceptor { listener: listener, incoming: Tube::new() } + } +} + +impl RtioSocket for UvTcpAcceptor { + fn socket_name(&mut self) -> Result { + do self.home_for_io |self_| { + socket_name(Tcp, self_.listener.watcher) } } +} + +impl RtioTcpAcceptor for UvTcpAcceptor { + fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { + self.incoming.recv() + } fn accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) + uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(self_.listener.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -694,10 +706,10 @@ impl RtioTcpListener for UvTcpListener { fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) + uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(self_.listener.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1440,8 +1452,9 @@ fn test_simple_tcp_server_and_client() { do spawntask { unsafe { let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut listener = (*io).tcp_bind(addr).unwrap(); - let mut stream = listener.accept().unwrap(); + let listener = (*io).tcp_bind(addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert_eq!(nread, 8); @@ -1498,11 +1511,10 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { }; let server_fn: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; - let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; - let mut stream = listener.accept().unwrap(); + let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; + let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert_eq!(nread, 8); @@ -1583,8 +1595,9 @@ fn test_read_and_block() { do spawntask { let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; - let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; - let mut stream = listener.accept().unwrap(); + let listener = unsafe { (*io).tcp_bind(addr).unwrap() }; + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let expected = 32; @@ -1639,8 +1652,9 @@ fn test_read_read_read() { do spawntask { unsafe { let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut listener = (*io).tcp_bind(addr).unwrap(); - let mut stream = listener.accept().unwrap(); + let listener = (*io).tcp_bind(addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let buf = [1, .. 2048]; let mut total_bytes_written = 0; while total_bytes_written < MAX {