From 3b790c646117bd0d8273491e321dfd6638f0c1d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 7 Oct 2019 14:48:40 +0200 Subject: [PATCH] Fix threading and closing of servers (#495) * Remove unnecessary spawns. * Fix how we manage threads. * Remove unused import. * Parallelize incoming connections. * Add close server test. * Add some docs. * Tone down warnings. * Fix test assert. --- http/src/lib.rs | 157 +++++++++++++------------ http/src/tests.rs | 34 ++++++ ipc/src/server.rs | 10 +- server-utils/Cargo.toml | 1 - server-utils/src/reactor.rs | 61 ++++------ server-utils/src/suspendable_stream.rs | 4 +- tcp/src/server.rs | 19 +-- 7 files changed, 154 insertions(+), 132 deletions(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index a25eeebbd..1ea19530e 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -43,7 +43,7 @@ use std::thread; use parking_lot::Mutex; use crate::jsonrpc::futures::sync::oneshot; -use crate::jsonrpc::futures::{self, future, Future, Stream}; +use crate::jsonrpc::futures::{self, Future, Stream}; use crate::jsonrpc::MetaIoHandler; use crate::server_utils::reactor::{Executor, UninitializedExecutor}; use hyper::{server, Body}; @@ -304,6 +304,12 @@ impl> ServerBuilder { /// Sets number of threads of the server to run. /// /// Panics when set to `0`. + /// The first thread will use provided `Executor` instance + /// and all other threads will use `UninitializedExecutor` to spawn + /// a new runtime for futures. + /// So it's also possible to run a multi-threaded server by + /// passing the default `tokio::runtime` executor to this builder + /// and setting `threads` to 1. #[cfg(unix)] pub fn threads(mut self, threads: usize) -> Self { self.threads = threads; @@ -481,89 +487,86 @@ fn serve>( max_request_body_size: usize, ) { let (shutdown_signal, local_addr_tx, done_tx) = signals; - executor.spawn( - future::lazy(move || { - let handle = tokio::reactor::Handle::default(); - - let bind = move || { - let listener = match addr { - SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?, - SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?, - }; - configure_port(reuse_port, &listener)?; - listener.reuse_address(true)?; - listener.bind(&addr)?; - let listener = listener.listen(1024)?; - let listener = tokio::net::TcpListener::from_std(listener, &handle)?; - // Add current host to allowed headers. - // NOTE: we need to use `l.local_addr()` instead of `addr` - // it might be different! - let local_addr = listener.local_addr()?; - - Ok((listener, local_addr)) + executor.spawn({ + let handle = tokio::reactor::Handle::default(); + + let bind = move || { + let listener = match addr { + SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?, + SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?, }; + configure_port(reuse_port, &listener)?; + listener.reuse_address(true)?; + listener.bind(&addr)?; + let listener = listener.listen(1024)?; + let listener = tokio::net::TcpListener::from_std(listener, &handle)?; + // Add current host to allowed headers. + // NOTE: we need to use `l.local_addr()` instead of `addr` + // it might be different! + let local_addr = listener.local_addr()?; + + Ok((listener, local_addr)) + }; - let bind_result = match bind() { - Ok((listener, local_addr)) => { - // Send local address - match local_addr_tx.send(Ok(local_addr)) { - Ok(_) => futures::future::ok((listener, local_addr)), - Err(_) => { - warn!( - "Thread {:?} unable to reach receiver, closing server", - thread::current().name() - ); - futures::future::err(()) - } + let bind_result = match bind() { + Ok((listener, local_addr)) => { + // Send local address + match local_addr_tx.send(Ok(local_addr)) { + Ok(_) => futures::future::ok((listener, local_addr)), + Err(_) => { + warn!( + "Thread {:?} unable to reach receiver, closing server", + thread::current().name() + ); + futures::future::err(()) } } - Err(err) => { - // Send error - let _send_result = local_addr_tx.send(Err(err)); + } + Err(err) => { + // Send error + let _send_result = local_addr_tx.send(Err(err)); - futures::future::err(()) - } - }; + futures::future::err(()) + } + }; - bind_result.and_then(move |(listener, local_addr)| { - let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr); - - let mut http = server::conn::Http::new(); - http.keep_alive(keep_alive); - let tcp_stream = SuspendableStream::new(listener.incoming()); - - tcp_stream - .for_each(move |socket| { - let service = ServerHandler::new( - jsonrpc_handler.clone(), - cors_domains.clone(), - cors_max_age, - allowed_headers.clone(), - allowed_hosts.clone(), - request_middleware.clone(), - rest_api, - health_api.clone(), - max_request_body_size, - keep_alive, - ); - tokio::spawn( - http.serve_connection(socket, service) - .map_err(|e| error!("Error serving connection: {:?}", e)), - ); - Ok(()) - }) - .map_err(|e| { - warn!("Incoming streams error, closing sever: {:?}", e); - }) - .select(shutdown_signal.map_err(|e| { - debug!("Shutdown signaller dropped, closing server: {:?}", e); - })) - .map(|_| ()) - .map_err(|_| ()) - }) + bind_result.and_then(move |(listener, local_addr)| { + let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr); + + let mut http = server::conn::Http::new(); + http.keep_alive(keep_alive); + let tcp_stream = SuspendableStream::new(listener.incoming()); + + tcp_stream + .map(move |socket| { + let service = ServerHandler::new( + jsonrpc_handler.clone(), + cors_domains.clone(), + cors_max_age, + allowed_headers.clone(), + allowed_hosts.clone(), + request_middleware.clone(), + rest_api, + health_api.clone(), + max_request_body_size, + keep_alive, + ); + + http.serve_connection(socket, service) + .map_err(|e| error!("Error serving connection: {:?}", e)) + }) + .buffer_unordered(1024) + .for_each(|_| Ok(())) + .map_err(|e| { + warn!("Incoming streams error, closing sever: {:?}", e); + }) + .select(shutdown_signal.map_err(|e| { + debug!("Shutdown signaller dropped, closing server: {:?}", e); + })) + .map_err(|_| ()) }) - .and_then(|_| done_tx.send(())), - ); + .and_then(|_| done_tx.send(())) + }); } #[cfg(unix)] diff --git a/http/src/tests.rs b/http/src/tests.rs index 29ce30b58..eeecf5570 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -1495,6 +1495,40 @@ fn should_respond_with_close_even_if_client_wants_to_keep_alive() { assert_eq!(response.body, world_batch()); } +#[test] +fn should_drop_io_handler_when_server_is_closed() { + use std::sync::{Arc, Mutex}; + // given + let (weak, _req) = { + let my_ref = Arc::new(Mutex::new(5)); + let weak = Arc::downgrade(&my_ref); + let mut io = IoHandler::default(); + io.add_method("hello", move |_| { + Ok(Value::String(format!("{}", my_ref.lock().unwrap()))) + }); + let server = ServerBuilder::new(io) + .start_http(&"127.0.0.1:0".parse().unwrap()) + .unwrap(); + + let addr = server.address().clone(); + + // when + let req = TcpStream::connect(addr).unwrap(); + server.close(); + (weak, req) + }; + + // then + for _ in 1..1000 { + if weak.upgrade().is_none() { + return; + } + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + panic!("expected server to be closed and io handler to be dropped") +} + fn invalid_host() -> String { "Provided Host header is not whitelisted.\n".into() } diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 037c0b88b..8406afb86 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -7,7 +7,7 @@ use tokio_service::{self, Service as TokioService}; use crate::server_utils::{ codecs, reactor, session, - tokio::{self, reactor::Handle, runtime::TaskExecutor}, + tokio::{reactor::Handle, runtime::TaskExecutor}, tokio_codec::Framed, }; use parking_lot::Mutex; @@ -180,7 +180,7 @@ impl> ServerBuilder { let mut id = 0u64; - let server = connections.for_each(move |(io_stream, remote_id)| { + let server = connections.map(move |(io_stream, remote_id)| { id = id.wrapping_add(1); let session_id = id; let session_stats = session_stats.clone(); @@ -228,9 +228,7 @@ impl> ServerBuilder { Ok(()) }); - tokio::spawn(writer); - - Ok(()) + writer }); start_signal .send(Ok(())) @@ -239,6 +237,8 @@ impl> ServerBuilder { let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into()); future::Either::B( server + .buffer_unordered(1024) + .for_each(|_| Ok(())) .select(stop) .map(|_| { let _ = wait_signal.send(()); diff --git a/server-utils/Cargo.toml b/server-utils/Cargo.toml index 0d4a75581..cdfb2533a 100644 --- a/server-utils/Cargo.toml +++ b/server-utils/Cargo.toml @@ -16,7 +16,6 @@ globset = "0.4" jsonrpc-core = { version = "13.2", path = "../core" } lazy_static = "1.1.0" log = "0.4" -num_cpus = "1.8" tokio = { version = "0.1" } tokio-codec = { version = "0.1" } unicase = "2.0" diff --git a/server-utils/src/reactor.rs b/server-utils/src/reactor.rs index 85834e30a..bd9d6b90c 100644 --- a/server-utils/src/reactor.rs +++ b/server-utils/src/reactor.rs @@ -1,9 +1,11 @@ //! Event Loop Executor +//! //! Either spawns a new event loop, or re-uses provided one. +//! Spawned event loop is always single threaded (mostly for +//! historical/backward compatibility reasons) despite the fact +//! that `tokio::runtime` can be multi-threaded. -use num_cpus; -use std::sync::mpsc; -use std::{io, thread}; +use std::io; use tokio; use crate::core::futures::{self, Future}; @@ -82,7 +84,7 @@ impl Executor { pub struct RpcEventLoop { executor: tokio::runtime::TaskExecutor, close: Option>, - handle: Option>, + handle: Option, } impl Drop for RpcEventLoop { @@ -100,42 +102,21 @@ impl RpcEventLoop { /// Spawns a new named thread with the `EventLoop`. pub fn with_name(name: Option) -> io::Result { let (stop, stopped) = futures::oneshot(); - let (tx, rx) = mpsc::channel(); - let mut tb = thread::Builder::new(); + + let mut tb = tokio::runtime::Builder::new(); + tb.core_threads(1); + if let Some(name) = name { - tb = tb.name(name); + tb.name_prefix(name); } - let handle = tb - .spawn(move || { - let core_threads = match num_cpus::get_physical() { - 1 => 1, - 2..=4 => 2, - _ => 3, - }; - - let runtime = tokio::runtime::Builder::new() - .core_threads(core_threads) - .name_prefix("jsonrpc-eventloop-") - .build(); - - match runtime { - Ok(mut runtime) => { - tx.send(Ok(runtime.executor())).expect("Rx is blocking upper thread."); - let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ()); - runtime.spawn(terminate); - runtime.shutdown_on_idle().wait().unwrap(); - } - Err(err) => { - tx.send(Err(err)).expect("Rx is blocking upper thread."); - } - } - }) - .expect("Couldn't spawn a thread."); - - let exec = rx.recv().expect("tx is transfered to a newly spawned thread."); - - exec.map(|executor| RpcEventLoop { + let mut runtime = tb.build()?; + let executor = runtime.executor(); + let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ()); + runtime.spawn(terminate); + let handle = runtime.shutdown_on_idle(); + + Ok(RpcEventLoop { executor, close: Some(stop), handle: Some(handle), @@ -148,11 +129,11 @@ impl RpcEventLoop { } /// Blocks current thread and waits until the event loop is finished. - pub fn wait(mut self) -> thread::Result<()> { + pub fn wait(mut self) -> Result<(), ()> { self.handle .take() - .expect("Handle is always set before self is consumed.") - .join() + .ok_or(())? + .wait() } /// Finishes this event loop. diff --git a/server-utils/src/suspendable_stream.rs b/server-utils/src/suspendable_stream.rs index 303a18b04..f563cdebe 100644 --- a/server-utils/src/suspendable_stream.rs +++ b/server-utils/src/suspendable_stream.rs @@ -74,8 +74,8 @@ where } else { self.next_delay }; - warn!("Error accepting connection: {}", err); - warn!("The server will stop accepting connections for {:?}", self.next_delay); + debug!("Error accepting connection: {}", err); + debug!("The server will stop accepting connections for {:?}", self.next_delay); self.timeout = Some(Delay::new(Instant::now() + self.next_delay)); } } diff --git a/tcp/src/server.rs b/tcp/src/server.rs index 347abd341..8b42ed322 100644 --- a/tcp/src/server.rs +++ b/tcp/src/server.rs @@ -87,7 +87,7 @@ impl + 'static> ServerBuilder { let listener = tokio::net::TcpListener::bind(&address)?; let connections = SuspendableStream::new(listener.incoming()); - let server = connections.for_each(move |socket| { + let server = connections.map(move |socket| { let peer_addr = socket.peer_addr().expect("Unable to determine socket peer address"); trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr); let (sender, receiver) = mpsc::channel(65536); @@ -137,9 +137,7 @@ impl + 'static> ServerBuilder { Ok(()) }); - tokio::spawn(writer); - - Ok(()) + writer }); Ok(server) @@ -149,9 +147,16 @@ impl + 'static> ServerBuilder { match start() { Ok(server) => { tx.send(Ok(())).expect("Rx is blocking parent thread."); - future::Either::A(server.select(stop).map(|_| ()).map_err(|(e, _)| { - error!("Error while executing the server: {:?}", e); - })) + future::Either::A( + server + .buffer_unordered(1024) + .for_each(|_| Ok(())) + .select(stop) + .map(|_| ()) + .map_err(|(e, _)| { + error!("Error while executing the server: {:?}", e); + }) + ) } Err(e) => { tx.send(Err(e)).expect("Rx is blocking parent thread.");