Skip to content

Commit

Permalink
net: use &self with TcpListener::accept (#2919)
Browse files Browse the repository at this point in the history
Uses the infrastructure added by #2828 to enable switching
`TcpListener::accept` to use `&self`.

This also switches `poll_accept` to use `&self`. While doing introduces
a hazard, `poll_*` style functions are considered low-level. Most users
will use the `async fn` variants which are more misuse-resistant.

TcpListener::incoming() is temporarily removed as it has the same
problem as `TcpSocket::by_ref()` and will be implemented later.
  • Loading branch information
carllerche authored Oct 8, 2020
1 parent 6259893 commit 066965c
Show file tree
Hide file tree
Showing 27 changed files with 201 additions and 174 deletions.
2 changes: 1 addition & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Bind a TCP listener to the socket address.
//
// Note that this is the Tokio TcpListener, which is fully async.
let mut listener = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;

tracing::info!("server running on {}", addr);

Expand Down
2 changes: 1 addition & 1 deletion examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
// above and must be associated with an event loop.
let mut listener = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

loop {
Expand Down
2 changes: 1 addition & 1 deletion examples/print_each_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// above and must be associated with an event loop, so we pass in a handle
// to our event loop. After the socket's created we inform that we're ready
// to go and start accepting connections.
let mut listener = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

loop {
Expand Down
2 changes: 1 addition & 1 deletion examples/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);

let mut listener = TcpListener::bind(listen_addr).await?;
let listener = TcpListener::bind(listen_addr).await?;

while let Ok((inbound, _)) = listener.accept().await {
let transfer = transfer(inbound, server_addr.clone()).map(|r| {
Expand Down
2 changes: 1 addition & 1 deletion examples/tinydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());

let mut listener = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

// Create the shared state of this server that will be shared amongst all
Expand Down
8 changes: 3 additions & 5 deletions examples/tinyhttp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let mut server = TcpListener::bind(&addr).await?;
let mut incoming = server.incoming();
let server = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

while let Some(Ok(stream)) = incoming.next().await {
loop {
let (stream, _) = server.accept().await?;
tokio::spawn(async move {
if let Err(e) = process(stream).await {
println!("failed to process connection; error = {}", e);
}
});
}

Ok(())
}

async fn process(stream: TcpStream) -> Result<(), Box<dyn Error>> {
Expand Down
67 changes: 56 additions & 11 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ cfg_io_readiness! {

#[derive(Debug, Default)]
struct Waiters {
#[cfg(any(feature = "udp", feature = "uds"))]
#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
/// List of all current waiters
list: WaitList,

Expand Down Expand Up @@ -186,33 +186,78 @@ impl ScheduledIo {
}
}

/// Notifies all pending waiters that have registered interest in `ready`.
///
/// There may be many waiters to notify. Waking the pending task **must** be
/// done from outside of the lock otherwise there is a potential for a
/// deadlock.
///
/// A stack array of wakers is created and filled with wakers to notify, the
/// lock is released, and the wakers are notified. Because there may be more
/// than 32 wakers to notify, if the stack array fills up, the lock is
/// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr = 0;

let mut waiters = self.waiters.lock();

// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
waker.wake();
wakers[curr] = Some(waker);
curr += 1;
}
}

// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
waker.wake();
wakers[curr] = Some(waker);
curr += 1;
}
}

#[cfg(any(feature = "udp", feature = "uds"))]
{
// check list of waiters
for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) {
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
waker.wake();
#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));

while curr < NUM_WAKERS {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };

if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
wakers[curr] = Some(waker);
curr += 1;
}
}
None => {
break 'outer;
}
}
}

drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}

curr = 0;

// Acquire the lock again.
waiters = self.waiters.lock();
}

// Release the lock before notifying
drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}
}

