Skip to content

Commit

Permalink
[sync] Add a limit to rollback block count, for avoid use too much m…
Browse files Browse the repository at this point in the history
…emory (#1773)

* [sync] Add a limit to rollback block count, for avoid usage too memory.

Optimize BlockConnectedEvent, only try connect block when sync chain's total_difficulty > node's current chain.

* [metric] Fix node panic for block_latency metric. resolve #1771
  • Loading branch information
jolestar authored Dec 8, 2020
1 parent 01675ea commit 5d8345b
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 33 deletions.
8 changes: 5 additions & 3 deletions node/src/peer_message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ impl PeerMessageHandler for NodePeerMessageHandler {
}
NotificationMessage::CompactBlock(message) => {
let header_time = message.compact_block.header.timestamp;
NODE_METRICS
.block_latency
.observe((duration_since_epoch().as_millis() - header_time as u128) as f64);
NODE_METRICS.block_latency.observe(
duration_since_epoch()
.as_millis()
.saturating_sub(header_time as u128) as f64,
);
if let Err(e) = self
.block_relayer
.notify(PeerCompactBlockMessage::new(peer_message.peer_id, *message))
Expand Down
1 change: 1 addition & 0 deletions sync/src/block_connector/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub static WRITE_BLOCK_CHAIN_METRICS: Lazy<ChainMetrics> =

#[derive(Clone)]
pub struct ChainMetrics {
//TODO change Int to UInt
pub try_connect_count: IntCounter,
pub duplicate_conn_count: IntCounter,
pub broadcast_head_count: IntCounter,
Expand Down
58 changes: 38 additions & 20 deletions sync/src/block_connector/write_block_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use starcoin_vm_types::on_chain_config::GlobalTimeOnChain;
use std::sync::Arc;
use traits::{ChainReader, ChainWriter, ConnectBlockError, WriteableChainService};

const MAX_ROLL_BACK_BLOCK: usize = 10;

pub struct WriteBlockChainService<P>
where
P: TxPoolSyncService,
Expand Down Expand Up @@ -123,26 +125,28 @@ where
let mut map_be_uncles = Vec::new();
let parent_is_main_head = self.is_main_head(&block_header.parent_hash());
if branch_total_difficulty > main_total_difficulty {
let (enacted_blocks, retracted_blocks) = if !parent_is_main_head {
self.find_ancestors_from_accumulator(&new_branch)?
} else {
(vec![block.clone()], vec![])
};
let (enacted_count, enacted_blocks, retracted_count, retracted_blocks) =
if !parent_is_main_head {
self.find_ancestors_from_accumulator(&new_branch)?
} else {
(1, vec![block.clone()], 0, vec![])
};

debug_assert!(!enacted_blocks.is_empty());
debug_assert_eq!(enacted_blocks.last().unwrap(), &block);
self.update_main(new_branch);
if !parent_is_main_head {
WRITE_BLOCK_CHAIN_METRICS
.rollback_block_size
.set(retracted_blocks.len() as i64);
.set(retracted_count as i64);
}
self.commit_2_txpool(enacted_blocks, retracted_blocks);
WRITE_BLOCK_CHAIN_METRICS.broadcast_head_count.inc();
self.config
.net()
.time_service()
.adjust(GlobalTimeOnChain::new(block.header().timestamp));
info!("[chain] Select new head, id: {}, number: {}, total_difficulty: {}, enacted_block_count: {}, retracted_block_count: {}", block_header.id(), block_header.number,branch_total_difficulty, enacted_count, retracted_count);
self.broadcast_new_head(BlockDetail::new(block, branch_total_difficulty));
} else {
//send new branch event
Expand Down Expand Up @@ -177,7 +181,7 @@ where
fn find_ancestors_from_accumulator(
&self,
new_branch: &BlockChain,
) -> Result<(Vec<Block>, Vec<Block>)> {
) -> Result<(u64, Vec<Block>, u64, Vec<Block>)> {
let new_header_number = new_branch.current_header().number();
let main_header_number = self.get_main().current_header().number();
let mut number = if new_header_number >= main_header_number {
Expand Down Expand Up @@ -212,29 +216,43 @@ where
);

let ancestor = ancestor.expect("Ancestor is none.");
let enacted = self.find_blocks_until(block_enacted, ancestor)?;
let retracted = self.find_blocks_until(block_retracted, ancestor)?;
let ancestor_block = self
.main
.get_block(ancestor)?
.ok_or_else(|| format_err!("Can not find block by id:{}", ancestor))?;
let enacted_count = new_branch.current_header().number() - ancestor_block.header().number();
let retracted_count =
self.main.current_header().number() - ancestor_block.header().number();
let enacted = self.find_blocks_until(block_enacted, ancestor, MAX_ROLL_BACK_BLOCK)?;
let retracted = self.find_blocks_until(block_retracted, ancestor, MAX_ROLL_BACK_BLOCK)?;

debug!(
"commit block num:{}, rollback block num:{}",
enacted.len(),
retracted.len(),
"Commit block count:{}, rollback block count:{}",
enacted_count, retracted_count,
);
Ok((enacted, retracted))
Ok((enacted_count, enacted, retracted_count, retracted))
}

fn find_blocks_until(&self, from: HashValue, until: HashValue) -> Result<Vec<Block>> {
fn find_blocks_until(
&self,
from: HashValue,
until: HashValue,
max_size: usize,
) -> Result<Vec<Block>> {
let mut blocks: Vec<Block> = Vec::new();
let mut tmp = from;
let mut block_id = from;
loop {
if tmp == until {
if block_id == until {
break;
};
}
if blocks.len() >= max_size {
break;
}
let block = self
.storage
.get_block(tmp)?
.ok_or_else(|| format_err!("Can not find block {:?}.", tmp))?;
tmp = block.header().parent_hash();
.get_block(block_id)?
.ok_or_else(|| format_err!("Can not find block {:?}.", block_id))?;
block_id = block.header().parent_hash();
blocks.push(block);
}
blocks.reverse();
Expand Down
29 changes: 21 additions & 8 deletions sync/src/tasks/block_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::future::BoxFuture;
use futures::FutureExt;
use logger::prelude::*;
use starcoin_accumulator::{Accumulator, MerkleAccumulator};
use starcoin_chain_api::ChainWriter;
use starcoin_chain_api::{ChainReader, ChainWriter};
use starcoin_types::block::{Block, BlockInfo, BlockNumber};
use starcoin_vm_types::on_chain_config::GlobalTimeOnChain;
use std::collections::HashMap;
Expand Down Expand Up @@ -143,23 +143,32 @@ impl TaskState for BlockSyncTask {
}

pub struct BlockCollector {
//node's current block info
current_block_info: BlockInfo,
// the block chain init by ancestor
chain: BlockChain,
event_handle: Box<dyn BlockConnectedEventHandle>,
}

impl BlockCollector {
pub fn new(chain: BlockChain) -> Self {
pub fn new(current_block_info: BlockInfo, chain: BlockChain) -> Self {
Self {
current_block_info,
chain,
event_handle: Box::new(NoOpEventHandle),
}
}

pub fn new_with_handle<H>(chain: BlockChain, event_handle: H) -> Self
pub fn new_with_handle<H>(
current_block_info: BlockInfo,
chain: BlockChain,
event_handle: H,
) -> Self
where
H: BlockConnectedEventHandle + 'static,
{
Self {
current_block_info,
chain,
event_handle: Box::new(event_handle),
}
Expand Down Expand Up @@ -187,11 +196,15 @@ impl TaskResultCollector<(Block, Option<BlockInfo>)> for BlockCollector {
self.chain
.time_service()
.adjust(GlobalTimeOnChain::new(timestamp));
if let Err(e) = self.event_handle.handle(BlockConnectedEvent { block }) {
error!(
"Send BlockConnectedEvent error: {:?}, block_id: {}",
e, block_id
);
let total_difficulty = self.chain.get_total_difficulty()?;
// only try connect block when sync chain total_difficulty > node's current chain.
if total_difficulty > self.current_block_info.total_difficulty {
if let Err(e) = self.event_handle.handle(BlockConnectedEvent { block }) {
error!(
"Send BlockConnectedEvent error: {:?}, block_id: {}",
e, block_id
);
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions sync/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where
let target_block_number = target.block_accumulator_info.num_leaves - 1;
let target_block_accumulator = target.block_accumulator_info;

let current_block_accumulator_info = current_block_info.block_accumulator_info;
let current_block_accumulator_info = current_block_info.block_accumulator_info.clone();

let accumulator_task_fetcher = fetcher.clone();
let block_task_fetcher = fetcher.clone();
Expand Down Expand Up @@ -280,7 +280,7 @@ where
1,
);
let chain = BlockChain::new(time_service, ancestor.id, chain_storage)?;
let block_collector = BlockCollector::new_with_handle(chain, block_event_handle);
let block_collector = BlockCollector::new_with_handle(current_block_info, chain, block_event_handle);
Ok(TaskGenerator::new(
block_sync_task,
5,
Expand Down

0 comments on commit 5d8345b

Please sign in to comment.