Skip to content

Commit

Permalink
tests: handle errors properly in examples (#748)
Browse files Browse the repository at this point in the history
  • Loading branch information
liranringel authored and tobz committed Nov 20, 2018
1 parent 477fa55 commit 9b1a45c
Show file tree
Hide file tree
Showing 19 changed files with 143 additions and 118 deletions.
14 changes: 8 additions & 6 deletions examples/chat-combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ use std::env;
use std::io::{BufReader};
use std::sync::{Arc, Mutex};

fn main() {
fn main() -> Result<(), Box<std::error::Error>> {
// Create the TCP listener we'll accept connections on.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse().unwrap();
let addr = addr.parse()?;

let socket = TcpListener::bind(&addr).unwrap();
let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);

// This is running on the Tokio runtime, so it will be multi-threaded. The
Expand All @@ -49,10 +49,10 @@ fn main() {
// The server task asynchronously iterates over and processes each incoming
// connection.
let srv = socket.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.map_err(|e| {println!("failed to accept socket; error = {:?}", e); e})
.for_each(move |stream| {
// The client's socket address
let addr = stream.peer_addr().unwrap();
let addr = stream.peer_addr()?;

println!("New Connection: {}", addr);

Expand Down Expand Up @@ -143,8 +143,10 @@ fn main() {
}));

Ok(())
});
})
.map_err(|err| println!("error occurred: {:?}", err));

// execute server
tokio::run(srv);
Ok(())
}
7 changes: 4 additions & 3 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,20 +426,20 @@ fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
tokio::spawn(connection);
}

pub fn main() {
pub fn main() -> Result<(), Box<std::error::Error>> {
// Create the shared state. This is how all the peers communicate.
//
// The server task will hold a handle to this. For every new client, the
// `state` handle is cloned and passed into the task that processes the
// client connection.
let state = Arc::new(Mutex::new(Shared::new()));

let addr = "127.0.0.1:6142".parse().unwrap();
let addr = "127.0.0.1:6142".parse()?;

// Bind a TCP listener to the socket address.
//
// Note that this is the Tokio TcpListener, which is fully async.
let listener = TcpListener::bind(&addr).unwrap();
let listener = TcpListener::bind(&addr)?;

// The server task asynchronously iterates over and processes each
// incoming connection.
Expand Down Expand Up @@ -471,4 +471,5 @@ pub fn main() {
// In our example, we have not defined a shutdown strategy, so this will
// block until `ctrl-c` is pressed at the terminal.
tokio::run(server);
Ok(())
}
48 changes: 28 additions & 20 deletions examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::thread;
use tokio::prelude::*;
use futures::sync::mpsc;

fn main() {
fn main() -> Result<(), Box<std::error::Error>> {
// Determine if we're going to run in TCP or UDP mode
let mut args = env::args().skip(1).collect::<Vec<_>>();
let tcp = match args.iter().position(|a| a == "--udp") {
Expand All @@ -41,26 +41,27 @@ fn main() {
};

// Parse what address we're going to connect to
let addr = args.first().unwrap_or_else(|| {
panic!("this program requires at least one argument")
});
let addr = addr.parse::<SocketAddr>().unwrap();
let addr = match args.first() {
Some(addr) => addr,
None => Err("this program requires at least one argument")?,
};
let addr = addr.parse::<SocketAddr>()?;

// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data (with blocking I/O) from stdin and then send it to the event
// loop over a standard futures channel.
let (stdin_tx, stdin_rx) = mpsc::channel(0);
thread::spawn(|| read_stdin(stdin_tx));
let stdin_rx = stdin_rx.map_err(|_| panic!()); // errors not possible on rx
let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx"));

// Now that we've got our stdin read we either set up our TCP connection or
// our UDP connection to get a stream of bytes we're going to emit to
// stdout.
let stdout = if tcp {
tcp::connect(&addr, Box::new(stdin_rx))
tcp::connect(&addr, Box::new(stdin_rx))?
} else {
udp::connect(&addr, Box::new(stdin_rx))
udp::connect(&addr, Box::new(stdin_rx))?
};

// And now with our stream of bytes to write to stdout, we execute that in
Expand All @@ -77,6 +78,7 @@ fn main() {
})
.map_err(|e| println!("error reading stdout; error = {:?}", e))
});
Ok(())
}

mod codec {
Expand Down Expand Up @@ -127,12 +129,13 @@ mod tcp {
use bytes::BytesMut;
use codec::Bytes;

use std::error::Error;
use std::io;
use std::net::SocketAddr;

pub fn connect(addr: &SocketAddr,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
-> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>
{
let tcp = TcpStream::connect(addr);

Expand All @@ -151,22 +154,24 @@ mod tcp {
// You'll also note that we *spawn* the work to read stdin and write it
// to the TCP stream. This is done to ensure that happens concurrently
// with us reading data from the stream.
Box::new(tcp.map(move |stream| {
let stream = Box::new(tcp.map(move |stream| {
let (sink, stream) = Bytes.framed(stream).split();

tokio::spawn(stdin.forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
println!("failed to write to socket: {}", e)
}
Ok(())
}));

stream
}).flatten_stream())
}).flatten_stream());
Ok(stream)
}
}

mod udp {
use std::error::Error;
use std::io;
use std::net::SocketAddr;

Expand All @@ -179,17 +184,19 @@ mod udp {

pub fn connect(&addr: &SocketAddr,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
-> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>
{
// We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those.
let addr_to_bind = if addr.ip().is_ipv4() {
"0.0.0.0:0".parse().unwrap()
"0.0.0.0:0".parse()?
} else {
"[::]:0".parse().unwrap()
"[::]:0".parse()?
};
let udp = match UdpSocket::bind(&addr_to_bind) {
Ok(udp) => udp,
Err(_) => Err("failed to bind socket")?,
};
let udp = UdpSocket::bind(&addr_to_bind)
.expect("failed to bind socket");

// Like above with TCP we use an instance of `Bytes` codec to transform
// this UDP socket into a framed sink/stream which operates over
Expand All @@ -203,7 +210,7 @@ mod udp {
(chunk, addr)
}).forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
println!("failed to write to socket: {}", e)
}
Ok(())
});
Expand All @@ -218,10 +225,11 @@ mod udp {
}
});

