Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the Reactor::run method #58

Merged
merged 1 commit into from
Dec 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@ use futures::future::Executor;
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;
use tokio_io::io;
use tokio_io::AsyncRead;

fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse().unwrap();

// Create the event loop and TCP listener we'll accept connections on.
let mut core = Reactor::new().unwrap();
let handle = core.handle();
// Create the TCP listener we'll accept connections on.
let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);

Expand Down Expand Up @@ -135,5 +134,5 @@ fn main() {
});

// execute server
core.run(srv).unwrap();
srv.wait().unwrap();
}
7 changes: 3 additions & 4 deletions examples/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures::{Future, Stream, Poll};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Reactor;
use tokio::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use flate2::write::GzEncoder;

Expand All @@ -41,8 +41,7 @@ fn main() {
// reactor.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);

Expand All @@ -64,7 +63,7 @@ fn main() {
Ok(())
});

core.run(server).unwrap();
server.wait().unwrap();
}

/// The main workhorse of this example. This'll compress all data read from
Expand Down
10 changes: 4 additions & 6 deletions examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::thread;
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use futures_cpupool::CpuPool;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;

fn main() {
// Determine if we're going to run in TCP or UDP mode
Expand All @@ -47,9 +47,7 @@ fn main() {
});
let addr = addr.parse::<SocketAddr>().unwrap();

// Create the event loop and initiate the connection to the remote server
let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();

let pool = CpuPool::new(1);

Expand All @@ -76,9 +74,9 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful.
let mut out = io::stdout();
core.run(stdout.for_each(|chunk| {
stdout.for_each(|chunk| {
out.write_all(&chunk)
})).unwrap();
}).wait().unwrap();
}

mod tcp {
Expand Down
41 changes: 15 additions & 26 deletions examples/echo-threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ extern crate tokio;
extern crate tokio_io;

use std::env;
use std::net::{self, SocketAddr};
use std::net::SocketAddr;
use std::thread;

use futures::Future;
use futures::prelude::*;
use futures::future::Executor;
use futures::stream::Stream;
use futures::sync::mpsc;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpStream;
use tokio::reactor::Reactor;
use tokio::net::{TcpStream, TcpListener};
use tokio::reactor::Handle;

fn main() {
// First argument, the address to bind
Expand All @@ -42,9 +41,8 @@ fn main() {
let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
.unwrap_or(num_cpus::get());

// Use `std::net` to bind the requested port, we'll use this on the main
// thread below
let listener = net::TcpListener::bind(&addr).expect("failed to bind");
let handle = Handle::default();
let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
println!("Listening on: {}", addr);

// Spin up our worker threads, creating a channel routing to each worker
Expand All @@ -56,31 +54,22 @@ fn main() {
thread::spawn(|| worker(rx));
}

// Infinitely accept sockets from our `std::net::TcpListener`, as this'll do
// blocking I/O. Each socket is then shipped round-robin to a particular
// thread which will associate the socket with the corresponding event loop
// and process the connection.
// Infinitely accept sockets from our `TcpListener`. Each socket is then
// shipped round-robin to a particular thread which will associate the
// socket with the corresponding event loop and process the connection.
let mut next = 0;
for socket in listener.incoming() {
let socket = socket.expect("failed to accept");
let srv = listener.incoming().for_each(|(socket, _)| {
channels[next].unbounded_send(socket).expect("worker thread died");
next = (next + 1) % channels.len();
}
Ok(())
});
srv.wait().unwrap();
}

fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
let mut core = Reactor::new().unwrap();
let handle = core.handle();

fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
let pool = CpuPool::new(1);

let done = rx.for_each(move |socket| {
// First up when we receive a socket we associate it with our event loop
// using the `TcpStream::from_stream` API. After that the socket is not
// a `tokio::net::TcpStream` meaning it's in nonblocking mode and
// ready to be used with Tokio
let socket = TcpStream::from_std(socket, &handle)
.expect("failed to associate TCP stream");
let addr = socket.peer_addr().expect("failed to get remote address");

// Like the single-threaded `echo` example we split the socket halves
Expand All @@ -101,5 +90,5 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {

Ok(())
});
core.run(done).unwrap();
done.wait().unwrap();
}
13 changes: 5 additions & 8 deletions examples/echo-udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::net::SocketAddr;

use futures::{Future, Poll};
use tokio::net::UdpSocket;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;

struct Server {
socket: UdpSocket,
Expand Down Expand Up @@ -54,18 +54,15 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();

// Create the event loop that will drive this server, and also bind the
// socket we'll be listening to.
let mut l = Reactor::new().unwrap();
let handle = l.handle();
let handle = Handle::default();
let socket = UdpSocket::bind(&addr, &handle).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());

// Next we'll create a future to spawn (the one we defined above) and then
// we'll run the event loop by running the future.
l.run(Server {
// we'll block our current thread waiting on the result of the future
Server {
socket: socket,
buf: vec![0; 1024],
to_send: None,
}).unwrap();
}.wait().unwrap();
}
27 changes: 6 additions & 21 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpListener;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;

fn main() {
// Allow passing an address to listen on as the first argument of this
Expand All @@ -41,17 +41,7 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();

// First up we'll create the event loop that's going to drive this server.
// This is done by creating an instance of the `Reactor` type, tokio-core's
// event loop. Most functions in tokio-core return an `io::Result`, and
// `Reactor::new` is no exception. For this example, though, we're mostly just
// ignoring errors, so we unwrap the return value.
//
// After the event loop is created we acquire a handle to it through the
// `handle` method. With this handle we'll then later be able to create I/O
// objects.
let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();

// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
Expand Down Expand Up @@ -125,13 +115,8 @@ fn main() {
Ok(())
});

// And finally now that we've define what our server is, we run it! We
// didn't actually do much I/O up to this point and this `Reactor::run` method
// is responsible for driving the entire server to completion.
//
// The `run` method will return the result of the future that it's running,
// but in our case the `done` future won't ever finish because a TCP
// listener is never done accepting clients. That basically just means that
// we're going to be running the server until it's killed (e.g. ctrl-c).
core.run(done).unwrap();
// And finally now that we've define what our server is, we run it! Here we
// just need to execute the future we've created and wait for it to complete
// using the standard methods in the `futures` crate.
done.wait().unwrap();
}
10 changes: 5 additions & 5 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ extern crate tokio_io;
use std::env;
use std::net::SocketAddr;

