Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
sorted with shutdown-wait
Browse files Browse the repository at this point in the history
  • Loading branch information
NikVolf committed Aug 26, 2016
1 parent c685a72 commit 9ee993d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 17 deletions.
11 changes: 5 additions & 6 deletions ipc/hypervisor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use service::{HypervisorService, IpcModuleId};
use std::process::{Command,Child};
use std::collections::HashMap;

pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID};
pub use service::{HypervisorServiceClient, ControlService, CLIENT_MODULE_ID, SYNC_MODULE_ID};

pub type BinaryId = &'static str;

Expand Down Expand Up @@ -195,22 +195,21 @@ impl Hypervisor {
}

/// Shutdown the ipc and all managed child processes
pub fn shutdown(&self, wait_time: Option<std::time::Duration>) {
if wait_time.is_some() { std::thread::sleep(wait_time.unwrap()) }

pub fn shutdown(&self) {
let mut childs = self.processes.write().unwrap();
for (ref mut module, _) in childs.iter_mut() {
trace!(target: "hypervisor", "Stopping process module: {}", module);
self.service.send_shutdown(**module);
}

trace!(target: "hypervisor", "Waiting for shutdown...");
self.wait_for_shutdown();
trace!(target: "hypervisor", "All modules reported shutdown");
}
}

impl Drop for Hypervisor {
fn drop(&mut self) {
self.shutdown(Some(std::time::Duration::new(1, 0)));
self.shutdown();
}
}

Expand Down
14 changes: 11 additions & 3 deletions ipc/hypervisor/src/service.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,31 @@ pub struct ModuleState {


#[derive(Ipc)]
trait ControlService {
pub trait ControlService {
fn shutdown(&self);
}

#[derive(Ipc)]
impl HypervisorService {
fn module_ready(&self, control_url: String, module_id: u64) {
// return type for making method synchronous
fn module_ready(&self, module_id: u64, control_url: String) -> bool {
let mut modules = self.modules.write().unwrap();
modules.get_mut(&module_id).map(|mut module| {
module.started = true;
module.control_url = control_url;
});
trace!(target: "hypervisor", "Module ready: {}", module_id);
true
}

fn module_shutdown(&self, module_id: u64) {
// return type for making method synchronous
fn module_shutdown(&self, module_id: u64) -> bool {
let mut modules = self.modules.write().unwrap();
modules.get_mut(&module_id).map(|mut module| {
module.shutdown = true;
});
trace!(target: "hypervisor", "Module shutdown: {}", module_id);
true
}
}

Expand Down Expand Up @@ -103,8 +109,10 @@ impl HypervisorService {
pub fn send_shutdown(&self, module_id: IpcModuleId) {
let modules = self.modules.read().unwrap();
modules.get(&module_id).map(|module| {
trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url);
let client = nanoipc::init_client::<ControlServiceClient<_>>(&module.control_url).unwrap();
client.shutdown();
trace!(target: "hypervisor", "Sent shutdown to {}", module_id);
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions parity/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ pub fn payload<B: ipc::BinaryConvertable>() -> Result<B, BootError> {
.map_err(|binary_error| BootError::DecodeArgs(binary_error))
}

pub fn register(hv_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
hypervisor_client.handshake().unwrap();
hypervisor_client.module_ready(module_id);
hypervisor_client.module_ready(module_id, control_url.to_owned());

hypervisor_client
}
Expand Down
1 change: 1 addition & 0 deletions parity/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod service_urls {
pub const SYNC: &'static str = "parity-sync.ipc";
pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc";
pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc";
pub const SYNC_CONTROL: &'static str = "parity-sync-control.ipc";
#[cfg(feature="stratum")]
pub const STRATUM: &'static str = "parity-stratum.ipc";
#[cfg(feature="stratum")]
Expand Down
37 changes: 31 additions & 6 deletions parity/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,26 @@

//! Parity sync service
use std;
use std::sync::Arc;
use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL};
use std::sync::atomic::AtomicBool;
use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL, ControlService};
use ethcore::client::{RemoteClient, ChainNotify};
use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration};
use std::thread;
use modules::service_urls;
use boot;
use nanoipc;

#[derive(Default)]
struct SyncControlService {
pub stop: Arc<AtomicBool>,
}

impl ControlService for SyncControlService {
fn shutdown(&self) {
trace!(target: "hypervisor", "Received shutdown from control service");
self.stop.store(true, ::std::sync::atomic::Ordering::Relaxed);
}
}

pub fn main() {
boot::setup_cli_logger("sync");
Expand All @@ -33,11 +45,14 @@ pub fn main() {

let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT));

let stop = boot::main_thread();
let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap();

let _ = boot::register(
let _ = boot::main_thread();
let stop = Arc::new(AtomicBool::new(false));

let hypervisor = boot::register(
&service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL),
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL),
SYNC_MODULE_ID
);

Expand All @@ -57,7 +72,17 @@ pub fn main() {
sync.clone() as Arc<ChainNotify>
);

let control_service = Arc::new(SyncControlService { stop: stop.clone() });
let as_control = control_service.clone() as Arc<ControlService>;
let mut worker = nanoipc::Worker::<ControlService>::new(&as_control);
worker.add_reqrep(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL)
).unwrap();

while !stop.load(::std::sync::atomic::Ordering::Relaxed) {
thread::park_timeout(std::time::Duration::from_millis(1000));
worker.poll();
}

hypervisor.module_shutdown(SYNC_MODULE_ID);
trace!(target: "hypervisor", "Sync process terminated gracefully");
}

0 comments on commit 9ee993d

Please sign in to comment.