From 000f4016236ebe896b6e9bf507655e0a0d453ccf Mon Sep 17 00:00:00 2001 From: jolestar Date: Mon, 30 Nov 2020 18:29:38 +0800 Subject: [PATCH 1/3] [network] Refactor PeerMessage 1. Decode PeerMessage by protocol name. 2. Network api provider broadcast function. 3. Cleanup unneeded message and crate(tx-relay and block-relayer-api). --- Cargo.lock | 38 +- Cargo.toml | 2 - block-relayer/Cargo.toml | 1 - block-relayer/api/Cargo.toml | 16 - block-relayer/api/src/lib.rs | 19 - block-relayer/src/block_relayer.rs | 53 ++- miner/Cargo.toml | 12 +- miner/src/generate_block_event_pacemaker.rs | 2 +- network-p2p/Cargo.toml | 2 +- network-p2p/types/src/lib.rs | 4 +- network-rpc/Cargo.toml | 1 - network-rpc/api/src/lib.rs | 3 - network-rpc/src/lib.rs | 25 +- network/Cargo.toml | 4 +- network/api/Cargo.toml | 9 +- network/api/src/lib.rs | 13 +- network/api/src/messages.rs | 178 ++++++- network/api/src/peer_message_handler.rs | 6 +- network/src/lib.rs | 2 - network/src/message_processor.rs | 200 -------- network/src/net_test.rs | 6 +- network/src/network.rs | 493 +++++++------------- network/src/network_metrics.rs | 22 +- node/Cargo.toml | 2 - node/src/peer_message_handler.rs | 60 ++- test-helper/Cargo.toml | 2 - test-helper/src/dummy_network_service.rs | 11 +- test-helper/src/network.rs | 22 +- txpool/Cargo.toml | 4 +- txpool/api/src/lib.rs | 15 + txpool/src/lib.rs | 11 +- txpool/src/test.rs | 8 +- txpool/tx-relay/Cargo.toml | 11 - txpool/tx-relay/src/lib.rs | 54 --- types/src/lib.rs | 19 +- 35 files changed, 490 insertions(+), 840 deletions(-) delete mode 100644 block-relayer/api/Cargo.toml delete mode 100644 block-relayer/api/src/lib.rs delete mode 100644 network/src/message_processor.rs delete mode 100644 txpool/tx-relay/Cargo.toml delete mode 100644 txpool/tx-relay/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 0924653319..ca8f8bf33d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4366,20 +4366,18 @@ dependencies = [ name = "network-api" version = "0.8.0" dependencies = [ - "actix", "anyhow", "async-trait", "futures 0.3.8", + "hex", "itertools 0.9.0", "network-p2p-types", "network-rpc-core", "rand 0.7.3", "serde", - "starcoin-block-relayer-api", "starcoin-canonical-serialization", "starcoin-crypto", "starcoin-service-registry", - "starcoin-tx-relay", "starcoin-types", ] @@ -6663,7 +6661,6 @@ dependencies = [ "hex", "network-api", "once_cell", - "starcoin-block-relayer-api", "starcoin-canonical-serialization", "starcoin-config", "starcoin-crypto", @@ -6682,19 +6679,6 @@ dependencies = [ "tokio 0.2.23", ] -[[package]] -name = "starcoin-block-relayer-api" -version = "0.8.0" -dependencies = [ - "actix", - "anyhow", - "futures 0.3.8", - "once_cell", - "starcoin-canonical-serialization", - "starcoin-crypto", - "starcoin-types", -] - [[package]] name = "starcoin-canonical-serialization" version = "0.8.0" @@ -7242,7 +7226,6 @@ dependencies = [ "starcoin-sync", "starcoin-sync-api", "starcoin-traits", - "starcoin-tx-relay", "starcoin-txpool", "starcoin-txpool-api", "starcoin-types", @@ -7349,17 +7332,17 @@ dependencies = [ "slog", "slog_derive", "smallvec 1.5.0", - "starcoin-block-relayer-api", "starcoin-canonical-serialization", "starcoin-config", "starcoin-crypto", "starcoin-logger", + "starcoin-metrics", "starcoin-network-rpc", "starcoin-network-rpc-api", "starcoin-service-registry", "starcoin-storage", "starcoin-sync-api", - "starcoin-tx-relay", + "starcoin-txpool-api", "starcoin-types", "stest", "tempfile", @@ -7392,7 +7375,6 @@ dependencies = [ "starcoin-account-api", "starcoin-accumulator", "starcoin-block-relayer", - "starcoin-block-relayer-api", "starcoin-canonical-serialization", "starcoin-chain-service", "starcoin-config", @@ -7458,7 +7440,6 @@ dependencies = [ "starcoin-account-api", "starcoin-account-service", "starcoin-block-relayer", - "starcoin-block-relayer-api", "starcoin-chain-notify", "starcoin-chain-service", "starcoin-config", @@ -7483,7 +7464,6 @@ dependencies = [ "starcoin-sync", "starcoin-sync-api", "starcoin-traits", - "starcoin-tx-relay", "starcoin-txpool", "starcoin-txpool-api", "starcoin-types", @@ -7981,14 +7961,6 @@ dependencies = [ "tokio-compat", ] -[[package]] -name = "starcoin-tx-relay" -version = "0.8.0" -dependencies = [ - "actix", - "starcoin-types", -] - [[package]] name = "starcoin-txpool" version = "0.8.0" @@ -8002,6 +7974,7 @@ dependencies = [ "futures-channel", "linked-hash-map", "log 0.4.11", + "network-api", "once_cell", "parking_lot 0.11.1", "prometheus", @@ -8023,7 +7996,6 @@ dependencies = [ "starcoin-state-tree", "starcoin-statedb", "starcoin-storage", - "starcoin-tx-relay", "starcoin-txpool-api", "starcoin-types", "stdlib", @@ -8492,7 +8464,6 @@ dependencies = [ "starcoin-account-service", "starcoin-accumulator", "starcoin-block-relayer", - "starcoin-block-relayer-api", "starcoin-canonical-serialization", "starcoin-chain", "starcoin-chain-notify", @@ -8522,7 +8493,6 @@ dependencies = [ "starcoin-sync", "starcoin-sync-api", "starcoin-traits", - "starcoin-tx-relay", "starcoin-txpool", "starcoin-txpool-api", "starcoin-types", diff --git a/Cargo.toml b/Cargo.toml index 71da828760..eb0c2fa46b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,6 @@ members = [ "txpool", "txpool/api", "txpool/mock-service", - "txpool/tx-relay", "executor", "executor/benchmark", "chain", @@ -49,7 +48,6 @@ members = [ "sync", "sync/api", "block-relayer", - "block-relayer/api", "miner", "node", "network-p2p", diff --git a/block-relayer/Cargo.toml b/block-relayer/Cargo.toml index 3b5c574c86..66d8f28dfc 100644 --- a/block-relayer/Cargo.toml +++ b/block-relayer/Cargo.toml @@ -21,7 +21,6 @@ network-api = { package = "network-api", path = "../network/api" } starcoin-sync-api = {package="starcoin-sync-api", path="../sync/api"} starcoin-sync = {package="starcoin-sync", path="../sync"} starcoin-network ={path = "../network"} -starcoin-block-relayer-api = { path = "./api"} starcoin-canonical-serialization = { package="starcoin-canonical-serialization", path = "../commons/scs"} starcoin-types = {path = "../types", package = "starcoin-types" } starcoin-metrics = {path = "../commons/metrics"} diff --git a/block-relayer/api/Cargo.toml b/block-relayer/api/Cargo.toml deleted file mode 100644 index d4c70fe207..0000000000 --- a/block-relayer/api/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "starcoin-block-relayer-api" -version = "0.8.0" -authors = ["Starcoin Core Dev "] -license = "Apache-2.0" -publish = false -edition = "2018" - -[dependencies] -anyhow = "1.0.34" -actix = "0.10.0" -futures = "0.3" -once_cell = "1.5.2" -crypto = {package="starcoin-crypto", path = "../../commons/crypto"} -starcoin-canonical-serialization = { package="starcoin-canonical-serialization", path = "../../commons/scs"} -starcoin-types = {path = "../../types", package = "starcoin-types" } diff --git a/block-relayer/api/src/lib.rs b/block-relayer/api/src/lib.rs deleted file mode 100644 index 43b8ddc8f0..0000000000 --- a/block-relayer/api/src/lib.rs +++ /dev/null @@ -1,19 +0,0 @@ -use actix::prelude::*; -use starcoin_types::cmpact_block::CompactBlock; -use starcoin_types::{peer_info::PeerId, U256}; - -/// Event of received compact block -#[derive(Message, Clone, Debug)] -#[rtype(result = "()")] -pub struct PeerCmpctBlockEvent { - pub peer_id: PeerId, - pub compact_block: CompactBlock, -} - -/// Message of sending compact block to network -#[derive(Message, Clone, Debug)] -#[rtype(result = "()")] -pub struct NetCmpctBlockMessage { - pub compact_block: CompactBlock, - pub total_difficulty: U256, -} diff --git a/block-relayer/src/block_relayer.rs b/block-relayer/src/block_relayer.rs index 85e60b3372..a36191a4e7 100644 --- a/block-relayer/src/block_relayer.rs +++ b/block-relayer/src/block_relayer.rs @@ -6,7 +6,8 @@ use anyhow::Result; use crypto::HashValue; use futures::FutureExt; use logger::prelude::*; -use starcoin_block_relayer_api::{NetCmpctBlockMessage, PeerCmpctBlockEvent}; +use network_api::messages::{CompactBlockMessage, NotificationMessage, PeerCompactBlockMessage}; +use network_api::NetworkService; use starcoin_network::NetworkAsyncService; use starcoin_network_rpc_api::{gen_client::NetworkRpcClient, GetTxns}; use starcoin_service_registry::{ActorService, EventHandler, ServiceContext, ServiceFactory}; @@ -130,17 +131,16 @@ impl BlockRelayer { fn handle_block_event( &self, - cmpct_block_msg: PeerCmpctBlockEvent, + compact_block_msg: PeerCompactBlockMessage, ctx: &mut ServiceContext, ) -> Result<()> { let network = ctx.get_shared::()?; let block_connector_service = ctx.service_ref::()?.clone(); - //TODO use VerifiedRpcClient and filter peers, avoid fetch from a no synced peer. let rpc_client = NetworkRpcClient::new(network); let txpool = self.txpool.clone(); let fut = async move { - let compact_block = cmpct_block_msg.compact_block; - let peer_id = cmpct_block_msg.peer_id; + let compact_block = compact_block_msg.message.compact_block; + let peer_id = compact_block_msg.peer_id; debug!("Receive peer compact block event from peer id:{}", peer_id); let block = BlockRelayer::fill_compact_block( txpool, @@ -191,30 +191,55 @@ impl EventHandler for BlockRelayer { debug!("[block-relay] Ignore NewHeadBlock event because the node has not been synchronized yet."); return; } + let network = match ctx.get_shared::() { + Ok(network) => network, + Err(e) => { + error!("Get network service error: {:?}", e); + return; + } + }; let compact_block = self.block_into_compact(event.0.get_block().clone()); let total_difficulty = event.0.get_total_difficulty(); - let net_cmpct_block_msg = NetCmpctBlockMessage { + let compact_block_msg = CompactBlockMessage { compact_block, total_difficulty, }; - //TODO directly send to network. - ctx.broadcast(net_cmpct_block_msg); + ctx.spawn(async move { + network + .broadcast(NotificationMessage::CompactBlock(Box::new( + compact_block_msg, + ))) + .await; + }); } } -impl EventHandler for BlockRelayer { +impl EventHandler for BlockRelayer { fn handle_event( &mut self, - cmpct_block_msg: PeerCmpctBlockEvent, + compact_block_msg: PeerCompactBlockMessage, ctx: &mut ServiceContext, ) { if !self.is_synced() { - debug!("[block-relay] Ignore PeerCmpctBlock event because the node has not been synchronized yet."); + debug!("[block-relay] Ignore PeerCompactBlockMessage because the node has not been synchronized yet."); + return; + } + let sync_status = self + .sync_status + .as_ref() + .expect("Sync status should bean some at here"); + let current_total_difficulty = sync_status.chain_status().total_difficulty(); + let block_total_difficulty = compact_block_msg.message.total_difficulty; + let block_id = compact_block_msg.message.compact_block.header.id(); + if current_total_difficulty > block_total_difficulty { + debug!("[block-relay] Ignore PeerCompactBlockMessage because node current total_difficulty({}) > block({})'s total_difficulty({}).", current_total_difficulty, block_id, block_total_difficulty); return; } - //TODO filter by total_difficulty and block number, ignore too old block. - if let Err(e) = self.handle_block_event(cmpct_block_msg, ctx) { - error!("[block-relay] handle PeerCmpctBlock event error: {:?}", e); + if let Err(e) = self.handle_block_event(compact_block_msg, ctx) { + error!( + "[block-relay] handle PeerCompactBlockMessage error: {:?}", + e + ); } } } diff --git a/miner/Cargo.toml b/miner/Cargo.toml index 5c65f7a98d..e37b4b1a83 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -10,6 +10,11 @@ actix = "0.10.0" actix-rt = "1.1" futures = { version = "0.3" } futures-timer = "3.0" +hex = { version = "0.4.2", default-features = false } +thiserror = "1.0" +once_cell = "1.5.2" +parking_lot = "0.11.1" + starcoin-config = { path = "../config" } network = { path = "../network", package = "starcoin-network" } chain = {path = "../chain", package="starcoin-chain" } @@ -22,18 +27,13 @@ consensus = {path = "../consensus", package="starcoin-consensus" } starcoin-storage = { path = "../storage" } executor = { path = "../executor", package = "starcoin-executor" } starcoin-txpool = { path = "../txpool" } -tx-relay = { path = "../txpool/tx-relay", package = "starcoin-tx-relay" } +starcoin-txpool-api = { path = "../txpool/api" } tokio = { version = "0.2", features = ["full"] } logger = {path = "../commons/logger", package="starcoin-logger"} crypto = { package="starcoin-crypto", path = "../commons/crypto"} starcoin-accumulator = {path = "../core/accumulator", package="starcoin-accumulator"} starcoin-account-api = { path = "../account/api" } starcoin-account-service = { path = "../account/service" } -starcoin-txpool-api = { path = "../txpool/api" } -hex = { version = "0.4.2", default-features = false } -thiserror = "1.0" -once_cell = "1.5.2" -parking_lot = "0.11.1" starcoin-metrics = { path = "../commons/metrics" } starcoin-miner-client = { path = "../cmd/miner_client" } scs = { package="starcoin-canonical-serialization", path = "../commons/scs"} diff --git a/miner/src/generate_block_event_pacemaker.rs b/miner/src/generate_block_event_pacemaker.rs index 163ac0309d..15a4fbf364 100644 --- a/miner/src/generate_block_event_pacemaker.rs +++ b/miner/src/generate_block_event_pacemaker.rs @@ -5,7 +5,7 @@ use crate::GenerateBlockEvent; use anyhow::Result; use logger::prelude::*; use starcoin_service_registry::{ActorService, EventHandler, ServiceContext}; -use tx_relay::PropagateNewTransactions; +use starcoin_txpool_api::PropagateNewTransactions; use types::{ sync_status::SyncStatus, system_events::{NewHeadBlock, SyncStatusChangeEvent}, diff --git a/network-p2p/Cargo.toml b/network-p2p/Cargo.toml index 59f8b510c7..6932e1be95 100644 --- a/network-p2p/Cargo.toml +++ b/network-p2p/Cargo.toml @@ -47,7 +47,7 @@ wasm-timer = "0.2" sc-peerset = { path = "peerset" } logger = {path = "../commons/logger",package="starcoin-logger"} crypto = { package="starcoin-crypto", path = "../commons/crypto"} -starcoin-metrics = {path = "../commons/metrics",package="starcoin-metrics"} +starcoin-metrics = {path = "../commons/metrics"} starcoin-types = {path = "../types"} network-p2p-types = {path = "./types"} prometheus = "0.10" diff --git a/network-p2p/types/src/lib.rs b/network-p2p/types/src/lib.rs index 0f103f4b94..bc2a98eed9 100644 --- a/network-p2p/types/src/lib.rs +++ b/network-p2p/types/src/lib.rs @@ -1,15 +1,15 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 +use libp2p::futures::channel::oneshot; +use std::borrow::Cow; use std::convert::TryFrom; use std::fmt; use std::str::FromStr; pub use libp2p::core::{identity, multiaddr, Multiaddr, PeerId, PublicKey}; -use libp2p::futures::channel::oneshot; pub use libp2p::multihash; pub use libp2p::request_response::{InboundFailure, OutboundFailure}; -use std::borrow::Cow; /// Parses a string address and splits it into Multiaddress and PeerId, if /// valid. diff --git a/network-rpc/Cargo.toml b/network-rpc/Cargo.toml index eb745c1fc7..fbbfc4a55b 100644 --- a/network-rpc/Cargo.toml +++ b/network-rpc/Cargo.toml @@ -28,7 +28,6 @@ crypto = { package="starcoin-crypto", path = "../commons/crypto"} prometheus = "0.10" network-api = { path="../network/api" } scs = { path = "../commons/scs", package = "starcoin-canonical-serialization" } -block-relayer-api = { path = "../block-relayer/api/", package = "starcoin-block-relayer-api" } starcoin-types = { path = "../types", package = "starcoin-types" } accumulator = { path = "../core/accumulator", package = "starcoin-accumulator" } state-tree = { path = "../state/state-tree", package = "starcoin-state-tree" } diff --git a/network-rpc/api/src/lib.rs b/network-rpc/api/src/lib.rs index a3e77a4c8a..84a4b5a98e 100644 --- a/network-rpc/api/src/lib.rs +++ b/network-rpc/api/src/lib.rs @@ -23,9 +23,6 @@ pub use remote_chain_state::RemoteChainStateReader; use starcoin_types::account_address::AccountAddress; use starcoin_types::account_state::AccountState; -//TODO move this constants from types -pub use starcoin_types::CHAIN_PROTOCOL_NAME; - #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TransactionsData { pub txns: Vec, diff --git a/network-rpc/src/lib.rs b/network-rpc/src/lib.rs index 003c782d5a..2f36b94808 100644 --- a/network-rpc/src/lib.rs +++ b/network-rpc/src/lib.rs @@ -3,14 +3,13 @@ use crate::rpc::NetworkRpcImpl; use anyhow::Result; -use network_api::messages::RawRpcRequestMessage; use network_rpc_core::server::NetworkRpcServer; use network_rpc_core::RawRpcServer; use starcoin_chain_service::ChainReaderService; use starcoin_logger::prelude::*; use starcoin_network_rpc_api::gen_server::NetworkRpc; use starcoin_service_registry::{ - ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRef, + ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceRef, }; use starcoin_state_service::ChainStateService; use starcoin_storage::{Storage, Store}; @@ -60,28 +59,6 @@ impl ServiceFactory for NetworkRpcService { impl ActorService for NetworkRpcService {} -impl ServiceHandler for NetworkRpcService { - fn handle( - &mut self, - req_msg: RawRpcRequestMessage, - ctx: &mut ServiceContext, - ) { - let rpc_server = self.rpc_server.clone(); - let (peer_id, rpc_path, message) = req_msg.request; - let mut responder = req_msg.responder; - ctx.spawn(async move { - let result = rpc_server - .handle_raw_request(peer_id, rpc_path, message) - .await; - let resp = scs::to_bytes(&result).expect("NetRpc Result must encode success."); - - if let Err(e) = responder.try_send(resp) { - error!("Send response to rpc call failed:{:?}", e); - } - }); - } -} - impl EventHandler for NetworkRpcService { fn handle_event(&mut self, msg: ProtocolRequest, ctx: &mut ServiceContext) { let rpc_server = self.rpc_server.clone(); diff --git a/network/Cargo.toml b/network/Cargo.toml index 2aa841bba1..c6742f947e 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -20,7 +20,7 @@ bytes = "0.5.0" config = {path = "../config", package="starcoin-config"} types = {path = "../types", package="starcoin-types"} -tx-relay = {path="../txpool/tx-relay", package="starcoin-tx-relay"} +starcoin-txpool-api = {path = "../txpool/api"} network-p2p-types = {path = "../network-p2p/types"} network-p2p = {path = "../network-p2p"} logger = {path = "../commons/logger",package="starcoin-logger"} @@ -50,8 +50,8 @@ async-std = "1.7" prometheus = "0.10" network-api = { package = "network-api", path = "../network/api" } +starcoin-metrics = {path = "../commons/metrics"} starcoin-sync-api = { package = "starcoin-sync-api", path = "../sync/api" } -starcoin-block-relayer-api = { path = "../block-relayer/api/" } starcoin-service-registry = { path = "../commons/service-registry" } starcoin-network-rpc = { path = "../network-rpc" } network-rpc-core = { path = "../network-rpc/core" } diff --git a/network/api/Cargo.toml b/network/api/Cargo.toml index 619dddcaab..4fb8d9bf86 100644 --- a/network/api/Cargo.toml +++ b/network/api/Cargo.toml @@ -5,18 +5,17 @@ authors = ["Starcoin Core Dev "] edition = "2018" [dependencies] -starcoin-types = { path = "../../types" } -actix = "0.10.0" +hex= "0.4.2" anyhow = "1.0.34" futures = "0.3" serde = { version = "1.0", default-features = false } rand = "0.7.3" itertools = "0.9.0" +async-trait = "0.1.42" + +starcoin-types = { path = "../../types" } starcoin-crypto = { path = "../../commons/crypto" } scs = { package = "starcoin-canonical-serialization", path = "../../commons/scs" } -async-trait = "0.1.42" starcoin-service-registry = { path = "../../commons/service-registry" } network-rpc-core = { path = "../../network-rpc/core" } -starcoin-tx-relay= { path="../../txpool/tx-relay"} -starcoin-block-relayer-api = { path = "../../block-relayer/api/" } network-p2p-types = {path = "../../network-p2p/types"} \ No newline at end of file diff --git a/network/api/src/lib.rs b/network/api/src/lib.rs index 48ea3d7fa3..fe3ce598be 100644 --- a/network/api/src/lib.rs +++ b/network/api/src/lib.rs @@ -1,11 +1,10 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use crate::messages::PeerMessage; +use crate::messages::{NotificationMessage, PeerMessage}; use anyhow::*; use async_trait::async_trait; use network_rpc_core::RawRpcClient; -use std::borrow::Cow; pub mod messages; mod peer_message_handler; @@ -21,10 +20,8 @@ pub use starcoin_types::peer_info::{PeerId, PeerInfo}; pub trait NetworkService: Send + Sync + Clone + Sized + std::marker::Unpin + RawRpcClient + PeerProvider { - async fn send_peer_message( - &self, - protocol_name: Cow<'static, str>, - peer_id: PeerId, - msg: PeerMessage, - ) -> Result<()>; + /// send notification message to peer. + async fn send_peer_message(&self, msg: PeerMessage) -> Result<()>; + /// Broadcast notification to all connected peers + async fn broadcast(&self, notification: NotificationMessage); } diff --git a/network/api/src/messages.rs b/network/api/src/messages.rs index 5ffe66095d..29ae68006a 100644 --- a/network/api/src/messages.rs +++ b/network/api/src/messages.rs @@ -1,41 +1,175 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use actix::prelude::*; use anyhow::*; -use futures::channel::mpsc::Sender; +use scs::SCSCodec; use serde::{Deserialize, Serialize}; -use starcoin_service_registry::ServiceRequest; use starcoin_types::peer_info::PeerId; use starcoin_types::startup_info::ChainInfo; use starcoin_types::transaction::SignedUserTransaction; use starcoin_types::{cmpact_block::CompactBlock, U256}; +use std::borrow::Cow; -/// message from peer -#[rtype(result = "Result<()>")] -#[derive(Clone, Debug, Serialize, Deserialize, Message)] -#[allow(clippy::large_enum_variant)] -pub enum PeerMessage { - NewTransactions(Vec), - CompactBlock(CompactBlock, U256), - RawRPCRequest(u128, String, Vec), - RawRPCResponse(u128, Vec), +pub const TXN_PROTOCOL_NAME: &str = "/starcoin/txn/1"; +pub const BLOCK_PROTOCOL_NAME: &str = "/starcoin/block/1"; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TransactionsMessage { + pub txns: Vec, +} + +impl TransactionsMessage { + pub fn new(txns: Vec) -> Self { + Self { txns } + } + + pub fn transactions(self) -> Vec { + self.txns + } +} + +/// Message of sending or receive block notification to network +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CompactBlockMessage { + pub compact_block: CompactBlock, + pub total_difficulty: U256, +} + +#[derive(Clone, Debug)] +pub enum NotificationMessage { + Transactions(TransactionsMessage), + CompactBlock(Box), +} + +impl NotificationMessage { + pub fn decode_notification(protocol_name: Cow<'static, str>, bytes: &[u8]) -> Result { + Ok(match protocol_name.as_ref() { + TXN_PROTOCOL_NAME => { + NotificationMessage::Transactions(TransactionsMessage::decode(bytes)?) + } + BLOCK_PROTOCOL_NAME => { + NotificationMessage::CompactBlock(Box::new(CompactBlockMessage::decode(bytes)?)) + } + unknown_protocol => bail!( + "Unknown protocol {}'s message: {}", + unknown_protocol, + hex::encode(bytes) + ), + }) + } + + pub fn encode_notification(&self) -> Result<(Cow<'static, str>, Vec)> { + Ok(match self { + NotificationMessage::Transactions(msg) => (TXN_PROTOCOL_NAME.into(), msg.encode()?), + NotificationMessage::CompactBlock(msg) => (TXN_PROTOCOL_NAME.into(), msg.encode()?), + }) + } + + pub fn protocol_name(&self) -> Cow<'static, str> { + match self { + Self::Transactions(_) => TXN_PROTOCOL_NAME.into(), + Self::CompactBlock(_) => BLOCK_PROTOCOL_NAME.into(), + } + } + + pub fn protocols() -> Vec> { + vec![TXN_PROTOCOL_NAME.into(), BLOCK_PROTOCOL_NAME.into()] + } + + pub fn into_transactions(self) -> Option { + match self { + NotificationMessage::Transactions(message) => Some(message), + _ => None, + } + } + + pub fn into_compact_block(self) -> Option { + match self { + NotificationMessage::CompactBlock(message) => Some(*message), + _ => None, + } + } +} + +/// Message for send or receive from peer +#[derive(Clone, Debug)] +pub struct PeerMessage { + pub peer_id: PeerId, + pub notification: NotificationMessage, +} + +impl PeerMessage { + pub fn new(peer_id: PeerId, notification: NotificationMessage) -> Self { + Self { + peer_id, + notification, + } + } + pub fn new_transactions(peer_id: PeerId, transactions: TransactionsMessage) -> Self { + Self::new(peer_id, NotificationMessage::Transactions(transactions)) + } + + pub fn new_compact_block(peer_id: PeerId, compact_block: CompactBlockMessage) -> Self { + Self::new( + peer_id, + NotificationMessage::CompactBlock(Box::new(compact_block)), + ) + } + + pub fn into_transactions(self) -> Option { + let peer_id = self.peer_id; + self.notification + .into_transactions() + .map(|message| PeerTransactionsMessage { peer_id, message }) + } + + pub fn into_compact_block(self) -> Option { + let peer_id = self.peer_id; + self.notification + .into_compact_block() + .map(|message| PeerCompactBlockMessage { peer_id, message }) + } +} + +/// Message for combine PeerId and TransactionsMessage +#[derive(Clone, Debug)] +pub struct PeerTransactionsMessage { + pub peer_id: PeerId, + pub message: TransactionsMessage, +} + +impl PeerTransactionsMessage { + pub fn new(peer_id: PeerId, message: TransactionsMessage) -> Self { + Self { peer_id, message } + } +} + +impl Into for PeerTransactionsMessage { + fn into(self) -> PeerMessage { + PeerMessage::new_transactions(self.peer_id, self.message) + } +} + +/// Message for combine PeerId and CompactBlockMessage +#[derive(Clone, Debug)] +pub struct PeerCompactBlockMessage { + pub peer_id: PeerId, + pub message: CompactBlockMessage, } -#[rtype(result = "Result<()>")] -#[derive(Debug, Message, Clone)] -pub struct RawRpcRequestMessage { - pub request: (PeerId, String, Vec), - pub responder: Sender>, +impl PeerCompactBlockMessage { + pub fn new(peer_id: PeerId, message: CompactBlockMessage) -> Self { + Self { peer_id, message } + } } -// TODO remove RawRpcRequestMessage responder and set response. -impl ServiceRequest for RawRpcRequestMessage { - type Response = (); +impl Into for PeerCompactBlockMessage { + fn into(self) -> PeerMessage { + PeerMessage::new_compact_block(self.peer_id, self.message) + } } -#[rtype(result = "Result<()>")] -#[derive(Debug, Eq, PartialEq, Message, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub enum PeerEvent { Open(PeerId, Box), Close(PeerId), diff --git a/network/api/src/peer_message_handler.rs b/network/api/src/peer_message_handler.rs index ac12ad28ba..7e15481444 100644 --- a/network/api/src/peer_message_handler.rs +++ b/network/api/src/peer_message_handler.rs @@ -1,12 +1,10 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use starcoin_block_relayer_api::PeerCmpctBlockEvent; -use starcoin_tx_relay::PeerTransactions; +use crate::messages::PeerMessage; //TODO unify peer events /// Handle broadcast message from peer pub trait PeerMessageHandler: Send + Sync { - fn handle_transaction(&self, transaction: PeerTransactions); - fn handle_block(&self, block: PeerCmpctBlockEvent); + fn handle_message(&self, peer_message: PeerMessage); } diff --git a/network/src/lib.rs b/network/src/lib.rs index 2e38c2d411..b94fa791d0 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -7,8 +7,6 @@ extern crate log; extern crate prometheus; pub mod helper; -//TODO cleanup -pub mod message_processor; pub mod net; #[cfg(test)] mod net_test; diff --git a/network/src/message_processor.rs b/network/src/message_processor.rs deleted file mode 100644 index d669b63528..0000000000 --- a/network/src/message_processor.rs +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright (c) The Starcoin Core Contributors -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::*; -use futures::lock::Mutex; -use futures::{ - channel::mpsc::{Receiver, Sender}, - sink::SinkExt, - task::{Context, Poll}, - Future, Stream, -}; -use network_p2p_types::PeerId; -use std::cmp::Eq; -use std::fmt::Debug; -use std::hash::Hash; -use std::pin::Pin; -use std::{collections::HashMap, sync::Arc}; - -pub struct MessageFuture { - rx: Receiver>, -} - -impl MessageFuture { - pub fn new(rx: Receiver>) -> Self { - Self { rx } - } -} - -impl Future for MessageFuture { - type Output = Result; - - //FIXME - #[allow(clippy::never_loop)] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - while let Poll::Ready(v) = Pin::new(&mut self.rx).poll_next(cx) { - match v { - Some(v) => match v { - Ok(v) => { - return Poll::Ready(Ok(v)); - } - Err(e) => { - return Poll::Ready(Err(e)); - } - }, - None => { - debug!("no data,return timeout"); - return Poll::Ready(Err(anyhow!("future time out"))); - } - } - } - Poll::Pending - } -} - -#[derive(Clone, Default)] -pub struct MessageProcessor { - tx_map: Arc>, PeerId)>>>, -} - -impl MessageProcessor -where - K: Send + Sync + Hash + Eq + Debug + 'static, - T: Send + Sync + 'static, -{ - pub fn new() -> Self { - Self { - tx_map: Arc::new(Mutex::new(HashMap::new())), - } - } - - pub async fn add_future(&self, id: K, sender: Sender>, to_peer: PeerId) { - self.tx_map - .lock() - .await - .entry(id) - .or_insert_with(|| (sender.clone(), to_peer)); - } - - pub async fn send_response(&self, id: K, value: T) -> Result<()> { - let mut tx_map = self.tx_map.lock().await; - match tx_map.get(&id) { - Some((tx, _)) => { - match tx.clone().send(Ok(value)).await { - Ok(_new_tx) => { - debug!("send message {:?} succ", id); - tx_map.remove(&id); - } - Err(_) => debug!("send message {:?} error", id), - }; - } - _ => debug!("tx id {:?} not in map", id), - } - Ok(()) - } - // - pub async fn remove_future(&self, id: K) -> bool { - let mut tx_map = self.tx_map.lock().await; - if let Some((tx, peer_id)) = tx_map.get(&id) { - if let Err(e) = tx - .clone() - .send(Err(anyhow!( - "request {:?} send to peer {:?} future time out", - id, - peer_id - ))) - .await - { - warn!("Send timeout error fail {:?}.", e) - } - tx_map.remove(&id); - // if find tx ,means timeout - return true; - } - false - } -} - -#[cfg(test)] -mod tests { - use crate::get_unix_ts; - use crate::message_processor::{MessageFuture, MessageProcessor}; - use anyhow::{format_err, Result}; - use futures::{Future, SinkExt}; - use network_p2p_types::PeerId; - use std::pin::Pin; - - #[stest::test] - async fn test_message_future_err() { - let (mut tx, rx) = futures::channel::mpsc::channel::>(1); - let message_future = MessageFuture::new(rx); - let _ = tx.send(Err(format_err!("test error."))).await; - let response = message_future.await; - assert!(response.is_err()); - } - - #[stest::test] - fn test_message_future_none() { - let (_, rx) = futures::channel::mpsc::channel::>(1); - let mut message_future = MessageFuture::new(rx); - let response = futures::executor::block_on(futures::future::poll_fn(move |cx| { - Pin::new(&mut message_future).poll(cx) - })); - assert!(response.is_err()); - } - - #[stest::test] - async fn test_add_future() { - let message_processor = MessageProcessor::::new(); - let request_id = get_unix_ts(); - let (tx, _) = futures::channel::mpsc::channel::>(1); - message_processor - .add_future(request_id, tx.clone(), PeerId::random()) - .await; - assert!(message_processor - .tx_map - .lock() - .await - .contains_key(&request_id)); - message_processor - .add_future(request_id, tx, PeerId::random()) - .await; - assert_eq!(message_processor.tx_map.lock().await.len(), 1); - } - - #[stest::test] - async fn test_send_response_error() { - let message_processor = MessageProcessor::::new(); - let request_id = get_unix_ts(); - let (tx, _) = futures::channel::mpsc::channel::>(1); - message_processor - .add_future(request_id, tx.clone(), PeerId::random()) - .await; - assert!(message_processor - .send_response(request_id, ()) - .await - .is_ok()); - } - - #[stest::test] - async fn test_send_response_none() { - let message_processor = MessageProcessor::::new(); - let request_id = get_unix_ts(); - assert!(message_processor - .send_response(request_id, ()) - .await - .is_ok()); - } - - #[stest::test] - async fn test_remove_future() { - let message_processor = MessageProcessor::::new(); - let request_id = get_unix_ts(); - let (tx, rx) = futures::channel::mpsc::channel::>(1); - message_processor - .add_future(request_id, tx.clone(), PeerId::random()) - .await; - assert!(message_processor.remove_future(request_id).await); - drop(rx); - } -} diff --git a/network/src/net_test.rs b/network/src/net_test.rs index 4448162193..022d3ecdeb 100644 --- a/network/src/net_test.rs +++ b/network/src/net_test.rs @@ -16,6 +16,7 @@ mod tests { stream::StreamExt, }; use futures_timer::Delay; + use network_api::messages::NotificationMessage; use network_p2p::{identity, DhtEvent, Event}; use network_p2p::{NetworkConfiguration, NetworkWorker, NodeKeyConfig, Params, Secret}; use network_p2p_types::PeerId; @@ -23,7 +24,6 @@ mod tests { use std::pin::Pin; use std::{thread, time::Duration}; use types::startup_info::{ChainInfo, ChainStatus}; - use types::PROTOCOLS; const PROTOCOL_ID: &str = "starcoin"; @@ -80,7 +80,7 @@ mod tests { .unwrap(); NodeKeyConfig::Ed25519(Secret::Input(secret)) }, - protocols: PROTOCOLS.clone(), + protocols: NotificationMessage::protocols(), ..NetworkConfiguration::default() }; @@ -155,7 +155,7 @@ mod tests { if first_addr.is_none() { first_addr = Some(config.listen.to_string()); } - let mut protocols = PROTOCOLS.clone(); + let mut protocols = NotificationMessage::protocols(); protocols.push(TEST_PROTOCOL_NAME.into()); let server = build_network_service(chain_info.clone(), &config, protocols, None); result.push({ diff --git a/network/src/network.rs b/network/src/network.rs index 5c34d74dc7..eb62f07fb1 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -1,78 +1,56 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use crate::helper::get_unix_ts; -use crate::message_processor::{MessageFuture, MessageProcessor}; use crate::net::{build_network_service, SNetworkService, RPC_PROTOCOL_PREFIX}; use crate::network_metrics::NetworkMetrics; use crate::{NetworkMessage, PeerEvent, PeerMessage}; -use anyhow::{format_err, Result}; +use anyhow::Result; use async_trait::async_trait; -use bitflags::_core::ops::Deref; -use bitflags::_core::sync::atomic::Ordering; use config::NodeConfig; -use crypto::{hash::PlainCryptoHash, HashValue}; +use crypto::HashValue; use futures::future::BoxFuture; use futures::lock::Mutex; use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt}; use futures::{FutureExt, TryFutureExt}; -use futures_timer::Delay; use lru::LruCache; +use network_api::messages::NotificationMessage; use network_api::{ - messages::RawRpcRequestMessage, NetworkService, PeerMessageHandler, PeerProvider, + messages::TransactionsMessage, NetworkService, PeerMessageHandler, PeerProvider, }; use network_p2p::Multiaddr; use network_p2p_types::PeerId; use network_rpc_core::RawRpcClient; -use scs::SCSCodec; -use starcoin_block_relayer_api::{NetCmpctBlockMessage, PeerCmpctBlockEvent}; use starcoin_network_rpc::NetworkRpcService; -use starcoin_network_rpc_api::CHAIN_PROTOCOL_NAME; use starcoin_service_registry::bus::{Bus, BusService}; use starcoin_service_registry::{ ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceRef, }; -use std::borrow::Cow; +use starcoin_txpool_api::PropagateNewTransactions; use std::collections::HashMap; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use tx_relay::*; use types::peer_info::{PeerInfo, RpcInfo}; use types::startup_info::{ChainInfo, ChainStatus}; -use types::transaction::SignedUserTransaction; -use types::{BLOCK_PROTOCOL_NAME, PROTOCOLS, TXN_PROTOCOL_NAME}; const LRU_CACHE_SIZE: usize = 1024; #[derive(Clone)] pub struct NetworkAsyncService { - raw_message_processor: MessageProcessor>, /// TODO: tx is unused? tx: mpsc::UnboundedSender, - network_service: SNetworkService, peer_id: PeerId, inner: Arc, - metrics: Option, -} - -impl Deref for NetworkAsyncService { - type Target = SNetworkService; - - fn deref(&self) -> &Self::Target { - &self.network_service - } } struct Inner { network_service: SNetworkService, bus: ServiceRef, - raw_message_processor: MessageProcessor>, peers: Arc>>, connected_tx: mpsc::Sender, need_send_event: AtomicBool, - network_rpc_service: ServiceRef, peer_message_handler: Arc, + metrics: Option, } #[derive(Debug)] @@ -99,18 +77,21 @@ impl PeerInfoNet { #[async_trait] impl NetworkService for NetworkAsyncService { - async fn send_peer_message( - &self, - protocol_name: Cow<'static, str>, - peer_id: types::peer_info::PeerId, - msg: PeerMessage, - ) -> Result<()> { - let data = msg.encode()?; - self.network_service - .send_message(peer_id.into(), protocol_name, data) - .await?; + async fn send_peer_message(&self, msg: PeerMessage) -> Result<()> { + self.inner.send_peer_message(msg).await + } - Ok(()) + async fn broadcast(&self, notification: NotificationMessage) { + let inner = self.inner.clone(); + async_std::task::spawn(async move { + let protocol = notification.protocol_name(); + if let Err(e) = inner.broadcast(notification).await { + warn!( + "[network-broadcast] Broadcast {} message error: {:?}", + protocol, e + ); + } + }); } } @@ -137,7 +118,9 @@ impl RawRpcClient for NetworkAsyncService { _timeout: Duration, ) -> BoxFuture>> { let protocol = format!("{}{}", RPC_PROTOCOL_PREFIX, rpc_path); - self.request(peer_id, protocol, message) + self.inner + .network_service + .request(peer_id, protocol, message) .map_err(|e| e.into()) .boxed() } @@ -167,74 +150,13 @@ impl NetworkAsyncService { self.inner.peers.clone() } - async fn _send_request_bytes( - &self, - peer_id: Option, - rpc_path: String, - message: Vec, - time_out: Duration, - ) -> Result> { - let request_id = get_unix_ts(); - let peer_msg = PeerMessage::RawRPCRequest(request_id, rpc_path, message); - let data = peer_msg.encode()?; - let peer_id = match peer_id { - Some(peer_id) => peer_id, - None => self - .best_peer() - .await? - .ok_or_else(|| format_err!("No connected peers to request for {:?}", peer_msg))? - .peer_id(), - }; - debug!( - "Send request to {} with id {} and msg: {:?}", - peer_id, request_id, peer_msg - ); - - self.network_service - .send_message(peer_id.clone().into(), CHAIN_PROTOCOL_NAME.into(), data) - .await?; - - let (tx, rx) = futures::channel::mpsc::channel(1); - let message_future = MessageFuture::new(rx); - self.raw_message_processor - .add_future(request_id, tx, peer_id.clone().into()) - .await; - - let processor = self.raw_message_processor.clone(); - - if let Some(metrics) = &self.metrics { - metrics.request_count.inc(); - } - - let metrics = self.metrics.clone(); - let peer_id_for_task = peer_id.clone(); - let task = async move { - Delay::new(time_out).await; - let timeout = processor.remove_future(request_id).await; - if !timeout { - return; - } - debug!( - "send request to {} with id {} timeout", - peer_id_for_task, request_id - ); - if let Some(metrics) = metrics { - metrics.request_timeout_count.inc(); - } - }; - - async_std::task::spawn(task); - let response = message_future.await; - debug!("receive response from {} with id {}", peer_id, request_id); - response - } - pub fn add_peer(&self, peer: String) -> Result<()> { - self.network_service.add_peer(peer) + self.inner.network_service.add_peer(peer) } pub async fn connected_peers(&self) -> Vec { - self.network_service + self.inner + .network_service .connected_peers() .await .into_iter() @@ -243,7 +165,7 @@ impl NetworkAsyncService { } pub async fn get_address(&self, peer_id: types::peer_info::PeerId) -> Vec { - self.network_service.get_address(peer_id.into()).await + self.inner.network_service.get_address(peer_id.into()).await } pub fn start( @@ -272,8 +194,8 @@ impl NetworkAsyncService { let (service, tx, rx, event_rx, tx_command) = build_network_service( chain_info, &config, - PROTOCOLS.clone(), - Some((rpc_info, network_rpc_service.clone())), + NotificationMessage::protocols(), + Some((rpc_info, network_rpc_service)), ); info!( "network started at {} with seed {},network address is {}", @@ -286,9 +208,6 @@ impl NetworkAsyncService { service.identify() ); - let raw_message_processor = MessageProcessor::new(); - let raw_message_processor_clone = raw_message_processor.clone(); - let peer_id = service.identify().clone(); let mut peers = HashMap::new(); @@ -305,14 +224,13 @@ impl NetworkAsyncService { let metrics = NetworkMetrics::register().ok(); let inner = Inner { - network_service: service.clone(), + network_service: service, bus, - raw_message_processor: raw_message_processor_clone, peers, connected_tx, need_send_event, - network_rpc_service, peer_message_handler: Arc::new(peer_message_handler), + metrics, }; let inner = Arc::new(inner); @@ -330,14 +248,7 @@ impl NetworkAsyncService { }); } - Ok(NetworkAsyncService { - raw_message_processor, - network_service: service, - tx, - peer_id, - inner, - metrics, - }) + Ok(NetworkAsyncService { tx, peer_id, inner }) } } @@ -373,114 +284,55 @@ impl Inner { "Receive network_message from peer: {:?}", network_msg.peer_id, ); - // TODO: we should decode msg based on protocol name. - // when protocol upgrade, we can decoded data based on the new protocol. - - let message = PeerMessage::decode(&network_msg.data); - match message { - Ok(msg) => { - if let Err(e) = inner.handle_network_message(network_msg.peer_id, msg).await { - warn!("Handle_network_message error: {:?}", e); - } - } - Err(e) => { - warn!("Decode network message {:?} error {:?}", network_msg, e); - } + if let Err(e) = inner.handle_network_message(network_msg).await { + warn!("Handle_network_message error: {:?}", e); } } - async fn handle_network_message(&self, peer_id: PeerId, msg: PeerMessage) -> Result<()> { - match msg { - PeerMessage::NewTransactions(txns) => { - debug!("receive new txn list from {:?} ", peer_id); - if let Some(peer_info) = self.peers.lock().await.get_mut(&peer_id) { - for txn in &txns { - let id = txn.crypto_hash(); - if !peer_info.known_transactions.contains(&id) { - peer_info.known_transactions.put(id, ()); - } else { - return Ok(()); - } + async fn handle_network_message(&self, network_msg: NetworkMessage) -> Result<()> { + let peer_id = network_msg.peer_id; + if let Some(peer_info) = self.peers.lock().await.get_mut(&peer_id) { + let notification = NotificationMessage::decode_notification( + network_msg.protocol_name, + network_msg.data.as_slice(), + )?; + match ¬ification { + NotificationMessage::Transactions(peer_transactions) => { + for txn in &peer_transactions.txns { + let id = txn.id(); + peer_info.known_transactions.put(id, ()); } } - self.peer_message_handler - .handle_transaction(PeerTransactions::new(txns)); - } - PeerMessage::CompactBlock(compact_block, total_difficulty) => { - //TODO: Check total difficulty - let block_header = compact_block.header.clone(); - debug!( - "Receive new compact block from {:?} with hash {:?}", - peer_id, - block_header.id() - ); - - if let Some(peer_info) = self.peers.lock().await.get_mut(&peer_id) { + NotificationMessage::CompactBlock(compact_block_message) => { + let block_header = compact_block_message.compact_block.header.clone(); + let total_difficulty = compact_block_message.total_difficulty; + let block_id = block_header.id(); + debug!( + "Receive new compact block from {:?} with hash {:?}", + peer_id, block_id + ); debug!( "total_difficulty is {},peer_info is {:?}", total_difficulty, peer_info ); - if total_difficulty > peer_info.peer_info.total_difficulty() { - peer_info - .peer_info - .update_chain_status(ChainStatus::new(block_header, total_difficulty)); - } - } else { - error!( - "Receive compat block from {}, but can not find it peer info.", - peer_id - ) + peer_info.known_blocks.put(block_id, ()); + peer_info + .peer_info + .update_chain_status(ChainStatus::new(block_header, total_difficulty)); } - self.peer_message_handler.handle_block(PeerCmpctBlockEvent { - peer_id: peer_id.into(), - compact_block, - }); } - PeerMessage::RawRPCRequest(id, rpc_path, request) => { - debug!("do request {} from peer {}", id, peer_id); - let (tx, rx) = mpsc::channel(1); - self.network_rpc_service.try_send(RawRpcRequestMessage { - responder: tx, - request: (peer_id.clone().into(), rpc_path, request), - })?; - let network_service = self.network_service.clone(); - async_std::task::spawn(Self::handle_response(id, peer_id, rx, network_service)); - } - PeerMessage::RawRPCResponse(id, response) => { - debug!("do response {} from peer {}", id, peer_id); - self.raw_message_processor - .send_response(id, response) - .await?; - } + let peer_message = PeerMessage::new(peer_id.into(), notification); + self.peer_message_handler.handle_message(peer_message); + } else { + error!( + "Receive NetworkMessage from unknown peer {}, protocol: {}", + peer_id, network_msg.protocol_name + ) } Ok(()) } - async fn handle_response( - id: u128, - peer_id: PeerId, - mut rx: mpsc::Receiver>, - network_service: SNetworkService, - ) -> Result<()> { - let response = rx.next().await; - match response { - Some(response) => { - let peer_msg = PeerMessage::RawRPCResponse(id, response); - let data = peer_msg.encode()?; - network_service - .send_message(peer_id, CHAIN_PROTOCOL_NAME.into(), data) - .await?; - debug!("send response by id {} succ.", id); - Ok(()) - } - None => { - debug!("can't get response by id {}", id); - Ok(()) - } - } - } - async fn handle_event_receive(inner: Arc, event: PeerEvent) { if let Err(e) = inner.do_handle_event_receive(event).await { warn!("Handle peer event error: {}", e); @@ -519,10 +371,112 @@ impl Inner { async fn on_peer_disconnected(&self, peer_id: PeerId) { self.peers.lock().await.remove(&peer_id); } + + async fn send_peer_message(&self, msg: PeerMessage) -> Result<()> { + let peer_id = msg.peer_id; + let (protocol_name, data) = msg.notification.encode_notification()?; + self.network_service + .send_message(peer_id.into(), protocol_name, data) + .await?; + Ok(()) + } + + async fn broadcast(&self, notification: NotificationMessage) -> Result<()> { + let protocol_name = notification.protocol_name(); + let _timer = self.metrics.as_ref().map(|metrics| { + metrics + .broadcast_duration + .with_label_values(&[protocol_name.as_ref()]) + .start_timer() + }); + let self_id = self.network_service.identify(); + match ¬ification { + NotificationMessage::CompactBlock(msg) => { + let id = msg.compact_block.header.id(); + debug!("broadcast new compact block message {:?}", id); + let block_header = msg.compact_block.header.clone(); + let total_difficulty = msg.total_difficulty; + if let Some(peer_info) = self.peers.lock().await.get_mut(self_id) { + debug!( + "total_difficulty is {}, peer_info is {:?}", + total_difficulty, peer_info + ); + + let chain_status = ChainStatus::new(block_header.clone(), total_difficulty); + peer_info + .peer_info + .update_chain_status(chain_status.clone()); + self.network_service.update_chain_status(chain_status); + } else { + error!("Can not find self peer info {:?}", self_id); + } + + let send_futures = self.peers.lock().await.iter_mut().filter_map(|(peer_id, peer_info)|{ + if peer_id == self_id { + return None + } + if peer_info.known_blocks.contains(&id) + || peer_info.peer_info.total_difficulty() >= total_difficulty + { + debug!("peer({:?})'s total_difficulty is > block({:?})'s total_difficulty or it know this block, so do not broadcast. ", peer_id, id); + None + }else{ + peer_info.known_blocks.put(id, ()); + Some(self + .send_peer_message( + PeerMessage::new(peer_id.clone().into(), notification.clone()) + )) + } + }).collect::>(); + futures::future::join_all(send_futures) + .await + .into_iter() + .collect::>()?; + Ok(()) + } + NotificationMessage::Transactions(msg) => { + let send_futures = self + .peers + .lock() + .await + .iter_mut() + .filter_map(|(peer_id, peer_info)| { + if peer_id == self_id { + return None; + } + let mut txns_unhandled = Vec::new(); + for txn in &msg.txns { + let id = txn.id(); + if !peer_info.known_transactions.contains(&id) { + peer_info.known_transactions.put(id, ()); + txns_unhandled.push(txn.clone()); + } + } + if txns_unhandled.is_empty() { + debug!( + "{} known all transactions, ignore broadcast message.", + peer_id + ); + return None; + } + Some(self.send_peer_message(PeerMessage::new_transactions( + peer_id.clone().into(), + TransactionsMessage::new(txns_unhandled), + ))) + }) + .collect::>(); + futures::future::join_all(send_futures) + .await + .into_iter() + .collect::>()?; + Ok(()) + } + } + } } // TODO: figure out a better place for the actor. -/// Used to manage broadcast new txn and new block event to other network peers. +/// Used to manage broadcast new txn to other network peers. pub struct PeerMsgBroadcasterService { network: NetworkAsyncService, } @@ -545,125 +499,36 @@ impl ServiceFactory for PeerMsgBroadcasterService { impl ActorService for PeerMsgBroadcasterService { fn started(&mut self, ctx: &mut ServiceContext) -> Result<()> { ctx.subscribe::(); - ctx.subscribe::(); Ok(()) } fn stopped(&mut self, ctx: &mut ServiceContext) -> Result<()> { ctx.unsubscribe::(); - ctx.unsubscribe::(); Ok(()) } } -impl EventHandler for PeerMsgBroadcasterService { - fn handle_event( - &mut self, - msg: NetCmpctBlockMessage, - ctx: &mut ServiceContext, - ) { - let id = msg.compact_block.header.id(); - debug!("broadcast new compact block message {:?}", id); - let network = self.network.clone(); - let block_header = msg.compact_block.header.clone(); - let total_difficulty = msg.total_difficulty; - let msg = PeerMessage::CompactBlock(msg.compact_block, total_difficulty); - let self_id: PeerId = self.network.identify().into(); - ctx.spawn(async move { - let peers = network.peers(); - if let Some(peer_info) = peers.lock().await.get_mut(&self_id) { - debug!( - "total_difficulty is {}, peer_info is {:?}", - total_difficulty, peer_info - ); - - let chain_status = ChainStatus::new(block_header.clone(), total_difficulty); - peer_info.peer_info.update_chain_status(chain_status.clone()); - network.update_chain_status(chain_status); - }else{ - error!("Can not find self peer info {:?}", &self_id); - } - - for (peer_id, peer_info) in peers.lock().await.iter_mut() { - if peer_info.known_blocks.contains(&id) - || peer_info.peer_info.total_difficulty() >= total_difficulty - { - debug!("peer({:?})'s total_difficulty is > block({:?})'s total_difficulty or it know this block, so do not broadcast. ", peer_id, id); - continue; - } - - peer_info.known_blocks.put(id, ()); - network - .send_peer_message( - BLOCK_PROTOCOL_NAME.into(), - peer_id.clone().into(), - msg.clone(), - ) - .await?; - } - Ok(()) - }.then(|result: Result<()>| async move{ - if let Err(e) = result{ - error!("[peer-message-broadcaster] Handle NetCmpctBlockMessage error: {:?}", e); - } - })) - } -} - -/// handle txn relayer +// handle txn relayer impl EventHandler for PeerMsgBroadcasterService { fn handle_event( &mut self, msg: PropagateNewTransactions, ctx: &mut ServiceContext, ) { - let (protocol_name, txns) = { - (TXN_PROTOCOL_NAME, msg.propagate_transaction()) - // new version of txn message can come here - }; - // false positive + let txns = msg.propagate_transaction(); if txns.is_empty() { + error!("broadcast PropagateNewTransactions is empty."); return; } debug!("propagate new txns, len: {}", txns.len()); let network_service = self.network.clone(); - let mut txn_map: HashMap = HashMap::new(); - for txn in txns { - txn_map.insert(txn.crypto_hash(), txn); - } - let self_peer_id: PeerId = self.network.identify().into(); - ctx.spawn( - async move { - let peers = network_service.peers(); - for (peer_id, peer_info) in peers.lock().await.iter_mut() { - let mut txns_unhandled = Vec::new(); - for (id, txn) in &txn_map { - if !peer_info.known_transactions.contains(id) - && !peer_id.eq(&self_peer_id.clone()) - { - peer_info.known_transactions.put(*id, ()); - txns_unhandled.push(txn.clone()); - } - } - network_service - .send_peer_message( - Cow::Borrowed(protocol_name), - peer_id.clone().into(), - PeerMessage::NewTransactions(txns_unhandled), - ) - .await?; - } - Ok(()) - } - .then(|result: Result<()>| async move { - if let Err(e) = result { - error!( - "[peer-message-broadcaster] Handle PropagateNewTransactions error: {:?}", - e - ); - } - }), - ); + ctx.spawn(async move { + network_service + .broadcast(NotificationMessage::Transactions(TransactionsMessage::new( + txns, + ))) + .await; + }); } } diff --git a/network/src/network_metrics.rs b/network/src/network_metrics.rs index 366d9a4096..29e6fd13e2 100644 --- a/network/src/network_metrics.rs +++ b/network/src/network_metrics.rs @@ -1,24 +1,18 @@ -use prometheus::{Error as PrometheusError, IntGauge, Opts}; +use prometheus::Error as PrometheusError; +use starcoin_metrics::HistogramVec; #[derive(Clone)] pub struct NetworkMetrics { - pub request_count: IntGauge, - pub request_timeout_count: IntGauge, + pub broadcast_duration: HistogramVec, } impl NetworkMetrics { pub fn register() -> Result { - let request_count = register_int_gauge!( - Opts::new("request_count", "rpc request count").namespace("starcoin") + let broadcast_duration = register_histogram_vec!( + "broadcast_duration", + "network broadcast message duration by protocol", + &["notification_protocol"] )?; - - let request_timeout_count = - register_int_gauge!( - Opts::new("request_timeout_count", "request timeout count").namespace("starcoin") - )?; - Ok(Self { - request_count, - request_timeout_count, - }) + Ok(Self { broadcast_duration }) } } diff --git a/node/Cargo.toml b/node/Cargo.toml index 4d7f0f77a9..70d152747c 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -24,7 +24,6 @@ starcoin-consensus = {path = "../consensus"} starcoin-executor = {path = "../executor"} network-api = {path = "../network/api"} starcoin-network = {path = "../network"} -starcoin-tx-relay = {path="../txpool/tx-relay"} starcoin-txpool = { path = "../txpool" } starcoin-chain-service = { path = "../chain/service" } starcoin-chain-notify = { path = "../chain/chain-notify" } @@ -44,7 +43,6 @@ starcoin-statedb = { path = "../state/statedb"} starcoin-state-service = { path = "../state/service"} starcoin-txpool-api = {path = "../txpool/api"} starcoin-sync-api = {package="starcoin-sync-api", path="../sync/api"} -starcoin-block-relayer-api = { path = "../block-relayer/api/" } starcoin-block-relayer = {path = "../block-relayer"} starcoin-network-rpc = {path = "../network-rpc"} starcoin-network-rpc-api = { path = "../network-rpc/api" } diff --git a/node/src/peer_message_handler.rs b/node/src/peer_message_handler.rs index b1ba67917f..ef8acea599 100644 --- a/node/src/peer_message_handler.rs +++ b/node/src/peer_message_handler.rs @@ -2,12 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use crate::metrics::NODE_METRICS; +use network_api::messages::{ + NotificationMessage, PeerCompactBlockMessage, PeerMessage, PeerTransactionsMessage, +}; use network_api::PeerMessageHandler; use starcoin_block_relayer::BlockRelayer; -use starcoin_block_relayer_api::PeerCmpctBlockEvent; use starcoin_logger::prelude::*; use starcoin_service_registry::ServiceRef; -use starcoin_tx_relay::PeerTransactions; use starcoin_txpool::TxPoolActorService; use starcoin_types::time::duration_since_epoch; use std::sync::mpsc::TrySendError; @@ -30,31 +31,40 @@ impl NodePeerMessageHandler { } impl PeerMessageHandler for NodePeerMessageHandler { - fn handle_transaction(&self, transaction: PeerTransactions) { - if let Err(e) = self.txpool_service.notify(transaction) { - match e { - TrySendError::Full(_) => { - warn!("Handle PeerTransaction error, TxPoolService is too busy."); - } - TrySendError::Disconnected(_) => { - error!("Handle PeerTransaction error, TxPoolService is shutdown."); + fn handle_message(&self, peer_message: PeerMessage) { + match peer_message.notification { + NotificationMessage::Transactions(message) => { + if let Err(e) = self + .txpool_service + .notify(PeerTransactionsMessage::new(peer_message.peer_id, message)) + { + match e { + TrySendError::Full(_) => { + warn!("Handle PeerTransaction error, TxPoolService is too busy."); + } + TrySendError::Disconnected(_) => { + error!("Handle PeerTransaction error, TxPoolService is shutdown."); + } + } } } - } - } - - fn handle_block(&self, block: PeerCmpctBlockEvent) { - let header_time = block.compact_block.header.timestamp; - NODE_METRICS - .block_latency - .observe((duration_since_epoch().as_millis() - header_time as u128) as f64); - if let Err(e) = self.block_relayer.notify(block) { - match e { - TrySendError::Full(_) => { - warn!("Handle PeerCmpctBlock error, BlockRelayer is too busy."); - } - TrySendError::Disconnected(_) => { - error!("Handle PeerCmpctBlock error, BlockRelayer is shutdown."); + NotificationMessage::CompactBlock(message) => { + let header_time = message.compact_block.header.timestamp; + NODE_METRICS + .block_latency + .observe((duration_since_epoch().as_millis() - header_time as u128) as f64); + if let Err(e) = self + .block_relayer + .notify(PeerCompactBlockMessage::new(peer_message.peer_id, *message)) + { + match e { + TrySendError::Full(_) => { + warn!("Handle PeerCmpctBlock error, BlockRelayer is too busy."); + } + TrySendError::Disconnected(_) => { + error!("Handle PeerCmpctBlock error, BlockRelayer is shutdown."); + } + } } } } diff --git a/test-helper/Cargo.toml b/test-helper/Cargo.toml index e88996cc85..cc4374e9bf 100644 --- a/test-helper/Cargo.toml +++ b/test-helper/Cargo.toml @@ -37,10 +37,8 @@ starcoin-account-api = { path = "../account/api" } starcoin-account-service = { path = "../account/service" } starcoin-state-api = { path = "../state/api" } starcoin-state-service = { path = "../state/service" } -starcoin-tx-relay= { path="../txpool/tx-relay"} starcoin-txpool-api = { path = "../txpool/api" } starcoin-sync-api = { package = "starcoin-sync-api", path = "../sync/api" } -starcoin-block-relayer-api = { path = "../block-relayer/api" } starcoin-block-relayer = { path = "../block-relayer" } starcoin-network-rpc = { path = "../network-rpc" } starcoin-network-rpc-api = { path = "../network-rpc/api" } diff --git a/test-helper/src/dummy_network_service.rs b/test-helper/src/dummy_network_service.rs index 3317f7844d..1fec0b8392 100644 --- a/test-helper/src/dummy_network_service.rs +++ b/test-helper/src/dummy_network_service.rs @@ -2,6 +2,7 @@ use accumulator::AccumulatorNode; use futures::future::BoxFuture; use futures::FutureExt; +use network_api::messages::NotificationMessage; use network_api::{messages::PeerMessage, NetworkService, PeerId, PeerProvider}; use network_rpc_core::RawRpcClient; use network_rpc_core::Result; @@ -15,7 +16,6 @@ use starcoin_types::block::{BlockHeader, BlockInfo, BlockNumber}; use starcoin_types::peer_info::PeerInfo; use starcoin_types::transaction::TransactionInfo; use state_tree::StateNode; -use std::borrow::Cow; use std::sync::Arc; use std::time::Duration; @@ -227,14 +227,11 @@ impl DummyNetworkService { #[async_trait::async_trait] impl NetworkService for DummyNetworkService { - async fn send_peer_message( - &self, - _protocol_name: Cow<'static, str>, - _peer_id: PeerId, - _msg: PeerMessage, - ) -> anyhow::Result<()> { + async fn send_peer_message(&self, _msg: PeerMessage) -> anyhow::Result<()> { Ok(()) } + + async fn broadcast(&self, _notification: NotificationMessage) {} } impl RawRpcClient for DummyNetworkService { diff --git a/test-helper/src/network.rs b/test-helper/src/network.rs index 900bde77cf..e3abb009b0 100644 --- a/test-helper/src/network.rs +++ b/test-helper/src/network.rs @@ -2,36 +2,30 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::{format_err, Result}; +use network_api::messages::PeerMessage; +use network_api::{MultiaddrWithPeerId, PeerMessageHandler}; use starcoin_config::NodeConfig; use starcoin_genesis::Genesis; use starcoin_network_rpc::NetworkRpcService; use starcoin_service_registry::bus::BusService; use starcoin_service_registry::mocker::MockHandler; use starcoin_service_registry::{RegistryAsyncService, RegistryService, ServiceRef}; +use starcoin_storage::block_info::BlockInfoStore; use starcoin_storage::{BlockStore, Storage}; +use starcoin_types::peer_info::RpcInfo; +use starcoin_types::startup_info::{ChainInfo, ChainStatus}; use std::sync::{Arc, Mutex}; -use network_api::{MultiaddrWithPeerId, PeerMessageHandler}; -use starcoin_block_relayer_api::PeerCmpctBlockEvent; pub use starcoin_network::NetworkAsyncService; -use starcoin_storage::block_info::BlockInfoStore; -use starcoin_tx_relay::PeerTransactions; -use starcoin_types::peer_info::RpcInfo; -use starcoin_types::startup_info::{ChainInfo, ChainStatus}; #[derive(Clone, Default)] pub struct MockPeerMessageHandler { - pub txns: Arc>>, - pub blocks: Arc>>, + pub messages: Arc>>, } impl PeerMessageHandler for MockPeerMessageHandler { - fn handle_transaction(&self, transaction: PeerTransactions) { - self.txns.lock().unwrap().push(transaction); - } - - fn handle_block(&self, block: PeerCmpctBlockEvent) { - self.blocks.lock().unwrap().push(block); + fn handle_message(&self, peer_message: PeerMessage) { + self.messages.lock().unwrap().push(peer_message); } } diff --git a/txpool/Cargo.toml b/txpool/Cargo.toml index 924529d90f..8b8feb26e9 100644 --- a/txpool/Cargo.toml +++ b/txpool/Cargo.toml @@ -37,11 +37,13 @@ starcoin-state-tree={path="../state/state-tree"} starcoin-executor={path="../executor"} starcoin-consensus = {path = "../consensus"} starcoin-config={path="../config"} -tx-relay = { path = "./tx-relay", package = "starcoin-tx-relay" } starcoin-service-registry = { path = "../commons/service-registry" } +network-api = { package = "network-api", path = "../network/api" } + proptest = { version = "0.10.1", default-features = false, optional = true } proptest-derive = { version = "0.2.0", default-features = false, optional = true } + [dev-dependencies] stdlib = { path = "../vm/stdlib" } tempfile="3" diff --git a/txpool/api/src/lib.rs b/txpool/api/src/lib.rs index 9ff6c5d29b..668526ffa8 100644 --- a/txpool/api/src/lib.rs +++ b/txpool/api/src/lib.rs @@ -67,3 +67,18 @@ pub trait TxPoolSyncService: Clone + Send + Sync + Unpin { max_len: Option, ) -> Vec; } + +#[derive(Clone, Debug)] +pub struct PropagateNewTransactions { + txns: Vec, +} + +impl PropagateNewTransactions { + pub fn propagate_transaction(self) -> Vec { + self.txns + } + + pub fn new(txns: Vec) -> Self { + Self { txns } + } +} diff --git a/txpool/src/lib.rs b/txpool/src/lib.rs index 57391eec74..5828918ab7 100644 --- a/txpool/src/lib.rs +++ b/txpool/src/lib.rs @@ -14,13 +14,13 @@ use anyhow::{format_err, Result}; use counters::{TXPOOL_STATUS_GAUGE_VEC, TXPOOL_TXNS_GAUGE}; use starcoin_config::NodeConfig; use starcoin_service_registry::{ActorService, EventHandler, ServiceContext, ServiceFactory}; -use starcoin_txpool_api::TxnStatusFullEvent; +use starcoin_txpool_api::{PropagateNewTransactions, TxnStatusFullEvent}; use std::sync::Arc; use storage::{BlockStore, Storage}; use tx_pool_service_impl::Inner; -use tx_relay::{PeerTransactions, PropagateNewTransactions}; use types::{sync_status::SyncStatus, system_events::SyncStatusChangeEvent}; +use network_api::messages::PeerTransactionsMessage; pub use pool::TxStatus; use serde::export::Option::Some; pub use tx_pool_service_impl::TxPoolService; @@ -149,13 +149,12 @@ impl EventHandler for TxPoolActorService { } } -impl EventHandler for TxPoolActorService { - fn handle_event(&mut self, msg: PeerTransactions, _ctx: &mut ServiceContext) { +impl EventHandler for TxPoolActorService { + fn handle_event(&mut self, msg: PeerTransactionsMessage, _ctx: &mut ServiceContext) { //TODO should filter msg an NetworkService if self.is_synced() { // JUST need to keep at most once delivery. - let txns = msg.peer_transactions(); - let _ = self.inner.import_txns(txns); + let _ = self.inner.import_txns(msg.message.txns); } else { debug!("[txpool] Ignore PeerTransactions event because the node has not been synchronized yet."); } diff --git a/txpool/src/test.rs b/txpool/src/test.rs index e3ac1613cf..8b87d955a0 100644 --- a/txpool/src/test.rs +++ b/txpool/src/test.rs @@ -5,6 +5,8 @@ use crate::pool::AccountSeqNumberClient; use crate::TxStatus; use anyhow::Result; use crypto::{hash::PlainCryptoHash, keygen::KeyGen}; +use network_api::messages::{PeerTransactionsMessage, TransactionsMessage}; +use network_api::PeerId; use parking_lot::RwLock; use starcoin_config::NodeConfig; use starcoin_executor::{ @@ -19,7 +21,6 @@ use std::time::Duration; use std::{collections::HashMap, sync::Arc}; use stest::actix_export::time::delay_for; use storage::BlockStore; -use tx_relay::PeerTransactions; use types::{ account_address::{self, AccountAddress}, account_config, @@ -230,7 +231,10 @@ async fn test_txpool_actor_service() { let txn = generate_txn(config, 0); tx_pool_actor - .notify(PeerTransactions::new(vec![txn.clone()])) + .notify(PeerTransactionsMessage::new( + PeerId::random(), + TransactionsMessage::new(vec![txn.clone()]), + )) .unwrap(); delay_for(Duration::from_millis(200)).await; diff --git a/txpool/tx-relay/Cargo.toml b/txpool/tx-relay/Cargo.toml deleted file mode 100644 index 772e8a9fa3..0000000000 --- a/txpool/tx-relay/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "starcoin-tx-relay" -version = "0.8.0" -authors = ["Starcoin Core Dev "] -license = "Apache-2.0" -publish = false -edition = "2018" - -[dependencies] -actix = "0.10.0" -starcoin-types = { package = "starcoin-types", path = "../../types"} diff --git a/txpool/tx-relay/src/lib.rs b/txpool/tx-relay/src/lib.rs deleted file mode 100644 index 95ec6f8acd..0000000000 --- a/txpool/tx-relay/src/lib.rs +++ /dev/null @@ -1,54 +0,0 @@ -use starcoin_types::transaction::SignedUserTransaction; - -pub enum TxnRelayMessage { - /// propagate local txns to remote peers, - PropagateNewTransactions(PropagateNewTransactions), - /// txns received from remote peers. - PeerTransactions(PeerTransactions), -} - -#[derive(Clone, Debug)] -pub struct PropagateNewTransactions { - txns: Vec, -} - -impl PropagateNewTransactions { - pub fn propagate_transaction(self) -> Vec { - self.txns - } - - pub fn new(txns: Vec) -> Self { - Self { txns } - } -} - -impl actix::Message for PropagateNewTransactions { - type Result = (); -} - -#[derive(Clone, Debug)] -pub struct PeerTransactions { - txns: Vec, -} - -impl actix::Message for PeerTransactions { - type Result = (); -} - -impl PeerTransactions { - pub fn new(txns: Vec) -> Self { - Self { txns } - } - - pub fn peer_transactions(self) -> Vec { - self.txns - } -} - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/types/src/lib.rs b/types/src/lib.rs index 549668381c..df9780408d 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -75,26 +75,9 @@ pub mod write_set { pub use starcoin_vm_types::write_set::{WriteOp, WriteSet, WriteSetMut}; } -use once_cell::sync::Lazy; -use std::borrow::Cow; - pub mod genesis_config { pub use starcoin_vm_types::genesis_config::*; } -pub mod sync_status; - -//TODO should define at here? move to network api. -pub const CHAIN_PROTOCOL_NAME: &str = "/starcoin/chain/1"; -pub const TXN_PROTOCOL_NAME: &str = "/starcoin/txn/1"; -pub const BLOCK_PROTOCOL_NAME: &str = "/starcoin/block/1"; - -pub static PROTOCOLS: Lazy>> = Lazy::new(|| { - vec![ - CHAIN_PROTOCOL_NAME.into(), - TXN_PROTOCOL_NAME.into(), - BLOCK_PROTOCOL_NAME.into(), - ] -}); - pub mod stress_test; +pub mod sync_status; From b4a1ede416b9bafeb5f98b020a3baefba15acadc Mon Sep 17 00:00:00 2001 From: jolestar Date: Mon, 30 Nov 2020 19:35:54 +0800 Subject: [PATCH 2/3] [sync] Add a test for test sync by notification message. --- network/api/src/messages.rs | 2 +- network/src/network.rs | 2 +- sync/tests/full_sync_test.rs | 56 ++++++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/network/api/src/messages.rs b/network/api/src/messages.rs index 29ae68006a..c2244da9a0 100644 --- a/network/api/src/messages.rs +++ b/network/api/src/messages.rs @@ -61,7 +61,7 @@ impl NotificationMessage { pub fn encode_notification(&self) -> Result<(Cow<'static, str>, Vec)> { Ok(match self { NotificationMessage::Transactions(msg) => (TXN_PROTOCOL_NAME.into(), msg.encode()?), - NotificationMessage::CompactBlock(msg) => (TXN_PROTOCOL_NAME.into(), msg.encode()?), + NotificationMessage::CompactBlock(msg) => (BLOCK_PROTOCOL_NAME.into(), msg.encode()?), }) } diff --git a/network/src/network.rs b/network/src/network.rs index eb62f07fb1..2ab0db2eb2 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -285,7 +285,7 @@ impl Inner { network_msg.peer_id, ); if let Err(e) = inner.handle_network_message(network_msg).await { - warn!("Handle_network_message error: {:?}", e); + error!("Handle_network_message error: {:?}", e); } } diff --git a/sync/tests/full_sync_test.rs b/sync/tests/full_sync_test.rs index f1c539cd1f..37b3c71450 100644 --- a/sync/tests/full_sync_test.rs +++ b/sync/tests/full_sync_test.rs @@ -3,6 +3,8 @@ mod test_sync; use config::{NodeConfig, SyncMode}; use futures::executor::block_on; use logger::prelude::*; +use starcoin_service_registry::ActorService; +use starcoin_sync::sync2::SyncService2; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -14,6 +16,60 @@ fn test_full_sync() { test_sync::test_sync(SyncMode::FULL) } +#[stest::test(timeout = 120)] +fn test_sync_by_notification() { + let first_config = Arc::new(NodeConfig::random_for_test()); + info!( + "first peer : {:?}", + first_config.network.self_peer_id().unwrap() + ); + let first_node = run_node_by_config(first_config.clone()).unwrap(); + let first_chain = first_node.chain_service().unwrap(); + + let mut second_config = NodeConfig::random_for_test(); + info!( + "second peer : {:?}", + second_config.network.self_peer_id().unwrap() + ); + second_config.network.seeds = vec![first_config.network.self_address().unwrap()]; + second_config.miner.enable_miner_client = false; + + let second_node = run_node_by_config(Arc::new(second_config)).unwrap(); + // stop sync service and just use notification message to sync. + second_node + .stop_service(SyncService2::service_name().to_string()) + .unwrap(); + + let second_chain = second_node.chain_service().unwrap(); + + //wait second node sync service stop. + sleep(Duration::from_millis(500)); + + let count = 5; + for _i in 0..count { + first_node.generate_block().unwrap(); + } + + //wait block generate. + sleep(Duration::from_millis(500)); + let block_1 = block_on(async { first_chain.main_head_block().await.unwrap() }); + let number_1 = block_1.header().number(); + + let mut number_2 = 0; + for i in 0..10 as usize { + std::thread::sleep(Duration::from_secs(2)); + let block_2 = block_on(async { second_chain.main_head_block().await.unwrap() }); + number_2 = block_2.header().number(); + debug!("index : {}, second chain number is {}", i, number_2); + if number_2 == number_1 { + break; + } + } + assert_eq!(number_1, number_2, "two node is not sync."); + second_node.stop().unwrap(); + first_node.stop().unwrap(); +} + //TODO fixme #[ignore] #[stest::test(timeout = 120)] From d39399aae0aa9f4e7cb271b8b2ac25b7dc4812b3 Mon Sep 17 00:00:00 2001 From: jolestar Date: Tue, 1 Dec 2020 16:18:29 +0800 Subject: [PATCH 3/3] [network] Spawn network worker future when service run, and use memory transport in test. --- Cargo.lock | 1 + commons/service-registry/src/handler_proxy.rs | 2 +- commons/service-registry/src/service_actor.rs | 2 +- config/src/network_config.rs | 26 +- network-p2p/types/Cargo.toml | 1 + network-p2p/types/src/lib.rs | 29 +- network/src/net.rs | 87 +++-- network/src/net_test.rs | 313 ++++++++---------- network/src/network.rs | 35 +- sync/tests/full_sync_test.rs | 8 +- 10 files changed, 270 insertions(+), 234 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca8f8bf33d..66bb7a66be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4444,6 +4444,7 @@ dependencies = [ "bytes 0.5.6", "derive_more", "libp2p", + "rand 0.7.3", "serde", "starcoin-canonical-serialization", ] diff --git a/commons/service-registry/src/handler_proxy.rs b/commons/service-registry/src/handler_proxy.rs index a4cd5adf16..e82c9d370f 100644 --- a/commons/service-registry/src/handler_proxy.rs +++ b/commons/service-registry/src/handler_proxy.rs @@ -80,7 +80,7 @@ where fn stop(&mut self, ctx: &mut ServiceContext) -> Result<()> { if self.status().is_stopped() { - warn!("Service {} has bean stopped", S::service_name()); + info!("Service {} has bean stopped", S::service_name()); return Ok(()); } let service = self.service.take(); diff --git a/commons/service-registry/src/service_actor.rs b/commons/service-registry/src/service_actor.rs index 0ddd1e7168..cc790fd9c2 100644 --- a/commons/service-registry/src/service_actor.rs +++ b/commons/service-registry/src/service_actor.rs @@ -255,7 +255,7 @@ where fn handle(&mut self, msg: EventMessage, ctx: &mut Self::Context) -> Self::Result { debug!("{} handle event: {:?}", S::service_name(), &msg.msg); if self.proxy.status().is_stopped() { - error!("Service {} is stopped", S::service_name()); + info!("Service {} is already stopped", S::service_name()); return; } let mut service_ctx = ServiceContext::new(&mut self.cache, ctx); diff --git a/config/src/network_config.rs b/config/src/network_config.rs index 6a5760d19c..a591898e7f 100644 --- a/config/src/network_config.rs +++ b/config/src/network_config.rs @@ -7,6 +7,7 @@ use crate::{ }; use anyhow::{bail, format_err, Result}; use network_p2p_types::{ + is_memory_addr, memory_addr, multiaddr::{Multiaddr, Protocol}, MultiaddrWithPeerId, }; @@ -58,11 +59,14 @@ impl NetworkConfig { fn prepare_peer_id(&mut self) { let peer_id = PeerId::from_ed25519_public_key(self.network_keypair().public_key.clone()); - let host = self - .listen - .clone() - .replace(0, |_p| Some(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))) - .expect("Replace multi address fail."); + let host = if is_memory_addr(&self.listen) { + self.listen.clone() + } else { + self.listen + .clone() + .replace(0, |_p| Some(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))) + .expect("Replace multi address fail.") + }; self.self_address = Some(MultiaddrWithPeerId::new(host, peer_id.clone().into())); self.self_peer_id = Some(peer_id); } @@ -103,10 +107,16 @@ impl ConfigModule for NetworkConfig { } else { DEFAULT_NETWORK_PORT }; - Ok(Self { - listen: format!("/ip4/0.0.0.0/tcp/{}", port) + //test env use in memory transport. + let listen = if base.net.is_test() { + memory_addr(port as u64) + } else { + format!("/ip4/0.0.0.0/tcp/{}", port) .parse() - .expect("Parse multi address fail."), + .expect("Parse multi address fail.") + }; + Ok(Self { + listen, seeds, network_keypair: Some(Arc::new(Self::load_or_generate_keypair(opt, base)?)), self_peer_id: None, diff --git a/network-p2p/types/Cargo.toml b/network-p2p/types/Cargo.toml index d70fd8f041..7b4d892b4a 100644 --- a/network-p2p/types/Cargo.toml +++ b/network-p2p/types/Cargo.toml @@ -11,6 +11,7 @@ anyhow = "1.0.34" bitflags = "1.2.0" bytes = "0.5.0" derive_more = "0.99.11" +rand = "0.7.3" scs = { package="starcoin-canonical-serialization", path = "../../commons/scs"} serde = { version = "1.0", features = ["derive"] } libp2p = { version = "0.30.1", default-features = false, features = ["request-response"] } diff --git a/network-p2p/types/src/lib.rs b/network-p2p/types/src/lib.rs index bc2a98eed9..cc29dee60e 100644 --- a/network-p2p/types/src/lib.rs +++ b/network-p2p/types/src/lib.rs @@ -8,8 +8,8 @@ use std::fmt; use std::str::FromStr; pub use libp2p::core::{identity, multiaddr, Multiaddr, PeerId, PublicKey}; -pub use libp2p::multihash; pub use libp2p::request_response::{InboundFailure, OutboundFailure}; +pub use libp2p::{build_multiaddr, multihash}; /// Parses a string address and splits it into Multiaddress and PeerId, if /// valid. @@ -42,6 +42,22 @@ pub fn parse_addr(mut addr: Multiaddr) -> Result<(PeerId, Multiaddr), ParseErr> Ok((who, addr)) } +/// Build memory protocol Multiaddr by port +pub fn memory_addr(port: u64) -> Multiaddr { + build_multiaddr!(Memory(port)) +} + +/// Generate a random memory protocol Multiaddr +pub fn random_memory_addr() -> Multiaddr { + memory_addr(rand::random::()) +} + +/// Check the address is a memory protocol Multiaddr. +pub fn is_memory_addr(addr: &Multiaddr) -> bool { + addr.iter() + .any(|protocol| matches!(protocol, libp2p::core::multiaddr::Protocol::Memory(_))) +} + /// Address of a node, including its identity. /// /// This struct represents a decoded version of a multiaddress that ends with `/p2p/`. @@ -187,3 +203,14 @@ pub struct ProtocolRequest { pub protocol: Cow<'static, str>, pub request: IncomingRequest, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_memory_address() { + let addr = random_memory_addr(); + assert!(is_memory_addr(&addr)); + } +} diff --git a/network/src/net.rs b/network/src/net.rs index 58972b7cc5..cd556513e2 100644 --- a/network/src/net.rs +++ b/network/src/net.rs @@ -13,7 +13,7 @@ use network_p2p::{ identity, Event, Multiaddr, NetworkConfiguration, NetworkService, NetworkWorker, NodeKeyConfig, Params, ProtocolId, Secret, }; -use network_p2p_types::{PeerId, ProtocolRequest, RequestFailure}; +use network_p2p_types::{is_memory_addr, PeerId, ProtocolRequest, RequestFailure}; use prometheus::{default_registry, Registry}; use starcoin_network_rpc::NetworkRpcService; use starcoin_service_registry::ServiceRef; @@ -25,8 +25,11 @@ use types::startup_info::{ChainInfo, ChainStatus}; #[derive(Clone)] pub struct SNetworkService { - inner: NetworkInner, - service: Arc, + protocol: ProtocolId, + chain_info: ChainInfo, + cfg: NetworkConfiguration, + metrics_registry: Option, + service: Option>, net_tx: Option>, } @@ -42,18 +45,12 @@ impl SNetworkService { cfg: NetworkConfiguration, metrics_registry: Option, ) -> Self { - let worker = - NetworkWorker::new(Params::new(cfg, protocol, chain_info, metrics_registry)).unwrap(); - let service = worker.service().clone(); - let worker = worker; - - async_std::task::spawn(worker); - - let inner = NetworkInner::new(service.clone()); - Self { - inner, - service, + protocol, + chain_info, + cfg, + metrics_registry, + service: None, net_tx: None, } } @@ -70,21 +67,28 @@ impl SNetworkService { let (tx, net_rx) = mpsc::unbounded(); let (net_tx, rx) = mpsc::unbounded::(); let (event_tx, event_rx) = mpsc::unbounded::(); - let inner = self.inner.clone(); + let worker = NetworkWorker::new(Params::new( + self.cfg.clone(), + self.protocol.clone(), + self.chain_info.clone(), + self.metrics_registry.clone(), + )) + .unwrap(); self.net_tx = Some(net_tx.clone()); - - async_std::task::spawn(Self::start_network(inner, tx, rx, event_tx, close_rx)); + self.service = Some(worker.service().clone()); + async_std::task::spawn(Self::start_network(worker, tx, rx, event_tx, close_rx)); (net_tx, net_rx, event_rx, close_tx) } async fn start_network( - inner: NetworkInner, + mut worker: NetworkWorker, net_tx: mpsc::UnboundedSender, net_rx: mpsc::UnboundedReceiver, event_tx: mpsc::UnboundedSender, close_rx: mpsc::UnboundedReceiver<()>, ) { + let inner = NetworkInner::new(worker.service().clone()); let mut event_stream = inner.service.event_stream("network").fuse(); let mut net_rx = net_rx.fuse(); let mut close_rx = close_rx.fuse(); @@ -98,10 +102,10 @@ impl SNetworkService { inner.handle_network_receive(event,net_tx.clone(),event_tx.clone()).await; }, _ = close_rx.select_next_some() => { - //TODO - debug!("To shutdown command "); + info!("Network shutdown"); break; } + _ = (&mut worker).fuse() => {}, complete => { debug!("all stream are complete"); break; @@ -109,13 +113,18 @@ impl SNetworkService { } } } + fn service(&self) -> &Arc { + self.service + .as_ref() + .expect("Should call network function after network running.") + } pub async fn is_connected(&self, peer_id: PeerId) -> Result { - Ok(self.service.is_connected(peer_id).await) + Ok(self.service().is_connected(peer_id).await) } pub fn identify(&self) -> &PeerId { - self.service.peer_id() + self.service().peer_id() } pub async fn send_message( @@ -125,7 +134,7 @@ impl SNetworkService { message: Vec, ) -> Result<()> { debug!("Send message to {}", &peer_id); - self.service + self.service() .write_notification(peer_id, protocol_name, message); Ok(()) @@ -139,34 +148,38 @@ impl SNetworkService { ) -> Result, RequestFailure> { let protocol = protocol.into(); debug!("Send request to peer {} and rpc: {:?}", target, protocol); - self.service.request(target.into(), protocol, request).await + self.service() + .request(target.into(), protocol, request) + .await } pub async fn broadcast_message(&mut self, protocol_name: Cow<'static, str>, message: Vec) { debug!("broadcast message, protocol: {:?}", protocol_name); - self.service.broadcast_message(protocol_name, message).await; + self.service() + .broadcast_message(protocol_name, message) + .await; } pub fn add_peer(&self, peer: String) -> Result<()> { - self.service + self.service() .add_reserved_peer(peer) .map_err(|e| format_err!("{:?}", e)) } pub async fn connected_peers(&self) -> HashSet { - self.service.connected_peers().await + self.service().connected_peers().await } pub fn update_chain_status(&self, chain_status: ChainStatus) { - self.service.update_chain_status(chain_status); + self.service().update_chain_status(chain_status); } pub async fn get_address(&self, peer_id: PeerId) -> Vec { - self.service.get_address(peer_id).await + self.service().get_address(peer_id).await } pub async fn exist_notif_proto(&self, protocol_name: Cow<'static, str>) -> bool { - self.service.exist_notif_proto(protocol_name).await + self.service().exist_notif_proto(protocol_name).await } } @@ -267,11 +280,15 @@ pub fn build_network_service( mpsc::UnboundedReceiver, mpsc::UnboundedSender<()>, ) { - let transport_config = TransportConfig::Normal { - //TODO support enable mdns by config. - enable_mdns: false, - allow_private_ipv4: false, - wasm_external_transport: None, + let transport_config = if is_memory_addr(&cfg.listen) { + TransportConfig::MemoryOnly + } else { + TransportConfig::Normal { + //TODO support enable mdns by config. + enable_mdns: false, + allow_private_ipv4: false, + wasm_external_transport: None, + } }; //let rpc_info: Vec = starcoin_network_rpc_api::gen_client::get_rpc_info(); //TODO define RequestResponseConfig by rpc api diff --git a/network/src/net_test.rs b/network/src/net_test.rs index 022d3ecdeb..7df8f05d00 100644 --- a/network/src/net_test.rs +++ b/network/src/net_test.rs @@ -17,9 +17,11 @@ mod tests { }; use futures_timer::Delay; use network_api::messages::NotificationMessage; + use network_api::Multiaddr; use network_p2p::{identity, DhtEvent, Event}; use network_p2p::{NetworkConfiguration, NetworkWorker, NodeKeyConfig, Params, Secret}; - use network_p2p_types::PeerId; + use network_p2p_types::{random_memory_addr, MultiaddrWithPeerId, PeerId}; + use std::borrow::Cow; use std::future::Future; use std::pin::Pin; use std::{thread, time::Duration}; @@ -37,7 +39,7 @@ mod tests { ); async fn build_test_network_pair_not_wait() -> (NetworkComponent, NetworkComponent) { - let (service1, service2) = build_test_network_pair("127.0.0.1".to_string()); + let (service1, service2) = build_test_network_pair(); let from_peer_id = service1.0.identify().clone(); let to_peer_id = service2.0.identify().clone(); thread::sleep(Duration::from_secs(2)); @@ -100,8 +102,8 @@ mod tests { .is_ok()); } - fn build_test_network_pair(host: String) -> (NetworkComponent, NetworkComponent) { - let mut l = build_test_network_services(2, host, get_random_available_port()).into_iter(); + fn build_test_network_pair() -> (NetworkComponent, NetworkComponent) { + let mut l = build_test_network_services(2).into_iter(); let a = l.next().unwrap(); let b = l.next().unwrap(); (a, b) @@ -109,8 +111,6 @@ mod tests { fn build_test_network_services( num: usize, - host: String, - base_port: u16, ) -> Vec<( SNetworkService, UnboundedSender, @@ -127,36 +127,33 @@ mod tests { UnboundedSender<()>, NetworkConfig, )> = Vec::with_capacity(num); - let mut first_addr = None::; + let mut first_addr: Option = None; let chain_info = ChainInfo::new( BuiltinNetworkID::Test.chain_id(), HashValue::random(), ChainStatus::random(), ); - for index in 0..num { + for _index in 0..num { let mut boot_nodes = Vec::new(); if let Some(first_addr) = first_addr.as_ref() { - boot_nodes.push( - format!("{}/p2p/{}", first_addr, result[0].0.identify().to_base58()) - .parse() - .unwrap(), - ); + boot_nodes.push(MultiaddrWithPeerId::new( + first_addr.clone(), + result[0].0.identify().clone(), + )); } let node_config = NodeConfig::random_for_test(); let mut config = node_config.network.clone(); - config.listen = format!("/ip4/{}/tcp/{}", host, base_port + index as u16) - .parse() - .unwrap(); + config.listen = random_memory_addr(); config.seeds = boot_nodes; info!("listen:{:?},boots {:?}", config.listen, config.seeds); if first_addr.is_none() { - first_addr = Some(config.listen.to_string()); + first_addr = Some(config.listen.clone()); } let mut protocols = NotificationMessage::protocols(); - protocols.push(TEST_PROTOCOL_NAME.into()); + protocols.push(TEST_NOTIF_PROTOCOL_NAME.into()); let server = build_network_service(chain_info.clone(), &config, protocols, None); result.push({ let c: NetworkComponent = ( @@ -173,7 +170,7 @@ mod tests { result } - const TEST_PROTOCOL_NAME: &str = "/test_notif"; + const TEST_NOTIF_PROTOCOL_NAME: &str = "/test_notif"; #[test] fn test_send_receive_1() { ::logger::init_for_test(); @@ -181,7 +178,7 @@ mod tests { let ( (service1, tx1, rx1, _event_rx1, close_tx1, _), (service2, tx2, _rx2, _event_rx2, close_tx2, _), - ) = build_test_network_pair("127.0.0.1".to_string()); + ) = build_test_network_pair(); let msg_peer_id_1 = service1.identify().clone(); let msg_peer_id_2 = service2.identify().clone(); // Once sender has been droped, the select_all will return directly. clone it to prevent it. @@ -204,13 +201,13 @@ mod tests { match if count % 2 == 0 { tx2.unbounded_send(NetworkMessage { peer_id: msg_peer_id_1.clone(), - protocol_name: std::borrow::Cow::Borrowed(TEST_PROTOCOL_NAME), + protocol_name: std::borrow::Cow::Borrowed(TEST_NOTIF_PROTOCOL_NAME), data: random_bytes, }) } else { tx1.unbounded_send(NetworkMessage { peer_id: msg_peer_id_2.clone(), - protocol_name: std::borrow::Cow::Borrowed(TEST_PROTOCOL_NAME), + protocol_name: std::borrow::Cow::Borrowed(TEST_NOTIF_PROTOCOL_NAME), data: random_bytes, }) } { @@ -244,61 +241,11 @@ mod tests { task::block_on(task); } - #[test] - fn test_send_receive_2() { - ::logger::init_for_test(); - - let ( - (service1, _tx1, rx1, _event_rx1, _close_tx1, _), - (service2, _tx2, _rx2, _event_rx2, _close_tx2, _), - ) = build_test_network_pair("127.0.0.1".to_string()); - let msg_peer_id = service1.identify().clone(); - let receive_fut = async move { - let mut rx1 = rx1.fuse(); - loop { - futures::select! { - _message = rx1.select_next_some()=>{ - info!("receive message"); - }, - complete => { - info!("complete"); - break; - } - } - } - }; - - task::spawn(receive_fut); - - //wait the network started. - thread::sleep(Duration::from_secs(1)); - - for _x in 0..1000 { - let random_bytes: Vec = (0..10240).map(|_| rand::random::()).collect(); - let service2_clone = service2.clone(); - - let peer_id = msg_peer_id.clone(); - let fut = async move { - assert_eq!( - service2_clone.is_connected(peer_id.clone()).await.unwrap(), - true - ); - - service2_clone - .send_message(peer_id, TEST_PROTOCOL_NAME.into(), random_bytes) - .await - .unwrap(); - }; - task::spawn(fut); - } - thread::sleep(Duration::from_secs(3)); - } - #[test] fn test_connected_nodes() { ::logger::init_for_test(); - let (service1, _service2) = build_test_network_pair("127.0.0.1".to_string()); + let (service1, _service2) = build_test_network_pair(); thread::sleep(Duration::from_secs(2)); let fut = async move { assert_eq!( @@ -323,7 +270,7 @@ mod tests { let random_bytes: Vec = (0..10240).map(|_| rand::random::()).collect(); service1 .0 - .broadcast_message(TEST_PROTOCOL_NAME.into(), random_bytes.clone()) + .broadcast_message(TEST_NOTIF_PROTOCOL_NAME.into(), random_bytes.clone()) .await; let mut receiver = service2.2.select_next_some(); let response = futures::future::poll_fn(move |cx| Pin::new(&mut receiver).poll(cx)).await; @@ -332,12 +279,13 @@ mod tests { #[stest::test] async fn test_network_exist_notify_proto() { - let service: NetworkComponent = - build_test_network_services(1, "127.0.0.1".to_string(), get_random_available_port()) - .into_iter() - .next() - .unwrap(); - assert!(service.0.exist_notif_proto(TEST_PROTOCOL_NAME.into()).await); + let service: NetworkComponent = build_test_network_services(1).into_iter().next().unwrap(); + assert!( + service + .0 + .exist_notif_proto(TEST_NOTIF_PROTOCOL_NAME.into()) + .await + ); } #[stest::test] @@ -370,104 +318,119 @@ mod tests { data.push(Bytes::from(&b"hello"[..])); let event = Event::NotificationsReceived { remote: PeerId::random(), - protocol: TEST_PROTOCOL_NAME.into(), + protocol: TEST_NOTIF_PROTOCOL_NAME.into(), messages: data, }; test_handle_event(event).await; } - // //FIXME temp ignore for #139 - // #[ignore] - // #[test] - // fn test_reconnected_nodes() { - // ::logger::init_for_test(); - // - // let mut node_config1 = NodeConfig::random_for_test().network; - // node_config1.listen = format!("/ip4/127.0.0.1/tcp/{}", config::get_random_available_port()) - // .parse() - // .unwrap(); - // - // let (service1, _net_tx1, _net_rx1, _event_rx1, _command_tx1) = - // build_network_service(&node_config1, HashValue::default(), PeerInfo::random()); - // - // thread::sleep(Duration::from_secs(1)); - // - // let mut node_config2 = NodeConfig::random_for_test().network; - // let addr1_hex = service1.identify().to_base58(); - // let seed: Multiaddr = format!("{}/p2p/{}", &node_config1.listen, addr1_hex) - // .parse() - // .unwrap(); - // node_config2.listen = format!("/ip4/127.0.0.1/tcp/{}", config::get_random_available_port()) - // .parse() - // .unwrap(); - // node_config2.seeds = vec![seed.clone()]; - // let (service2, _net_tx2, _net_rx2, _event_rx2, _command_tx2) = - // build_network_service(&node_config2, HashValue::default(), PeerInfo::random()); - // - // thread::sleep(Duration::from_secs(1)); - // - // let mut node_config3 = NodeConfig::random_for_test().network; - // node_config3.listen = format!("/ip4/127.0.0.1/tcp/{}", config::get_random_available_port()) - // .parse() - // .unwrap(); - // node_config3.seeds = vec![seed]; - // let (service3, _net_tx3, _net_rx3, _event_rx3, _command_tx3) = - // build_network_service(&node_config3, HashValue::default(), PeerInfo::random()); - // - // thread::sleep(Duration::from_secs(1)); - // - // let service1_clone = service1.clone(); - // let fut = async move { - // assert_eq!( - // service1_clone - // .is_connected(service2.identify().clone()) - // .await - // .unwrap(), - // true - // ); - // assert_eq!( - // service1_clone - // .is_connected(service3.identify().clone()) - // .await - // .unwrap(), - // true - // ); - // - // drop(service2); - // drop(service3); - // - // Delay::new(Duration::from_secs(1)).await; - // }; - // task::block_on(fut); - // - // thread::sleep(Duration::from_secs(10)); - // - // let (service2, _net_tx2, _net_rx2, _event_tx2, _command_tx2) = - // build_network_service(&node_config2, HashValue::default(), PeerInfo::random()); - // - // thread::sleep(Duration::from_secs(1)); - // - // let (service3, _net_tx3, _net_rx3, _event_rx3, _command_tx3) = - // build_network_service(&node_config3, HashValue::default(), PeerInfo::random()); - // - // thread::sleep(Duration::from_secs(1)); - // - // let fut = async move { - // assert_eq!( - // service1 - // .is_connected(service2.identify().clone()) - // .await - // .unwrap(), - // true - // ); - // assert_eq!( - // service1 - // .is_connected(service3.identify().clone()) - // .await - // .unwrap(), - // true - // ); - // }; - // task::block_on(fut); - // } + //TOD FIXME provider a shutdown network method, shutdown first, then start. + #[ignore] + #[stest::test] + fn test_reconnected_nodes() { + let protocols: Vec> = vec![TEST_NOTIF_PROTOCOL_NAME.into()]; + let mut node_config1 = NodeConfig::random_for_test().network; + node_config1.listen = format!("/ip4/127.0.0.1/tcp/{}", config::get_random_available_port()) + .parse() + .unwrap(); + + let chain_info = ChainInfo::new( + BuiltinNetworkID::Test.chain_id(), + HashValue::random(), + ChainStatus::random(), + ); + + let (service1, _net_tx1, _net_rx1, _event_rx1, _command_tx1) = + build_network_service(chain_info.clone(), &node_config1, protocols.clone(), None); + + thread::sleep(Duration::from_secs(1)); + + let mut node_config2 = NodeConfig::random_for_test().network; + let addr1_hex = service1.identify().to_base58(); + let seed: MultiaddrWithPeerId = format!("{}/p2p/{}", &node_config1.listen, addr1_hex) + .parse() + .unwrap(); + node_config2.listen = format!("/ip4/127.0.0.1/tcp/{}", config::get_random_available_port()) + .parse() + .unwrap(); + node_config2.seeds = vec![seed.clone()]; + let (service2, net_tx2, net_rx2, event_rx2, command_tx2) = + build_network_service(chain_info.clone(), &node_config2, protocols.clone(), None); + + thread::sleep(Duration::from_secs(1)); + + let mut node_config3 = NodeConfig::random_for_test().network; + node_config3.listen = format!("/ip4/127.0.0.1/tcp/{}", config::get_random_available_port()) + .parse() + .unwrap(); + node_config3.seeds = vec![seed]; + let (service3, net_tx3, net_rx3, event_rx3, command_tx3) = + build_network_service(chain_info.clone(), &node_config3, protocols.clone(), None); + + thread::sleep(Duration::from_secs(1)); + + let service1_clone = service1.clone(); + let fut = async move { + assert_eq!( + service1_clone + .is_connected(service2.identify().clone()) + .await + .unwrap(), + true + ); + assert_eq!( + service1_clone + .is_connected(service3.identify().clone()) + .await + .unwrap(), + true + ); + + drop(service2); + drop(service3); + + drop(net_tx2); + drop(net_rx2); + drop(event_rx2); + + drop(net_tx3); + drop(net_rx3); + drop(event_rx3); + + command_tx2.close_channel(); + command_tx3.close_channel(); + Delay::new(Duration::from_secs(1)).await; + }; + task::block_on(fut); + + thread::sleep(Duration::from_secs(1)); + + let (service2, _net_tx2, _net_rx2, _event_tx2, _command_tx2) = + build_network_service(chain_info.clone(), &node_config2, protocols.clone(), None); + + thread::sleep(Duration::from_secs(1)); + + let (service3, _net_tx3, _net_rx3, _event_rx3, _command_tx3) = + build_network_service(chain_info, &node_config3, protocols.clone(), None); + + thread::sleep(Duration::from_secs(1)); + + let fut = async move { + assert_eq!( + service1 + .is_connected(service2.identify().clone()) + .await + .unwrap(), + true + ); + assert_eq!( + service1 + .is_connected(service3.identify().clone()) + .await + .unwrap(), + true + ); + }; + task::block_on(fut); + } } diff --git a/network/src/network.rs b/network/src/network.rs index 2ab0db2eb2..9f69cf8e4c 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -393,12 +393,11 @@ impl Inner { match ¬ification { NotificationMessage::CompactBlock(msg) => { let id = msg.compact_block.header.id(); - debug!("broadcast new compact block message {:?}", id); let block_header = msg.compact_block.header.clone(); let total_difficulty = msg.total_difficulty; if let Some(peer_info) = self.peers.lock().await.get_mut(self_id) { debug!( - "total_difficulty is {}, peer_info is {:?}", + "update self network chain status, total_difficulty is {}, peer_info is {:?}", total_difficulty, peer_info ); @@ -418,7 +417,7 @@ impl Inner { if peer_info.known_blocks.contains(&id) || peer_info.peer_info.total_difficulty() >= total_difficulty { - debug!("peer({:?})'s total_difficulty is > block({:?})'s total_difficulty or it know this block, so do not broadcast. ", peer_id, id); + debug!("peer({:?})'s total_difficulty is >= block({:?})'s total_difficulty or it know this block, so do not broadcast. ", peer_id, id); None }else{ peer_info.known_blocks.put(id, ()); @@ -428,10 +427,17 @@ impl Inner { )) } }).collect::>(); - futures::future::join_all(send_futures) - .await - .into_iter() - .collect::>()?; + debug!( + "[network] broadcast new compact block message {:?} to {} peers", + id, + send_futures.len() + ); + if !send_futures.is_empty() { + futures::future::join_all(send_futures) + .await + .into_iter() + .collect::>()?; + } Ok(()) } NotificationMessage::Transactions(msg) => { @@ -465,10 +471,17 @@ impl Inner { ))) }) .collect::>(); - futures::future::join_all(send_futures) - .await - .into_iter() - .collect::>()?; + debug!( + "[network] broadcast new {} transactions to {} peers", + msg.txns.len(), + send_futures.len() + ); + if !send_futures.is_empty() { + futures::future::join_all(send_futures) + .await + .into_iter() + .collect::>()?; + } Ok(()) } } diff --git a/sync/tests/full_sync_test.rs b/sync/tests/full_sync_test.rs index 37b3c71450..f1b69ae730 100644 --- a/sync/tests/full_sync_test.rs +++ b/sync/tests/full_sync_test.rs @@ -16,6 +16,7 @@ fn test_full_sync() { test_sync::test_sync(SyncMode::FULL) } +#[ignore] #[stest::test(timeout = 120)] fn test_sync_by_notification() { let first_config = Arc::new(NodeConfig::random_for_test()); @@ -26,6 +27,9 @@ fn test_sync_by_notification() { let first_node = run_node_by_config(first_config.clone()).unwrap(); let first_chain = first_node.chain_service().unwrap(); + //wait node start + sleep(Duration::from_millis(1000)); + let mut second_config = NodeConfig::random_for_test(); info!( "second peer : {:?}", @@ -42,8 +46,8 @@ fn test_sync_by_notification() { let second_chain = second_node.chain_service().unwrap(); - //wait second node sync service stop. - sleep(Duration::from_millis(500)); + //wait node start and sync service stop. + sleep(Duration::from_millis(1000)); let count = 5; for _i in 0..count {