Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Merge pull request #2046 from ethcore/ipc-tweaks-3
Browse files Browse the repository at this point in the history
IPC tweaks
  • Loading branch information
rphmeier authored Sep 9, 2016
2 parents ff65ac7 + 2fc7090 commit 53b22da
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ mod client_tests {
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
client.close().unwrap();
Expand All @@ -477,7 +477,7 @@ mod client_tests {
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();

client.open_default(path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
Expand All @@ -498,7 +498,7 @@ mod client_tests {
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();

client.open_default(path.as_str().to_owned()).unwrap();
assert!(client.get("xxx".as_bytes()).unwrap().is_none());
Expand All @@ -516,7 +516,7 @@ mod client_tests {
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();

let transaction = DBTransaction::new();
Expand All @@ -541,7 +541,7 @@ mod client_tests {
let stop = StopGuard::new();
run_worker(&scope, stop.share(), url);

let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();

client.open_default(path.as_str().to_owned()).unwrap();
let mut batch = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions db/src/lib.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ pub fn extras_service_url(db_path: &str) -> Result<String, ::std::io::Error> {

pub fn blocks_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
let url = try!(blocks_service_url(db_path));
let client = try!(nanoipc::init_client::<DatabaseClient<_>>(&url));
let client = try!(nanoipc::generic_client::<DatabaseClient<_>>(&url));
Ok(client)
}

pub fn extras_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
let url = try!(extras_service_url(db_path));
let client = try!(nanoipc::init_client::<DatabaseClient<_>>(&url));
let client = try!(nanoipc::generic_client::<DatabaseClient<_>>(&url));
Ok(client)
}

Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/tests/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn can_handshake() {
let stop_guard = StopGuard::new();
let socket_path = "ipc:///tmp/parity-client-rpc-10.ipc";
run_test_worker(scope, stop_guard.share(), socket_path);
let remote_client = nanoipc::init_client::<RemoteClient<_>>(socket_path).unwrap();
let remote_client = nanoipc::generic_client::<RemoteClient<_>>(socket_path).unwrap();

assert!(remote_client.handshake().is_ok());
})
Expand All @@ -68,7 +68,7 @@ fn can_query_block() {
let stop_guard = StopGuard::new();
let socket_path = "ipc:///tmp/parity-client-rpc-20.ipc";
run_test_worker(scope, stop_guard.share(), socket_path);
let remote_client = nanoipc::init_client::<RemoteClient<_>>(socket_path).unwrap();
let remote_client = nanoipc::generic_client::<RemoteClient<_>>(socket_path).unwrap();

let non_existant_block = remote_client.block_header(BlockID::Number(999));

Expand Down
2 changes: 1 addition & 1 deletion ipc/hypervisor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ mod tests {
::std::thread::spawn(move || {
while !hypervisor_ready.load(Ordering::Relaxed) { }

let client = nanoipc::init_client::<HypervisorServiceClient<_>>(url).unwrap();
let client = nanoipc::fast_client::<HypervisorServiceClient<_>>(url).unwrap();
client.handshake().unwrap();
client.module_ready(test_module_id);
});
Expand Down
2 changes: 1 addition & 1 deletion ipc/hypervisor/src/service.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl HypervisorService {
let modules = self.modules.read().unwrap();
modules.get(&module_id).map(|module| {
trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url);
let client = nanoipc::init_client::<ControlServiceClient<_>>(&module.control_url).unwrap();
let client = nanoipc::fast_client::<ControlServiceClient<_>>(&module.control_url).unwrap();
client.shutdown();
trace!(target: "hypervisor", "Sent shutdown to {}", module_id);
});
Expand Down
2 changes: 1 addition & 1 deletion ipc/nano/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ license = "GPL-3.0"
ethcore-ipc = { path = "../rpc" }
nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" }
log = "0.3"

lazy_static = "0.2"
44 changes: 39 additions & 5 deletions ipc/nano/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
extern crate ethcore_ipc as ipc;
extern crate nanomsg;
#[macro_use] extern crate log;
#[macro_use] extern crate lazy_static;

pub use ipc::{WithSocket, IpcInterface, IpcConfig};
pub use nanomsg::Socket as NanoSocket;
Expand All @@ -28,7 +29,8 @@ use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}
use std::ops::Deref;

const POLL_TIMEOUT: isize = 200;
const CLIENT_CONNECTION_TIMEOUT: isize = 120000;
const DEFAULT_CONNECTION_TIMEOUT: isize = 30000;
const DEBUG_CONNECTION_TIMEOUT: isize = 5000;

/// Generic worker to handle service (binded) sockets
pub struct Worker<S: ?Sized> where S: IpcInterface {
Expand Down Expand Up @@ -68,7 +70,7 @@ pub fn init_duplex_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, Sock
SocketError::DuplexLink
}));

socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap();
socket.set_receive_timeout(DEFAULT_CONNECTION_TIMEOUT).unwrap();

let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
Expand All @@ -84,26 +86,58 @@ pub fn init_duplex_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, Sock
/// Spawns client <`S`> over specified address
/// creates socket and connects endpoint to it
/// for request-reply connections to the service
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
pub fn client<S>(socket_addr: &str, receive_timeout: Option<isize>) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
let mut socket = try!(Socket::new(Protocol::Req).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
SocketError::RequestLink
}));

socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap();
if let Some(timeout) = receive_timeout {
socket.set_receive_timeout(timeout).unwrap();
}

let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::RequestLink
}));

