Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: the all-in-one file of core-run #1349

Merged
merged 8 commits into from
Sep 1, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/memory-tracker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration};

use jemalloc_ctl::{epoch, stats};
use log::error;
use rocksdb::ops::{GetColumnFamilys, GetProperty, GetPropertyCF};
pub use rocksdb::ops::{GetColumnFamilys, GetProperty, GetPropertyCF};

use protocol::tokio;

Expand Down
68 changes: 68 additions & 0 deletions core/run/src/components/extensions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! Optional extensions.

use common_apm::{server::run_prometheus_server, tracing::global_tracer_register};
use common_config_parser::types::{ConfigJaeger, ConfigPrometheus};
use protocol::{tokio, ProtocolResult};

pub(crate) trait ExtensionConfig {
const NAME: &'static str;

/// Try to start and return the result.
fn try_to_start(&self) -> ProtocolResult<bool>;

/// Try to start and ignore the result.
fn start_if_possible(&self) {
match self.try_to_start() {
Ok(started) => {
if started {
log::info!("{} is started", Self::NAME);
} else {
log::info!("{} is disabled", Self::NAME);
}
}
Err(err) => {
log::error!("failed to start {} since {err}", Self::NAME);
}
}
}
}

impl ExtensionConfig for Option<ConfigJaeger> {
const NAME: &'static str = "Jaeger";

fn try_to_start(&self) -> ProtocolResult<bool> {
if let Some(ref config) = self {
if let Some(ref addr) = config.tracing_address {
let service_name = config
.service_name
.as_ref()
.map(ToOwned::to_owned)
.unwrap_or("axon".to_owned());
let tracing_batch_size = config.tracing_batch_size.unwrap_or(50);
global_tracer_register(&service_name, addr.to_owned(), tracing_batch_size);
Ok(true)
} else {
Ok(false)
}
} else {
Ok(false)
}
}
}

impl ExtensionConfig for Option<ConfigPrometheus> {
const NAME: &'static str = "Prometheus";

fn try_to_start(&self) -> ProtocolResult<bool> {
if let Some(ref config) = self {
if let Some(ref addr) = config.listening_address {
tokio::spawn(run_prometheus_server(addr.to_owned()));
Ok(true)
} else {
Ok(false)
}
} else {
Ok(false)
}
}
}
33 changes: 33 additions & 0 deletions core/run/src/components/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
pub(crate) mod extensions;
pub(crate) mod network;
pub(crate) mod storage;
pub(crate) mod system;

#[cfg(all(
not(target_env = "msvc"),
not(target_os = "macos"),
feature = "jemalloc"
))]
pub(crate) mod profiling;

#[cfg(not(all(
not(target_env = "msvc"),
not(target_os = "macos"),
feature = "jemalloc"
)))]
pub(crate) mod profiling {
use std::sync::Arc;

pub(crate) fn start() {
log::warn!("profiling is not supported, so it doesn't start");
}
pub(crate) fn stop() {
log::warn!("profiling is not supported, so it doesn't require stopping");
}
pub(crate) fn track_current_process() {
log::warn!("profiling is not supported, so it doesn't track current process");
}
pub(crate) fn track_db_process<DB>(typ: &str, _db: &Arc<DB>) {
log::warn!("profiling is not supported, so it doesn't track db process for [{typ}]");
}
}
147 changes: 147 additions & 0 deletions core/run/src/components/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//! Configure the network service.

use std::sync::Arc;

use core_consensus::message::{
ChokeMessageHandler, ProposalMessageHandler, PullBlockRpcHandler, PullProofRpcHandler,
PullTxsRpcHandler, QCMessageHandler, RemoteHeightMessageHandler, VoteMessageHandler,
BROADCAST_HEIGHT, END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_SIGNED_CHOKE,
END_GOSSIP_SIGNED_PROPOSAL, END_GOSSIP_SIGNED_VOTE, RPC_RESP_SYNC_PULL_BLOCK,
RPC_RESP_SYNC_PULL_PROOF, RPC_RESP_SYNC_PULL_TXS, RPC_SYNC_PULL_BLOCK, RPC_SYNC_PULL_PROOF,
RPC_SYNC_PULL_TXS,
};
use core_consensus::OverlordSynchronization;
use core_db::RocksAdapter;
use core_mempool::{
NewTxsHandler, PullTxsHandler, END_GOSSIP_NEW_TXS, RPC_PULL_TXS, RPC_RESP_PULL_TXS,
RPC_RESP_PULL_TXS_SYNC,
};
use core_network::{KeyProvider, NetworkService, PeerId, PeerIdExt};
use core_storage::ImplStorage;
use protocol::{
traits::{Consensus, Context, MemPool, Network, SynchronizationAdapter},
types::ValidatorExtend,
ProtocolResult,
};

pub(crate) trait NetworkServiceExt {
fn tag_consensus(&self, validators: &[ValidatorExtend]) -> ProtocolResult<()>;

fn register_mempool_endpoint(
&mut self,
mempool: &Arc<impl MemPool + 'static>,
) -> ProtocolResult<()>;

fn register_consensus_endpoint(
&mut self,
overlord_consensus: &Arc<impl Consensus + 'static>,
) -> ProtocolResult<()>;

fn register_synchronization_endpoint(
&mut self,
synchronization: &Arc<OverlordSynchronization<impl SynchronizationAdapter + 'static>>,
) -> ProtocolResult<()>;

fn register_storage_endpoint(
&mut self,
storage: &Arc<ImplStorage<RocksAdapter>>,
) -> ProtocolResult<()>;

fn register_rpc(&mut self) -> ProtocolResult<()>;
}