use futures::stream::Stream;
use tokio::reactor::Reactor;
use futures::prelude::*;
use tokio::net::TcpListener;
use tokio::reactor::Handle;

fn main() {
env_logger::init().unwrap();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();

let mut core = Reactor::new().unwrap();
let listener = TcpListener::bind(&addr, &core.handle()).unwrap();
let handle = Handle::default();
let listener = TcpListener::bind(&addr, &handle).unwrap();

let addr = listener.local_addr().unwrap();
println!("Listening for connections on {}", addr);
Expand All @@ -42,5 +42,5 @@ fn main() {
Ok(())
});

core.run(server).unwrap();
server.wait().unwrap();
}
10 changes: 4 additions & 6 deletions examples/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use futures::{Future, Poll};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Reactor;
use tokio::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{copy, shutdown};

Expand All @@ -42,14 +42,12 @@ fn main() {
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>().unwrap();

// Create the event loop that will drive this server.
let mut l = Reactor::new().unwrap();
let handle = l.handle();
let handle = Handle::default();

let pool = CpuPool::new(1);

// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap();
let socket = TcpListener::bind(&listen_addr, &handle).unwrap();
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);

Expand Down Expand Up @@ -97,7 +95,7 @@ fn main() {

Ok(())
});
l.run(done).unwrap();
done.wait().unwrap();
}

// This is a custom type used to have a custom implementation of the
Expand Down
7 changes: 3 additions & 4 deletions examples/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Reactor;
use tokio::reactor::Handle;

fn main() {
env_logger::init().unwrap();
Expand All @@ -40,16 +40,15 @@ fn main() {

let pool = CpuPool::new(1);

let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let server = socket.incoming().for_each(|(socket, addr)| {
println!("got a socket: {}", addr);
pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
Ok(())
});
core.run(server).unwrap();
server.wait().unwrap();
}

fn write(socket: TcpStream) -> IoFuture<()> {
Expand Down
11 changes: 5 additions & 6 deletions examples/tinydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use futures::prelude::*;
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio::reactor::Reactor;
use tokio::reactor::Handle;
use tokio_io::AsyncRead;
use tokio_io::io::{lines, write_all};

Expand All @@ -80,12 +80,11 @@ enum Response {
}

fn main() {
// Parse the address we're going to run this server on, create a `Reactor`, and
// set up our TCP listener to accept connections.
// Parse the address we're going to run this server on
// and set up our TCP listener to accept connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut core = Reactor::new().unwrap();
let handle = core.handle();
let handle = Handle::default();
let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
println!("Listening on: {}", addr);

Expand Down Expand Up @@ -163,7 +162,7 @@ fn main() {
Ok(())
});

core.run(done).unwrap();
done.wait().unwrap();
}

impl Request {
Expand Down
Loading