Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak the PollEvented::deregister signature #55

Merged
merged 7 commits into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ appveyor = { repository = "alexcrichton/tokio" }
[dependencies]
bytes = "0.4"
log = "0.3"
mio = "0.6.10"
mio = "0.6.11"
slab = "0.4"
iovec = "0.1"
tokio-io = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion examples/echo-threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
// using the `TcpStream::from_stream` API. After that the socket is not
// a `tokio::net::TcpStream` meaning it's in nonblocking mode and
// ready to be used with Tokio
let socket = TcpStream::from_stream(socket, &handle)
let socket = TcpStream::from_std(socket, &handle)
.expect("failed to associate TCP stream");
let addr = socket.peer_addr().expect("failed to get remote address");

Expand Down
2 changes: 1 addition & 1 deletion examples/tinyhttp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
// request/response types instead of bytes. Here we'll just use our
// framing defined below and then use the `send_all` helper to send the
// responses back on the socket after we've processed them
let socket = future::result(TcpStream::from_stream(socket, &handle));
let socket = future::result(TcpStream::from_std(socket, &handle));
let req = socket.and_then(|socket| {
let (tx, rx) = socket.framed(Http).split();
tx.send_all(rx.and_then(respond))
Expand Down
99 changes: 42 additions & 57 deletions src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::time::Duration;

use bytes::{Buf, BufMut};
use futures::stream::Stream;
use futures::sync::oneshot;
use futures::{Future, Poll, Async};
use iovec::IoVec;
use mio;
Expand All @@ -20,7 +19,6 @@ use reactor::{Handle, PollEvented};
/// various forms of processing.
pub struct TcpListener {
io: PollEvented<mio::net::TcpListener>,
pending_accept: Option<oneshot::Receiver<io::Result<(TcpStream, SocketAddr)>>>,
}

/// Stream returned by the `TcpListener::incoming` function representing the
Expand Down Expand Up @@ -59,50 +57,35 @@ impl TcpListener {
/// future's task. It's recommended to only call this from the
/// implementation of a `Future::poll`, if necessary.
pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
loop {
if let Some(mut pending) = self.pending_accept.take() {
match pending.poll().expect("shouldn't be canceled") {
Async::NotReady => {
self.pending_accept = Some(pending);
return Err(io::ErrorKind::WouldBlock.into())
},
Async::Ready(r) => return r,
}
}
let (stream, addr) = self.accept_std()?;
let stream = TcpStream::from_std(stream, self.io.handle())?;
Ok((stream, addr))
}

if let Async::NotReady = self.io.poll_read() {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready"))
}
/// Attempt to accept a connection and create a new connected `TcpStream` if
/// successful.
///
/// This function is the asme as `accept` above except that it returns a
/// `std::net::TcpStream` instead of a `tokio::net::TcpStream`. This in turn
/// can then allow for the TCP stream to be assoiated with a different
/// reactor than the one this `TcpListener` is associated with.
///
/// # Panics
///
/// This function will panic for the same reasons as `accept`, notably if
/// called outside the context of a future.
pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
if let Async::NotReady = self.io.poll_read() {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready"))
}

match self.io.get_ref().accept() {
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_read();
}
return Err(e)
},
Ok((sock, addr)) => {
// Fast path if we haven't left the event loop
if let Some(handle) = self.io.remote().handle() {
let io = try!(PollEvented::new(sock, &handle));
return Ok((TcpStream { io: io }, addr))
}

// If we're off the event loop then send the socket back
// over there to get registered and then we'll get it back
// eventually.
let (tx, rx) = oneshot::channel();
let remote = self.io.remote().clone();
remote.run(move |handle| {
let res = PollEvented::new(sock, handle)
.map(move |io| {
(TcpStream { io: io }, addr)
});
drop(tx.send(res));
});
self.pending_accept = Some(rx);
// continue to polling the `rx` at the beginning of the loop
match self.io.get_ref().accept_std() {
Ok(pair) => Ok(pair),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_read()?;
}
Err(e)
}
}
}
Expand Down Expand Up @@ -134,17 +117,17 @@ impl TcpListener {
/// will only be for the same IP version as `addr` specified. That is, if
/// `addr` is an IPv4 address then all sockets accepted will be IPv4 as
/// well (same for IPv6).
pub fn from_listener(listener: net::TcpListener,
addr: &SocketAddr,
handle: &Handle) -> io::Result<TcpListener> {
let l = try!(mio::net::TcpListener::from_listener(listener, addr));
pub fn from_std(listener: net::TcpListener,
addr: &SocketAddr,
handle: &Handle) -> io::Result<TcpListener> {
let l = mio::net::TcpListener::from_listener(listener, addr)?;
TcpListener::new(l, handle)
}

fn new(listener: mio::net::TcpListener, handle: &Handle)
-> io::Result<TcpListener> {
let io = try!(PollEvented::new(listener, handle));
Ok(TcpListener { io: io, pending_accept: None })
Ok(TcpListener { io: io })
}

/// Test whether this socket is ready to be read or not.
Expand Down Expand Up @@ -283,9 +266,10 @@ impl TcpStream {
/// to a TCP stream ready to be used with the provided event loop handle.
/// The stream returned is associated with the event loop and ready to
/// perform I/O.
pub fn from_stream(stream: net::TcpStream, handle: &Handle)
-> io::Result<TcpStream> {
let inner = try!(mio::net::TcpStream::from_stream(stream));
pub fn from_std(stream: net::TcpStream, handle: &Handle)
-> io::Result<TcpStream>
{
let inner = mio::net::TcpStream::from_stream(stream)?;
Ok(TcpStream {
io: try!(PollEvented::new(inner, handle)),
})
Expand All @@ -309,10 +293,11 @@ impl TcpStream {
/// loop. Note that on Windows you must `bind` a socket before it can be
/// connected, so if a custom `TcpBuilder` is used it should be bound
/// (perhaps to `INADDR_ANY`) before this method is called.
pub fn connect_stream(stream: net::TcpStream,
addr: &SocketAddr,
handle: &Handle)
-> Box<Future<Item=TcpStream, Error=io::Error> + Send> {
pub fn connect_std(stream: net::TcpStream,
addr: &SocketAddr,
handle: &Handle)
-> Box<Future<Item=TcpStream, Error=io::Error> + Send>
{
let state = match mio::net::TcpStream::connect_stream(stream, addr) {
Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => TcpStreamNewState::Error(e),
Expand Down Expand Up @@ -605,7 +590,7 @@ impl<'a> AsyncRead for &'a TcpStream {
Ok(Async::Ready(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read();
self.io.need_read()?;
Ok(Async::NotReady)
}
Err(e) => Err(e),
Expand Down Expand Up @@ -643,7 +628,7 @@ impl<'a> AsyncWrite for &'a TcpStream {
Ok(Async::Ready(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_write();
self.io.need_write()?;
Ok(Async::NotReady)
}
Err(e) => Err(e),
Expand Down
12 changes: 6 additions & 6 deletions src/net/udp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ impl UdpSocket {
/// This can be used in conjunction with net2's `UdpBuilder` interface to
/// configure a socket before it's handed off, such as setting options like
/// `reuse_address` or binding to multiple addresses.
pub fn from_socket(socket: net::UdpSocket,
handle: &Handle) -> io::Result<UdpSocket> {
pub fn from_std(socket: net::UdpSocket,
handle: &Handle) -> io::Result<UdpSocket> {
let udp = try!(mio::net::UdpSocket::from_socket(socket));
UdpSocket::new(udp, handle)
}
Expand Down Expand Up @@ -93,7 +93,7 @@ impl UdpSocket {
Ok(n) => Ok(n),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_write();
self.io.need_write()?;
}
Err(e)
}
Expand All @@ -115,7 +115,7 @@ impl UdpSocket {
Ok(n) => Ok(n),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_read();
self.io.need_read()?;
}
Err(e)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ impl UdpSocket {
Ok(n) => Ok(n),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_write();
self.io.need_write()?;
}
Err(e)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ impl UdpSocket {
Ok(n) => Ok(n),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_read();
self.io.need_read()?;
}
Err(e)
}
Expand Down
22 changes: 12 additions & 10 deletions src/reactor/io_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::io;

use mio::event::Evented;

use reactor::{Remote, Handle, Direction};
use reactor::{Handle, Direction};

/// A token that identifies an active I/O resource.
pub struct IoToken {
token: usize,
handle: Remote,
handle: Handle,
}

impl IoToken {
Expand All @@ -29,19 +29,19 @@ impl IoToken {
/// associated with has gone away, or if there is an error communicating
/// with the event loop.
pub fn new(source: &Evented, handle: &Handle) -> io::Result<IoToken> {
match handle.remote.inner.upgrade() {
match handle.inner.upgrade() {
Some(inner) => {
let token = try!(inner.add_source(source));
let handle = handle.remote().clone();
let handle = handle.clone();

Ok(IoToken { token, handle })
}
None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")),
}
}

/// Returns a reference to the remote handle.
pub fn remote(&self) -> &Remote {
/// Returns a reference to this I/O token's event loop's handle.
pub fn handle(&self) -> &Handle {
&self.handle
}

Expand Down Expand Up @@ -92,13 +92,14 @@ impl IoToken {
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_read(&self) {
pub fn schedule_read(&self) -> io::Result<()> {
let inner = match self.handle.inner.upgrade() {
Some(inner) => inner,
None => return,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};

inner.schedule(self.token, Direction::Read);
Ok(())
}

/// Schedule the current future task to receive a notification when the
Expand All @@ -124,13 +125,14 @@ impl IoToken {
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_write(&self) {
pub fn schedule_write(&self) -> io::Result<()> {
let inner = match self.handle.inner.upgrade() {
Some(inner) => inner,
None => return,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};

inner.schedule(self.token, Direction::Write);
Ok(())
}

/// Unregister all information associated with a token on an event loop,
Expand Down
Loading