Skip to content

Commit

Permalink
Feature/network refactor (#1804)
Browse files Browse the repository at this point in the history
* [network] Refactor network to actor service.
* [network] rename NetworkAsyncService to NetworkServiceRef
* [network] Remove self peer id from seeds.
* [network] refactor test and name.
* [network] Filter repeat message from network.
* [network] Handle SyncStatusChangeEvent in network, ignore notification message if not is not synchronized.
* [network] Remove PeerMsgBroadcasterService, handle PropagateNewTransactions in NetworkActorService.
  • Loading branch information
jolestar authored Dec 15, 2020
1 parent 37010d8 commit 77a17fa
Show file tree
Hide file tree
Showing 40 changed files with 1,578 additions and 1,598 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 7 additions & 15 deletions block-relayer/src/block_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::FutureExt;
use logger::prelude::*;
use network_api::messages::{CompactBlockMessage, NotificationMessage, PeerCompactBlockMessage};
use network_api::NetworkService;
use starcoin_network::NetworkAsyncService;
use starcoin_network::NetworkServiceRef;
use starcoin_network_rpc_api::{gen_client::NetworkRpcClient, GetTxnsWithHash};
use starcoin_service_registry::{ActorService, EventHandler, ServiceContext, ServiceFactory};
use starcoin_sync::block_connector::BlockConnectorService;
Expand Down Expand Up @@ -124,16 +124,12 @@ impl BlockRelayer {
Ok(block)
}

fn block_into_compact(&self, block: Block) -> CompactBlock {
CompactBlock::new(&block, vec![])
}

fn handle_block_event(
&self,
compact_block_msg: PeerCompactBlockMessage,
ctx: &mut ServiceContext<BlockRelayer>,
) -> Result<()> {
let network = ctx.get_shared::<NetworkAsyncService>()?;
let network = ctx.get_shared::<NetworkServiceRef>()?;
let block_connector_service = ctx.service_ref::<BlockConnectorService>()?.clone();
let rpc_client = NetworkRpcClient::new(network);
let txpool = self.txpool.clone();
Expand Down Expand Up @@ -195,26 +191,22 @@ impl EventHandler<Self, NewHeadBlock> for BlockRelayer {
debug!("[block-relay] Ignore NewHeadBlock event because the node has not been synchronized yet.");
return;
}
let network = match ctx.get_shared::<NetworkAsyncService>() {
let network = match ctx.get_shared::<NetworkServiceRef>() {
Ok(network) => network,
Err(e) => {
error!("Get network service error: {:?}", e);
return;
}
};
let compact_block = self.block_into_compact(event.0.get_block().clone());
let compact_block = event.0.get_block().clone().into();
let total_difficulty = event.0.get_total_difficulty();
let compact_block_msg = CompactBlockMessage {
compact_block,
total_difficulty,
};
ctx.spawn(async move {
network
.broadcast(NotificationMessage::CompactBlock(Box::new(
compact_block_msg,
)))
.await;
});
network.broadcast(NotificationMessage::CompactBlock(Box::new(
compact_block_msg,
)));
}
}

Expand Down
7 changes: 4 additions & 3 deletions commons/metrics/src/metric_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ pub fn start_server(host: String, port: u16) {
.enable_io()
.build()
.unwrap();
rt.block_on(async {
if let Err(e) = rt.block_on(async {
let server = Server::bind(&addr).serve(make_service);
server.await
})
.unwrap();
}) {
error!("Start metric server failed: {:?}", e);
}
});
}
8 changes: 8 additions & 0 deletions commons/service-registry/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ where
})
}

pub fn put_shared<T>(&self, shared: T) -> Result<()>
where
T: Send + Sync + Clone + 'static,
{
let registry_ref = self.registry_ref();
registry_ref.put_shared_sync(shared)
}

pub fn subscribe<M>(&mut self)
where
M: Send + Clone + Debug + 'static,
Expand Down
6 changes: 2 additions & 4 deletions core/genesis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use starcoin_accumulator::node::AccumulatorStoreType;
use starcoin_accumulator::{Accumulator, MerkleAccumulator};
use starcoin_chain::BlockChain;
use starcoin_config::{genesis_key_pair, ChainNetwork};
use starcoin_crypto::HashValue;
use starcoin_logger::prelude::*;
use starcoin_state_api::ChainState;
use starcoin_statedb::ChainStateDB;
Expand Down Expand Up @@ -352,13 +351,12 @@ impl Genesis {

pub fn init_storage_for_test(
net: &ChainNetwork,
) -> Result<(Arc<Storage>, StartupInfo, HashValue)> {
) -> Result<(Arc<Storage>, StartupInfo, Genesis)> {
debug!("init storage by genesis for test.");
let storage = Arc::new(Storage::new(StorageInstance::new_cache_instance())?);
let genesis = Genesis::load(net)?;
let genesis_hash = genesis.block.header().id();
let startup_info = genesis.execute_genesis_block(net, storage.clone())?;
Ok((storage, startup_info, genesis_hash))
Ok((storage, startup_info, genesis))
}
}

