Skip to content

Commit

Permalink
Merge 877637c into ae80c23
Browse files Browse the repository at this point in the history
  • Loading branch information
jolestar authored Mar 30, 2021
2 parents ae80c23 + 877637c commit c886b38
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 34 deletions.
6 changes: 1 addition & 5 deletions block-relayer/src/block_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl BlockRelayer {
debug!("Receive peer compact block event from peer id:{}", peer_id);
let block_id = compact_block.header.id();
if let Ok(Some(_)) = txpool.get_store().get_failed_block_by_id(block_id) {
debug!("Block is failed block : {:?}", block_id);
warn!("Block is failed block : {:?}", block_id);
} else {
let peers = network.peer_set().await?;
let peer_selector = PeerSelector::new(peers, PeerStrategy::default());
Expand Down Expand Up @@ -209,10 +209,6 @@ impl EventHandler<Self, PeerCompactBlockMessage> for BlockRelayer {
compact_block_msg: PeerCompactBlockMessage,
ctx: &mut ServiceContext<BlockRelayer>,
) {
if !self.is_synced() {
debug!("[block-relay] Ignore PeerCompactBlockMessage because the node has not been synchronized yet.");
return;
}
let sync_status = self
.sync_status
.as_ref()
Expand Down
2 changes: 2 additions & 0 deletions config/example/barnard/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ interval = 0
miner_thread = 3

[network]
max_incoming_peers = 25
max_outgoing_peers = 75
node_name = "alice-node1"
seeds = ["/ip4/1.2.3.3/tcp/9840/p2p/QmRZ6ZwVzhJ6xpVV1CEve2RKiUzK4y2pSx3eg2cvQMsT4f", "/ip4/1.2.3.4/tcp/9840/p2p/12D3KooWCfUex27aoqaKScponiLB4N4FWbgmbHYjVoRebGrQaRYk"]

Expand Down
2 changes: 2 additions & 0 deletions config/example/halley/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ interval = 0
miner_thread = 3

[network]
max_incoming_peers = 25
max_outgoing_peers = 75
node_name = "alice-node1"
seeds = ["/ip4/1.2.3.3/tcp/9840/p2p/QmRZ6ZwVzhJ6xpVV1CEve2RKiUzK4y2pSx3eg2cvQMsT4f", "/ip4/1.2.3.4/tcp/9840/p2p/12D3KooWCfUex27aoqaKScponiLB4N4FWbgmbHYjVoRebGrQaRYk"]

Expand Down
2 changes: 2 additions & 0 deletions config/example/proxima/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ interval = 0
miner_thread = 3

[network]
max_incoming_peers = 25
max_outgoing_peers = 75
node_name = "alice-node1"
seeds = ["/ip4/1.2.3.3/tcp/9840/p2p/QmRZ6ZwVzhJ6xpVV1CEve2RKiUzK4y2pSx3eg2cvQMsT4f", "/ip4/1.2.3.4/tcp/9840/p2p/12D3KooWCfUex27aoqaKScponiLB4N4FWbgmbHYjVoRebGrQaRYk"]

Expand Down
35 changes: 28 additions & 7 deletions config/src/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,25 @@ pub struct NetworkConfig {

#[serde(skip_serializing_if = "Option::is_none")]
#[structopt(long)]
/// min peers to propagate new block and new transactions. Default to 8.
/// min peers to propagate new block and new transactions. Default 8.
min_peers_to_propagate: Option<u32>,

#[serde(skip_serializing_if = "Option::is_none")]
#[structopt(long)]
///max peers to propagate new block and new transactions. Default to 128.
///max peers to propagate new block and new transactions. Default 128.
max_peers_to_propagate: Option<u32>,

#[serde(skip_serializing_if = "Option::is_none")]
#[structopt(long)]
///max count for incoming peers. Default 25.
max_incoming_peers: Option<u32>,

#[serde(skip_serializing_if = "Option::is_none")]
#[structopt(long)]
///max count for outgoing connected peers. Default 75.
/// max peers = max_incoming_peers + max_outgoing_peers
max_outgoing_peers: Option<u32>,

#[serde(skip_serializing_if = "Option::is_none")]
#[structopt(long)]
/// p2p network listen address, Default is /ip4/0.0.0.0/tcp/9840
Expand Down Expand Up @@ -296,6 +307,14 @@ impl NetworkConfig {
self.min_peers_to_propagate.clone().unwrap_or(8)
}

pub fn max_incoming_peers(&self) -> u32 {
self.max_incoming_peers.clone().unwrap_or(25)
}

pub fn max_outgoing_peers(&self) -> u32 {
self.max_outgoing_peers.clone().unwrap_or(75)
}

pub fn node_name(&self) -> String {
self.node_name.clone().unwrap_or_else(generate_node_name)
}
Expand Down Expand Up @@ -367,11 +386,6 @@ impl ConfigModule for NetworkConfig {
self.disable_seed = opt.network.disable_seed;
}

info!(
"Final bootstrap seeds: {:?}, disable_seed: {}",
self.seeds, self.disable_seed
);

self.network_rpc_quotas
.merge(&opt.network.network_rpc_quotas)?;

Expand Down Expand Up @@ -400,6 +414,13 @@ impl ConfigModule for NetworkConfig {
self.discover_local = opt.network.discover_local;
}

if opt.network.max_incoming_peers.is_some() {
self.max_incoming_peers = opt.network.max_incoming_peers;
}
if opt.network.max_outgoing_peers.is_some() {
self.max_outgoing_peers = opt.network.max_outgoing_peers;
}

self.load_or_generate_keypair()?;
self.generate_listen_address();
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions config/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ fn test_example_config_compact() -> Result<()> {
"alice-node1",
"--discover-local",
"true",
"--max-incoming-peers",
"25",
"--max-outgoing-peers",
"75",
//P2P
"--p2prpc-default-global-api-quota",
"2000/s",
Expand Down
4 changes: 2 additions & 2 deletions kube/manifest/starcoin-barnard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ spec:
spec:
containers:
- name: starcoin
image: starcoin/starcoin:v1.0.0-alpha.1
image: starcoin/starcoin:v1.0.0-beta.2
imagePullPolicy: Always
command:
- bash
Expand All @@ -50,7 +50,7 @@ spec:
if [ ! -z $node_key ]; then
node_key_flag="--node-key ${node_key}";
fi;
/starcoin/starcoin -n barnard --discover-local true -d /sc-data $node_key_flag;
/starcoin/starcoin -n barnard --discover-local true --disable-miner-client true --min-peers-to-propagate 512 --max-peers-to-propagate 1024 --max-outgoing-peers 512 --max-incoming-peers 512 -d /sc-data $node_key_flag;
ports:
- containerPort: 9840
hostPort: 9840
Expand Down
31 changes: 13 additions & 18 deletions network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ActorService for NetworkActorService {

impl EventHandler<Self, SyncStatusChangeEvent> for NetworkActorService {
fn handle_event(&mut self, msg: SyncStatusChangeEvent, _ctx: &mut ServiceContext<Self>) {
self.inner.update_sync_status(msg.0);
self.inner.update_chain_status(msg.0);
}
}

Expand Down Expand Up @@ -294,7 +294,6 @@ pub(crate) struct Inner {
self_peer: Peer,
peers: HashMap<PeerId, Peer>,
peer_message_handler: Arc<dyn PeerMessageHandler>,
sync_status: Option<SyncStatus>,
metrics: Option<NetworkMetrics>,
score_handler: Arc<dyn Score<BlockBroadcastEntry> + 'static>,
}
Expand All @@ -317,26 +316,17 @@ impl Inner {
self_peer: Peer::new(self_info),
peers: HashMap::new(),
peer_message_handler: Arc::new(peer_message_handler),
sync_status: None,
metrics,
score_handler: Arc::new(LinearScore::new(10)),
})
}

pub(crate) fn is_synced(&self) -> bool {
match self.sync_status.as_ref() {
Some(sync_status) => sync_status.is_synced(),
None => false,
}
}

pub(crate) fn update_sync_status(&mut self, sync_status: SyncStatus) {
pub(crate) fn update_chain_status(&mut self, sync_status: SyncStatus) {
let chain_status = sync_status.chain_status().clone();
self.self_peer
.peer_info
.update_chain_status(chain_status.clone());
self.network_service.update_chain_status(chain_status);
self.sync_status = Some(sync_status);
}

pub(crate) fn handle_network_message(
Expand Down Expand Up @@ -406,12 +396,8 @@ impl Inner {
};

if let Some(notification) = notification {
if self.is_synced() {
let peer_message = PeerMessage::new(peer_id.clone(), notification);
self.peer_message_handler.handle_message(peer_message);
} else {
debug!("Ignore notification message from peer: {}, protocol: {} , because node is not synchronized.", peer_id, protocol);
}
let peer_message = PeerMessage::new(peer_id.clone(), notification);
self.peer_message_handler.handle_message(peer_message);
BROADCAST_SCORE_METRICS.report_new(
peer_id,
self.score_handler
Expand Down Expand Up @@ -487,6 +473,15 @@ impl Inner {
"update self network chain status, total_difficulty is {}, peer_info is {:?}",
total_difficulty, self.self_peer.peer_info
);
//Update chain status in two case:
//1. New Block broadcast
//2. Sync status change.
// may be update by repeat message, but can not find a more good way.
self.network_service.update_chain_status(ChainStatus::new(
msg.compact_block.header.clone(),
msg.block_info.clone(),
));

self.self_peer.known_blocks.put(id, ());
let mut send_peer_count: usize = 0;
let (protocol_name, message) = notification
Expand Down
7 changes: 6 additions & 1 deletion network/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::*;
use bitflags::_core::time::Duration;
use futures::channel::mpsc::channel;
use futures::prelude::*;
use log::{debug, error};
use log::{debug, error, info};
use network_p2p::config::{RequestResponseConfig, TransportConfig};
use network_p2p::{
identity, NetworkConfiguration, NetworkWorker, NodeKeyConfig, Params, ProtocolId, Secret,
Expand Down Expand Up @@ -74,6 +74,9 @@ pub fn build_network_worker(
};
let allow_non_globals_in_dht = discover_local;
let boot_nodes = node_config.network.seeds();

info!("Final bootstrap seeds: {:?}", boot_nodes);

let config = NetworkConfiguration {
listen_addresses: vec![node_config.network.listen()],
boot_nodes,
Expand All @@ -84,6 +87,8 @@ pub fn build_network_worker(
.expect("decode network node key should success.");
NodeKeyConfig::Ed25519(Secret::Input(secret))
},
in_peers: node_config.network.max_incoming_peers(),
out_peers: node_config.network.max_outgoing_peers(),
notifications_protocols: protocols,
request_response_protocols: rpc_protocols,
transport: transport_config,
Expand Down
2 changes: 1 addition & 1 deletion txpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ impl EventHandler<Self, TxnStatusFullEvent> for TxPoolActorService {

impl EventHandler<Self, PeerTransactionsMessage> for TxPoolActorService {
fn handle_event(&mut self, msg: PeerTransactionsMessage, _ctx: &mut ServiceContext<Self>) {
//TODO should filter msg an NetworkService
if self.is_synced() {
// JUST need to keep at most once delivery.
let _ = self.inner.import_txns(msg.message.txns);
} else {
//TODO should keep txn in a buffer, then execute after sync finished.
debug!("[txpool] Ignore PeerTransactions event because the node has not been synchronized yet.");
}
}
Expand Down

0 comments on commit c886b38

Please sign in to comment.