Box::new(future::lazy(|| {
let stream = Box::new(future::lazy(|| {
tokio::spawn(forward_stdin);
future::ok(receive)
}).flatten_stream())
}).flatten_stream());
Ok(stream)
}
}

Expand Down
9 changes: 5 additions & 4 deletions examples/echo-udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ impl Future for Server {
}
}

fn main() {
fn main() -> Result<(), Box<std::error::Error>> {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let addr = addr.parse::<SocketAddr>()?;

let socket = UdpSocket::bind(&addr).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
let socket = UdpSocket::bind(&addr)?;
println!("Listening on: {}", socket.local_addr()?);

let server = Server {
socket: socket,
Expand All @@ -70,4 +70,5 @@ fn main() {
//
// `tokio::run` spawns the task on the Tokio runtime and starts running.
tokio::run(server.map_err(|e| println!("server error = {:?}", e)));
Ok(())
}
7 changes: 4 additions & 3 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ use tokio::prelude::*;
use std::env;
use std::net::SocketAddr;

fn main() {
fn main() -> Result<(), Box<std::error::Error>> {
// Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let addr = addr.parse::<SocketAddr>()?;

// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
// above and must be associated with an event loop, so we pass in a handle
// to our event loop. After the socket's created we inform that we're ready
// to go and start accepting connections.
let socket = TcpListener::bind(&addr).unwrap();
let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);

// Here we convert the `TcpListener` to a stream of incoming connections
Expand Down Expand Up @@ -111,4 +111,5 @@ fn main() {
// never completes (it just keeps accepting sockets), `tokio::run` blocks
// forever (until ctrl-c is pressed).
tokio::run(done);
Ok(())
}
6 changes: 4 additions & 2 deletions examples/hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use tokio::io;
use tokio::net::TcpStream;
use tokio::prelude::*;

pub fn main() {
let addr = "127.0.0.1:6142".parse().unwrap();
pub fn main() -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:6142".parse()?;

// Open a TCP stream to the socket address.
//
Expand Down Expand Up @@ -52,4 +52,6 @@ pub fn main() {
println!("About to create the stream and write to it...");
tokio::run(client);
println!("Stream has been created and written to.");

Ok(())
}
5 changes: 3 additions & 2 deletions examples/manual-runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn run<F: Future<Item = (), Error = ()>>(f: F) -> Result<(), IoError> {
Ok(())
}

fn main() {
fn main() -> Result<(), Box<std::error::Error>> {
run(future::lazy(|| {
// Here comes the application logic. It can spawn further tasks by tokio_current_thread::spawn().
// It also can use the default reactor and create timeouts.
Expand All @@ -82,5 +82,6 @@ fn main() {
// We can spawn on the default executor, which is also the local one.
tokio::executor::spawn(deadline);
Ok(())
})).unwrap();
}))?;
Ok(())
}
7 changes: 4 additions & 3 deletions examples/print_each_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,19 @@ use tokio::codec::Decoder;
use std::env;
use std::net::SocketAddr;

fn main() {
fn main() -> Result<(), Box<std::error::Error>> {
// Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let addr = addr.parse::<SocketAddr>()?;

// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
// above and must be associated with an event loop, so we pass in a handle
// to our event loop. After the socket's created we inform that we're ready
// to go and start accepting connections.
let socket = TcpListener::bind(&addr).unwrap();
let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);

// Here we convert the `TcpListener` to a stream of incoming connections
Expand Down Expand Up @@ -146,4 +146,5 @@ fn main() {
// never completes (it just keeps accepting sockets), `tokio::run` blocks
// forever (until ctrl-c is pressed).
tokio::run(done);
Ok(())
}
9 changes: 5 additions & 4 deletions examples/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ use tokio::io::{copy, shutdown};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;

fn main() {
fn main() -> Result<(), Box<std::error::Error>> {
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
let listen_addr = listen_addr.parse::<SocketAddr>().unwrap();
let listen_addr = listen_addr.parse::<SocketAddr>()?;

let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>().unwrap();
let server_addr = server_addr.parse::<SocketAddr>()?;

// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr).unwrap();
let socket = TcpListener::bind(&listen_addr)?;
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);

Expand Down Expand Up @@ -94,6 +94,7 @@ fn main() {
});

tokio::run(done);
Ok(())
}

// This is a custom type used to have a custom implementation of the
Expand Down
7 changes: 4 additions & 3 deletions examples/tinydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ enum Response {
Error { msg: String },
}

fn main() {
fn main() -> Result<(), Box<std::error::Error>> {
// Parse the address we're going to run this server on
// and set up our TCP listener to accept connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let listener = TcpListener::bind(&addr).expect("failed to bind");
let addr = addr.parse::<SocketAddr>()?;
let listener = TcpListener::bind(&addr).map_err(|_| "failed to bind")?;
println!("Listening on: {}", addr);

// Create the shared state of this server that will be shared amongst all
Expand Down Expand Up @@ -156,6 +156,7 @@ fn main() {
});

tokio::run(done);
Ok(())
}

impl Request {
Expand Down
Loading

0 comments on commit 9b1a45c

Please sign in to comment.