Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add diskcheck #3598

Merged
merged 12 commits into from
Aug 8, 2022
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 30 additions & 10 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use starcoin_rpc_server::module::{PubSubService, PubSubServiceFactory};
use starcoin_rpc_server::service::RpcService;
use starcoin_service_registry::bus::{Bus, BusService};
use starcoin_service_registry::{
ActorService, RegistryAsyncService, RegistryService, ServiceContext, ServiceFactory,
ServiceHandler, ServiceRef,
ActorService, EventHandler, RegistryAsyncService, RegistryService, ServiceContext,
ServiceFactory, ServiceHandler, ServiceRef,
};
use starcoin_state_service::ChainStateService;
use starcoin_storage::block_info::BlockInfoStore;
Expand All @@ -53,7 +53,7 @@ use starcoin_sync::sync::SyncService;
use starcoin_sync::txn_sync::TxnSyncService;
use starcoin_sync::verified_rpc_client::VerifiedRpcClient;
use starcoin_txpool::TxPoolActorService;
use starcoin_types::system_events::SystemStarted;
use starcoin_types::system_events::{SystemShutdown, SystemStarted};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

Expand All @@ -69,7 +69,23 @@ impl ServiceFactory<Self> for NodeService {
}
}

impl ActorService for NodeService {}
impl ActorService for NodeService {
fn started(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
ctx.subscribe::<SystemShutdown>();
Ok(())
}

fn stopped(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
ctx.unsubscribe::<SystemShutdown>();
Ok(())
}
}

impl EventHandler<Self, SystemShutdown> for NodeService {
fn handle_event(&mut self, _: SystemShutdown, _: &mut ServiceContext<Self>) {
self.shutdown_system();
}
}

impl ServiceHandler<Self, NodeRequest> for NodeService {
fn handle(
Expand Down Expand Up @@ -105,12 +121,7 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
}
NodeRequest::ShutdownSystem => {
info!("Receive StopSystem request, try to stop system.");
if let Err(e) = self.registry.shutdown_system_sync() {
error!("Shutdown registry error: {}", e);
};
//wait a seconds for registry shutdown, then stop System.
std::thread::sleep(Duration::from_millis(2000));
System::current().stop();
self.shutdown_system();
NodeResponse::Result(Ok(()))
}
NodeRequest::StopPacemaker => NodeResponse::Result(
Expand Down Expand Up @@ -404,4 +415,13 @@ impl NodeService {

Ok((registry, node_service))
}

fn shutdown_system(&self) {
if let Err(e) = self.registry.shutdown_system_sync() {
error!("Shutdown registry error: {}", e);
};
//wait a seconds for registry shutdown, then stop System.
std::thread::sleep(Duration::from_millis(2000));
jiangying000 marked this conversation as resolved.
Show resolved Hide resolved
System::current().stop();
}
}
4 changes: 3 additions & 1 deletion sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ futures-retry = "0.6"
itertools = { version = "0.10.3", default-features = false }
pin-project = "1.0.1"
futures-timer = "3.0"
stream-task = { path = "../commons/stream-task" }
sysinfo = "0.25.1"
stream-task ={ path = "../commons/stream-task"}
starcoin-chain = { path = "../chain" }
config = { path = "../config", package = "starcoin-config" }
network = { path = "../network", package = "starcoin-network" }
Expand Down Expand Up @@ -47,6 +48,7 @@ starcoin-chain-service = { path = "../chain/service" }
starcoin-chain-api = { path = "../chain/api" }
network-rpc-core = { path = "../network-rpc/core" }


[dev-dependencies]
tokio = { version = "^1", features = ["full"] }
miner = { path = "../miner", package = "starcoin-miner" }
Expand Down
100 changes: 94 additions & 6 deletions sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService};
use crate::sync::{CheckSyncEvent, SyncService};
use crate::tasks::BlockConnectedEvent;
use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent};
use anyhow::{format_err, Result};
use config::{NodeConfig, G_CRATE_VERSION};
use executor::VMMetrics;
Expand All @@ -18,20 +18,29 @@ use starcoin_storage::{BlockStore, Storage};
use starcoin_sync_api::PeerNewBlock;
use starcoin_types::block::ExecutedBlock;
use starcoin_types::sync_status::SyncStatus;
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent};
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown};
use std::sync::Arc;
use sysinfo::{DiskExt, System, SystemExt};
use txpool::TxPoolService;

const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3;
const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5;

pub struct BlockConnectorService {
chain_service: WriteBlockChainService<TxPoolService>,
sync_status: Option<SyncStatus>,
config: Arc<NodeConfig>,
}

