Skip to content

Commit

Permalink
Remove the Reactor::run method (#58)
Browse files Browse the repository at this point in the history
This commit removes the `Reactor::run` method which has previously been used to
execute futures and turn the reactor at the same time. The tests/examples made
heavy usage of this method but they have now all temporarily moved to `wait()`
until the futures dependency is upgraded. In the meantime this'll allow us to
further trim down the `Reactor` APIs to their final state.
  • Loading branch information
alexcrichton authored and carllerche committed Dec 12, 2017
1 parent 32f2750 commit a577bfc
Show file tree
Hide file tree
Showing 23 changed files with 133 additions and 247 deletions.
9 changes: 4 additions & 5 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@ use futures::future::Executor;
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;
use tokio_io::io;
use tokio_io::AsyncRead;

fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse().unwrap();

// Create the event loop and TCP listener we'll accept connections on.
let mut core = Reactor::new().unwrap();
let handle = core.handle();
// Create the TCP listener we'll accept connections on.
let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);

Expand Down Expand Up @@ -135,5 +134,5 @@ fn main() {
});

// execute server
core.run(srv).unwrap();
srv.wait().unwrap();
}
7 changes: 3 additions & 4 deletions examples/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures::{Future, Stream, Poll};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Reactor;
use tokio::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use flate2::write::GzEncoder;

Expand All @@ -41,8 +41,7 @@ fn main() {
// reactor.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);

Expand All @@ -64,7 +63,7 @@ fn main() {
Ok(())
});

core.run(server).unwrap();
server.wait().unwrap();
}

/// The main workhorse of this example. This'll compress all data read from
Expand Down
10 changes: 4 additions & 6 deletions examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::thread;
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use futures_cpupool::CpuPool;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;

fn main() {
// Determine if we're going to run in TCP or UDP mode
Expand All @@ -47,9 +47,7 @@ fn main() {
});
let addr = addr.parse::<SocketAddr>().unwrap();

// Create the event loop and initiate the connection to the remote server
let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();

let pool = CpuPool::new(1);

Expand All @@ -76,9 +74,9 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful.
let mut out = io::stdout();
core.run(stdout.for_each(|chunk| {
stdout.for_each(|chunk| {
out.write_all(&chunk)
})).unwrap();
}).wait().unwrap();
}

mod tcp {
Expand Down
41 changes: 15 additions & 26 deletions examples/echo-threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ extern crate tokio;
extern crate tokio_io;

use std::env;
use std::net::{self, SocketAddr};
use std::net::SocketAddr;
use std::thread;

use futures::Future;
use futures::prelude::*;
use futures::future::Executor;
use futures::stream::Stream;
use futures::sync::mpsc;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpStream;
use tokio::reactor::Reactor;
use tokio::net::{TcpStream, TcpListener};
use tokio::reactor::Handle;

fn main() {
// First argument, the address to bind
Expand All @@ -42,9 +41,8 @@ fn main() {
let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
.unwrap_or(num_cpus::get());

// Use `std::net` to bind the requested port, we'll use this on the main
// thread below
let listener = net::TcpListener::bind(&addr).expect("failed to bind");
let handle = Handle::default();
let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
println!("Listening on: {}", addr);

// Spin up our worker threads, creating a channel routing to each worker
Expand All @@ -56,31 +54,22 @@ fn main() {
thread::spawn(|| worker(rx));
}

// Infinitely accept sockets from our `std::net::TcpListener`, as this'll do
// blocking I/O. Each socket is then shipped round-robin to a particular
// thread which will associate the socket with the corresponding event loop
// and process the connection.
// Infinitely accept sockets from our `TcpListener`. Each socket is then
// shipped round-robin to a particular thread which will associate the
// socket with the corresponding event loop and process the connection.
let mut next = 0;
for socket in listener.incoming() {
let socket = socket.expect("failed to accept");
let srv = listener.incoming().for_each(|(socket, _)| {
channels[next].unbounded_send(socket).expect("worker thread died");
next = (next + 1) % channels.len();
}
Ok(())
});
srv.wait().unwrap();
}

fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
let mut core = Reactor::new().unwrap();
let handle = core.handle();

fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
let pool = CpuPool::new(1);

let done = rx.for_each(move |socket| {
// First up when we receive a socket we associate it with our event loop
// using the `TcpStream::from_stream` API. After that the socket is not
// a `tokio::net::TcpStream` meaning it's in nonblocking mode and
// ready to be used with Tokio
let socket = TcpStream::from_std(socket, &handle)
.expect("failed to associate TCP stream");
let addr = socket.peer_addr().expect("failed to get remote address");

// Like the single-threaded `echo` example we split the socket halves
Expand All @@ -101,5 +90,5 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {

Ok(())
});
core.run(done).unwrap();
done.wait().unwrap();
}
13 changes: 5 additions & 8 deletions examples/echo-udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::net::SocketAddr;

use futures::{Future, Poll};
use tokio::net::UdpSocket;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;

struct Server {
socket: UdpSocket,
Expand Down Expand Up @@ -54,18 +54,15 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();

// Create the event loop that will drive this server, and also bind the
// socket we'll be listening to.
let mut l = Reactor::new().unwrap();
let handle = l.handle();
let handle = Handle::default();
let socket = UdpSocket::bind(&addr, &handle).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());

// Next we'll create a future to spawn (the one we defined above) and then
// we'll run the event loop by running the future.
l.run(Server {
// we'll block our current thread waiting on the result of the future
Server {
socket: socket,
buf: vec![0; 1024],
to_send: None,
}).unwrap();
}.wait().unwrap();
}
27 changes: 6 additions & 21 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpListener;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;

fn main() {
// Allow passing an address to listen on as the first argument of this
Expand All @@ -41,17 +41,7 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();

// First up we'll create the event loop that's going to drive this server.
// This is done by creating an instance of the `Reactor` type, tokio-core's
// event loop. Most functions in tokio-core return an `io::Result`, and
// `Reactor::new` is no exception. For this example, though, we're mostly just
// ignoring errors, so we unwrap the return value.
//
// After the event loop is created we acquire a handle to it through the
// `handle` method. With this handle we'll then later be able to create I/O
// objects.
let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();

// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
Expand Down Expand Up @@ -125,13 +115,8 @@ fn main() {
Ok(())
});

// And finally now that we've define what our server is, we run it! We
// didn't actually do much I/O up to this point and this `Reactor::run` method
// is responsible for driving the entire server to completion.
//
// The `run` method will return the result of the future that it's running,
// but in our case the `done` future won't ever finish because a TCP
// listener is never done accepting clients. That basically just means that
// we're going to be running the server until it's killed (e.g. ctrl-c).
core.run(done).unwrap();
// And finally now that we've define what our server is, we run it! Here we
// just need to execute the future we've created and wait for it to complete
// using the standard methods in the `futures` crate.
done.wait().unwrap();
}
10 changes: 5 additions & 5 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ extern crate tokio_io;
use std::env;
use std::net::SocketAddr;

