Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp: update API documentation #1392

Merged
merged 1 commit into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tokio-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ Core I/O primitives for asynchronous I/O in Rust.
categories = ["asynchronous"]
publish = false

[features]
util = ["memchr"]

[dependencies]
bytes = "0.4.7"
log = "0.4"
Expand All @@ -32,6 +35,3 @@ pin-utils = "0.1.0-alpha.4"
tokio = { version = "0.2.0", path = "../tokio" }
futures-util-preview = "0.3.0-alpha.17"
tokio-test = { version = "0.2.0", path = "../tokio-test" }

[features]
util = ["memchr"]
42 changes: 28 additions & 14 deletions tokio-reactor/src/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,33 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
/// [`clear_read_ready`].
///
/// ```rust,ignore
/// pub fn poll_accept(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> {
/// let ready = Ready::readable();
/// ```rust
/// use tokio_reactor::PollEvented;
///
/// try_ready!(self.poll_evented.poll_read_ready(ready));
/// use futures_core::ready;
/// use mio::Ready;
/// use mio::net::{TcpStream, TcpListener};
/// use std::io;
/// use std::task::{Context, Poll};
///
/// match self.poll_evented.get_ref().accept_std() {
/// Ok(pair) => Ok(Async::Ready(pair)),
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// self.poll_evented.clear_read_ready(ready);
/// Ok(Async::NotReady)
/// struct MyListener {
/// poll_evented: PollEvented<TcpListener>,
/// }
///
/// impl MyListener {
/// pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> {
/// let ready = Ready::readable();
///
/// ready!(self.poll_evented.poll_read_ready(cx, ready))?;
///
/// match self.poll_evented.get_ref().accept() {
/// Ok((socket, _)) => Poll::Ready(Ok(socket)),
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// self.poll_evented.clear_read_ready(cx, ready);
/// Poll::Pending
/// }
/// Err(e) => Poll::Ready(Err(e)),
/// }
/// Err(e) => Err(e),
/// }
/// }
/// ```
Expand Down Expand Up @@ -209,8 +223,8 @@ where
/// `writable`. HUP is always implicitly included on platforms that support
/// it.
///
/// If the resource is not ready for a read then `Async::NotReady` is
/// returned and the current task is notified once a new event is received.
/// If the resource is not ready for a read then `Poll::Pending` is returned
/// and the current task is notified once a new event is received.
///
/// The I/O resource will remain in a read-ready state until readiness is
/// cleared by calling [`clear_read_ready`].
Expand Down Expand Up @@ -241,8 +255,8 @@ where
/// Clears the I/O resource's read readiness state and registers the current
/// task to be notified once a read readiness event is received.
///
/// After calling this function, `poll_read_ready` will return `NotReady`
/// until a new read readiness event has been received.
/// After calling this function, `poll_read_ready` will return
/// `Poll::Pending` until a new read readiness event has been received.
///
/// The `mask` argument specifies the readiness bits to clear. This may not
/// include `writable` or `hup`.
Expand Down
20 changes: 10 additions & 10 deletions tokio-reactor/src/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,15 @@ impl Registration {
///
/// There are several possible return values:
///
/// * `Ok(Async::Ready(readiness))` means that the I/O resource has received
/// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
/// a new readiness event. The readiness value is included.
///
/// * `Ok(NotReady)` means that no new readiness events have been received
/// * `Poll::Pending` means that no new readiness events have been received
/// since the last call to `poll_read_ready`.
///
/// * `Err(err)` means that the registration has encountered an error. This
/// error either represents a permanent internal error **or** the fact
/// that [`register`] was not called first.
/// * `Poll::Ready(Err(err))` means that the registration has encountered an
/// error. This error either represents a permanent internal error **or**
/// the fact that [`register`] was not called first.
///
/// [`register`]: #method.register
/// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
Expand Down Expand Up @@ -314,15 +314,15 @@ impl Registration {
///
/// There are several possible return values:
///
/// * `Ok(Async::Ready(readiness))` means that the I/O resource has received
/// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
/// a new readiness event. The readiness value is included.
///
/// * `Ok(NotReady)` means that no new readiness events have been received
/// * `Poll::Pending` means that no new readiness events have been received
/// since the last call to `poll_write_ready`.
///
/// * `Err(err)` means that the registration has encountered an error. This
/// error either represents a permanent internal error **or** the fact
/// that [`register`] was not called first.
/// * `Poll::Ready(Err(err))` means that the registration has encountered an
/// error. This error either represents a permanent internal error **or**
/// the fact that [`register`] was not called first.
///
/// [`register`]: #method.register
/// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
Expand Down
155 changes: 35 additions & 120 deletions tokio-tcp/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@ use tokio_reactor::{Handle, PollEvented};
/// # Examples
///
/// ```no_run
/// use futures::stream::Stream;
/// use std::net::SocketAddr;
/// use tokio::net::{TcpListener, TcpStream};
/// #![feature(async_await)]
///
/// fn process_socket(socket: TcpStream) {
/// // ...
/// }
/// use tokio::net::TcpListener;
/// use std::error::Error;
/// # async fn process_socket<T>(socket: T) {}
///
/// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
/// let listener = TcpListener::bind(&addr)?;
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let mut listener = TcpListener::bind(&addr)?;
///
/// // accept connections and process them
/// tokio::run(listener.incoming()
/// .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
/// .for_each(|socket| {
/// loop {
/// let (socket, _) = listener.accept().await?;
/// process_socket(socket);
/// Ok(())
/// })
/// );
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// }
/// }
/// ```
pub struct TcpListener {
io: PollEvented<mio::net::TcpListener>,
Expand All @@ -64,42 +60,38 @@ impl TcpListener {
Ok(TcpListener::new(l))
}

/// Attempt to accept a connection and create a new connected `TcpStream` if
/// successful.
///
/// Note that typically for simple usage it's easier to treat incoming
/// connections as a `Stream` of `TcpStream`s with the `incoming` method
/// below.
///
/// # Return
///
/// On success, returns `Ok(Async::Ready((socket, addr)))`.
///
/// If the listener is not ready to accept, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// notification when the listener becomes ready to accept.
/// Accept a new incoming connection from this listener.
///
/// # Panics
/// This function will yield once a new TCP connection is established. When
/// established, the corresponding [`TcpStream`] and the remote peer's
/// address will be returned.
///
/// This function will panic if called from outside of a task context.
/// [`TcpStream`]: ../struct.TcpStream.html
///
/// # Examples
///
/// ```no_run
/// use std::net::SocketAddr;
/// ```
/// #![feature(async_await)]
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// use tokio::net::TcpListener;
/// use futures::Async;
///
/// let addr = "127.0.0.1:0".parse::<SocketAddr>()?;
/// let addr = "127.0.0.1:8080".parse()?;
/// let mut listener = TcpListener::bind(&addr)?;
/// match listener.poll_accept() {
/// Ok(Async::Ready((_socket, addr))) => println!("listener ready to accept: {:?}", addr),
/// Ok(Async::NotReady) => println!("listener not ready to accept!"),
/// Err(e) => eprintln!("got an error: {}", e),
/// match listener.accept().await {
/// Ok((_socket, addr)) => println!("new client: {:?}", addr),
/// Err(e) => println!("couldn't get client: {:?}", e),
/// }
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// # Ok(())
/// # }
/// ```
pub fn poll_accept(
#[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
use async_util::future::poll_fn;
poll_fn(|cx| self.poll_accept(cx)).await
}

pub(crate) fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
Expand All @@ -111,62 +103,7 @@ impl TcpListener {
Poll::Ready(Ok((io, addr)))
}

/// Accept a new incoming connection from this listener.
///
/// This function will yield once a new TCP connection is established. When
/// established, the corresponding [`TcpStream`] and the remote peer's
/// address will be returned.
///
/// [`TcpStream`]: ../struct.TcpStream.html
///
/// # Examples
///
/// ```
/// unimplemented!();
/// ```
#[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
use async_util::future::poll_fn;
poll_fn(|cx| self.poll_accept(cx)).await
}

/// Attempt to accept a connection and create a new connected `TcpStream` if
/// successful.
///
/// This function is the same as `accept` above except that it returns a
/// `std::net::TcpStream` instead of a `tokio::net::TcpStream`. This in turn
/// can then allow for the TCP stream to be associated with a different
/// reactor than the one this `TcpListener` is associated with.
///
/// # Return
///
/// On success, returns `Ok(Async::Ready((socket, addr)))`.
///
/// If the listener is not ready to accept, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// notification when the listener becomes ready to accept.
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
///
/// # Examples
///
/// ```no_run
/// use std::net::SocketAddr;
/// use tokio::net::TcpListener;
/// use futures::Async;
///
/// let addr = "127.0.0.1:0".parse::<SocketAddr>()?;
/// let mut listener = TcpListener::bind(&addr)?;
/// match listener.poll_accept_std() {
/// Ok(Async::Ready((_socket, addr))) => println!("listener ready to accept: {:?}", addr),
/// Ok(Async::NotReady) => println!("listener not ready to accept!"),
/// Err(e) => eprintln!("got an error: {}", e),
/// }
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// ```
pub fn poll_accept_std(
fn poll_accept_std(
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> {
Expand Down Expand Up @@ -267,28 +204,6 @@ impl TcpListener {
/// necessarily fatal ‒ for example having too many open file descriptors or the other side
/// closing the connection while it waits in an accept queue. These would terminate the stream
/// if not handled in any way.
///
/// If aiming for production, decision what to do about them must be made. The
/// [`tk-listen`](https://crates.io/crates/tk-listen) crate might be of some help.
///
/// # Examples
///
/// ```
/// use tokio::net::TcpListener;
/// use futures::stream::Stream;
/// use std::net::SocketAddr;
///
/// let addr = "127.0.0.1:0".parse::<SocketAddr>()?;
/// let listener = TcpListener::bind(&addr)?;
///
/// listener.incoming()
/// .map_err(|e| eprintln!("failed to accept stream; error = {:?}", e))
/// .for_each(|_socket| {
/// println!("new socket!");
/// Ok(())
/// });
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// ```
#[cfg(feature = "async-traits")]
pub fn incoming(self) -> Incoming {
Incoming::new(self)
Expand Down
Loading