trace!(target: "ipc", "Created cleint for {}", socket_addr);
trace!(target: "ipc", "Created client for {}", socket_addr);
Ok(GuardedSocket {
client: Arc::new(S::init(socket)),
_endpoint: endpoint,
})
}

lazy_static! {
/// Set PARITY_IPC_DEBUG=1 for fail-fast connectivity problems diagnostic
pub static ref DEBUG_FLAG: bool = {
use std::env;

if let Ok(debug) = env::var("PARITY_IPC_DEBUG") {
debug == "1" || debug.to_uppercase() == "TRUE"
}
else { false }
};
}

/// Client with no default timeout on operations
pub fn generic_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
if *DEBUG_FLAG {
client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT))
} else {
client(socket_addr, None)
}
}

/// Client over interface that is supposed to give quick almost non-blocking responses
pub fn fast_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
if *DEBUG_FLAG {
client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT))
} else {
client(socket_addr, Some(DEFAULT_CONNECTION_TIMEOUT))
}
}

/// Error occurred while establising socket or endpoint
#[derive(Debug)]
pub enum SocketError {
Expand Down
4 changes: 2 additions & 2 deletions parity/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn payload<B: ipc::BinaryConvertable>() -> Result<B, BootError> {
}

pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
let hypervisor_client = nanoipc::fast_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
hypervisor_client.handshake().unwrap();
hypervisor_client.module_ready(module_id, control_url.to_owned());

Expand All @@ -73,7 +73,7 @@ pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> Guar
pub fn dependency<C: WithSocket<NanoSocket>>(url: &str)
-> Result<GuardedSocket<C>, BootError>
{
nanoipc::init_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
nanoipc::generic_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
}

pub fn main_thread() -> Arc<AtomicBool> {
Expand Down
4 changes: 3 additions & 1 deletion parity/io_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use ethcore::client::Client;
use ethcore::service::ClientIoMessage;
use ethsync::{SyncProvider, ManageNetwork};
Expand All @@ -31,6 +32,7 @@ pub struct ClientIoHandler {
pub net: Arc<ManageNetwork>,
pub accounts: Arc<AccountProvider>,
pub info: Arc<Informant>,
pub shutdown: Arc<AtomicBool>
}

impl IoHandler<ClientIoMessage> for ClientIoHandler {
Expand All @@ -39,7 +41,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
}

fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if let INFO_TIMER = timer {
if timer == INFO_TIMER && !self.shutdown.load(Ordering::SeqCst) {
self.info.tick();
}
}
Expand Down
8 changes: 4 additions & 4 deletions parity/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mod ipc_deps {
pub use ethsync::{SyncClient, NetworkManagerClient, ServiceConfiguration};
pub use ethcore::client::ChainNotifyClient;
pub use hypervisor::{SYNC_MODULE_ID, BootArgs, HYPERVISOR_IPC_URL};
pub use nanoipc::{GuardedSocket, NanoSocket, init_client};
pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client};
pub use ipc::IpcSocket;
pub use ipc::binary::serialize;
}
Expand Down Expand Up @@ -134,11 +134,11 @@ pub fn sync
hypervisor.start();
hypervisor.wait_for_startup();

let sync_client = init_client::<SyncClient<_>>(
let sync_client = generic_client::<SyncClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC)).unwrap();
let notify_client = init_client::<ChainNotifyClient<_>>(
let notify_client = generic_client::<ChainNotifyClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap();
let manage_client = init_client::<NetworkManagerClient<_>>(
let manage_client = generic_client::<NetworkManagerClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap();

*hypervisor_ref = Some(hypervisor);
Expand Down
8 changes: 7 additions & 1 deletion parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,9 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
sync: sync_provider.clone(),
net: manage_network.clone(),
accounts: account_provider.clone(),
shutdown: Default::default(),
});
service.register_io_handler(io_handler).expect("Error registering IO handler");
service.register_io_handler(io_handler.clone()).expect("Error registering IO handler");

// the watcher must be kept alive.
let _watcher = match cmd.no_periodic_snapshot {
Expand Down Expand Up @@ -289,6 +290,11 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
// Handle exit
wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server);

// to make sure timer does not spawn requests while shutdown is in progress
io_handler.shutdown.store(true, ::std::sync::atomic::Ordering::SeqCst);
// just Arc is dropping here, to allow other reference release in its default time
drop(io_handler);

// hypervisor should be shutdown first while everything still works and can be
// terminated gracefully
drop(hypervisor);
Expand Down

0 comments on commit 53b22da

Please sign in to comment.