impl<K> NetworkServiceExt for NetworkService<K>
where
K: KeyProvider,
{
fn tag_consensus(&self, validators: &[ValidatorExtend]) -> ProtocolResult<()> {
let peer_ids = validators
.iter()
.map(|v| PeerId::from_pubkey_bytes(v.pub_key.as_bytes()).map(PeerIdExt::into_bytes_ext))
.collect::<Result<Vec<_>, _>>()
.unwrap();
self.handle().tag_consensus(Context::new(), peer_ids)
}

fn register_mempool_endpoint(
&mut self,
mempool: &Arc<impl MemPool + 'static>,
) -> ProtocolResult<()> {
// register broadcast new transaction
self.register_endpoint_handler(
END_GOSSIP_NEW_TXS,
NewTxsHandler::new(Arc::clone(mempool)),
)?;
// register pull txs from other node
self.register_endpoint_handler(
RPC_PULL_TXS,
PullTxsHandler::new(Arc::new(self.handle()), Arc::clone(mempool)),
)?;
Ok(())
}

fn register_consensus_endpoint(
&mut self,
overlord_consensus: &Arc<impl Consensus + 'static>,
) -> ProtocolResult<()> {
// register consensus
self.register_endpoint_handler(
END_GOSSIP_SIGNED_PROPOSAL,
ProposalMessageHandler::new(Arc::clone(overlord_consensus)),
)?;
self.register_endpoint_handler(
END_GOSSIP_AGGREGATED_VOTE,
QCMessageHandler::new(Arc::clone(overlord_consensus)),
)?;
self.register_endpoint_handler(
END_GOSSIP_SIGNED_VOTE,
VoteMessageHandler::new(Arc::clone(overlord_consensus)),
)?;
self.register_endpoint_handler(
END_GOSSIP_SIGNED_CHOKE,
ChokeMessageHandler::new(Arc::clone(overlord_consensus)),
)?;
Ok(())
}

fn register_synchronization_endpoint(
&mut self,
synchronization: &Arc<OverlordSynchronization<impl SynchronizationAdapter + 'static>>,
) -> ProtocolResult<()> {
self.register_endpoint_handler(
BROADCAST_HEIGHT,
RemoteHeightMessageHandler::new(Arc::clone(synchronization)),
)?;
Ok(())
}

fn register_storage_endpoint(
&mut self,
storage: &Arc<ImplStorage<RocksAdapter>>,
) -> ProtocolResult<()> {
let handle = Arc::new(self.handle());
// register storage
self.register_endpoint_handler(
RPC_SYNC_PULL_BLOCK,
PullBlockRpcHandler::new(Arc::clone(&handle), Arc::clone(storage)),
)?;
self.register_endpoint_handler(
RPC_SYNC_PULL_PROOF,
PullProofRpcHandler::new(Arc::clone(&handle), Arc::clone(storage)),
)?;
self.register_endpoint_handler(
RPC_SYNC_PULL_TXS,
PullTxsRpcHandler::new(Arc::clone(&handle), Arc::clone(storage)),
)?;
Ok(())
}

fn register_rpc(&mut self) -> ProtocolResult<()> {
self.register_rpc_response(RPC_RESP_PULL_TXS)?;
self.register_rpc_response(RPC_RESP_PULL_TXS_SYNC)?;
self.register_rpc_response(RPC_RESP_SYNC_PULL_BLOCK)?;
self.register_rpc_response(RPC_RESP_SYNC_PULL_PROOF)?;
self.register_rpc_response(RPC_RESP_SYNC_PULL_TXS)?;
Ok(())
}
}
47 changes: 47 additions & 0 deletions core/run/src/components/profiling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! Control the profiling related features.

use std::sync::Arc;

use common_memory_tracker::{GetColumnFamilys, GetProperty, GetPropertyCF};
use jemalloc_ctl::{Access, AsName};
use jemallocator::Jemalloc;
use protocol::tokio;

#[global_allocator]
pub static JEMALLOC: Jemalloc = Jemalloc;

pub(crate) fn start() {
set_profile(true);
}

pub(crate) fn stop() {
set_profile(false);
dump_profile();
}

pub(crate) fn track_current_process() {
tokio::spawn(common_memory_tracker::track_current_process());
}

pub(crate) fn track_db_process<DB>(typ: &'static str, db_ref: &Arc<DB>)
where
DB: GetColumnFamilys + GetProperty + GetPropertyCF + Send + Sync + 'static,
{
let db = Arc::clone(db_ref);
tokio::spawn(common_memory_tracker::track_db_process::<DB>(typ, db));
}

fn set_profile(is_active: bool) {
let _ = b"prof.active\0"
.name()
.write(is_active)
.map_err(|e| panic!("Set jemalloc profile error {:?}", e));
}

fn dump_profile() {
let name = b"profile.out\0".as_ref();
b"prof.dump\0"
.name()
.write(name)
.expect("Should succeed to dump profile")
}
Loading