From c685a728d91ab8755f5f7a880dd88745730db04a Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 26 Aug 2016 14:06:58 +0300 Subject: [PATCH 1/3] hypervisor extension --- ipc/hypervisor/src/lib.rs | 18 ++++++++-- ipc/hypervisor/src/service.rs.in | 60 ++++++++++++++++++++++++++------ 2 files changed, 65 insertions(+), 13 deletions(-) diff --git a/ipc/hypervisor/src/lib.rs b/ipc/hypervisor/src/lib.rs index b0e1564ab91..43d5785a380 100644 --- a/ipc/hypervisor/src/lib.rs +++ b/ipc/hypervisor/src/lib.rs @@ -174,6 +174,10 @@ impl Hypervisor { self.service.unchecked_count() == 0 } + pub fn modules_shutdown(&self) -> bool { + self.service.running_count() == 0 + } + /// Waits for every required module to check in pub fn wait_for_startup(&self) { let mut worker = self.ipc_worker.write().unwrap(); @@ -182,15 +186,25 @@ impl Hypervisor { } } + /// Waits for every required module to check in + pub fn wait_for_shutdown(&self) { + let mut worker = self.ipc_worker.write().unwrap(); + while !self.modules_shutdown() { + worker.poll() + } + } + /// Shutdown the ipc and all managed child processes pub fn shutdown(&self, wait_time: Option) { if wait_time.is_some() { std::thread::sleep(wait_time.unwrap()) } let mut childs = self.processes.write().unwrap(); - for (ref mut module, ref mut child) in childs.iter_mut() { + for (ref mut module, _) in childs.iter_mut() { trace!(target: "hypervisor", "Stopping process module: {}", module); - child.kill().unwrap(); + self.service.send_shutdown(**module); } + + self.wait_for_shutdown(); } } diff --git a/ipc/hypervisor/src/service.rs.in b/ipc/hypervisor/src/service.rs.in index 3b1a4d1458b..cfcc8a21cd9 100644 --- a/ipc/hypervisor/src/service.rs.in +++ b/ipc/hypervisor/src/service.rs.in @@ -17,6 +17,7 @@ use std::sync::{RwLock,Arc}; use ipc::IpcConfig; use std::collections::HashMap; +use nanoipc; pub type IpcModuleId = u64; @@ -28,15 +29,37 @@ pub const SYNC_MODULE_ID: IpcModuleId = 2100; /// IPC service that handles module management pub struct HypervisorService { - check_list: RwLock>, + modules: RwLock>, +} + +#[derive(Default)] +pub struct ModuleState { + started: bool, + control_url: String, + shutdown: bool, +} + + +#[derive(Ipc)] +trait ControlService { + fn shutdown(&self); } #[derive(Ipc)] impl HypervisorService { - fn module_ready(&self, module_id: u64) -> bool { - let mut check_list = self.check_list.write().unwrap(); - check_list.get_mut(&module_id).map(|mut status| *status = true); - check_list.iter().any(|(_, status)| !status) + fn module_ready(&self, control_url: String, module_id: u64) { + let mut modules = self.modules.write().unwrap(); + modules.get_mut(&module_id).map(|mut module| { + module.started = true; + module.control_url = control_url; + }); + } + + fn module_shutdown(&self, module_id: u64) { + let mut modules = self.modules.write().unwrap(); + modules.get_mut(&module_id).map(|mut module| { + module.shutdown = true; + }); } } @@ -48,29 +71,44 @@ impl HypervisorService { /// New service with list of modules that will report for being ready pub fn with_modules(module_ids: Vec) -> Arc { - let mut check_list = HashMap::new(); + let mut modules = HashMap::new(); for module_id in module_ids { - check_list.insert(module_id, false); + modules.insert(module_id, ModuleState::default()); } Arc::new(HypervisorService { - check_list: RwLock::new(check_list), + modules: RwLock::new(modules), }) } /// Add the module to the check-list pub fn add_module(&self, module_id: IpcModuleId) { - self.check_list.write().unwrap().insert(module_id, false); + self.modules.write().unwrap().insert(module_id, ModuleState::default()); } /// Number of modules still being waited for check-in pub fn unchecked_count(&self) -> usize { - self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count() + self.modules.read().unwrap().iter().filter(|&(_, module)| !module.started).count() } /// List of all modules within this service pub fn module_ids(&self) -> Vec { - self.check_list.read().unwrap().iter().map(|(module_id, _)| module_id).cloned().collect() + self.modules.read().unwrap().iter().map(|(module_id, _)| module_id).cloned().collect() + } + + /// Number of modules started and running + pub fn running_count(&self) -> usize { + self.modules.read().unwrap().iter().filter(|&(_, module)| module.started && !module.shutdown).count() + } + + pub fn send_shutdown(&self, module_id: IpcModuleId) { + let modules = self.modules.read().unwrap(); + modules.get(&module_id).map(|module| { + let client = nanoipc::init_client::>(&module.control_url).unwrap(); + client.shutdown(); + }); } } impl ::ipc::IpcConfig for HypervisorService {} + +impl ::ipc::IpcConfig for ControlService {} From 9ee993d459ebbddb201fba12b6d58d2a2a31733a Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 26 Aug 2016 16:03:48 +0300 Subject: [PATCH 2/3] sorted with shutdown-wait --- ipc/hypervisor/src/lib.rs | 11 +++++----- ipc/hypervisor/src/service.rs.in | 14 +++++++++--- parity/boot.rs | 4 ++-- parity/modules.rs | 1 + parity/sync.rs | 37 ++++++++++++++++++++++++++------ 5 files changed, 50 insertions(+), 17 deletions(-) diff --git a/ipc/hypervisor/src/lib.rs b/ipc/hypervisor/src/lib.rs index 43d5785a380..3cfd464e9f9 100644 --- a/ipc/hypervisor/src/lib.rs +++ b/ipc/hypervisor/src/lib.rs @@ -33,7 +33,7 @@ use service::{HypervisorService, IpcModuleId}; use std::process::{Command,Child}; use std::collections::HashMap; -pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID}; +pub use service::{HypervisorServiceClient, ControlService, CLIENT_MODULE_ID, SYNC_MODULE_ID}; pub type BinaryId = &'static str; @@ -195,22 +195,21 @@ impl Hypervisor { } /// Shutdown the ipc and all managed child processes - pub fn shutdown(&self, wait_time: Option) { - if wait_time.is_some() { std::thread::sleep(wait_time.unwrap()) } - + pub fn shutdown(&self) { let mut childs = self.processes.write().unwrap(); for (ref mut module, _) in childs.iter_mut() { trace!(target: "hypervisor", "Stopping process module: {}", module); self.service.send_shutdown(**module); } - + trace!(target: "hypervisor", "Waiting for shutdown..."); self.wait_for_shutdown(); + trace!(target: "hypervisor", "All modules reported shutdown"); } } impl Drop for Hypervisor { fn drop(&mut self) { - self.shutdown(Some(std::time::Duration::new(1, 0))); + self.shutdown(); } } diff --git a/ipc/hypervisor/src/service.rs.in b/ipc/hypervisor/src/service.rs.in index cfcc8a21cd9..69585ee6c29 100644 --- a/ipc/hypervisor/src/service.rs.in +++ b/ipc/hypervisor/src/service.rs.in @@ -41,25 +41,31 @@ pub struct ModuleState { #[derive(Ipc)] -trait ControlService { +pub trait ControlService { fn shutdown(&self); } #[derive(Ipc)] impl HypervisorService { - fn module_ready(&self, control_url: String, module_id: u64) { + // return type for making method synchronous + fn module_ready(&self, module_id: u64, control_url: String) -> bool { let mut modules = self.modules.write().unwrap(); modules.get_mut(&module_id).map(|mut module| { module.started = true; module.control_url = control_url; }); + trace!(target: "hypervisor", "Module ready: {}", module_id); + true } - fn module_shutdown(&self, module_id: u64) { + // return type for making method synchronous + fn module_shutdown(&self, module_id: u64) -> bool { let mut modules = self.modules.write().unwrap(); modules.get_mut(&module_id).map(|mut module| { module.shutdown = true; }); + trace!(target: "hypervisor", "Module shutdown: {}", module_id); + true } } @@ -103,8 +109,10 @@ impl HypervisorService { pub fn send_shutdown(&self, module_id: IpcModuleId) { let modules = self.modules.read().unwrap(); modules.get(&module_id).map(|module| { + trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url); let client = nanoipc::init_client::>(&module.control_url).unwrap(); client.shutdown(); + trace!(target: "hypervisor", "Sent shutdown to {}", module_id); }); } } diff --git a/parity/boot.rs b/parity/boot.rs index ddc05437c59..1614317b8b7 100644 --- a/parity/boot.rs +++ b/parity/boot.rs @@ -62,10 +62,10 @@ pub fn payload() -> Result { .map_err(|binary_error| BootError::DecodeArgs(binary_error)) } -pub fn register(hv_url: &str, module_id: IpcModuleId) -> GuardedSocket>{ +pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket>{ let hypervisor_client = nanoipc::init_client::>(hv_url).unwrap(); hypervisor_client.handshake().unwrap(); - hypervisor_client.module_ready(module_id); + hypervisor_client.module_ready(module_id, control_url.to_owned()); hypervisor_client } diff --git a/parity/modules.rs b/parity/modules.rs index 83ae4480241..5edbca7020f 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -32,6 +32,7 @@ pub mod service_urls { pub const SYNC: &'static str = "parity-sync.ipc"; pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc"; pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc"; + pub const SYNC_CONTROL: &'static str = "parity-sync-control.ipc"; #[cfg(feature="stratum")] pub const STRATUM: &'static str = "parity-stratum.ipc"; #[cfg(feature="stratum")] diff --git a/parity/sync.rs b/parity/sync.rs index 5d3056acd48..fcba527a811 100644 --- a/parity/sync.rs +++ b/parity/sync.rs @@ -16,14 +16,26 @@ //! Parity sync service -use std; use std::sync::Arc; -use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL}; +use std::sync::atomic::AtomicBool; +use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL, ControlService}; use ethcore::client::{RemoteClient, ChainNotify}; use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration}; -use std::thread; use modules::service_urls; use boot; +use nanoipc; + +#[derive(Default)] +struct SyncControlService { + pub stop: Arc, +} + +impl ControlService for SyncControlService { + fn shutdown(&self) { + trace!(target: "hypervisor", "Received shutdown from control service"); + self.stop.store(true, ::std::sync::atomic::Ordering::Relaxed); + } +} pub fn main() { boot::setup_cli_logger("sync"); @@ -33,11 +45,14 @@ pub fn main() { let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT)); - let stop = boot::main_thread(); let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap(); - let _ = boot::register( + let _ = boot::main_thread(); + let stop = Arc::new(AtomicBool::new(false)); + + let hypervisor = boot::register( &service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL), + &service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL), SYNC_MODULE_ID ); @@ -57,7 +72,17 @@ pub fn main() { sync.clone() as Arc ); + let control_service = Arc::new(SyncControlService { stop: stop.clone() }); + let as_control = control_service.clone() as Arc; + let mut worker = nanoipc::Worker::::new(&as_control); + worker.add_reqrep( + &service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL) + ).unwrap(); + while !stop.load(::std::sync::atomic::Ordering::Relaxed) { - thread::park_timeout(std::time::Duration::from_millis(1000)); + worker.poll(); } + + hypervisor.module_shutdown(SYNC_MODULE_ID); + trace!(target: "hypervisor", "Sync process terminated gracefully"); } From 108e034c59adeade89394f673bbc65640bb14bbd Mon Sep 17 00:00:00 2001 From: NikVolf Date: Sat, 27 Aug 2016 00:55:38 +0300 Subject: [PATCH 3/3] hypervisor lifecycle alter --- parity/run.rs | 4 ++++ parity/sync.rs | 13 +++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/parity/run.rs b/parity/run.rs index 220f7737686..71995cd5f7b 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -260,6 +260,10 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { // Handle exit wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server); + // hypervisor should be shutdown first while everything still works and can be + // terminated gracefully + drop(hypervisor); + Ok(()) } diff --git a/parity/sync.rs b/parity/sync.rs index fcba527a811..95c9924c689 100644 --- a/parity/sync.rs +++ b/parity/sync.rs @@ -48,7 +48,7 @@ pub fn main() { let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap(); let _ = boot::main_thread(); - let stop = Arc::new(AtomicBool::new(false)); + let service_stop = Arc::new(AtomicBool::new(false)); let hypervisor = boot::register( &service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL), @@ -58,30 +58,31 @@ pub fn main() { boot::host_service( &service_urls::with_base(&service_config.io_path, service_urls::SYNC), - stop.clone(), + service_stop.clone(), sync.clone() as Arc ); boot::host_service( &service_urls::with_base(&service_config.io_path, service_urls::NETWORK_MANAGER), - stop.clone(), + service_stop.clone(), sync.clone() as Arc ); boot::host_service( &service_urls::with_base(&service_config.io_path, service_urls::SYNC_NOTIFY), - stop.clone(), + service_stop.clone(), sync.clone() as Arc ); - let control_service = Arc::new(SyncControlService { stop: stop.clone() }); + let control_service = Arc::new(SyncControlService::default()); let as_control = control_service.clone() as Arc; let mut worker = nanoipc::Worker::::new(&as_control); worker.add_reqrep( &service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL) ).unwrap(); - while !stop.load(::std::sync::atomic::Ordering::Relaxed) { + while !control_service.stop.load(::std::sync::atomic::Ordering::Relaxed) { worker.poll(); } + service_stop.store(true, ::std::sync::atomic::Ordering::Relaxed); hypervisor.module_shutdown(SYNC_MODULE_ID); trace!(target: "hypervisor", "Sync process terminated gracefully");