Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Control service for IPC #2013

Merged
merged 3 commits into from
Aug 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions ipc/hypervisor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use service::{HypervisorService, IpcModuleId};
use std::process::{Command,Child};
use std::collections::HashMap;

pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID};
pub use service::{HypervisorServiceClient, ControlService, CLIENT_MODULE_ID, SYNC_MODULE_ID};

pub type BinaryId = &'static str;

Expand Down Expand Up @@ -174,6 +174,10 @@ impl Hypervisor {
self.service.unchecked_count() == 0
}

pub fn modules_shutdown(&self) -> bool {
self.service.running_count() == 0
}

/// Waits for every required module to check in
pub fn wait_for_startup(&self) {
let mut worker = self.ipc_worker.write().unwrap();
Expand All @@ -182,21 +186,30 @@ impl Hypervisor {
}
}

/// Shutdown the ipc and all managed child processes
pub fn shutdown(&self, wait_time: Option<std::time::Duration>) {
if wait_time.is_some() { std::thread::sleep(wait_time.unwrap()) }
/// Waits for every required module to check in
pub fn wait_for_shutdown(&self) {
let mut worker = self.ipc_worker.write().unwrap();
while !self.modules_shutdown() {
worker.poll()
}
}

/// Shutdown the ipc and all managed child processes
pub fn shutdown(&self) {
let mut childs = self.processes.write().unwrap();
for (ref mut module, ref mut child) in childs.iter_mut() {
for (ref mut module, _) in childs.iter_mut() {
trace!(target: "hypervisor", "Stopping process module: {}", module);
child.kill().unwrap();
self.service.send_shutdown(**module);
}
trace!(target: "hypervisor", "Waiting for shutdown...");
self.wait_for_shutdown();
trace!(target: "hypervisor", "All modules reported shutdown");
}
}

impl Drop for Hypervisor {
fn drop(&mut self) {
self.shutdown(Some(std::time::Duration::new(1, 0)));
self.shutdown();
}
}

Expand Down
68 changes: 57 additions & 11 deletions ipc/hypervisor/src/service.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::sync::{RwLock,Arc};
use ipc::IpcConfig;
use std::collections::HashMap;
use nanoipc;

pub type IpcModuleId = u64;

Expand All @@ -28,15 +29,43 @@ pub const SYNC_MODULE_ID: IpcModuleId = 2100;

/// IPC service that handles module management
pub struct HypervisorService {
check_list: RwLock<HashMap<IpcModuleId, bool>>,
modules: RwLock<HashMap<IpcModuleId, ModuleState>>,
}

#[derive(Default)]
pub struct ModuleState {
started: bool,
control_url: String,
shutdown: bool,
}


#[derive(Ipc)]
pub trait ControlService {
fn shutdown(&self);
}

#[derive(Ipc)]
impl HypervisorService {
fn module_ready(&self, module_id: u64) -> bool {
let mut check_list = self.check_list.write().unwrap();
check_list.get_mut(&module_id).map(|mut status| *status = true);
check_list.iter().any(|(_, status)| !status)
// return type for making method synchronous
fn module_ready(&self, module_id: u64, control_url: String) -> bool {
let mut modules = self.modules.write().unwrap();
modules.get_mut(&module_id).map(|mut module| {
module.started = true;
module.control_url = control_url;
});
trace!(target: "hypervisor", "Module ready: {}", module_id);
true
}

// return type for making method synchronous
fn module_shutdown(&self, module_id: u64) -> bool {
let mut modules = self.modules.write().unwrap();
modules.get_mut(&module_id).map(|mut module| {
module.shutdown = true;
});
trace!(target: "hypervisor", "Module shutdown: {}", module_id);
true
}
}

Expand All @@ -48,29 +77,46 @@ impl HypervisorService {

/// New service with list of modules that will report for being ready
pub fn with_modules(module_ids: Vec<IpcModuleId>) -> Arc<HypervisorService> {
let mut check_list = HashMap::new();
let mut modules = HashMap::new();
for module_id in module_ids {
check_list.insert(module_id, false);
modules.insert(module_id, ModuleState::default());
}
Arc::new(HypervisorService {
check_list: RwLock::new(check_list),
modules: RwLock::new(modules),
})
}

/// Add the module to the check-list
pub fn add_module(&self, module_id: IpcModuleId) {
self.check_list.write().unwrap().insert(module_id, false);
self.modules.write().unwrap().insert(module_id, ModuleState::default());
}

/// Number of modules still being waited for check-in
pub fn unchecked_count(&self) -> usize {
self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count()
self.modules.read().unwrap().iter().filter(|&(_, module)| !module.started).count()
}

/// List of all modules within this service
pub fn module_ids(&self) -> Vec<IpcModuleId> {
self.check_list.read().unwrap().iter().map(|(module_id, _)| module_id).cloned().collect()
self.modules.read().unwrap().iter().map(|(module_id, _)| module_id).cloned().collect()
}

/// Number of modules started and running
pub fn running_count(&self) -> usize {
self.modules.read().unwrap().iter().filter(|&(_, module)| module.started && !module.shutdown).count()
}

pub fn send_shutdown(&self, module_id: IpcModuleId) {
let modules = self.modules.read().unwrap();
modules.get(&module_id).map(|module| {
trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url);
let client = nanoipc::init_client::<ControlServiceClient<_>>(&module.control_url).unwrap();
client.shutdown();
trace!(target: "hypervisor", "Sent shutdown to {}", module_id);
});
}
}

impl ::ipc::IpcConfig for HypervisorService {}

impl ::ipc::IpcConfig for ControlService {}
4 changes: 2 additions & 2 deletions parity/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ pub fn payload<B: ipc::BinaryConvertable>() -> Result<B, BootError> {
.map_err(|binary_error| BootError::DecodeArgs(binary_error))
}

pub fn register(hv_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
hypervisor_client.handshake().unwrap();
hypervisor_client.module_ready(module_id);
hypervisor_client.module_ready(module_id, control_url.to_owned());

hypervisor_client
}
Expand Down
1 change: 1 addition & 0 deletions parity/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod service_urls {
pub const SYNC: &'static str = "parity-sync.ipc";
pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc";
pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc";
pub const SYNC_CONTROL: &'static str = "parity-sync-control.ipc";
#[cfg(feature="stratum")]
pub const STRATUM: &'static str = "parity-stratum.ipc";
#[cfg(feature="stratum")]
Expand Down
4 changes: 4 additions & 0 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
// Handle exit
wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server);

// hypervisor should be shutdown first while everything still works and can be
// terminated gracefully
drop(hypervisor);

Ok(())
}

