diff --git a/tokio-io/Cargo.toml b/tokio-io/Cargo.toml index 80df8691318..894f861334b 100644 --- a/tokio-io/Cargo.toml +++ b/tokio-io/Cargo.toml @@ -21,6 +21,9 @@ Core I/O primitives for asynchronous I/O in Rust. categories = ["asynchronous"] publish = false +[features] +util = ["memchr"] + [dependencies] bytes = "0.4.7" log = "0.4" @@ -32,6 +35,3 @@ pin-utils = "0.1.0-alpha.4" tokio = { version = "0.2.0", path = "../tokio" } futures-util-preview = "0.3.0-alpha.17" tokio-test = { version = "0.2.0", path = "../tokio-test" } - -[features] -util = ["memchr"] diff --git a/tokio-reactor/src/poll_evented.rs b/tokio-reactor/src/poll_evented.rs index 2b62d352795..c240384cb23 100644 --- a/tokio-reactor/src/poll_evented.rs +++ b/tokio-reactor/src/poll_evented.rs @@ -52,19 +52,33 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and /// [`clear_read_ready`]. /// -/// ```rust,ignore -/// pub fn poll_accept(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> { -/// let ready = Ready::readable(); +/// ```rust +/// use tokio_reactor::PollEvented; /// -/// try_ready!(self.poll_evented.poll_read_ready(ready)); +/// use futures_core::ready; +/// use mio::Ready; +/// use mio::net::{TcpStream, TcpListener}; +/// use std::io; +/// use std::task::{Context, Poll}; /// -/// match self.poll_evented.get_ref().accept_std() { -/// Ok(pair) => Ok(Async::Ready(pair)), -/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { -/// self.poll_evented.clear_read_ready(ready); -/// Ok(Async::NotReady) +/// struct MyListener { +/// poll_evented: PollEvented, +/// } +/// +/// impl MyListener { +/// pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { +/// let ready = Ready::readable(); +/// +/// ready!(self.poll_evented.poll_read_ready(cx, ready))?; +/// +/// match self.poll_evented.get_ref().accept() { +/// Ok((socket, _)) => Poll::Ready(Ok(socket)), +/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { +/// self.poll_evented.clear_read_ready(cx, ready); +/// Poll::Pending +/// } +/// Err(e) => Poll::Ready(Err(e)), /// } -/// Err(e) => Err(e), /// } /// } /// ``` @@ -209,8 +223,8 @@ where /// `writable`. HUP is always implicitly included on platforms that support /// it. /// - /// If the resource is not ready for a read then `Async::NotReady` is - /// returned and the current task is notified once a new event is received. + /// If the resource is not ready for a read then `Poll::Pending` is returned + /// and the current task is notified once a new event is received. /// /// The I/O resource will remain in a read-ready state until readiness is /// cleared by calling [`clear_read_ready`]. @@ -241,8 +255,8 @@ where /// Clears the I/O resource's read readiness state and registers the current /// task to be notified once a read readiness event is received. /// - /// After calling this function, `poll_read_ready` will return `NotReady` - /// until a new read readiness event has been received. + /// After calling this function, `poll_read_ready` will return + /// `Poll::Pending` until a new read readiness event has been received. /// /// The `mask` argument specifies the readiness bits to clear. This may not /// include `writable` or `hup`. diff --git a/tokio-reactor/src/registration.rs b/tokio-reactor/src/registration.rs index 940b19e6f92..d094e21839e 100644 --- a/tokio-reactor/src/registration.rs +++ b/tokio-reactor/src/registration.rs @@ -263,15 +263,15 @@ impl Registration { /// /// There are several possible return values: /// - /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received + /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received /// a new readiness event. The readiness value is included. /// - /// * `Ok(NotReady)` means that no new readiness events have been received + /// * `Poll::Pending` means that no new readiness events have been received /// since the last call to `poll_read_ready`. /// - /// * `Err(err)` means that the registration has encountered an error. This - /// error either represents a permanent internal error **or** the fact - /// that [`register`] was not called first. + /// * `Poll::Ready(Err(err))` means that the registration has encountered an + /// error. This error either represents a permanent internal error **or** + /// the fact that [`register`] was not called first. /// /// [`register`]: #method.register /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered @@ -314,15 +314,15 @@ impl Registration { /// /// There are several possible return values: /// - /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received + /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received /// a new readiness event. The readiness value is included. /// - /// * `Ok(NotReady)` means that no new readiness events have been received + /// * `Poll::Pending` means that no new readiness events have been received /// since the last call to `poll_write_ready`. /// - /// * `Err(err)` means that the registration has encountered an error. This - /// error either represents a permanent internal error **or** the fact - /// that [`register`] was not called first. + /// * `Poll::Ready(Err(err))` means that the registration has encountered an + /// error. This error either represents a permanent internal error **or** + /// the fact that [`register`] was not called first. /// /// [`register`]: #method.register /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered diff --git a/tokio-tcp/src/listener.rs b/tokio-tcp/src/listener.rs index 66392a699a7..bc961f566ad 100644 --- a/tokio-tcp/src/listener.rs +++ b/tokio-tcp/src/listener.rs @@ -18,26 +18,22 @@ use tokio_reactor::{Handle, PollEvented}; /// # Examples /// /// ```no_run -/// use futures::stream::Stream; -/// use std::net::SocketAddr; -/// use tokio::net::{TcpListener, TcpStream}; +/// #![feature(async_await)] /// -/// fn process_socket(socket: TcpStream) { -/// // ... -/// } +/// use tokio::net::TcpListener; +/// use std::error::Error; +/// # async fn process_socket(socket: T) {} /// -/// let addr = "127.0.0.1:8080".parse::()?; -/// let listener = TcpListener::bind(&addr)?; +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let addr = "127.0.0.1:8080".parse()?; +/// let mut listener = TcpListener::bind(&addr)?; /// -/// // accept connections and process them -/// tokio::run(listener.incoming() -/// .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e)) -/// .for_each(|socket| { +/// loop { +/// let (socket, _) = listener.accept().await?; /// process_socket(socket); -/// Ok(()) -/// }) -/// ); -/// # Ok::<_, Box>(()) +/// } +/// } /// ``` pub struct TcpListener { io: PollEvented, @@ -64,42 +60,38 @@ impl TcpListener { Ok(TcpListener::new(l)) } - /// Attempt to accept a connection and create a new connected `TcpStream` if - /// successful. - /// - /// Note that typically for simple usage it's easier to treat incoming - /// connections as a `Stream` of `TcpStream`s with the `incoming` method - /// below. - /// - /// # Return - /// - /// On success, returns `Ok(Async::Ready((socket, addr)))`. - /// - /// If the listener is not ready to accept, the method returns - /// `Ok(Async::NotReady)` and arranges for the current task to receive a - /// notification when the listener becomes ready to accept. + /// Accept a new incoming connection from this listener. /// - /// # Panics + /// This function will yield once a new TCP connection is established. When + /// established, the corresponding [`TcpStream`] and the remote peer's + /// address will be returned. /// - /// This function will panic if called from outside of a task context. + /// [`TcpStream`]: ../struct.TcpStream.html /// /// # Examples /// - /// ```no_run - /// use std::net::SocketAddr; + /// ``` + /// #![feature(async_await)] + /// + /// # async fn dox() -> Result<(), Box> { /// use tokio::net::TcpListener; - /// use futures::Async; /// - /// let addr = "127.0.0.1:0".parse::()?; + /// let addr = "127.0.0.1:8080".parse()?; /// let mut listener = TcpListener::bind(&addr)?; - /// match listener.poll_accept() { - /// Ok(Async::Ready((_socket, addr))) => println!("listener ready to accept: {:?}", addr), - /// Ok(Async::NotReady) => println!("listener not ready to accept!"), - /// Err(e) => eprintln!("got an error: {}", e), + /// match listener.accept().await { + /// Ok((_socket, addr)) => println!("new client: {:?}", addr), + /// Err(e) => println!("couldn't get client: {:?}", e), /// } - /// # Ok::<_, Box>(()) + /// # Ok(()) + /// # } /// ``` - pub fn poll_accept( + #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 + pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { + use async_util::future::poll_fn; + poll_fn(|cx| self.poll_accept(cx)).await + } + + pub(crate) fn poll_accept( &mut self, cx: &mut Context<'_>, ) -> Poll> { @@ -111,62 +103,7 @@ impl TcpListener { Poll::Ready(Ok((io, addr))) } - /// Accept a new incoming connection from this listener. - /// - /// This function will yield once a new TCP connection is established. When - /// established, the corresponding [`TcpStream`] and the remote peer's - /// address will be returned. - /// - /// [`TcpStream`]: ../struct.TcpStream.html - /// - /// # Examples - /// - /// ``` - /// unimplemented!(); - /// ``` - #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 - pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { - use async_util::future::poll_fn; - poll_fn(|cx| self.poll_accept(cx)).await - } - - /// Attempt to accept a connection and create a new connected `TcpStream` if - /// successful. - /// - /// This function is the same as `accept` above except that it returns a - /// `std::net::TcpStream` instead of a `tokio::net::TcpStream`. This in turn - /// can then allow for the TCP stream to be associated with a different - /// reactor than the one this `TcpListener` is associated with. - /// - /// # Return - /// - /// On success, returns `Ok(Async::Ready((socket, addr)))`. - /// - /// If the listener is not ready to accept, the method returns - /// `Ok(Async::NotReady)` and arranges for the current task to receive a - /// notification when the listener becomes ready to accept. - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - /// - /// # Examples - /// - /// ```no_run - /// use std::net::SocketAddr; - /// use tokio::net::TcpListener; - /// use futures::Async; - /// - /// let addr = "127.0.0.1:0".parse::()?; - /// let mut listener = TcpListener::bind(&addr)?; - /// match listener.poll_accept_std() { - /// Ok(Async::Ready((_socket, addr))) => println!("listener ready to accept: {:?}", addr), - /// Ok(Async::NotReady) => println!("listener not ready to accept!"), - /// Err(e) => eprintln!("got an error: {}", e), - /// } - /// # Ok::<_, Box>(()) - /// ``` - pub fn poll_accept_std( + fn poll_accept_std( &mut self, cx: &mut Context<'_>, ) -> Poll> { @@ -267,28 +204,6 @@ impl TcpListener { /// necessarily fatal ‒ for example having too many open file descriptors or the other side /// closing the connection while it waits in an accept queue. These would terminate the stream /// if not handled in any way. - /// - /// If aiming for production, decision what to do about them must be made. The - /// [`tk-listen`](https://crates.io/crates/tk-listen) crate might be of some help. - /// - /// # Examples - /// - /// ``` - /// use tokio::net::TcpListener; - /// use futures::stream::Stream; - /// use std::net::SocketAddr; - /// - /// let addr = "127.0.0.1:0".parse::()?; - /// let listener = TcpListener::bind(&addr)?; - /// - /// listener.incoming() - /// .map_err(|e| eprintln!("failed to accept stream; error = {:?}", e)) - /// .for_each(|_socket| { - /// println!("new socket!"); - /// Ok(()) - /// }); - /// # Ok::<_, Box>(()) - /// ``` #[cfg(feature = "async-traits")] pub fn incoming(self) -> Incoming { Incoming::new(self) diff --git a/tokio-tcp/src/stream.rs b/tokio-tcp/src/stream.rs index 358a19fa728..9eb6d659256 100644 --- a/tokio-tcp/src/stream.rs +++ b/tokio-tcp/src/stream.rs @@ -30,19 +30,25 @@ use tokio_reactor::{Handle, PollEvented}; /// /// # Examples /// -/// ``` -/// use futures::Future; -/// use tokio::io::AsyncWrite; +/// ```no_run +/// #![feature(async_await)] +/// /// use tokio::net::TcpStream; -/// use std::net::SocketAddr; +/// use tokio::prelude::*; +/// use std::error::Error; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let addr = "127.0.0.1:8080".parse()?; +/// +/// // Connect to a peer +/// let mut stream = TcpStream::connect(&addr).await?; +/// +/// // Write some data. +/// stream.write_all(b"hello world!").await?; /// -/// let addr = "127.0.0.1:34254".parse::()?; -/// let stream = TcpStream::connect(&addr); -/// stream.map(|mut stream| { -/// // Attempt to write bytes asynchronously to the stream -/// stream.poll_write(&[1]); -/// }); -/// # Ok::<_, Box>(()) +/// Ok(()) +/// } /// ``` pub struct TcpStream { io: PollEvented, @@ -71,16 +77,25 @@ impl TcpStream { /// /// # Examples /// - /// ``` - /// use futures::Future; + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; /// - /// let addr = "127.0.0.1:34254".parse::()?; - /// let stream = TcpStream::connect(&addr) - /// .map(|stream| - /// println!("successfully connected to {}", stream.local_addr().unwrap())); - /// # Ok::<_, Box>(()) + /// // Connect to a peer + /// let mut stream = TcpStream::connect(&addr).await?; + /// + /// // Write some data. + /// stream.write_all(b"hello world!").await?; + /// + /// Ok(()) + /// } /// ``` pub fn connect(addr: &SocketAddr) -> impl Future> { use self::ConnectFutureState::*; @@ -108,12 +123,13 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::TcpStream as StdTcpStream; /// use tokio_reactor::Handle; /// - /// let std_stream = StdTcpStream::connect("127.0.0.1:34254")?; + /// # fn dox() -> std::io::Result<()> { + /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; /// let stream = TcpStream::from_std(std_stream, &Handle::default())?; - /// # Ok::<_, Box>(()) + /// # Ok(()) + /// # } /// ``` pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result { let io = mio::net::TcpStream::from_stream(stream)?; @@ -158,111 +174,23 @@ impl TcpStream { ConnectFuture { inner } } - /// Check the TCP stream's read readiness state. - /// - /// The mask argument allows specifying what readiness to notify on. This - /// can be any value, including platform specific readiness, **except** - /// `writable`. HUP is always implicitly included on platforms that support - /// it. - /// - /// If the resource is not ready for a read then `Async::NotReady` is - /// returned and the current task is notified once a new event is received. - /// - /// The stream will remain in a read-ready state until calls to `poll_read` - /// return `NotReady`. - /// - /// # Panics - /// - /// This function panics if: - /// - /// * `ready` includes writable. - /// * called from outside of a task context. + /// Returns the local address that this stream is bound to. /// /// # Examples /// - /// ``` - /// use mio::Ready; - /// use futures::Async; - /// use futures::Future; - /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; - /// - /// let addr = "127.0.0.1:34254".parse::()?; - /// let stream = TcpStream::connect(&addr); - /// - /// stream.map(|stream| { - /// match stream.poll_read_ready(Ready::readable()) { - /// Ok(Async::Ready(_)) => println!("read ready"), - /// Ok(Async::NotReady) => println!("not read ready"), - /// Err(e) => eprintln!("got error: {}", e), - /// } - /// }); - /// # Ok::<_, Box>(()) - /// ``` - pub fn poll_read_ready( - &self, - cx: &mut Context<'_>, - mask: mio::Ready, - ) -> Poll> { - self.io.poll_read_ready(cx, mask) - } - - /// Check the TCP stream's write readiness state. - /// - /// This always checks for writable readiness and also checks for HUP - /// readiness on platforms that support it. - /// - /// If the resource is not ready for a write then `Async::NotReady` is - /// returned and the current task is notified once a new event is received. - /// - /// The I/O resource will remain in a write-ready state until calls to - /// `poll_write` return `NotReady`. - /// - /// # Panics - /// - /// This function panics if called from outside of a task context. - /// - /// # Examples + /// ```no_run + /// #![feature(async_await)] /// - /// ``` - /// use futures::Async; - /// use futures::Future; /// use tokio::net::TcpStream; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:34254".parse::()?; - /// let stream = TcpStream::connect(&addr); - /// - /// stream.map(|stream| { - /// match stream.poll_write_ready() { - /// Ok(Async::Ready(_)) => println!("write ready"), - /// Ok(Async::NotReady) => println!("not write ready"), - /// Err(e) => eprintln!("got error: {}", e), - /// } - /// }); - /// # Ok::<_, Box>(()) - /// ``` - pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.io.poll_write_ready(cx) - } - - /// Returns the local address that this stream is bound to. - /// - /// # Examples + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// ``` - /// use tokio::net::TcpStream; - /// use futures::Future; - /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; - /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); - /// - /// stream.map(|stream| { - /// assert_eq!(stream.local_addr().unwrap(), - /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))); - /// }); - /// # Ok::<_, Box>(()) + /// println!("{:?}", stream.local_addr()?); + /// # Ok(()) + /// # } /// ``` pub fn local_addr(&self) -> io::Result { self.io.get_ref().local_addr() @@ -271,65 +199,25 @@ impl TcpStream { /// Returns the remote address that this stream is connected to. /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; - /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// assert_eq!(stream.peer_addr().unwrap(), - /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))); - /// }); - /// # Ok::<_, Box>(()) + /// println!("{:?}", stream.peer_addr()?); + /// # Ok(()) + /// # } /// ``` pub fn peer_addr(&self) -> io::Result { self.io.get_ref().peer_addr() } - /// Receives data on the socket from the remote address to which it is - /// connected, without removing that data from the queue. On success, - /// returns the number of bytes peeked. - /// - /// Successive calls return the same data. This is accomplished by passing - /// `MSG_PEEK` as a flag to the underlying recv system call. - /// - /// # Return - /// - /// On success, returns `Ok(Async::Ready(num_bytes_read))`. - /// - /// If no data is available for reading, the method returns - /// `Ok(Async::NotReady)` and arranges for the current task to receive a - /// notification when the socket becomes readable or is closed. - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - /// - /// # Examples - /// - /// ``` - /// use tokio::net::TcpStream; - /// use futures::Async; - /// use futures::Future; - /// use std::net::SocketAddr; - /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); - /// - /// stream.map(|mut stream| { - /// let mut buf = [0; 10]; - /// match stream.poll_peek(&mut buf) { - /// Ok(Async::Ready(len)) => println!("read {} bytes", len), - /// Ok(Async::NotReady) => println!("no data available"), - /// Err(e) => eprintln!("got error: {}", e), - /// } - /// }); - /// # Ok::<_, Box>(()) - /// ``` - pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().peek(buf) { @@ -351,8 +239,32 @@ impl TcpStream { /// /// # Examples /// - /// ``` - /// unimplemented!(); + /// ```no_run + /// #![feature(async_await)] + /// + /// use tokio::net::TcpStream; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// + /// // Connect to a peer + /// let mut stream = TcpStream::connect(&addr).await?; + /// + /// let mut b1 = [0; 10]; + /// let mut b2 = [0; 10]; + /// + /// // Peek at the data + /// let n = stream.peek(&mut b1).await?; + /// + /// // Read the data + /// assert_eq!(n, stream.read(&mut b2[..n]).await?); + /// assert_eq!(&b1[..n], &b2[..n]); + /// + /// Ok(()) + /// } /// ``` pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result { poll_fn(|cx| self.poll_peek(cx, buf)).await @@ -366,18 +278,26 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; - /// use std::net::{Shutdown, SocketAddr}; + /// use tokio::prelude::*; + /// use std::error::Error; + /// use std::net::Shutdown; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// // Connect to a peer + /// let mut stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.shutdown(Shutdown::Both) - /// }); - /// # Ok::<_, Box>(()) + /// // Shutdown the stream + /// stream.shutdown(Shutdown::Write)?; + /// + /// Ok(()) + /// } /// ``` pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.io.get_ref().shutdown(how) @@ -391,19 +311,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_nodelay(true).expect("set_nodelay call failed");; - /// assert_eq!(stream.nodelay().unwrap_or(false), true); - /// }); - /// # Ok::<_, Box>(()) + /// println!("{:?}", stream.nodelay()?); + /// # Ok(()) + /// # } /// ``` pub fn nodelay(&self) -> io::Result { self.io.get_ref().nodelay() @@ -419,18 +339,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_nodelay(true).expect("set_nodelay call failed"); - /// }); - /// # Ok::<_, Box>(()) + /// stream.set_nodelay(true)?; + /// # Ok(()) + /// # } /// ``` pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { self.io.get_ref().set_nodelay(nodelay) @@ -444,19 +365,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed"); - /// assert_eq!(stream.recv_buffer_size().unwrap_or(0), 100); - /// }); - /// # Ok::<_, Box>(()) + /// println!("{:?}", stream.recv_buffer_size()?); + /// # Ok(()) + /// # } /// ``` pub fn recv_buffer_size(&self) -> io::Result { self.io.get_ref().recv_buffer_size() @@ -469,18 +390,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed"); - /// }); - /// # Ok::<_, Box>(()) + /// stream.set_recv_buffer_size(100)?; + /// # Ok(()) + /// # } /// ``` pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { self.io.get_ref().set_recv_buffer_size(size) @@ -494,19 +416,28 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// Returns whether keepalive messages are enabled on this socket, and if so + /// the duration of time between them. + /// + /// For more information about this option, see [`set_keepalive`]. + /// + /// [`set_keepalive`]: #tymethod.set_keepalive + /// + /// # Examples + /// + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_send_buffer_size(100).expect("set_send_buffer_size failed"); - /// assert_eq!(stream.send_buffer_size().unwrap_or(0), 100); - /// }); - /// # Ok::<_, Box>(()) + /// println!("{:?}", stream.send_buffer_size()?); + /// # Ok(()) + /// # } /// ``` pub fn send_buffer_size(&self) -> io::Result { self.io.get_ref().send_buffer_size() @@ -519,18 +450,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_send_buffer_size(100).expect("set_send_buffer_size failed"); - /// }); - /// # Ok::<_, Box>(()) + /// stream.set_send_buffer_size(100)?; + /// # Ok(()) + /// # } /// ``` pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { self.io.get_ref().set_send_buffer_size(size) @@ -545,19 +477,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_keepalive(None).expect("set_keepalive failed"); - /// assert_eq!(stream.keepalive().unwrap(), None); - /// }); - /// # Ok::<_, Box>(()) + /// println!("{:?}", stream.keepalive()?); + /// # Ok(()) + /// # } /// ``` pub fn keepalive(&self) -> io::Result> { self.io.get_ref().keepalive() @@ -578,18 +510,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_keepalive(None).expect("set_keepalive failed"); - /// }); - /// # Ok::<_, Box>(()) + /// stream.set_keepalive(None)?; + /// # Ok(()) + /// # } /// ``` pub fn set_keepalive(&self, keepalive: Option) -> io::Result<()> { self.io.get_ref().set_keepalive(keepalive) @@ -603,19 +536,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_ttl(100).expect("set_ttl failed"); - /// assert_eq!(stream.ttl().unwrap_or(0), 100); - /// }); - /// # Ok::<_, Box>(()) + /// println!("{:?}", stream.ttl()?); + /// # Ok(()) + /// # } /// ``` pub fn ttl(&self) -> io::Result { self.io.get_ref().ttl() @@ -628,18 +561,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_ttl(100).expect("set_ttl failed"); - /// }); - /// # Ok::<_, Box>(()) + /// stream.set_ttl(123)?; + /// # Ok(()) + /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.io.get_ref().set_ttl(ttl) @@ -654,19 +588,19 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_linger(None).expect("set_linger failed"); - /// assert_eq!(stream.linger().unwrap(), None); - /// }); - /// # Ok::<_, Box>(()) + /// println!("{:?}", stream.linger()?); + /// # Ok(()) + /// # } /// ``` pub fn linger(&self) -> io::Result> { self.io.get_ref().linger() @@ -686,55 +620,24 @@ impl TcpStream { /// /// # Examples /// - /// ``` + /// ```no_run + /// #![feature(async_await)] + /// /// use tokio::net::TcpStream; - /// use futures::Future; /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); + /// # async fn dox() -> Result<(), Box> { + /// let addr = "127.0.0.1:8080".parse()?; + /// let stream = TcpStream::connect(&addr).await?; /// - /// stream.map(|stream| { - /// stream.set_linger(None).expect("set_linger failed"); - /// }); - /// # Ok::<_, Box>(()) + /// stream.set_linger(None)?; + /// # Ok(()) + /// # } /// ``` pub fn set_linger(&self, dur: Option) -> io::Result<()> { self.io.get_ref().set_linger(dur) } - /// Creates a new independently owned handle to the underlying socket. - /// - /// The returned `TcpStream` is a reference to the same stream that this - /// object references. Both handles will read and write the same stream of - /// data, and options set on one stream will be propagated to the other - /// stream. - /// - /// # Examples - /// - /// ``` - /// use tokio::net::TcpStream; - /// use futures::Future; - /// use std::net::SocketAddr; - /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let stream = TcpStream::connect(&addr); - /// - /// stream.map(|stream| { - /// let clone = stream.try_clone().unwrap(); - /// }); - /// # Ok::<_, Box>(()) - /// ``` - #[deprecated(since = "0.1.14", note = "use `split()` instead")] - #[doc(hidden)] - pub fn try_clone(&self) -> io::Result { - // Rationale for deprecation: - // - https://github.com/tokio-rs/tokio/pull/824 - // - https://github.com/tokio-rs/tokio/issues/774#issuecomment-451059317 - let msg = "`TcpStream::try_clone()` is deprecated because it doesn't work as intended"; - Err(io::Error::new(io::ErrorKind::Other, msg)) - } - /// Split a `TcpStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. ///