From f5e3b9a99300df4cbeac33dfdf7046b920ee5144 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 5 Dec 2017 10:13:45 -0800 Subject: [PATCH] Remove the `Reactor::run` method This commit removes the `Reactor::run` method which has previously been used to execute futures and turn the reactor at the same time. The tests/examples made heavy usage of this method but they have now all temporarily moved to `wait()` until the futures dependency is upgraded. In the meantime this'll allow us to further trim down the `Reactor` APIs to their final state. --- examples/chat.rs | 9 ++--- examples/compress.rs | 7 ++-- examples/connect.rs | 10 ++--- examples/echo-threads.rs | 41 ++++++++------------ examples/echo-udp.rs | 13 +++---- examples/echo.rs | 27 +++----------- examples/hello.rs | 10 ++--- examples/proxy.rs | 10 ++--- examples/sink.rs | 7 ++-- examples/tinydb.rs | 11 +++--- examples/tinyhttp.rs | 11 +++--- examples/udp-codec.rs | 7 ++-- src/lib.rs | 12 +++--- src/reactor/mod.rs | 81 +++------------------------------------- tests/buffered.rs | 8 ++-- tests/chain.rs | 8 ++-- tests/echo.rs | 8 ++-- tests/limit.rs | 8 ++-- tests/line-frames.rs | 21 +++++------ tests/pipe-hup.rs | 11 +++--- tests/stream-buffered.rs | 8 ++-- tests/tcp.rs | 20 +++++----- tests/udp.rs | 32 ++++++++-------- 23 files changed, 133 insertions(+), 247 deletions(-) diff --git a/examples/chat.rs b/examples/chat.rs index f22d6b6438f..d7d84669911 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -33,7 +33,7 @@ use futures::future::Executor; use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::io; use tokio_io::AsyncRead; @@ -41,9 +41,8 @@ fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse().unwrap(); - // Create the event loop and TCP listener we'll accept connections on. - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + // Create the TCP listener we'll accept connections on. + let handle = Handle::default(); let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); @@ -135,5 +134,5 @@ fn main() { }); // execute server - core.run(srv).unwrap(); + srv.wait().unwrap(); } diff --git a/examples/compress.rs b/examples/compress.rs index d158060f5c7..42cbab8e2d0 100644 --- a/examples/compress.rs +++ b/examples/compress.rs @@ -32,7 +32,7 @@ use futures::{Future, Stream, Poll}; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use flate2::write::GzEncoder; @@ -41,8 +41,7 @@ fn main() { // reactor. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::().unwrap(); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); @@ -64,7 +63,7 @@ fn main() { Ok(()) }); - core.run(server).unwrap(); + server.wait().unwrap(); } /// The main workhorse of this example. This'll compress all data read from diff --git a/examples/connect.rs b/examples/connect.rs index 235da1af531..5cedadfb930 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -28,7 +28,7 @@ use std::thread; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; use futures_cpupool::CpuPool; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; fn main() { // Determine if we're going to run in TCP or UDP mode @@ -47,9 +47,7 @@ fn main() { }); let addr = addr.parse::().unwrap(); - // Create the event loop and initiate the connection to the remote server - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let pool = CpuPool::new(1); @@ -76,9 +74,9 @@ fn main() { // loop. In this case, though, we know it's ok as the event loop isn't // otherwise running anything useful. let mut out = io::stdout(); - core.run(stdout.for_each(|chunk| { + stdout.for_each(|chunk| { out.write_all(&chunk) - })).unwrap(); + }).wait().unwrap(); } mod tcp { diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs index ea3ca36219d..8d428e49a91 100644 --- a/examples/echo-threads.rs +++ b/examples/echo-threads.rs @@ -20,18 +20,17 @@ extern crate tokio; extern crate tokio_io; use std::env; -use std::net::{self, SocketAddr}; +use std::net::SocketAddr; use std::thread; -use futures::Future; +use futures::prelude::*; use futures::future::Executor; -use futures::stream::Stream; use futures::sync::mpsc; use futures_cpupool::CpuPool; use tokio_io::AsyncRead; use tokio_io::io::copy; -use tokio::net::TcpStream; -use tokio::reactor::Reactor; +use tokio::net::{TcpStream, TcpListener}; +use tokio::reactor::Handle; fn main() { // First argument, the address to bind @@ -42,9 +41,8 @@ fn main() { let num_threads = env::args().nth(2).and_then(|s| s.parse().ok()) .unwrap_or(num_cpus::get()); - // Use `std::net` to bind the requested port, we'll use this on the main - // thread below - let listener = net::TcpListener::bind(&addr).expect("failed to bind"); + let handle = Handle::default(); + let listener = TcpListener::bind(&addr, &handle).expect("failed to bind"); println!("Listening on: {}", addr); // Spin up our worker threads, creating a channel routing to each worker @@ -56,31 +54,22 @@ fn main() { thread::spawn(|| worker(rx)); } - // Infinitely accept sockets from our `std::net::TcpListener`, as this'll do - // blocking I/O. Each socket is then shipped round-robin to a particular - // thread which will associate the socket with the corresponding event loop - // and process the connection. + // Infinitely accept sockets from our `TcpListener`. Each socket is then + // shipped round-robin to a particular thread which will associate the + // socket with the corresponding event loop and process the connection. let mut next = 0; - for socket in listener.incoming() { - let socket = socket.expect("failed to accept"); + let srv = listener.incoming().for_each(|(socket, _)| { channels[next].unbounded_send(socket).expect("worker thread died"); next = (next + 1) % channels.len(); - } + Ok(()) + }); + srv.wait().unwrap(); } -fn worker(rx: mpsc::UnboundedReceiver) { - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); - +fn worker(rx: mpsc::UnboundedReceiver) { let pool = CpuPool::new(1); let done = rx.for_each(move |socket| { - // First up when we receive a socket we associate it with our event loop - // using the `TcpStream::from_stream` API. After that the socket is not - // a `tokio::net::TcpStream` meaning it's in nonblocking mode and - // ready to be used with Tokio - let socket = TcpStream::from_std(socket, &handle) - .expect("failed to associate TCP stream"); let addr = socket.peer_addr().expect("failed to get remote address"); // Like the single-threaded `echo` example we split the socket halves @@ -101,5 +90,5 @@ fn worker(rx: mpsc::UnboundedReceiver) { Ok(()) }); - core.run(done).unwrap(); + done.wait().unwrap(); } diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index 0e163efe97a..0bcc38079b7 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -20,7 +20,7 @@ use std::net::SocketAddr; use futures::{Future, Poll}; use tokio::net::UdpSocket; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; struct Server { socket: UdpSocket, @@ -54,18 +54,15 @@ fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::().unwrap(); - // Create the event loop that will drive this server, and also bind the - // socket we'll be listening to. - let mut l = Reactor::new().unwrap(); - let handle = l.handle(); + let handle = Handle::default(); let socket = UdpSocket::bind(&addr, &handle).unwrap(); println!("Listening on: {}", socket.local_addr().unwrap()); // Next we'll create a future to spawn (the one we defined above) and then - // we'll run the event loop by running the future. - l.run(Server { + // we'll block our current thread waiting on the result of the future + Server { socket: socket, buf: vec![0; 1024], to_send: None, - }).unwrap(); + }.wait().unwrap(); } diff --git a/examples/echo.rs b/examples/echo.rs index 07c061c4407..ca081f84e72 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -32,7 +32,7 @@ use futures_cpupool::CpuPool; use tokio_io::AsyncRead; use tokio_io::io::copy; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; fn main() { // Allow passing an address to listen on as the first argument of this @@ -41,17 +41,7 @@ fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::().unwrap(); - // First up we'll create the event loop that's going to drive this server. - // This is done by creating an instance of the `Reactor` type, tokio-core's - // event loop. Most functions in tokio-core return an `io::Result`, and - // `Reactor::new` is no exception. For this example, though, we're mostly just - // ignoring errors, so we unwrap the return value. - // - // After the event loop is created we acquire a handle to it through the - // `handle` method. With this handle we'll then later be able to create I/O - // objects. - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); // Next up we create a TCP listener which will listen for incoming // connections. This TCP listener is bound to the address we determined @@ -125,13 +115,8 @@ fn main() { Ok(()) }); - // And finally now that we've define what our server is, we run it! We - // didn't actually do much I/O up to this point and this `Reactor::run` method - // is responsible for driving the entire server to completion. - // - // The `run` method will return the result of the future that it's running, - // but in our case the `done` future won't ever finish because a TCP - // listener is never done accepting clients. That basically just means that - // we're going to be running the server until it's killed (e.g. ctrl-c). - core.run(done).unwrap(); + // And finally now that we've define what our server is, we run it! Here we + // just need to execute the future we've created and wait for it to complete + // using the standard methods in the `futures` crate. + done.wait().unwrap(); } diff --git a/examples/hello.rs b/examples/hello.rs index 0bff27e929e..5d1c226ec37 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -19,17 +19,17 @@ extern crate tokio_io; use std::env; use std::net::SocketAddr; -use futures::stream::Stream; -use tokio::reactor::Reactor; +use futures::prelude::*; use tokio::net::TcpListener; +use tokio::reactor::Handle; fn main() { env_logger::init().unwrap(); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::().unwrap(); - let mut core = Reactor::new().unwrap(); - let listener = TcpListener::bind(&addr, &core.handle()).unwrap(); + let handle = Handle::default(); + let listener = TcpListener::bind(&addr, &handle).unwrap(); let addr = listener.local_addr().unwrap(); println!("Listening for connections on {}", addr); @@ -42,5 +42,5 @@ fn main() { Ok(()) }); - core.run(server).unwrap(); + server.wait().unwrap(); } diff --git a/examples/proxy.rs b/examples/proxy.rs index 9d77c54f157..03a832048ac 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -31,7 +31,7 @@ use futures::{Future, Poll}; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::{copy, shutdown}; @@ -42,14 +42,12 @@ fn main() { let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); let server_addr = server_addr.parse::().unwrap(); - // Create the event loop that will drive this server. - let mut l = Reactor::new().unwrap(); - let handle = l.handle(); + let handle = Handle::default(); let pool = CpuPool::new(1); // Create a TCP listener which will listen for incoming connections. - let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap(); + let socket = TcpListener::bind(&listen_addr, &handle).unwrap(); println!("Listening on: {}", listen_addr); println!("Proxying to: {}", server_addr); @@ -97,7 +95,7 @@ fn main() { Ok(()) }); - l.run(done).unwrap(); + done.wait().unwrap(); } // This is a custom type used to have a custom implementation of the diff --git a/examples/sink.rs b/examples/sink.rs index 980cb63eebc..48643e05934 100644 --- a/examples/sink.rs +++ b/examples/sink.rs @@ -31,7 +31,7 @@ use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio_io::IoFuture; use tokio::net::{TcpListener, TcpStream}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; fn main() { env_logger::init().unwrap(); @@ -40,8 +40,7 @@ fn main() { let pool = CpuPool::new(1); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); let server = socket.incoming().for_each(|(socket, addr)| { @@ -49,7 +48,7 @@ fn main() { pool.execute(write(socket).or_else(|_| Ok(()))).unwrap(); Ok(()) }); - core.run(server).unwrap(); + server.wait().unwrap(); } fn write(socket: TcpStream) -> IoFuture<()> { diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 9929e369c82..7b5c47d1d31 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -54,7 +54,7 @@ use futures::prelude::*; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::AsyncRead; use tokio_io::io::{lines, write_all}; @@ -80,12 +80,11 @@ enum Response { } fn main() { - // Parse the address we're going to run this server on, create a `Reactor`, and - // set up our TCP listener to accept connections. + // Parse the address we're going to run this server on + // and set up our TCP listener to accept connections. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::().unwrap(); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let listener = TcpListener::bind(&addr, &handle).expect("failed to bind"); println!("Listening on: {}", addr); @@ -163,7 +162,7 @@ fn main() { Ok(()) }); - core.run(done).unwrap(); + done.wait().unwrap(); } impl Request { diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index f5513992d02..00c16fec745 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -31,15 +31,15 @@ use std::net::{self, SocketAddr}; use std::thread; use bytes::BytesMut; -use futures::future; use futures::future::Executor; +use futures::future; use futures::sync::mpsc; use futures::{Stream, Future, Sink}; use futures_cpupool::CpuPool; -use http::{Request, Response, StatusCode}; use http::header::HeaderValue; +use http::{Request, Response, StatusCode}; use tokio::net::TcpStream; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::codec::{Encoder, Decoder}; use tokio_io::{AsyncRead}; @@ -70,8 +70,7 @@ fn main() { } fn worker(rx: mpsc::UnboundedReceiver) { - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let pool = CpuPool::new(1); @@ -92,7 +91,7 @@ fn worker(rx: mpsc::UnboundedReceiver) { })).unwrap(); Ok(()) }); - core.run(done).unwrap(); + done.wait().unwrap(); } /// "Server logic" is implemented in this function. diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 91fde26d036..f60fc108f96 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -18,7 +18,7 @@ use futures::{Future, Stream, Sink}; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{UdpSocket, UdpCodec}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; pub struct LineCodec; @@ -39,8 +39,7 @@ impl UdpCodec for LineCodec { fn main() { drop(env_logger::init()); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let pool = CpuPool::new(1); @@ -79,5 +78,5 @@ fn main() { // Spawn the sender of pongs and then wait for our pinger to finish. pool.execute(b.then(|_| Ok(()))).unwrap(); - drop(core.run(a)); + drop(a.wait()); } diff --git a/src/lib.rs b/src/lib.rs index d4698ca70d1..3cfcdb3f2c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,18 +43,16 @@ //! extern crate tokio; //! extern crate tokio_io; //! -//! use futures::{Future, Stream}; +//! use futures::prelude::*; //! use futures::future::Executor; //! use futures_cpupool::CpuPool; //! use tokio_io::AsyncRead; //! use tokio_io::io::copy; //! use tokio::net::TcpListener; -//! use tokio::reactor::Reactor; +//! use tokio::reactor::Handle; //! //! fn main() { -//! // Create the event loop that will drive this server. -//! let mut core = Reactor::new().unwrap(); -//! let handle = core.handle(); +//! let handle = Handle::default(); //! //! let pool = CpuPool::new_num_cpus(); //! @@ -86,8 +84,8 @@ //! Ok(()) //! }); //! -//! // Spin up the server on the event loop. -//! core.run(server).unwrap(); +//! // Spin up the server on this thread +//! server.wait().unwrap(); //! } //! ``` diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index 7954954fb32..5687b3a50bc 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -28,9 +28,7 @@ use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; use std::sync::{Arc, Weak, RwLock}; use std::time::{Duration}; -use futures::{Future, Async}; -use futures::executor::{self, Notify}; -use futures::task::{AtomicTask}; +use futures::task::AtomicTask; use mio; use mio::event::Evented; use slab::Slab; @@ -53,12 +51,6 @@ pub struct Reactor { /// State shared between the reactor and the handles. inner: Arc, - - /// Used for determining when the future passed to `run` is ready. Once the - /// registration is passed to `io` above we never touch it again, just keep - /// it alive. - _future_registration: mio::Registration, - future_readiness: Arc, } struct Inner { @@ -90,8 +82,7 @@ enum Direction { Write, } -const TOKEN_FUTURE: mio::Token = mio::Token(1); -const TOKEN_START: usize = 2; +const TOKEN_START: usize = 1; fn _assert_kinds() { fn _assert() {} @@ -106,18 +97,8 @@ impl Reactor { // Create the I/O poller let io = try!(mio::Poll::new()); - // Create a registration for unblocking the reactor when the "run" - // future becomes ready. - let future_pair = mio::Registration::new2(); - try!(io.register(&future_pair.0, - TOKEN_FUTURE, - mio::Ready::readable(), - mio::PollOpt::level())); - Ok(Reactor { events: mio::Events::with_capacity(1024), - _future_registration: future_pair.0, - future_readiness: Arc::new(MySetReadiness(future_pair.1)), inner: Arc::new(Inner { io: io, io_dispatch: RwLock::new(Slab::with_capacity(1)), @@ -137,41 +118,6 @@ impl Reactor { } } - /// Runs a future until completion, driving the event loop while we're - /// otherwise waiting for the future to complete. - /// - /// This function will begin executing the event loop and will finish once - /// the provided future is resolved. Note that the future argument here - /// crucially does not require the `'static` nor `Send` bounds. As a result - /// the future will be "pinned" to not only this thread but also this stack - /// frame. - /// - /// This function will return the value that the future resolves to once - /// the future has finished. If the future never resolves then this function - /// will never return. - /// - /// # Panics - /// - /// This method will **not** catch panics from polling the future `f`. If - /// the future panics then it's the responsibility of the caller to catch - /// that panic and handle it as appropriate. - pub fn run(&mut self, f: F) -> Result - where F: Future, - { - let mut task = executor::spawn(f); - let mut future_fired = true; - - loop { - if future_fired { - let res = task.poll_future_notify(&self.future_readiness, 0)?; - if let Async::Ready(e) = res { - return Ok(e) - } - } - future_fired = self.poll(None); - } - } - /// Performs one iteration of the event loop, blocking on waiting for events /// for at most `max_wait` (forever if `None`). /// @@ -184,32 +130,24 @@ impl Reactor { self.poll(max_wait); } - fn poll(&mut self, max_wait: Option) -> bool { + fn poll(&mut self, max_wait: Option) { // Block waiting for an event to happen, peeling out how many events // happened. match self.inner.io.poll(&mut self.events, max_wait) { Ok(_) => {} - Err(ref e) if e.kind() == ErrorKind::Interrupted => return false, + Err(ref e) if e.kind() == ErrorKind::Interrupted => return, // TODO: This should return an io::Result instead of panic. Err(e) => panic!("error in poll: {}", e), } // Process all the events that came in, dispatching appropriately - let mut fired = false; for i in 0..self.events.len() { let event = self.events.get(i).unwrap(); let token = event.token(); trace!("event {:?} {:?}", event.readiness(), event.token()); - if token == TOKEN_FUTURE { - self.future_readiness.0.set_readiness(mio::Ready::empty()).unwrap(); - fired = true; - } else { - self.dispatch(token, event.readiness()); - } + self.dispatch(token, event.readiness()); } - - return fired } fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { @@ -407,15 +345,6 @@ impl fmt::Debug for Handle { } } -struct MySetReadiness(mio::SetReadiness); - -impl Notify for MySetReadiness { - fn notify(&self, _id: usize) { - self.0.set_readiness(mio::Ready::readable()) - .expect("failed to set readiness"); - } -} - fn read_ready() -> mio::Ready { mio::Ready::readable() | platform::hup() } diff --git a/tests/buffered.rs b/tests/buffered.rs index e070a85fe4b..71b75c15db8 100644 --- a/tests/buffered.rs +++ b/tests/buffered.rs @@ -11,7 +11,7 @@ use futures::Future; use futures::stream::Stream; use tokio_io::io::copy; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; macro_rules! t { ($e:expr) => (match $e { @@ -25,8 +25,8 @@ fn echo_server() { const N: usize = 1024; drop(env_logger::init()); - let mut l = t!(Reactor::new()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &handle)); let addr = t!(srv.local_addr()); let msg = "foo bar baz"; @@ -56,7 +56,7 @@ fn echo_server() { copy(a, b) }); - let (amt, _, _) = t!(l.run(copied)); + let (amt, _, _) = t!(copied.wait()); let (expected, t2) = t.join().unwrap(); let actual = t2.join().unwrap(); diff --git a/tests/chain.rs b/tests/chain.rs index fb38ed82496..f0a626dddec 100644 --- a/tests/chain.rs +++ b/tests/chain.rs @@ -10,7 +10,7 @@ use futures::Future; use futures::stream::Stream; use tokio_io::io::read_to_end; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; macro_rules! t { ($e:expr) => (match $e { @@ -21,8 +21,8 @@ macro_rules! t { #[test] fn chain_clients() { - let mut l = t!(Reactor::new()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &handle)); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { @@ -44,7 +44,7 @@ fn chain_clients() { read_to_end(a.chain(b).chain(c), Vec::new()) }); - let (_, data) = t!(l.run(copied)); + let (_, data) = t!(copied.wait()); t.join().unwrap(); assert_eq!(data, b"foo bar baz"); diff --git a/tests/echo.rs b/tests/echo.rs index ed172ad8c72..53d4576bdaa 100644 --- a/tests/echo.rs +++ b/tests/echo.rs @@ -10,7 +10,7 @@ use std::thread; use futures::Future; use futures::stream::Stream; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::AsyncRead; use tokio_io::io::copy; @@ -25,8 +25,8 @@ macro_rules! t { fn echo_server() { drop(env_logger::init()); - let mut l = t!(Reactor::new()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &handle)); let addr = t!(srv.local_addr()); let msg = "foo bar baz"; @@ -46,7 +46,7 @@ fn echo_server() { let halves = client.map(|s| s.0.split()); let copied = halves.and_then(|(a, b)| copy(a, b)); - let (amt, _, _) = t!(l.run(copied)); + let (amt, _, _) = t!(copied.wait()); t.join().unwrap(); assert_eq!(amt, msg.len() as u64 * 1024); diff --git a/tests/limit.rs b/tests/limit.rs index b1b1591eb91..ee92c90be35 100644 --- a/tests/limit.rs +++ b/tests/limit.rs @@ -10,7 +10,7 @@ use futures::Future; use futures::stream::Stream; use tokio_io::io::read_to_end; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; macro_rules! t { ($e:expr) => (match $e { @@ -21,8 +21,8 @@ macro_rules! t { #[test] fn limit() { - let mut l = t!(Reactor::new()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &handle)); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { @@ -38,7 +38,7 @@ fn limit() { read_to_end(a.take(4), Vec::new()) }); - let (_, data) = t!(l.run(copied)); + let (_, data) = t!(copied.wait()); t.join().unwrap(); assert_eq!(data, b"foo "); diff --git a/tests/line-frames.rs b/tests/line-frames.rs index 5ada4936da6..7e3847e6589 100644 --- a/tests/line-frames.rs +++ b/tests/line-frames.rs @@ -13,7 +13,7 @@ use futures::{Future, Stream, Sink}; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::codec::{Encoder, Decoder}; use tokio_io::io::{write_all, read}; use tokio_io::AsyncRead; @@ -55,10 +55,8 @@ impl Encoder for LineCodec { fn echo() { drop(env_logger::init()); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); - let pool = CpuPool::new(1); + let handle = Handle::default(); let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap(); let addr = listener.local_addr().unwrap(); @@ -69,24 +67,23 @@ fn echo() { Ok(()) }); - let handle = core.handle(); pool.execute(srv.map_err(|e| panic!("srv error: {}", e))).unwrap(); let client = TcpStream::connect(&addr, &handle); - let client = core.run(client).unwrap(); - let (client, _) = core.run(write_all(client, b"a\n")).unwrap(); - let (client, buf, amt) = core.run(read(client, vec![0; 1024])).unwrap(); + let client = client.wait().unwrap(); + let (client, _) = write_all(client, b"a\n").wait().unwrap(); + let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); assert_eq!(amt, 2); assert_eq!(&buf[..2], b"a\n"); - let (client, _) = core.run(write_all(client, b"\n")).unwrap(); - let (client, buf, amt) = core.run(read(client, buf)).unwrap(); + let (client, _) = write_all(client, b"\n").wait().unwrap(); + let (client, buf, amt) = read(client, buf).wait().unwrap(); assert_eq!(amt, 1); assert_eq!(&buf[..1], b"\n"); - let (client, _) = core.run(write_all(client, b"b")).unwrap(); + let (client, _) = write_all(client, b"b").wait().unwrap(); client.shutdown(Shutdown::Write).unwrap(); - let (_client, buf, amt) = core.run(read(client, buf)).unwrap(); + let (_client, buf, amt) = read(client, buf).wait().unwrap(); assert_eq!(amt, 1); assert_eq!(&buf[..1], b"b"); } diff --git a/tests/pipe-hup.rs b/tests/pipe-hup.rs index 13aa02c9fd9..a9bf96716b2 100644 --- a/tests/pipe-hup.rs +++ b/tests/pipe-hup.rs @@ -13,10 +13,11 @@ use std::os::unix::io::{AsRawFd, FromRawFd}; use std::thread; use std::time::Duration; +use futures::prelude::*; +use mio::event::Evented; use mio::unix::{UnixReady, EventedFd}; use mio::{PollOpt, Ready, Token}; -use mio::event::Evented; -use tokio::reactor::{Reactor, PollEvented}; +use tokio::reactor::{Handle, PollEvented}; use tokio_io::io::read_to_end; macro_rules! t { @@ -64,7 +65,7 @@ impl Evented for MyFile { fn hup() { drop(env_logger::init()); - let mut l = t!(Reactor::new()); + let handle = Handle::default(); unsafe { let mut pipes = [0; 2]; assert!(libc::pipe(pipes.as_mut_ptr()) != -1, @@ -77,10 +78,10 @@ fn hup() { thread::sleep(Duration::from_millis(100)); }); - let source = PollEvented::new(MyFile::new(read), &l.handle()).unwrap(); + let source = PollEvented::new(MyFile::new(read), &handle).unwrap(); let reader = read_to_end(source, Vec::new()); - let (_, content) = t!(l.run(reader)); + let (_, content) = t!(reader.wait()); assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]); t.join().unwrap(); } diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs index 3146f2a71a3..1f62228c012 100644 --- a/tests/stream-buffered.rs +++ b/tests/stream-buffered.rs @@ -12,7 +12,7 @@ use futures::stream::Stream; use tokio_io::io::copy; use tokio_io::AsyncRead; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; macro_rules! t { ($e:expr) => (match $e { @@ -25,8 +25,8 @@ macro_rules! t { fn echo_server() { drop(env_logger::init()); - let mut l = t!(Reactor::new()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &handle)); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { @@ -50,7 +50,7 @@ fn echo_server() { .take(2) .collect(); - t!(l.run(future)); + t!(future.wait()); t.join().unwrap(); } diff --git a/tests/tcp.rs b/tests/tcp.rs index 7999429b387..4b46661cb35 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -8,7 +8,7 @@ use std::thread; use futures::Future; use futures::stream::Stream; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio::net::{TcpListener, TcpStream}; macro_rules! t { @@ -21,15 +21,15 @@ macro_rules! t { #[test] fn connect() { drop(env_logger::init()); - let mut l = t!(Reactor::new()); + let handle = Handle::default(); let srv = t!(net::TcpListener::bind("127.0.0.1:0")); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { t!(srv.accept()).0 }); - let stream = TcpStream::connect(&addr, &l.handle()); - let mine = t!(l.run(stream)); + let stream = TcpStream::connect(&addr, &handle); + let mine = t!(stream.wait()); let theirs = t.join().unwrap(); assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); @@ -39,8 +39,8 @@ fn connect() { #[test] fn accept() { drop(env_logger::init()); - let mut l = t!(Reactor::new()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &handle)); let addr = t!(srv.local_addr()); let (tx, rx) = channel(); @@ -53,7 +53,7 @@ fn accept() { net::TcpStream::connect(&addr).unwrap() }); - let (mine, _remaining) = t!(l.run(client)); + let (mine, _remaining) = t!(client.wait()); let mine = mine.unwrap(); let theirs = t.join().unwrap(); @@ -64,8 +64,8 @@ fn accept() { #[test] fn accept2() { drop(env_logger::init()); - let mut l = t!(Reactor::new()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &handle)); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { @@ -79,7 +79,7 @@ fn accept2() { }).into_future().map_err(|e| e.0); assert!(rx.try_recv().is_err()); - let (mine, _remaining) = t!(l.run(client)); + let (mine, _remaining) = t!(client.wait()); mine.unwrap(); t.join().unwrap(); } diff --git a/tests/udp.rs b/tests/udp.rs index bf2cd68e962..01cfcfd00ad 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use futures::{Future, Poll, Stream, Sink}; use tokio::net::{UdpSocket, UdpCodec}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; macro_rules! t { ($e:expr) => (match $e { @@ -18,16 +18,16 @@ macro_rules! t { } fn send_messages(send: S, recv: R) { - let mut l = t!(Reactor::new()); - let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &l.handle())); - let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &l.handle())); + let handle = Handle::default(); + let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &handle)); + let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &handle)); let a_addr = t!(a.local_addr()); let b_addr = t!(b.local_addr()); { let send = SendMessage::new(a, send.clone(), b_addr, b"1234"); let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234"); - let (sendt, received) = t!(l.run(send.join(recv))); + let (sendt, received) = t!(send.join(recv).wait()); a = sendt; b = received; } @@ -35,7 +35,7 @@ fn send_messages(send: S, recv: R) { { let send = SendMessage::new(a, send, b_addr, b""); let recv = RecvMessage::new(b, recv, a_addr, b""); - t!(l.run(send.join(recv))); + t!(send.join(recv).wait()); } } @@ -166,16 +166,16 @@ impl Future for RecvMessage { #[test] fn send_dgrams() { - let mut l = t!(Reactor::new()); - let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); - let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &handle)); + let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &handle)); let mut buf = [0u8; 50]; let b_addr = t!(b.local_addr()); { let send = a.send_dgram(&b"4321"[..], b_addr); let recv = b.recv_dgram(&mut buf[..]); - let (sendt, received) = t!(l.run(send.join(recv))); + let (sendt, received) = t!(send.join(recv).wait()); assert_eq!(received.2, 4); assert_eq!(&received.1[..4], b"4321"); a = sendt.0; @@ -185,7 +185,7 @@ fn send_dgrams() { { let send = a.send_dgram(&b""[..], b_addr); let recv = b.recv_dgram(&mut buf[..]); - let received = t!(l.run(send.join(recv))).1; + let received = t!(send.join(recv).wait()).1; assert_eq!(received.2, 0); } } @@ -216,9 +216,9 @@ impl UdpCodec for Codec { #[test] fn send_framed() { - let mut l = t!(Reactor::new()); - let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); - let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let handle = Handle::default(); + let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &handle)); + let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &handle)); let a_addr = t!(a_soc.local_addr()); let b_addr = t!(b_soc.local_addr()); @@ -228,7 +228,7 @@ fn send_framed() { let send = a.send(&b"4567"[..]); let recv = b.into_future().map_err(|e| e.0); - let (sendt, received) = t!(l.run(send.join(recv))); + let (sendt, received) = t!(send.join(recv).wait()); assert_eq!(received.0, Some(())); a_soc = sendt.into_inner(); @@ -241,7 +241,7 @@ fn send_framed() { let send = a.send(&b""[..]); let recv = b.into_future().map_err(|e| e.0); - let received = t!(l.run(send.join(recv))).1; + let received = t!(send.join(recv).wait()).1; assert_eq!(received.0, Some(())); } }