Expand Down
46 changes: 36 additions & 10 deletions parity/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,26 @@

//! Parity sync service

use std;
use std::sync::Arc;
use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL};
use std::sync::atomic::AtomicBool;
use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL, ControlService};
use ethcore::client::{RemoteClient, ChainNotify};
use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration};
use std::thread;
use modules::service_urls;
use boot;
use nanoipc;

#[derive(Default)]
struct SyncControlService {
pub stop: Arc<AtomicBool>,
}

impl ControlService for SyncControlService {
fn shutdown(&self) {
trace!(target: "hypervisor", "Received shutdown from control service");
self.stop.store(true, ::std::sync::atomic::Ordering::Relaxed);
}
}

pub fn main() {
boot::setup_cli_logger("sync");
Expand All @@ -33,31 +45,45 @@ pub fn main() {

let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT));

let stop = boot::main_thread();
let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap();

let _ = boot::register(
let _ = boot::main_thread();
let service_stop = Arc::new(AtomicBool::new(false));

let hypervisor = boot::register(
&service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL),
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL),
SYNC_MODULE_ID
);

boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC),
stop.clone(),
service_stop.clone(),
sync.clone() as Arc<SyncProvider>
);
boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::NETWORK_MANAGER),
stop.clone(),
service_stop.clone(),
sync.clone() as Arc<ManageNetwork>
);
boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_NOTIFY),
stop.clone(),
service_stop.clone(),
sync.clone() as Arc<ChainNotify>
);

while !stop.load(::std::sync::atomic::Ordering::Relaxed) {
thread::park_timeout(std::time::Duration::from_millis(1000));
let control_service = Arc::new(SyncControlService::default());
let as_control = control_service.clone() as Arc<ControlService>;
let mut worker = nanoipc::Worker::<ControlService>::new(&as_control);
worker.add_reqrep(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL)
).unwrap();

while !control_service.stop.load(::std::sync::atomic::Ordering::Relaxed) {
worker.poll();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a timeout for this poll?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
service_stop.store(true, ::std::sync::atomic::Ordering::Relaxed);

hypervisor.module_shutdown(SYNC_MODULE_ID);
trace!(target: "hypervisor", "Sync process terminated gracefully");
}