Expand Down
15 changes: 13 additions & 2 deletions tokio/src/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,19 @@ impl Registration {
cfg_io_readiness! {
impl Registration {
pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
// TODO: does this need to return a `Result`?
Ok(self.shared.readiness(interest).await)
use std::future::Future;
use std::pin::Pin;

let fut = self.shared.readiness(interest);
pin!(fut);

crate::future::poll_fn(|cx| {
if self.handle.inner().is_none() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
}

Pin::new(&mut fut).poll(cx).map(Ok)
}).await
}
}
}
2 changes: 1 addition & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ macro_rules! cfg_not_io_driver {
macro_rules! cfg_io_readiness {
($($item:item)*) => {
$(
#[cfg(any(feature = "udp", feature = "uds"))]
#[cfg(any(feature = "udp", feature = "uds", feature = "tcp"))]
$item
)*
}
Expand Down
42 changes: 0 additions & 42 deletions tokio/src/net/tcp/incoming.rs

This file was deleted.

75 changes: 19 additions & 56 deletions tokio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::future::poll_fn;
use crate::io::PollEvented;
use crate::net::tcp::{Incoming, TcpStream};
use crate::net::tcp::TcpStream;
use crate::net::{to_socket_addrs, ToSocketAddrs};

use std::convert::TryFrom;
Expand Down Expand Up @@ -40,7 +39,7 @@ cfg_tcp! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
Expand Down Expand Up @@ -171,7 +170,7 @@ impl TcpListener {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// match listener.accept().await {
/// Ok((_socket, addr)) => println!("new client: {:?}", addr),
Expand All @@ -181,18 +180,25 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
poll_fn(|cx| self.poll_accept(cx)).await
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (mio, addr) = self
.io
.async_io(mio::Interest::READABLE, |sock| sock.accept())
.await?;

let stream = TcpStream::new(mio)?;
Ok((stream, addr))
}

/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and
/// the current task will be notified by a waker.
pub fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
/// If there is no connection to accept, `Poll::Pending` is returned and the
/// current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
/// The caller is responsble to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

Expand Down Expand Up @@ -293,46 +299,6 @@ impl TcpListener {
self.io.get_ref().local_addr()
}

/// Returns a stream over the connections being received on this listener.
///
/// Note that `TcpListener` also directly implements `Stream`.
///
/// The returned stream will never return `None` and will also not yield the
/// peer's `SocketAddr` structure. Iterating over it is equivalent to
/// calling accept in a loop.
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
/// of them are necessarily fatal ‒ for example having too many open file
/// descriptors or the other side closing the connection while it waits in
/// an accept queue. These would terminate the stream if not handled in any
/// way.
///
/// # Examples
///
/// ```no_run
/// use tokio::{net::TcpListener, stream::StreamExt};
///
/// #[tokio::main]
/// async fn main() {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
/// let mut incoming = listener.incoming();
///
/// while let Some(stream) = incoming.next().await {
/// match stream {
/// Ok(stream) => {
/// println!("new client!");
/// }
/// Err(e) => { /* connection failed */ }
/// }
/// }
/// }
/// ```
pub fn incoming(&mut self) -> Incoming<'_> {
Incoming::new(self)
}

/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`].
Expand Down Expand Up @@ -390,10 +356,7 @@ impl TcpListener {
impl crate::stream::Stream for TcpListener {
type Item = io::Result<TcpStream>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
Expand Down
4 changes: 0 additions & 4 deletions tokio/src/net/tcp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
//! TCP utility types
pub(crate) mod listener;
pub(crate) use listener::TcpListener;

mod incoming;
pub use incoming::Incoming;

pub(crate) mod socket;

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ cfg_tcp! {
/// traits. Examples import these traits through [the prelude].
///
/// [`connect`]: method@TcpStream::connect
/// [accepting]: method@super::TcpListener::accept
/// [listener]: struct@super::TcpListener
/// [accepting]: method@crate::net::TcpListener::accept
/// [listener]: struct@crate::net::TcpListener
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
/// [the prelude]: crate::prelude
Expand Down
Loading

0 comments on commit 066965c

Please sign in to comment.