Expand Down
20 changes: 12 additions & 8 deletions miner/src/create_block_template/test_create_block_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ fn test_create_block_template_by_net(net: ChainNetworkID) {
opt.data_dir = Some(temp_path.path().to_path_buf());

let node_config = Arc::new(NodeConfig::load_with_opt(&opt).unwrap());
let (storage, startup_info, genesis_id) =
let (storage, startup_info, genesis) =
StarcoinGenesis::init_storage_for_test(node_config.net())
.expect("init storage by genesis fail.");

let genesis_id = genesis.block().id();
//TODO mock txpool after refactor txpool by service reigstry.
let chain_header = storage
.get_block_header_by_hash(startup_info.main)
Expand Down Expand Up @@ -64,8 +64,9 @@ fn test_create_block_template_by_net(net: ChainNetworkID) {
#[stest::test(timeout = 120)]
fn test_switch_main() {
let node_config = Arc::new(NodeConfig::random_for_test());
let (storage, _, genesis_id) = StarcoinGenesis::init_storage_for_test(node_config.net())
let (storage, _, genesis) = StarcoinGenesis::init_storage_for_test(node_config.net())
.expect("init storage by genesis fail.");
let genesis_id = genesis.block().id();
let times = 10;

let miner_account = AccountInfo::random();
Expand Down Expand Up @@ -166,8 +167,9 @@ fn test_switch_main() {
#[stest::test]
fn test_do_uncles() {
let node_config = Arc::new(NodeConfig::random_for_test());
let (storage, _, genesis_id) = StarcoinGenesis::init_storage_for_test(node_config.net())
let (storage, _, genesis) = StarcoinGenesis::init_storage_for_test(node_config.net())
.expect("init storage by genesis fail.");
let genesis_id = genesis.block().id();
let times = 2;

let miner_account = AccountInfo::random();
Expand Down Expand Up @@ -261,8 +263,9 @@ fn test_do_uncles() {
#[stest::test(timeout = 120)]
fn test_new_head() {
let node_config = Arc::new(NodeConfig::random_for_test());
let (storage, _, genesis_id) = StarcoinGenesis::init_storage_for_test(node_config.net())
let (storage, _, genesis) = StarcoinGenesis::init_storage_for_test(node_config.net())
.expect("init storage by genesis fail.");
let genesis_id = genesis.block().id();
let times = 10;

let miner_account = AccountInfo::random();
Expand Down Expand Up @@ -301,8 +304,9 @@ fn test_new_head() {
#[stest::test(timeout = 120)]
fn test_new_branch() {
let node_config = Arc::new(NodeConfig::random_for_test());
let (storage, _, genesis_id) = StarcoinGenesis::init_storage_for_test(node_config.net())
let (storage, _, genesis) = StarcoinGenesis::init_storage_for_test(node_config.net())
.expect("init storage by genesis fail.");
let genesis_id = genesis.block().id();
let times = 5;

let chain_header = storage
Expand Down Expand Up @@ -369,9 +373,9 @@ async fn test_create_block_template_actor() {
let registry = RegistryService::launch();
registry.put_shared(node_config.clone()).await.unwrap();

let (storage, _, genesis_id) = StarcoinGenesis::init_storage_for_test(node_config.net())
let (storage, _, genesis) = StarcoinGenesis::init_storage_for_test(node_config.net())
.expect("init storage by genesis fail.");

let genesis_id = genesis.block().id();
let chain_header = storage
.get_block_header_by_hash(genesis_id)
.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions miner/tests/miner_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ async fn test_miner_service() {
let registry = RegistryService::launch();
let node_config = Arc::new(config.clone());
registry.put_shared(node_config.clone()).await.unwrap();
let (storage, _startup_info, genesis_hash) =
Genesis::init_storage_for_test(config.net()).unwrap();
let (storage, _startup_info, genesis) = Genesis::init_storage_for_test(config.net()).unwrap();
registry.put_shared(storage.clone()).await.unwrap();

let genesis_hash = genesis.block().id();
let chain_header = storage
.get_block_header_by_hash(genesis_hash)
.unwrap()
Expand Down
3 changes: 0 additions & 3 deletions network-rpc/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub mod server;
use futures::future::BoxFuture;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use serde::{Deserialize, Serialize};
use std::time::Duration;

pub mod prelude {
pub use crate::NetRpcError;
Expand Down Expand Up @@ -133,7 +132,6 @@ pub trait RawRpcClient {
peer_id: PeerId,
rpc_path: String,
message: Vec<u8>,
timeout: Duration,
) -> BoxFuture<anyhow::Result<Vec<u8>>>;
}

Expand All @@ -158,7 +156,6 @@ impl RawRpcClient for InmemoryRpcClient {
_peer_id: PeerId,
rpc_path: String,
message: Vec<u8>,
_timeout: Duration,
) -> BoxFuture<anyhow::Result<Vec<u8>>> {
Box::pin(
self.server
Expand Down
11 changes: 0 additions & 11 deletions network-rpc/derive/src/to_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,12 @@ pub fn generate_client_module(rpc_trait: &ItemTrait) -> anyhow::Result<TokenStre
pub struct NetworkRpcClient
{
raw_client: Arc<dyn RawRpcClient + Send + Sync>,
timeout: Duration,
}

impl NetworkRpcClient {
pub fn new<C>(raw_rpc_client: C) -> Self where C: RawRpcClient + Send + Sync +'static {
Self {
raw_client: Arc::new(raw_rpc_client),
//TODO support custom timeout.
timeout: Duration::from_secs(15),
}
}

pub fn new_with_timeout<C>(raw_rpc_client: C, timeout: Duration) -> Self where C: RawRpcClient + Send + Sync +'static {
Self {
raw_client: Arc::new(raw_rpc_client),
timeout,
}
}
}
Expand All @@ -108,7 +98,6 @@ pub fn generate_client_module(rpc_trait: &ItemTrait) -> anyhow::Result<TokenStre
peer_id,
path,
request,
self.timeout,
)
.await
}
Expand Down
8 changes: 3 additions & 5 deletions network-rpc/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use anyhow::Result;
use config::*;
use futures::executor::block_on;
use network_api::PeerProvider;
use starcoin_logger::prelude::*;
use starcoin_network_rpc_api::{
gen_client as starcoin_gen_client, GetBlockHeadersByNumber, GetBlockIds, GetStateWithProof,
Expand All @@ -27,16 +26,15 @@ fn test_network_rpc() {
};

let network_1 = handle1.network();
let handle2 = {
let (handle2, peer_id_2) = {
let mut config_2 = NodeConfig::random_for_test();
config_2.network.seeds = vec![net_addr_1];
gen_chain_env(config_2).unwrap()
let peer_id_2 = config_2.network.self_peer_id().unwrap();
(gen_chain_env(config_2).unwrap(), peer_id_2)
};
handle2.generate_block().unwrap();

let network_2 = handle2.network();
// network rpc client for chain 1
let peer_id_2 = network_2.identify();
let client = starcoin_gen_client::NetworkRpcClient::new(network_1);

let access_path = access_path::AccessPath::new(genesis_address(), Epoch::resource_path());
Expand Down
6 changes: 3 additions & 3 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ log = "0.4.11"
hex = "0.4.2"
bytes = "0.5.0"

config = {path = "../config", package="starcoin-config"}
types = {path = "../types", package="starcoin-types"}
starcoin-config = {path = "../config"}
starcoin-types = {path = "../types"}
starcoin-txpool-api = {path = "../txpool/api"}
network-p2p-types = {path = "../network-p2p/types"}
network-p2p = {path = "../network-p2p"}
logger = {path = "../commons/logger",package="starcoin-logger"}

crypto = { package="starcoin-crypto", path = "../commons/crypto"}
starcoin-crypto = {path = "../commons/crypto"}
scs = { package="starcoin-canonical-serialization", path = "../commons/scs"}

fnv = "1.0.6"
Expand Down
1 change: 1 addition & 0 deletions network/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rand = "0.7.3"
itertools = "0.9.0"
async-trait = "0.1.42"

starcoin-logger = { path = "../../commons/logger"}
starcoin-types = { path = "../../types" }
starcoin-crypto = { path = "../../commons/crypto" }
scs = { package = "starcoin-canonical-serialization", path = "../../commons/scs" }
Expand Down
Loading

0 comments on commit 77a17fa

Please sign in to comment.