impl BlockConnectorService {
pub fn new(chain_service: WriteBlockChainService<TxPoolService>) -> Self {
pub fn new(
chain_service: WriteBlockChainService<TxPoolService>,
config: Arc<NodeConfig>,
) -> Self {
Self {
chain_service,
sync_status: None,
config,
}
}

Expand All @@ -41,6 +50,53 @@ impl BlockConnectorService {
None => false,
}
}

pub fn check_disk_space(&mut self) -> Option<Result<u64>> {
if System::IS_SUPPORTED {
let mut sys = System::new_all();
if sys.disks().len() == 1 {
let disk = &sys.disks()[0];
dbg!(DISK_CHECKPOINT_FOR_PANIC);
dbg!(disk.available_space());
if DISK_CHECKPOINT_FOR_PANIC > disk.available_space() {
return Some(Err(anyhow::anyhow!(
"Disk space is less than {} GB, please add disk space.",
DISK_CHECKPOINT_FOR_PANIC / 1024 / 1024 / 1024
)));
} else if DISK_CHECKPOINT_FOR_WARN > disk.available_space() {
return Some(Ok(disk.available_space() / 1024 / 1024 / 1024));
}
} else {
sys.sort_disks_by(|a, b| {
if a.mount_point()
.starts_with(b.mount_point().to_str().unwrap())
{
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
}
});

let base_data_dir = self.config.base().base_data_dir.path();
for disk in sys.disks() {
if base_data_dir.starts_with(disk.mount_point()) {
if DISK_CHECKPOINT_FOR_PANIC > disk.available_space() {
return Some(Err(anyhow::anyhow!(
"Disk space is less than {} GB, please add disk space.",
DISK_CHECKPOINT_FOR_PANIC / 1024 / 1024 / 1024
)));
} else if DISK_CHECKPOINT_FOR_WARN > disk.available_space() {
return Some(Ok(disk.available_space() / 1024 / 1024 / 1024));
}

break;
}
}
}
}

None
}
}

impl ServiceFactory<Self> for BlockConnectorService {
Expand All @@ -53,10 +109,16 @@ impl ServiceFactory<Self> for BlockConnectorService {
.get_startup_info()?
.ok_or_else(|| format_err!("Startup info should exist."))?;
let vm_metrics = ctx.get_shared_opt::<VMMetrics>()?;
let chain_service =
WriteBlockChainService::new(config, startup_info, storage, txpool, bus, vm_metrics)?;
let chain_service = WriteBlockChainService::new(
config.clone(),
startup_info,
storage,
txpool,
bus,
vm_metrics,
)?;

Ok(Self::new(chain_service))
Ok(Self::new(chain_service, config))
}
}

Expand All @@ -66,6 +128,11 @@ impl ActorService for BlockConnectorService {
ctx.set_mailbox_capacity(1024);
ctx.subscribe::<SyncStatusChangeEvent>();
ctx.subscribe::<MinedBlock>();

ctx.run_interval(std::time::Duration::from_secs(3), move |ctx| {
ctx.notify(crate::tasks::BlockDiskCheckEvent {});
});

Ok(())
}

Expand All @@ -76,6 +143,26 @@ impl ActorService for BlockConnectorService {
}
}

impl EventHandler<Self, BlockDiskCheckEvent> for BlockConnectorService {
fn handle_event(
&mut self,
_: BlockDiskCheckEvent,
ctx: &mut ServiceContext<BlockConnectorService>,
) {
if let Some(res) = self.check_disk_space() {
YouNeedWork marked this conversation as resolved.
Show resolved Hide resolved
match res {
Ok(available_space) => {
warn!("Available diskspace only {}/GB left ", available_space)
}
Err(e) => {
error!("{}", e);
ctx.broadcast(SystemShutdown);
}
}
}
}
}

impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService {
fn handle_event(
&mut self,
Expand All @@ -84,6 +171,7 @@ impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService {
) {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
if let Err(e) = self.chain_service.try_connect(block) {
error!("Process connected block error: {:?}", e);
Expand Down
3 changes: 3 additions & 0 deletions sync/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ pub struct BlockConnectedEvent {
pub block: Block,
}

#[derive(Clone, Debug)]
pub struct BlockDiskCheckEvent {}

pub trait BlockConnectedEventHandle: Send + Clone + std::marker::Unpin {
fn handle(&mut self, event: BlockConnectedEvent) -> Result<()>;
}
Expand Down
3 changes: 3 additions & 0 deletions types/src/system_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct MinedBlock(pub Arc<Block>);
#[derive(Clone, Debug)]
pub struct SystemStarted;

#[derive(Clone, Debug)]
pub struct SystemShutdown;

#[derive(Clone, Debug)]
pub struct SyncStatusChangeEvent(pub SyncStatus);

Expand Down