Skip to content

Commit

Permalink
Fix threading and closing of servers (#495)
Browse files Browse the repository at this point in the history
* Remove unnecessary spawns.

* Fix how we manage threads.

* Remove unused import.

* Parallelize incoming connections.

* Add close server test.

* Add some docs.

* Tone down warnings.

* Fix test assert.
  • Loading branch information
tomusdrw authored Oct 7, 2019
1 parent d8cfec5 commit 3b790c6
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 132 deletions.
157 changes: 80 additions & 77 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::thread;
use parking_lot::Mutex;

use crate::jsonrpc::futures::sync::oneshot;
use crate::jsonrpc::futures::{self, future, Future, Stream};
use crate::jsonrpc::futures::{self, Future, Stream};
use crate::jsonrpc::MetaIoHandler;
use crate::server_utils::reactor::{Executor, UninitializedExecutor};
use hyper::{server, Body};
Expand Down Expand Up @@ -304,6 +304,12 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
/// Sets number of threads of the server to run.
///
/// Panics when set to `0`.
/// The first thread will use provided `Executor` instance
/// and all other threads will use `UninitializedExecutor` to spawn
/// a new runtime for futures.
/// So it's also possible to run a multi-threaded server by
/// passing the default `tokio::runtime` executor to this builder
/// and setting `threads` to 1.
#[cfg(unix)]
pub fn threads(mut self, threads: usize) -> Self {
self.threads = threads;
Expand Down Expand Up @@ -481,89 +487,86 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
max_request_body_size: usize,
) {
let (shutdown_signal, local_addr_tx, done_tx) = signals;
executor.spawn(
future::lazy(move || {
let handle = tokio::reactor::Handle::default();

let bind = move || {
let listener = match addr {
SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
};
configure_port(reuse_port, &listener)?;
listener.reuse_address(true)?;
listener.bind(&addr)?;
let listener = listener.listen(1024)?;
let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
// Add current host to allowed headers.
// NOTE: we need to use `l.local_addr()` instead of `addr`
// it might be different!
let local_addr = listener.local_addr()?;

Ok((listener, local_addr))
executor.spawn({
let handle = tokio::reactor::Handle::default();

let bind = move || {
let listener = match addr {
SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
};
configure_port(reuse_port, &listener)?;
listener.reuse_address(true)?;
listener.bind(&addr)?;
let listener = listener.listen(1024)?;
let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
// Add current host to allowed headers.
// NOTE: we need to use `l.local_addr()` instead of `addr`
// it might be different!
let local_addr = listener.local_addr()?;

Ok((listener, local_addr))
};

let bind_result = match bind() {
Ok((listener, local_addr)) => {
// Send local address
match local_addr_tx.send(Ok(local_addr)) {
Ok(_) => futures::future::ok((listener, local_addr)),
Err(_) => {
warn!(
"Thread {:?} unable to reach receiver, closing server",
thread::current().name()
);
futures::future::err(())
}
let bind_result = match bind() {
Ok((listener, local_addr)) => {
// Send local address
match local_addr_tx.send(Ok(local_addr)) {
Ok(_) => futures::future::ok((listener, local_addr)),
Err(_) => {
warn!(
"Thread {:?} unable to reach receiver, closing server",
thread::current().name()
);
futures::future::err(())
}
}
Err(err) => {
// Send error
let _send_result = local_addr_tx.send(Err(err));
}
Err(err) => {
// Send error
let _send_result = local_addr_tx.send(Err(err));

futures::future::err(())
}
};
futures::future::err(())
}
};

bind_result.and_then(move |(listener, local_addr)| {
let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);

let mut http = server::conn::Http::new();
http.keep_alive(keep_alive);
let tcp_stream = SuspendableStream::new(listener.incoming());

tcp_stream
.for_each(move |socket| {
let service = ServerHandler::new(
jsonrpc_handler.clone(),
cors_domains.clone(),
cors_max_age,
allowed_headers.clone(),
allowed_hosts.clone(),
request_middleware.clone(),
rest_api,
health_api.clone(),
max_request_body_size,
keep_alive,
);
tokio::spawn(
http.serve_connection(socket, service)
.map_err(|e| error!("Error serving connection: {:?}", e)),
);
Ok(())
})
.map_err(|e| {
warn!("Incoming streams error, closing sever: {:?}", e);
})
.select(shutdown_signal.map_err(|e| {
debug!("Shutdown signaller dropped, closing server: {:?}", e);
}))
.map(|_| ())
.map_err(|_| ())
})
bind_result.and_then(move |(listener, local_addr)| {
let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);

let mut http = server::conn::Http::new();
http.keep_alive(keep_alive);
let tcp_stream = SuspendableStream::new(listener.incoming());

tcp_stream
.map(move |socket| {
let service = ServerHandler::new(
jsonrpc_handler.clone(),
cors_domains.clone(),
cors_max_age,
allowed_headers.clone(),
allowed_hosts.clone(),
request_middleware.clone(),
rest_api,
health_api.clone(),
max_request_body_size,
keep_alive,
);

http.serve_connection(socket, service)
.map_err(|e| error!("Error serving connection: {:?}", e))
})
.buffer_unordered(1024)
.for_each(|_| Ok(()))
.map_err(|e| {
warn!("Incoming streams error, closing sever: {:?}", e);
})
.select(shutdown_signal.map_err(|e| {
debug!("Shutdown signaller dropped, closing server: {:?}", e);
}))
.map_err(|_| ())
})
.and_then(|_| done_tx.send(())),
);
.and_then(|_| done_tx.send(()))
});
}

#[cfg(unix)]
Expand Down
34 changes: 34 additions & 0 deletions http/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,40 @@ fn should_respond_with_close_even_if_client_wants_to_keep_alive() {
assert_eq!(response.body, world_batch());
}

#[test]
fn should_drop_io_handler_when_server_is_closed() {
use std::sync::{Arc, Mutex};
// given
let (weak, _req) = {
let my_ref = Arc::new(Mutex::new(5));
let weak = Arc::downgrade(&my_ref);
let mut io = IoHandler::default();
io.add_method("hello", move |_| {
Ok(Value::String(format!("{}", my_ref.lock().unwrap())))
});
let server = ServerBuilder::new(io)
.start_http(&"127.0.0.1:0".parse().unwrap())
.unwrap();

let addr = server.address().clone();

// when
let req = TcpStream::connect(addr).unwrap();
server.close();
(weak, req)
};

// then
for _ in 1..1000 {
if weak.upgrade().is_none() {
return;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}

panic!("expected server to be closed and io handler to be dropped")
}

fn invalid_host() -> String {
"Provided Host header is not whitelisted.\n".into()
}
Expand Down
10 changes: 5 additions & 5 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio_service::{self, Service as TokioService};

use crate::server_utils::{
codecs, reactor, session,
tokio::{self, reactor::Handle, runtime::TaskExecutor},
tokio::{reactor::Handle, runtime::TaskExecutor},
tokio_codec::Framed,
};
use parking_lot::Mutex;
Expand Down Expand Up @@ -180,7 +180,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {

let mut id = 0u64;

let server = connections.for_each(move |(io_stream, remote_id)| {
let server = connections.map(move |(io_stream, remote_id)| {
id = id.wrapping_add(1);
let session_id = id;
let session_stats = session_stats.clone();
Expand Down Expand Up @@ -228,9 +228,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
Ok(())
});

tokio::spawn(writer);

Ok(())
writer
});
start_signal
.send(Ok(()))
Expand All @@ -239,6 +237,8 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into());
future::Either::B(
server
.buffer_unordered(1024)
.for_each(|_| Ok(()))
.select(stop)
.map(|_| {
let _ = wait_signal.send(());
Expand Down
1 change: 0 additions & 1 deletion server-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ globset = "0.4"
jsonrpc-core = { version = "13.2", path = "../core" }
lazy_static = "1.1.0"
log = "0.4"
num_cpus = "1.8"
tokio = { version = "0.1" }
tokio-codec = { version = "0.1" }
unicase = "2.0"
Expand Down
61 changes: 21 additions & 40 deletions server-utils/src/reactor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! Event Loop Executor
//!
//! Either spawns a new event loop, or re-uses provided one.
//! Spawned event loop is always single threaded (mostly for
//! historical/backward compatibility reasons) despite the fact
//! that `tokio::runtime` can be multi-threaded.
use num_cpus;
use std::sync::mpsc;
use std::{io, thread};
use std::io;
use tokio;

use crate::core::futures::{self, Future};
Expand Down Expand Up @@ -82,7 +84,7 @@ impl Executor {
pub struct RpcEventLoop {
executor: tokio::runtime::TaskExecutor,
close: Option<futures::Complete<()>>,
handle: Option<thread::JoinHandle<()>>,
handle: Option<tokio::runtime::Shutdown>,
}

impl Drop for RpcEventLoop {
Expand All @@ -100,42 +102,21 @@ impl RpcEventLoop {
/// Spawns a new named thread with the `EventLoop`.
pub fn with_name(name: Option<String>) -> io::Result<Self> {
let (stop, stopped) = futures::oneshot();
let (tx, rx) = mpsc::channel();
let mut tb = thread::Builder::new();

let mut tb = tokio::runtime::Builder::new();
tb.core_threads(1);

if let Some(name) = name {
tb = tb.name(name);
tb.name_prefix(name);
}

let handle = tb
.spawn(move || {
let core_threads = match num_cpus::get_physical() {
1 => 1,
2..=4 => 2,
_ => 3,
};

let runtime = tokio::runtime::Builder::new()
.core_threads(core_threads)
.name_prefix("jsonrpc-eventloop-")
.build();

match runtime {
Ok(mut runtime) => {
tx.send(Ok(runtime.executor())).expect("Rx is blocking upper thread.");
let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ());
runtime.spawn(terminate);
runtime.shutdown_on_idle().wait().unwrap();
}
Err(err) => {
tx.send(Err(err)).expect("Rx is blocking upper thread.");
}
}
})
.expect("Couldn't spawn a thread.");

let exec = rx.recv().expect("tx is transfered to a newly spawned thread.");

exec.map(|executor| RpcEventLoop {
let mut runtime = tb.build()?;
let executor = runtime.executor();
let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ());
runtime.spawn(terminate);
let handle = runtime.shutdown_on_idle();

Ok(RpcEventLoop {
executor,
close: Some(stop),
handle: Some(handle),
Expand All @@ -148,11 +129,11 @@ impl RpcEventLoop {
}

/// Blocks current thread and waits until the event loop is finished.
pub fn wait(mut self) -> thread::Result<()> {
pub fn wait(mut self) -> Result<(), ()> {
self.handle
.take()
.expect("Handle is always set before self is consumed.")
.join()
.ok_or(())?
.wait()
}

/// Finishes this event loop.
Expand Down
4 changes: 2 additions & 2 deletions server-utils/src/suspendable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ where
} else {
self.next_delay
};
warn!("Error accepting connection: {}", err);
warn!("The server will stop accepting connections for {:?}", self.next_delay);
debug!("Error accepting connection: {}", err);
debug!("The server will stop accepting connections for {:?}", self.next_delay);
self.timeout = Some(Delay::new(Instant::now() + self.next_delay));
}
}
Expand Down
Loading

0 comments on commit 3b790c6

Please sign in to comment.