use futures::stream::Stream;
use tokio::reactor::Reactor;
use futures::prelude::*;
use tokio::net::TcpListener;
use tokio::reactor::Handle;

fn main() {
env_logger::init().unwrap();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();

let mut core = Reactor::new().unwrap();
let listener = TcpListener::bind(&addr, &core.handle()).unwrap();
let handle = Handle::default();
let listener = TcpListener::bind(&addr, &handle).unwrap();

let addr = listener.local_addr().unwrap();
println!("Listening for connections on {}", addr);
Expand All @@ -42,5 +42,5 @@ fn main() {
Ok(())
});

core.run(server).unwrap();
server.wait().unwrap();
}
10 changes: 4 additions & 6 deletions examples/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use futures::{Future, Poll};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Reactor;
use tokio::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{copy, shutdown};

Expand All @@ -42,14 +42,12 @@ fn main() {
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>().unwrap();

// Create the event loop that will drive this server.
let mut l = Reactor::new().unwrap();
let handle = l.handle();
let handle = Handle::default();

let pool = CpuPool::new(1);

// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap();
let socket = TcpListener::bind(&listen_addr, &handle).unwrap();
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);

Expand Down Expand Up @@ -97,7 +95,7 @@ fn main() {

Ok(())
});
l.run(done).unwrap();
done.wait().unwrap();
}

// This is a custom type used to have a custom implementation of the
Expand Down
7 changes: 3 additions & 4 deletions examples/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Reactor;
use tokio::reactor::Handle;

fn main() {
env_logger::init().unwrap();
Expand All @@ -40,16 +40,15 @@ fn main() {

let pool = CpuPool::new(1);

let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let server = socket.incoming().for_each(|(socket, addr)| {
println!("got a socket: {}", addr);
pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
Ok(())
});
core.run(server).unwrap();
server.wait().unwrap();
}

fn write(socket: TcpStream) -> IoFuture<()> {
Expand Down
11 changes: 5 additions & 6 deletions examples/tinydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use futures::prelude::*;
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;
use tokio_io::AsyncRead;
use tokio_io::io::{lines, write_all};

Expand All @@ -80,12 +80,11 @@ enum Response {
}

fn main() {
// Parse the address we're going to run this server on, create a `Reactor`, and
// set up our TCP listener to accept connections.
// Parse the address we're going to run this server on
// and set up our TCP listener to accept connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();
let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
println!("Listening on: {}", addr);

Expand Down Expand Up @@ -163,7 +162,7 @@ fn main() {
Ok(())
});

core.run(done).unwrap();
done.wait().unwrap();
}

impl Request {
Expand Down
Loading

0 comments on commit a577bfc

Please sign in to comment.