diff --git a/Cargo.lock b/Cargo.lock index 0925b72..8e3de64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -727,6 +727,7 @@ dependencies = [ "ckb-chain-spec", "ckb-constant", "ckb-error", + "ckb-hash", "ckb-jsonrpc-types", "ckb-launcher", "ckb-merkle-mountain-range", diff --git a/Cargo.toml b/Cargo.toml index fdcc765..bc2bd93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ ckb-traits = "0.108.0" ckb-resource = "0.108.0" ckb-verification = "0.108.0" ckb-systemtime = "0.108.0" +ckb-hash = "0.108.0" ckb-merkle-mountain-range = "0.5.1" golomb-coded-set = "0.2.0" rocksdb = { package = "ckb-rocksdb", version ="=0.19.0", features = ["snappy"], default-features = false } diff --git a/src/main.rs b/src/main.rs index 12b467a..62e735d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,9 @@ mod types; mod utils; mod verify; +// TODO Remove the patches if the code was merged into CKB. +mod patches; + use config::AppConfig; fn main() -> anyhow::Result<()> { diff --git a/src/patches.rs b/src/patches.rs new file mode 100644 index 0000000..e7e17fd --- /dev/null +++ b/src/patches.rs @@ -0,0 +1,73 @@ +use std::io::{Cursor, Write}; + +#[cfg(not(test))] +use ckb_hash::blake2b_256; +use golomb_coded_set::{GCSFilterWriter, SipHasher24Builder, M, P}; + +use ckb_types::{core::TransactionView, packed, prelude::*}; + +/// Provides data for building block filter data. +pub trait FilterDataProvider { + /// Finds the cell through its out point. + fn cell(&self, out_point: &packed::OutPoint) -> Option; +} + +/// Builds filter data for transactions. +pub fn build_filter_data( + provider: P, + transactions: &[TransactionView], +) -> (Vec, Vec) { + let mut filter_writer = Cursor::new(Vec::new()); + let mut filter = build_gcs_filter(&mut filter_writer); + let mut missing_out_points = Vec::new(); + for tx in transactions { + if !tx.is_cellbase() { + for out_point in tx.input_pts_iter() { + if let Some(input_cell) = provider.cell(&out_point) { + filter.add_element(input_cell.calc_lock_hash().as_slice()); + if let Some(type_script) = input_cell.type_().to_opt() { + filter.add_element(type_script.calc_script_hash().as_slice()); + } + } else { + missing_out_points.push(out_point); + } + } + } + for output_cell in tx.outputs() { + filter.add_element(output_cell.calc_lock_hash().as_slice()); + if let Some(type_script) = output_cell.type_().to_opt() { + filter.add_element(type_script.calc_script_hash().as_slice()); + } + } + } + filter + .finish() + .expect("flush to memory writer should be OK"); + let filter_data = filter_writer.into_inner(); + (filter_data, missing_out_points) +} + +/// Calculates a block filter hash. +#[cfg(not(test))] +pub fn calc_filter_hash( + parent_block_filter_hash: &packed::Byte32, + filter_data: &packed::Bytes, +) -> [u8; 32] { + blake2b_256( + [ + parent_block_filter_hash.as_slice(), + filter_data.calc_raw_data_hash().as_slice(), + ] + .concat(), + ) +} + +// TODO Use real block filter hashes in unit tests. +#[cfg(test)] +pub fn calc_filter_hash(_: &packed::Byte32, _: &packed::Bytes) -> [u8; 32] { + Default::default() +} + +fn build_gcs_filter(out: &mut dyn Write) -> GCSFilterWriter { + GCSFilterWriter::new(out, SipHasher24Builder::new(0, 0), M, P) +} diff --git a/src/protocols/filter/block_filter.rs b/src/protocols/filter/block_filter.rs index 7f179ec..a311fb9 100644 --- a/src/protocols/filter/block_filter.rs +++ b/src/protocols/filter/block_filter.rs @@ -6,13 +6,21 @@ use ckb_constant::sync::INIT_BLOCKS_IN_TRANSIT_PER_PEER; use ckb_network::{async_trait, bytes::Bytes, CKBProtocolContext, CKBProtocolHandler, PeerIndex}; use ckb_types::{core::BlockNumber, packed, prelude::*}; use golomb_coded_set::{GCSFilterReader, SipHasher24Builder, M, P}; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, log_enabled, trace, warn, Level}; +use rand::seq::SliceRandom as _; use std::io::Cursor; use std::sync::RwLock; use std::time::Instant; use std::{sync::Arc, time::Duration}; pub(crate) const GET_BLOCK_FILTERS_TOKEN: u64 = 0; +pub(crate) const GET_BLOCK_FILTER_HASHES_TOKEN: u64 = 1; +pub(crate) const GET_BLOCK_FILTER_CHECK_POINTS_TOKEN: u64 = 2; + +pub(crate) const GET_BLOCK_FILTERS_DURATION: Duration = Duration::from_secs(3); +pub(crate) const GET_BLOCK_FILTER_HASHES_DURATION: Duration = Duration::from_secs(10); +pub(crate) const GET_BLOCK_FILTER_CHECK_POINTS_DURATION: Duration = Duration::from_secs(30); + const GET_BLOCK_FILTERS_TIMEOUT: Duration = Duration::from_secs(15); pub struct FilterProtocol { @@ -68,17 +76,145 @@ impl FilterProtocol { .collect() } - fn should_ask(&self) -> bool { + fn should_ask(&self, immediately: bool) -> bool { !self.storage.is_filter_scripts_empty() - && (self.last_ask_time.read().unwrap().is_none() + && (immediately + || self.last_ask_time.read().unwrap().is_none() || self.last_ask_time.read().unwrap().unwrap().elapsed() > GET_BLOCK_FILTERS_TIMEOUT) } pub fn update_min_filtered_block_number(&self, block_number: BlockNumber) { self.storage.update_min_filtered_block_number(block_number); + self.peers.update_min_filtered_block_number(block_number); self.last_ask_time.write().unwrap().replace(Instant::now()); } + + pub(crate) fn try_send_get_block_filters( + &self, + nc: Arc, + immediately: bool, + ) { + let start_number = self.storage.get_min_filtered_block_number() + 1; + let (finalized_check_point_index, _) = self.storage.get_last_check_point(); + let could_ask_more = self + .peers + .could_request_more_block_filters(finalized_check_point_index, start_number); + if log_enabled!(Level::Trace) { + let finalized_check_point_number = self + .peers + .calc_check_point_number(finalized_check_point_index); + let (cached_check_point_index, cached_hashes) = + self.peers.get_cached_block_filter_hashes(); + let cached_check_point_number = + self.peers.calc_check_point_number(cached_check_point_index); + let next_cached_check_point_number = self + .peers + .calc_check_point_number(cached_check_point_index + 1); + trace!( + "could request block filters from {} or not: {}, \ + finalized: index {}, number {}; \ + cached: index {}, number {}, length {}; \ + next cached: number {}", + start_number, + could_ask_more, + finalized_check_point_index, + finalized_check_point_number, + cached_check_point_index, + cached_check_point_number, + cached_hashes.len(), + next_cached_check_point_number + ); + } + if let Some((peer, _prove_state)) = self + .peers + .get_all_prove_states() + .iter() + .max_by_key(|(_, prove_state)| prove_state.get_last_header().total_difficulty()) + { + debug!("found best proved peer {}", peer); + + let mut matched_blocks = self.peers.matched_blocks().write().expect("poisoned"); + if let Some((db_start_number, blocks_count, db_blocks)) = + self.storage.get_earliest_matched_blocks() + { + debug!( + "try recover matched blocks from storage, start_number={}, \ + blocks_count={}, matched_count: {}", + db_start_number, + blocks_count, + matched_blocks.len(), + ); + if matched_blocks.is_empty() { + // recover matched blocks from storage + self.peers + .add_matched_blocks(&mut matched_blocks, db_blocks); + let tip_header = self.storage.get_tip_header(); + prove_or_download_matched_blocks( + Arc::clone(&self.peers), + &tip_header, + &matched_blocks, + nc.as_ref(), + INIT_BLOCKS_IN_TRANSIT_PER_PEER, + ); + if could_ask_more { + debug!( + "send get block filters to {}, start_number={}", + peer, start_number + ); + self.send_get_block_filters(nc, *peer, start_number); + } + } + } else if self.should_ask(immediately) && could_ask_more { + debug!( + "send get block filters to {}, start_number={}", + peer, start_number + ); + self.send_get_block_filters(nc, *peer, start_number); + } else { + trace!("no block filters is required to download"); + } + } else { + debug!("cannot find peers which are proved"); + } + } + + pub(crate) fn try_send_get_block_filter_hashes(&self, nc: Arc) { + let min_filtered_block_number = self.storage.get_min_filtered_block_number(); + self.peers + .update_min_filtered_block_number(min_filtered_block_number); + let finalized_check_point_index = self.storage.get_max_check_point_index(); + let cached_check_point_index = self.peers.get_cached_block_filter_hashes().0; + if let Some(start_number) = self + .peers + .if_cached_block_filter_hashes_require_update(finalized_check_point_index) + { + let best_peers = self + .peers + .get_all_proved_check_points() + .into_iter() + .filter_map(|(peer_index, (cpindex, _check_points))| { + if cpindex >= finalized_check_point_index { + Some(peer_index) + } else { + None + } + }) + .collect::>(); + if let Some(peer) = best_peers.choose(&mut rand::thread_rng()).cloned() { + self.send_get_block_filter_hashes(Arc::clone(&nc), peer, start_number); + } + } else if cached_check_point_index >= finalized_check_point_index { + let peers = self + .peers + .get_peers_which_require_more_latest_block_filter_hashes( + finalized_check_point_index, + ); + for (peer, start_number) in peers { + self.send_get_block_filter_hashes(Arc::clone(&nc), peer, start_number); + } + } + } } impl FilterProtocol { @@ -89,7 +225,12 @@ impl FilterProtocol { message: packed::BlockFilterMessageUnionReader<'_>, ) -> Status { match message { - // TODO: implement check points message processing + packed::BlockFilterMessageUnionReader::BlockFilterCheckPoints(reader) => { + components::BlockFilterCheckPointsProcess::new(reader, self, nc, peer).execute() + } + packed::BlockFilterMessageUnionReader::BlockFilterHashes(reader) => { + components::BlockFilterHashesProcess::new(reader, self, nc, peer).execute() + } packed::BlockFilterMessageUnionReader::BlockFilters(reader) => { components::BlockFiltersProcess::new(reader, self, nc, peer).execute() } @@ -101,8 +242,13 @@ impl FilterProtocol { &self, nc: Arc, peer: PeerIndex, - start_number: u64, + start_number: BlockNumber, ) { + trace!( + "request block filter from peer {}, starts at {}", + peer, + start_number + ); let content = packed::GetBlockFilters::new_builder() .start_number(start_number.pack()) .build(); @@ -110,7 +256,56 @@ impl FilterProtocol { .set(content) .build(); if let Err(err) = nc.send_message_to(peer, message.as_bytes()) { - let error_message = format!("nc.send_message BlockFilterMessage, error: {:?}", err); + let error_message = format!("nc.send_message GetBlockFilters, error: {:?}", err); + error!("{}", error_message); + } + } + + pub(crate) fn send_get_block_filter_hashes( + &self, + nc: Arc, + peer: PeerIndex, + start_number: BlockNumber, + ) { + trace!( + "request block filter hashes from peer {}, starts at {}", + peer, + start_number + ); + let content = packed::GetBlockFilterHashes::new_builder() + .start_number(start_number.pack()) + .build(); + let message = packed::BlockFilterMessage::new_builder() + .set(content) + .build(); + if let Err(err) = nc.send_message_to(peer, message.as_bytes()) { + let error_message = format!("nc.send_message GetBlockFilterHashes, error: {:?}", err); + error!("{}", error_message); + } + } + + pub(crate) fn send_get_block_filter_check_points( + &self, + nc: Arc, + peer: PeerIndex, + start_number: BlockNumber, + ) { + trace!( + "request check points from peer {}, starts at {}", + peer, + start_number + ); + let content = packed::GetBlockFilterCheckPoints::new_builder() + .start_number(start_number.pack()) + .build(); + let message = packed::BlockFilterMessage::new_builder() + .set(content) + .build(); + if let Err(err) = nc.send_message_to(peer, message.as_bytes()) { + let error_message = format!( + "nc.send_message GetBlockFilterCheckPoints, error: {:?}", + err + ); error!("{}", error_message); } } @@ -119,9 +314,21 @@ impl FilterProtocol { #[async_trait] impl CKBProtocolHandler for FilterProtocol { async fn init(&mut self, nc: Arc) { - nc.set_notify(Duration::from_secs(3), GET_BLOCK_FILTERS_TOKEN) + nc.set_notify(GET_BLOCK_FILTERS_DURATION, GET_BLOCK_FILTERS_TOKEN) .await .expect("set_notify should be ok"); + nc.set_notify( + GET_BLOCK_FILTER_HASHES_DURATION, + GET_BLOCK_FILTER_HASHES_TOKEN, + ) + .await + .expect("set_notify should be ok"); + nc.set_notify( + GET_BLOCK_FILTER_CHECK_POINTS_DURATION, + GET_BLOCK_FILTER_CHECK_POINTS_TOKEN, + ) + .await + .expect("set_notify should be ok"); } async fn connected( @@ -161,79 +368,21 @@ impl CKBProtocolHandler for FilterProtocol { let item_name = msg.item_name(); let status = self.try_process(Arc::clone(&nc), peer, msg); - trace!( - "FilterProtocol.received peer={}, message={}", - peer, - item_name - ); - if let Some(ban_time) = status.should_ban() { - error!( - "process {} from {}, ban {:?} since result is {}", - item_name, peer, ban_time, status - ); - nc.ban_peer(peer, ban_time, status.to_string()); - } else if status.should_warn() { - warn!("process {} from {}, result is {}", item_name, peer, status); - } else if !status.is_ok() { - debug!("process {} from {}, result is {}", item_name, peer, status); - } + status.process(nc, peer, "BlockFilter", item_name); } async fn notify(&mut self, nc: Arc, token: u64) { match token { GET_BLOCK_FILTERS_TOKEN => { - let proved_peers = self.peers.get_peers_which_are_proved(); - if let Some((peer, prove_state)) = proved_peers - .iter() - .max_by_key(|(_, prove_state)| prove_state.get_last_header().total_difficulty()) - { - let start_number = self.storage.get_min_filtered_block_number() + 1; - let prove_state_number = prove_state.get_last_header().header().number(); - debug!( - "found proved peer {}, start_number: {}, prove_state number: {:?}", - peer, - start_number, - prove_state.get_last_header().header().number() - ); - - let mut matched_blocks = self.peers.matched_blocks().write().expect("poisoned"); - if let Some((db_start_number, blocks_count, db_blocks)) = - self.storage.get_earliest_matched_blocks() - { - if matched_blocks.is_empty() { - debug!( - "recover matched blocks from storage, start_number={}, blocks_count={}, matched_count: {}", - db_start_number, blocks_count, - matched_blocks.len(), - ); - // recover matched blocks from storage - self.peers - .add_matched_blocks(&mut matched_blocks, db_blocks); - let tip_header = self.storage.get_tip_header(); - prove_or_download_matched_blocks( - Arc::clone(&self.peers), - &tip_header, - &matched_blocks, - nc.as_ref(), - INIT_BLOCKS_IN_TRANSIT_PER_PEER, - ); - if prove_state_number >= start_number { - debug!( - "send get block filters to {}, start_number={}", - peer, start_number - ); - self.send_get_block_filters(Arc::clone(&nc), *peer, start_number); - } - } - } else if self.should_ask() && prove_state_number >= start_number { - debug!( - "send get block filters to {}, start_number={}", - peer, start_number - ); - self.send_get_block_filters(Arc::clone(&nc), *peer, start_number); - } - } else { - debug!("cannot find peers which are proved"); + self.try_send_get_block_filters(nc, false); + } + GET_BLOCK_FILTER_HASHES_TOKEN => { + self.try_send_get_block_filter_hashes(nc); + } + GET_BLOCK_FILTER_CHECK_POINTS_TOKEN => { + let peers = self.peers.get_peers_which_require_more_check_points(); + for (peer, start_number) in peers { + self.send_get_block_filter_check_points(Arc::clone(&nc), peer, start_number); } } _ => unreachable!(), diff --git a/src/protocols/filter/components/block_filter_check_points_process.rs b/src/protocols/filter/components/block_filter_check_points_process.rs new file mode 100644 index 0000000..f7be2ec --- /dev/null +++ b/src/protocols/filter/components/block_filter_check_points_process.rs @@ -0,0 +1,79 @@ +use std::sync::Arc; + +use ckb_network::{CKBProtocolContext, PeerIndex}; +use ckb_types::{core::BlockNumber, packed, prelude::*}; +use log::trace; + +use crate::protocols::{FilterProtocol, Status, StatusCode}; + +pub struct BlockFilterCheckPointsProcess<'a> { + message: packed::BlockFilterCheckPointsReader<'a>, + protocol: &'a FilterProtocol, + nc: Arc, + peer_index: PeerIndex, +} + +impl<'a> BlockFilterCheckPointsProcess<'a> { + pub fn new( + message: packed::BlockFilterCheckPointsReader<'a>, + protocol: &'a FilterProtocol, + nc: Arc, + peer_index: PeerIndex, + ) -> Self { + Self { + message, + nc, + protocol, + peer_index, + } + } + + pub fn execute(self) -> Status { + let peer_state = if let Some(peer_state) = self.protocol.peers.get_state(&self.peer_index) { + peer_state + } else { + let errmsg = "peer is disconnected"; + return StatusCode::Ignore.with_context(errmsg); + }; + + let prove_number = if let Some(prove_state) = peer_state.get_prove_state() { + prove_state.get_last_header().header().number() + } else { + let errmsg = "peer is not proved"; + return StatusCode::Ignore.with_context(errmsg); + }; + + let start_number: BlockNumber = self.message.start_number().unpack(); + let check_points = self + .message + .block_filter_hashes() + .iter() + .map(|item| item.to_entity()) + .collect::>(); + + trace!( + "peer {}: last-state: {}, add check points (start: {}, len: {})", + self.peer_index, + peer_state, + start_number, + check_points.len() + ); + + let next_start_number_opt = return_if_failed!(self.protocol.peers.add_check_points( + self.peer_index, + prove_number, + start_number, + &check_points + )); + + if let Some(next_start_number) = next_start_number_opt { + self.protocol.send_get_block_filter_check_points( + self.nc, + self.peer_index, + next_start_number, + ); + } + + Status::ok() + } +} diff --git a/src/protocols/filter/components/block_filter_hashes_process.rs b/src/protocols/filter/components/block_filter_hashes_process.rs new file mode 100644 index 0000000..adb9920 --- /dev/null +++ b/src/protocols/filter/components/block_filter_hashes_process.rs @@ -0,0 +1,248 @@ +use std::sync::Arc; + +use ckb_network::{CKBProtocolContext, PeerIndex}; +use ckb_types::{core::BlockNumber, packed, prelude::*}; +use log::trace; +use rand::seq::SliceRandom as _; + +use crate::protocols::{FilterProtocol, Status, StatusCode}; + +pub struct BlockFilterHashesProcess<'a> { + message: packed::BlockFilterHashesReader<'a>, + protocol: &'a FilterProtocol, + nc: Arc, + peer_index: PeerIndex, +} + +impl<'a> BlockFilterHashesProcess<'a> { + pub fn new( + message: packed::BlockFilterHashesReader<'a>, + protocol: &'a FilterProtocol, + nc: Arc, + peer_index: PeerIndex, + ) -> Self { + Self { + message, + nc, + protocol, + peer_index, + } + } + + pub fn execute(self) -> Status { + let peer_state = if let Some(peer_state) = self.protocol.peers.get_state(&self.peer_index) { + peer_state + } else { + let errmsg = "peer is disconnected"; + return StatusCode::Ignore.with_context(errmsg); + }; + + let prove_number = if let Some(prove_state) = peer_state.get_prove_state() { + prove_state.get_last_header().header().number() + } else { + let errmsg = "peer is not proved"; + return StatusCode::Ignore.with_context(errmsg); + }; + + let start_number: BlockNumber = self.message.start_number().unpack(); + let parent_block_filter_hash = self.message.parent_block_filter_hash().to_entity(); + let block_filter_hashes = self + .message + .block_filter_hashes() + .iter() + .map(|item| item.to_entity()) + .collect::>(); + + trace!( + "peer {}: last-state: {}, add block filter hashes (start: {}, len: {}) \ + and parent block filter hash is {:#x}", + self.peer_index, + peer_state, + start_number, + block_filter_hashes.len(), + parent_block_filter_hash + ); + + let (finalized_check_point_index, finalized_check_point) = + self.protocol.storage.get_last_check_point(); + let finalized_check_point_number = self + .protocol + .peers + .calc_check_point_number(finalized_check_point_index); + + let (cached_check_point_index, cached_hashes) = + self.protocol.peers.get_cached_block_filter_hashes(); + let cached_check_point_number = self + .protocol + .peers + .calc_check_point_number(cached_check_point_index); + let next_cached_check_point_number = self + .protocol + .peers + .calc_check_point_number(cached_check_point_index + 1); + + trace!( + "finalized: index {}, number {}; \ + cached: index {}, number {}, length {}; \ + next cached: number {}", + finalized_check_point_index, + finalized_check_point_number, + cached_check_point_index, + cached_check_point_number, + cached_hashes.len(), + next_cached_check_point_number + ); + + if start_number <= finalized_check_point_number + && cached_check_point_number < start_number + && start_number <= next_cached_check_point_number + { + // Check block numbers. + let cached_last_number = cached_check_point_number + cached_hashes.len() as BlockNumber; + if start_number > cached_last_number + 1 { + let errmsg = format!( + "start number ({}) is continuous with cached last number ({})", + start_number, cached_last_number + ); + return StatusCode::Ignore.with_context(errmsg); + } + + // Check cached block filter hashes. + let (cached_check_point, next_cached_check_point) = { + let cached_check_points = self + .protocol + .storage + .get_check_points(cached_check_point_index, 2); + ( + cached_check_points[0].clone(), + cached_check_points[1].clone(), + ) + }; + + if start_number == cached_check_point_number + 1 { + if cached_check_point != parent_block_filter_hash { + let errmsg = format!( + "check point for block {} is {:#x} but parent hash is {:#x}", + start_number, cached_check_point, parent_block_filter_hash + ); + return StatusCode::BlockFilterHashesIsUnexpected.with_context(errmsg); + } + } else { + // This branch must be satisfied `start_number > cached_check_point_number + 1`. + let diff = start_number - cached_check_point_number; + let index = diff as usize - 2; + let cached_hash = &cached_hashes[index]; + if *cached_hash != parent_block_filter_hash { + let errmsg = format!( + "cached hash for block {} is {:#x} but parent hash is {:#x}", + start_number - 1, + cached_hash, + parent_block_filter_hash + ); + return StatusCode::Ignore.with_context(errmsg); + } + }; + let end_number = start_number + block_filter_hashes.len() as BlockNumber - 1; + if end_number > next_cached_check_point_number { + let diff = end_number - next_cached_check_point_number; + let index = block_filter_hashes.len() - (diff as usize) - 1; + let new_hash = &block_filter_hashes[index]; + if next_cached_check_point != *new_hash { + let errmsg = format!( + "check point for block {} is {:#x} but got {:#}", + next_cached_check_point_number, next_cached_check_point, new_hash + ); + return StatusCode::BlockFilterHashesIsUnexpected.with_context(errmsg); + } + } + let index_offset = (start_number - (cached_check_point_number + 1)) as usize; + for (index, (old_hash, new_hash)) in cached_hashes[index_offset..] + .iter() + .zip(block_filter_hashes.iter()) + .enumerate() + { + if old_hash != new_hash { + let number = start_number + (index_offset + index) as BlockNumber; + let errmsg = format!( + "cached hash for block {} is {:#x} but new is {:#}", + number, old_hash, new_hash + ); + return StatusCode::Ignore.with_context(errmsg); + } + } + + // Update cached block filter hashes. + let start_index = cached_hashes[index_offset..].len(); + let mut new_cached_hashes = cached_hashes; + if end_number > next_cached_check_point_number { + let excess_size = (end_number - next_cached_check_point_number) as usize; + let new_size = block_filter_hashes.len() - excess_size; + new_cached_hashes.extend_from_slice(&block_filter_hashes[start_index..new_size]); + } else { + new_cached_hashes.extend_from_slice(&block_filter_hashes[start_index..]); + } + self.protocol + .peers + .update_cached_block_filter_hashes(new_cached_hashes); + + if end_number < next_cached_check_point_number { + let best_peers = self + .protocol + .peers + .get_all_proved_check_points() + .into_iter() + .filter_map(|(peer_index, (cpindex, _check_points))| { + if peer_index == self.peer_index { + None + } else if cpindex >= finalized_check_point_index { + Some(peer_index) + } else { + None + } + }) + .collect::>(); + let best_peer = best_peers + .choose(&mut rand::thread_rng()) + .cloned() + .unwrap_or(self.peer_index); + self.protocol + .send_get_block_filter_hashes(self.nc, best_peer, end_number + 1); + } else { + // if couldn't request more block filter hashes, + // check if could request more block filters. + self.protocol.try_send_get_block_filters(self.nc, true); + } + } else if start_number > finalized_check_point_number { + let next_start_number_opt = + return_if_failed!(self.protocol.peers.update_latest_block_filter_hashes( + self.peer_index, + prove_number, + finalized_check_point_index, + &finalized_check_point, + start_number, + &parent_block_filter_hash, + &block_filter_hashes + )); + + if let Some(next_start_number) = next_start_number_opt { + self.protocol.send_get_block_filter_hashes( + self.nc, + self.peer_index, + next_start_number, + ); + } + } else { + let errmsg = format!( + "unknown start block number: {}, \ + cached in ({},{}], finalized starts at {}", + start_number, + cached_check_point_number, + next_cached_check_point_number, + finalized_check_point_number + ); + return StatusCode::Ignore.with_context(errmsg); + } + + Status::ok() + } +} diff --git a/src/protocols/filter/components/block_filters_process.rs b/src/protocols/filter/components/block_filters_process.rs index 0d99c43..bfe3c9b 100644 --- a/src/protocols/filter/components/block_filters_process.rs +++ b/src/protocols/filter/components/block_filters_process.rs @@ -7,7 +7,9 @@ use ckb_types::core::BlockNumber; use ckb_types::{packed, prelude::*}; use log::{info, trace, warn}; use rand::seq::SliceRandom; -use std::sync::Arc; +use std::{cmp, sync::Arc}; + +use crate::patches::calc_filter_hash; pub struct BlockFiltersProcess<'a> { message: packed::BlockFiltersReader<'a>, @@ -43,11 +45,11 @@ impl<'a> BlockFiltersProcess<'a> { } let peer_state = peer_state_opt.expect("checked Some"); - let (prove_state_block_number, prove_state_block_hash) = if let Some(header) = peer_state + let prove_state_block_hash = if let Some(header) = peer_state .get_prove_state() .map(|prove_state| prove_state.get_last_header().header()) { - (header.number(), header.hash()) + header.hash() } else { warn!("ignoring, peer {} prove state is none", self.peer); return Status::ok(); @@ -69,85 +71,183 @@ impl<'a> BlockFiltersProcess<'a> { .storage .update_block_number(min_filtered_block_number); } - } else { - let filters_count = block_filters.filters().len(); - let blocks_count = block_filters.block_hashes().len(); - - if filters_count != blocks_count { - let error_message = format!( - "filters length ({}) not equal to block_hashes length ({})", - filters_count, blocks_count - ); - return StatusCode::MalformedProtocolMessage.with_context(error_message); - } + return Status::ok(); + } - if filters_count == 0 { - info!("no new filters, ignore peer: {}", self.peer); - return Status::ok(); - } + let filters_count = block_filters.filters().len(); + let blocks_count = block_filters.block_hashes().len(); - if prove_state_block_number < start_number { - warn!( - "ignoring, peer {} prove_state_block_number {} is smaller than start_number {}", - self.peer, prove_state_block_number, start_number - ); - return Status::ok(); - } - let limit = (prove_state_block_number - start_number + 1) as usize; - let possible_match_blocks = self.filter.check_filters_data(block_filters, limit); - let possible_match_blocks_len = possible_match_blocks.len(); - trace!( - "peer {}, matched blocks: {}", - self.peer, - possible_match_blocks_len + if filters_count != blocks_count { + let error_message = format!( + "filters length ({}) not equal to block_hashes length ({})", + filters_count, blocks_count ); - let actual_blocks_count = blocks_count.min(limit); - let tip_header = self.filter.storage.get_tip_header(); - let filtered_block_number = start_number - 1 + actual_blocks_count as BlockNumber; + return StatusCode::MalformedProtocolMessage.with_context(error_message); + } - let mut matched_blocks = self - .filter - .peers - .matched_blocks() - .write() - .expect("poisoned"); - if possible_match_blocks_len != 0 { - let blocks = possible_match_blocks - .iter() - .map(|block_hash| (block_hash.clone(), block_hash == &prove_state_block_hash)) - .collect::>(); - self.filter.storage.add_matched_blocks( + if filters_count == 0 { + info!("no new filters, ignore peer: {}", self.peer); + return Status::ok(); + } + + let (finalized_check_point_index, finalized_check_point_hash) = + self.filter.storage.get_last_check_point(); + let finalized_check_point_number = self + .filter + .peers + .calc_check_point_number(finalized_check_point_index); + + let (mut parent_block_filter_hash, expected_block_filter_hashes) = + if start_number <= finalized_check_point_number { + // Use cached block filter hashes to check the block filters. + let (cached_check_point_index, mut cached_block_filter_hashes) = + self.filter.peers.get_cached_block_filter_hashes(); + let cached_check_point_number = self + .filter + .peers + .calc_check_point_number(cached_check_point_index); + let next_cached_check_point_number = self + .filter + .peers + .calc_check_point_number(cached_check_point_index + 1); + trace!( + "check block filters (start: {}, len: {}), \ + with cached block filter hashes: ({},{}]", start_number, - actual_blocks_count as u64, - blocks, + filters_count, + cached_check_point_number, + next_cached_check_point_number ); - if matched_blocks.is_empty() { - if let Some((_start_number, _blocks_count, db_blocks)) = - self.filter.storage.get_earliest_matched_blocks() - { - self.filter - .peers - .add_matched_blocks(&mut matched_blocks, db_blocks); - prove_or_download_matched_blocks( - Arc::clone(&self.filter.peers), - &tip_header, - &matched_blocks, - self.nc.as_ref(), - INIT_BLOCKS_IN_TRANSIT_PER_PEER, - ); - } + if start_number <= cached_check_point_number + || start_number > next_cached_check_point_number + { + let errmsg = format!( + "first block filter (number: {}) could not be checked \ + with cached block filter hashes ({},{}]", + start_number, cached_check_point_number, next_cached_check_point_number + ); + return StatusCode::Ignore.with_context(errmsg); } - } else if matched_blocks.is_empty() { - self.filter - .storage - .update_block_number(filtered_block_number) + if cached_block_filter_hashes.is_empty() { + let errmsg = "cached block filter hashes is empty"; + return StatusCode::Ignore.with_context(errmsg); + } + if start_number == cached_check_point_number + 1 { + let cached_check_point = self + .filter + .storage + .get_check_points(cached_check_point_index, 1) + .get(0) + .cloned() + .expect("all check points before finalized should be existed"); + (cached_check_point, cached_block_filter_hashes) + } else { + let start_index = (start_number - cached_check_point_number) as usize - 2; + let parent_hash = cached_block_filter_hashes[start_index].clone(); + cached_block_filter_hashes.drain(..=start_index); + (parent_hash, cached_block_filter_hashes) + } + } else { + // Use latest block filter hashes to check the block filters. + let mut latest_block_filter_hashes = self + .filter + .peers + .get_latest_block_filter_hashes(finalized_check_point_index); + if start_number == finalized_check_point_number + 1 { + (finalized_check_point_hash, latest_block_filter_hashes) + } else { + let start_index = (start_number - finalized_check_point_number) as usize - 2; + let parent_hash = latest_block_filter_hashes[start_index].clone(); + latest_block_filter_hashes.drain(..=start_index); + (parent_hash, latest_block_filter_hashes) + } + }; + + let limit = cmp::min(filters_count, expected_block_filter_hashes.len()); + + for (index, (filter, expected_hash)) in block_filters + .filters() + .into_iter() + .take(limit) + .zip(expected_block_filter_hashes.into_iter()) + .enumerate() + { + let current_hash = calc_filter_hash(&parent_block_filter_hash, &filter).pack(); + if current_hash != expected_hash { + let errmsg = format!( + "peer {}: block filter hash for block {} expect {:#x} but got {:#x}", + self.peer, + start_number + index as BlockNumber, + expected_hash, + current_hash, + ); + return StatusCode::BlockFilterDataIsUnexpected.with_context(errmsg); } + parent_block_filter_hash = current_hash; + } + + let possible_match_blocks = self.filter.check_filters_data(block_filters, limit); + let possible_match_blocks_len = possible_match_blocks.len(); + trace!( + "peer {}, matched blocks: {}", + self.peer, + possible_match_blocks_len + ); + let actual_blocks_count = blocks_count.min(limit); + let tip_header = self.filter.storage.get_tip_header(); + let filtered_block_number = start_number - 1 + actual_blocks_count as BlockNumber; + let mut matched_blocks = self + .filter + .peers + .matched_blocks() + .write() + .expect("poisoned"); + if possible_match_blocks_len != 0 { + let blocks = possible_match_blocks + .iter() + .map(|block_hash| (block_hash.clone(), block_hash == &prove_state_block_hash)) + .collect::>(); + self.filter.storage.add_matched_blocks( + start_number, + actual_blocks_count as u64, + blocks, + ); + if matched_blocks.is_empty() { + if let Some((_start_number, _blocks_count, db_blocks)) = + self.filter.storage.get_earliest_matched_blocks() + { + self.filter + .peers + .add_matched_blocks(&mut matched_blocks, db_blocks); + prove_or_download_matched_blocks( + Arc::clone(&self.filter.peers), + &tip_header, + &matched_blocks, + self.nc.as_ref(), + INIT_BLOCKS_IN_TRANSIT_PER_PEER, + ); + } + } + } else if matched_blocks.is_empty() { self.filter - .update_min_filtered_block_number(filtered_block_number); + .storage + .update_block_number(filtered_block_number) + } + + self.filter + .update_min_filtered_block_number(filtered_block_number); + + let could_request_more_block_filters = self.filter.peers.could_request_more_block_filters( + finalized_check_point_index, + filtered_block_number + 1, + ); + if could_request_more_block_filters { // send next batch GetBlockFilters message to a random best peer - let best_peers: Vec<_> = self.filter.peers.get_best_proved_peers(&tip_header); - let next_peer = best_peers + let best_peer = self + .filter + .peers + .get_best_proved_peers(&tip_header) .into_iter() .filter(|peer| *peer != self.peer) .collect::>() @@ -155,8 +255,13 @@ impl<'a> BlockFiltersProcess<'a> { .cloned() .unwrap_or(self.peer); self.filter - .send_get_block_filters(self.nc, next_peer, filtered_block_number + 1); + .send_get_block_filters(self.nc, best_peer, filtered_block_number + 1); + } else { + // if couldn't request more block filters, + // check if could request more block filter hashes. + self.filter.try_send_get_block_filter_hashes(self.nc); } + Status::ok() } } diff --git a/src/protocols/filter/components/mod.rs b/src/protocols/filter/components/mod.rs index c72c5d9..3f79b0f 100644 --- a/src/protocols/filter/components/mod.rs +++ b/src/protocols/filter/components/mod.rs @@ -1,3 +1,7 @@ +mod block_filter_check_points_process; +mod block_filter_hashes_process; mod block_filters_process; +pub(crate) use block_filter_check_points_process::BlockFilterCheckPointsProcess; +pub(crate) use block_filter_hashes_process::BlockFilterHashesProcess; pub(crate) use block_filters_process::BlockFiltersProcess; diff --git a/src/protocols/light_client/components/send_last_state.rs b/src/protocols/light_client/components/send_last_state.rs index 4e08cb4..8f36255 100644 --- a/src/protocols/light_client/components/send_last_state.rs +++ b/src/protocols/light_client/components/send_last_state.rs @@ -42,12 +42,10 @@ impl<'a> SendLastStateProcess<'a> { trace!( "peer {}: update last state from {} to {}", self.peer_index, - prev_last_state.verifiable_header().header().number(), - last_state.verifiable_header().header().number() + prev_last_state, + last_state, ); - if prev_last_state.verifiable_header().total_difficulty() - < last_state.verifiable_header().total_difficulty() - { + if prev_last_state.total_difficulty() < last_state.total_difficulty() { if let Some(prove_state) = peer_state.get_prove_state() { if prove_state.is_parent_of(&last_state) { trace!("peer {}: new last state could be trusted", self.peer_index); diff --git a/src/protocols/light_client/mod.rs b/src/protocols/light_client/mod.rs index 0c49e2e..bfa6160 100644 --- a/src/protocols/light_client/mod.rs +++ b/src/protocols/light_client/mod.rs @@ -19,7 +19,7 @@ use ckb_types::{ }; use ckb_systemtime::unix_time_as_millis; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, log_enabled, trace, warn, Level}; mod components; pub mod constant; @@ -126,30 +126,7 @@ impl CKBProtocolHandler for LightClientProtocol { let item_name = msg.item_name(); let status = self.try_process(nc.as_ref(), peer_index, msg); - if let Some(ban_time) = status.should_ban() { - error!( - "LightClient.received {} from {}, result {}, ban {:?}", - item_name, peer_index, status, ban_time - ); - nc.ban_peer(peer_index, ban_time, status.to_string()); - } else if status.should_warn() { - warn!( - "LightClient.received {} from {}, result {}", - item_name, peer_index, status - ); - } else if !status.is_ok() { - debug!( - "LightClient.received {} from {}, result {}", - item_name, peer_index, status - ); - } else { - trace!( - "LightClient.received {} from {}, result {}", - item_name, - peer_index, - status - ); - } + status.process(nc, peer_index, "LightClient", item_name); } async fn notify(&mut self, nc: Arc, token: u64) { @@ -220,7 +197,7 @@ impl LightClientProtocol { .expect("checked: should have state"); if let Some(last_state) = peer_state.get_last_state() { - let last_header = last_state.verifiable_header(); + let last_header = last_state.as_ref(); let is_proved = peer_state .get_prove_state() @@ -544,6 +521,174 @@ impl LightClientProtocol { ); } } + self.finalize_check_points(nc); + } + + fn finalize_check_points(&mut self, nc: &dyn CKBProtocolContext) { + let peers = self.peers(); + let required_peers_count = peers.required_peers_count(); + let mut peers_with_data = peers.get_all_proved_check_points(); + if log_enabled!(Level::Trace) { + for (peer_index, (start_cpindex, check_points)) in peers_with_data.iter() { + trace!( + "check points for peer {} in [{},{}]", + peer_index, + start_cpindex, + start_cpindex + check_points.len() as u32 - 1, + ); + } + } + + if peers_with_data.len() < required_peers_count { + debug!( + "no enough peers for finalizing check points, \ + requires {} but got {}", + required_peers_count, + peers_with_data.len() + ); + return; + } + trace!( + "requires {} peers for finalizing check points and got {}", + required_peers_count, + peers_with_data.len() + ); + let (last_cpindex, last_check_point) = self.storage.get_last_check_point(); + trace!( + "finalized check point is {}, {:#x}", + last_cpindex, + last_check_point + ); + // Clean finalized check points for new proved peers. + { + let mut peers_should_be_skipped = Vec::new(); + for (peer_index, (start_cpindex, check_points)) in peers_with_data.iter_mut() { + if *start_cpindex > last_cpindex { + // Impossible, in fact. + error!( + "peer {} will be banned \ + since start check point {} is later than finalized {}", + peer_index, start_cpindex, last_cpindex + ); + peers_should_be_skipped.push((*peer_index, true)); + continue; + } + let index = (last_cpindex - *start_cpindex) as usize; + if index >= check_points.len() { + peers_should_be_skipped.push((*peer_index, false)); + continue; + } + if check_points[index] != last_check_point { + info!( + "peer {} will be banned \ + since its {}-th check point is {:#x} but finalized is {:#x}", + peer_index, last_cpindex, check_points[index], last_check_point + ); + peers_should_be_skipped.push((*peer_index, true)); + continue; + } + if index > 0 { + check_points.drain(..index); + *start_cpindex = last_cpindex; + peers.remove_first_n_check_points(*peer_index, index); + trace!( + "peer {} remove first {} check points, \ + new start check point is {}, {:#x}", + peer_index, + index, + *start_cpindex, + check_points[0] + ); + } + } + for (peer_index, should_ban) in peers_should_be_skipped { + if should_ban { + nc.ban_peer( + peer_index, + BAD_MESSAGE_BAN_TIME, + String::from("incorrect check points"), + ); + } + peers_with_data.remove(&peer_index); + } + } + if peers_with_data.len() < required_peers_count { + trace!( + "no enough peers for finalizing check points after cleaning, \ + requires {} but got {}", + required_peers_count, + peers_with_data.len() + ); + return; + } + // Find a new check point to finalized. + let check_point_opt = { + let length_max = { + let mut check_points_sizes = peers_with_data + .values() + .map(|(_cpindex, check_points)| check_points.len()) + .collect::>(); + check_points_sizes.sort(); + check_points_sizes[required_peers_count - 1] + }; + trace!( + "new last check point will be less than or equal to {}", + last_cpindex + length_max as u32 - 1 + ); + let mut index = 1; + let mut check_point_opt = None; + // Q. Why don't check from bigger to smaller? + // A. We have to make sure if all check points are matched. + // To avoid that a bad peer sends us only start checkpoints and last points are correct. + while index < length_max { + let map = peers_with_data + .values() + .map(|(_cpindex, check_points)| check_points.get(index).cloned()) + .fold(HashMap::new(), |mut map, cp_opt| { + if let Some(cp) = cp_opt { + map.entry(cp).and_modify(|count| *count += 1).or_insert(1); + } + map + }); + let count_max = map.values().max().cloned().unwrap_or(0); + if count_max >= required_peers_count { + let mut cp_opt = None; + for (cp, count) in map { + if count == count_max { + cp_opt = Some(cp); + break; + } + } + let cp = cp_opt.expect("checked: must be found"); + if count_max != peers_with_data.len() { + peers_with_data.retain(|_, (_, check_points)| { + check_points + .get(index) + .map(|tmp| *tmp == cp) + .unwrap_or(false) + }); + } + check_point_opt = Some((index, cp)); + } else { + break; + } + index += 1; + } + check_point_opt + }; + if let Some((index, check_point)) = check_point_opt { + let new_last_cpindex = last_cpindex + index as u32; + info!( + "finalize {} new check points, stop at index {}, value {:#x}", + index, new_last_cpindex, check_point + ); + let (_, check_points) = peers_with_data.into_values().next().expect("always exists"); + self.storage + .update_check_points(last_cpindex + 1, &check_points[1..=index]); + self.storage.update_max_check_point_index(new_last_cpindex); + } else { + info!("no check point is found which could be finalized"); + } } fn get_idle_blocks(&mut self, nc: &dyn CKBProtocolContext) { diff --git a/src/protocols/light_client/peers.rs b/src/protocols/light_client/peers.rs index 6e2a84c..e5f6416 100644 --- a/src/protocols/light_client/peers.rs +++ b/src/protocols/light_client/peers.rs @@ -1,8 +1,12 @@ use ckb_network::PeerIndex; use ckb_systemtime::unix_time_as_millis; use ckb_types::{ - core::HeaderView, packed, packed::Byte32, prelude::*, - utilities::merkle_mountain_range::VerifiableHeader, H256, + core::{BlockNumber, HeaderView}, + packed, + packed::Byte32, + prelude::*, + utilities::merkle_mountain_range::VerifiableHeader, + H256, U256, }; use dashmap::DashMap; use std::{ @@ -14,7 +18,6 @@ use std::{ use super::prelude::*; use crate::protocols::{Status, StatusCode, MESSAGE_TIMEOUT}; -#[derive(Default)] pub struct Peers { inner: DashMap, // verified last N block headers @@ -28,15 +31,28 @@ pub struct Peers { // * if the block is proved // * the downloaded block matched_blocks: RwLock)>>, + + // Data: + // - Cached check point index. + // - Block filter hashes between current cached check point and next cached check point. + // - Exclude the cached check point. + // - Include at the next cached check point. + cached_block_filter_hashes: RwLock<(u32, Vec)>, + + max_outbound_peers: u32, + check_point_interval: BlockNumber, + start_check_point: (u32, packed::Byte32), } -#[derive(Default, Clone)] +#[derive(Clone)] pub struct Peer { // The peer is just discovered when it's `None`. state: PeerState, blocks_proof_request: Option, blocks_request: Option, txs_proof_request: Option, + check_points: CheckPoints, + latest_block_filter_hashes: LatestBlockFilterHashes, } pub struct FetchInfo { @@ -50,7 +66,7 @@ pub struct FetchInfo { missing: bool, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub(crate) struct LastState { header: VerifiableHeader, update_ts: u64, @@ -118,7 +134,7 @@ pub(crate) struct ProveRequest { long_fork_detected: bool, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub(crate) struct ProveState { last_state: LastState, reorg_last_headers: Vec, @@ -144,6 +160,23 @@ pub(crate) struct TransactionsProofRequest { when_sent: u64, } +#[derive(Clone)] +pub(crate) struct CheckPoints { + check_point_interval: BlockNumber, + // The index of the first check point in the memory. + index_of_first_check_point: u32, + // Exists at least 1 check point. + // N.B. Do NOT leak any API that could make this vector be empty. + inner: Vec, +} + +#[derive(Clone)] +pub(crate) struct LatestBlockFilterHashes { + // The previous block number of the first block filter hash. + check_point_number: BlockNumber, + inner: Vec, +} + impl FetchInfo { #[cfg(test)] pub fn new(added_ts: u64, first_sent: u64, timeout: bool, missing: bool) -> FetchInfo { @@ -178,6 +211,23 @@ impl AsRef for LastState { } } +impl fmt::Display for LastState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let header = self.header.header(); + if f.alternate() { + write!( + f, + "LastState {{ num: {}, hash: {:#x}, ts: {} }}", + header.number(), + header.hash(), + self.update_ts + ) + } else { + write!(f, "{}", header.number()) + } + } +} + impl LastState { pub(crate) fn new(header: VerifiableHeader) -> LastState { LastState { @@ -186,8 +236,35 @@ impl LastState { } } - pub(crate) fn verifiable_header(&self) -> &VerifiableHeader { - self.as_ref() + pub(crate) fn total_difficulty(&self) -> U256 { + self.as_ref().total_difficulty() + } + + pub(crate) fn header(&self) -> &HeaderView { + self.as_ref().header() + } +} + +impl fmt::Display for ProveRequest { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let tau_status = if self.skip_check_tau { + "skipped" + } else { + "normal" + }; + if f.alternate() { + write!( + f, + "LastState {{ last_state: {:#}, tau: {}, fork: {} }}", + self.last_state, tau_status, self.long_fork_detected, + ) + } else { + write!( + f, + "{} (tau: {}, fork: {})", + self.last_state, tau_status, self.long_fork_detected, + ) + } } } @@ -202,7 +279,7 @@ impl ProveRequest { } pub(crate) fn get_last_header(&self) -> &VerifiableHeader { - self.last_state.verifiable_header() + self.last_state.as_ref() } pub(crate) fn is_same_as(&self, another: &VerifiableHeader) -> bool { @@ -230,6 +307,39 @@ impl ProveRequest { } } +impl fmt::Display for ProveState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if f.alternate() { + write!(f, "ProveState {{ last_state: {:#}", self.last_state)?; + if self.reorg_last_headers.is_empty() { + write!(f, ", reorg: None")?; + } else { + let len = self.reorg_last_headers.len(); + let start = self.reorg_last_headers[0].number(); + let end = self.reorg_last_headers[len - 1].number(); + write!(f, ", reorg: [{}, {}]", start, end)?; + } + if self.last_headers.is_empty() { + write!(f, ", last: None")?; + } else { + let len = self.last_headers.len(); + let start = self.last_headers[0].number(); + let end = self.last_headers[len - 1].number(); + write!(f, ", last: [{}, {}]", start, end)?; + } + write!(f, " }}") + } else { + write!( + f, + "{} (reorg: {}, last: {})", + self.last_state, + self.reorg_last_headers.len(), + self.last_headers.len() + ) + } + } +} + impl ProveState { pub(crate) fn new_from_request( request: ProveRequest, @@ -263,11 +373,11 @@ impl ProveState { pub(crate) fn is_parent_of(&self, child_last_state: &LastState) -> bool { self.get_last_header() .header() - .is_parent_of(child_last_state.verifiable_header().header()) + .is_parent_of(child_last_state.header()) } pub(crate) fn get_last_header(&self) -> &VerifiableHeader { - self.last_state.verifiable_header() + self.last_state.as_ref() } pub(crate) fn is_same_as(&self, another: &VerifiableHeader) -> bool { @@ -373,6 +483,263 @@ impl TransactionsProofRequest { } } +impl CheckPoints { + fn new( + check_point_interval: BlockNumber, + index_of_first_check_point: u32, + first_check_point: packed::Byte32, + ) -> Self { + Self { + check_point_interval, + index_of_first_check_point, + inner: vec![first_check_point], + } + } + + fn get_start_index(&self) -> u32 { + self.index_of_first_check_point + } + + fn get_check_points(&self) -> Vec { + self.inner.clone() + } + + fn number_of_first_check_point(&self) -> BlockNumber { + self.check_point_interval * BlockNumber::from(self.index_of_first_check_point) + } + + fn number_of_last_check_point(&self) -> BlockNumber { + let first = self.number_of_first_check_point(); + let count = self.inner.len() as BlockNumber; + first + self.check_point_interval * (count - 1) + } + + fn number_of_next_check_point(&self) -> BlockNumber { + self.number_of_last_check_point() + } + + fn if_require_next_check_point(&self, last_proved_number: BlockNumber) -> bool { + self.number_of_next_check_point() + self.check_point_interval * 2 <= last_proved_number + } + + fn add_check_points( + &mut self, + last_proved_number: BlockNumber, + start_number: BlockNumber, + check_points: &[packed::Byte32], + ) -> Result, Status> { + if check_points.is_empty() { + return Err(StatusCode::CheckPointsIsEmpty.into()); + } + if start_number % self.check_point_interval != 0 { + let errmsg = format!( + "check points should at `{} * N` but got {}", + self.check_point_interval, start_number + ); + return Err(StatusCode::CheckPointsIsUnaligned.with_context(errmsg)); + } + let next_number = self.number_of_next_check_point(); + if start_number != next_number { + let errmsg = format!( + "expect starting from {} but got {}", + next_number, start_number + ); + return Err(StatusCode::CheckPointsIsUnexpected.with_context(errmsg)); + } + let prev_last_check_point = &self.inner[self.inner.len() - 1]; + let curr_first_check_point = &check_points[0]; + if prev_last_check_point != curr_first_check_point { + let errmsg = format!( + "expect hash for number {} is {:#x} but got {:#x}", + start_number, prev_last_check_point, curr_first_check_point + ); + return Err(StatusCode::CheckPointsIsUnexpected.with_context(errmsg)); + } + if check_points.len() < 2 { + let errmsg = format!( + "expect at least 2 check points but got only {}", + check_points.len() + ); + return Err(StatusCode::CheckPointsIsUnexpected.with_context(errmsg)); + } + let check_points_len = check_points.len() as BlockNumber; + if start_number + self.check_point_interval * check_points_len <= last_proved_number { + self.inner.extend_from_slice(&check_points[1..]); + } else if check_points.len() > 2 { + let end = check_points.len() - 2; + self.inner.extend_from_slice(&check_points[1..=end]); + } + if self.if_require_next_check_point(last_proved_number) { + Ok(Some(self.number_of_next_check_point())) + } else { + Ok(None) + } + } + + fn remove_first_n_check_points(&mut self, n: usize) { + self.index_of_first_check_point += n as u32; + self.inner.drain(..n); + } +} + +impl LatestBlockFilterHashes { + fn new(check_point_number: BlockNumber) -> Self { + Self { + check_point_number, + inner: Vec::new(), + } + } + + #[cfg(test)] + fn mock(check_point_number: BlockNumber, inner: Vec) -> Self { + Self { + check_point_number, + inner, + } + } + + fn get_check_point_number(&self) -> BlockNumber { + self.check_point_number + } + + fn get_last_number(&self) -> BlockNumber { + self.get_check_point_number() + self.inner.len() as BlockNumber + } + + fn get_hashes(&self) -> Vec { + self.inner.clone() + } + + fn clear(&mut self) { + self.inner.clear(); + } + + fn reset(&mut self, new_check_point_number: BlockNumber) { + self.check_point_number = new_check_point_number; + self.clear(); + } + + fn update_latest_block_filter_hashes( + &mut self, + last_proved_number: BlockNumber, + finalized_check_point_number: BlockNumber, + finalized_check_point: &packed::Byte32, + start_number: BlockNumber, + parent_block_filter_hash: &packed::Byte32, + mut block_filter_hashes: &[packed::Byte32], + ) -> Result, Status> { + if block_filter_hashes.is_empty() { + return Err(StatusCode::BlockFilterHashesIsEmpty.into()); + } + // Check block numbers. + if finalized_check_point_number >= last_proved_number { + let errmsg = format!( + "finalized check point ({}) is not less than proved number ({})", + finalized_check_point_number, last_proved_number + ); + return Err(StatusCode::Ignore.with_context(errmsg)); + } + let check_point_number = self.get_check_point_number(); + if finalized_check_point_number != check_point_number { + let errmsg = format!( + "finalized check point ({}) is not same as cached ({})", + finalized_check_point_number, check_point_number + ); + return Err(StatusCode::Ignore.with_context(errmsg)); + } + let mut end_number = start_number + block_filter_hashes.len() as BlockNumber - 1; + if finalized_check_point_number >= end_number { + let errmsg = format!( + "finalized check point ({}) is not less than end number ({})", + finalized_check_point_number, end_number, + ); + return Err(StatusCode::Ignore.with_context(errmsg)); + } + if start_number > last_proved_number { + let errmsg = format!( + "start number ({}) is greater than the proved number ({})", + start_number, last_proved_number + ); + return Err(StatusCode::Ignore.with_context(errmsg)); + } + let last_filter_number = self.get_last_number(); + if start_number > last_filter_number + 1 { + let errmsg = format!( + "start number ({}) is continuous with last filter block number ({})", + start_number, last_filter_number + ); + return Err(StatusCode::Ignore.with_context(errmsg)); + } + if end_number > last_proved_number { + let diff = end_number - last_proved_number; + let new_length = block_filter_hashes.len() - diff as usize; + block_filter_hashes = &block_filter_hashes[..new_length]; + end_number = last_proved_number; + } + // Check block filter hashes. + let (start_index_for_old, start_index_for_new) = if start_number + <= finalized_check_point_number + { + let diff = finalized_check_point_number - start_number; + let index = diff as usize; + let check_hash = &block_filter_hashes[index]; + if check_hash != finalized_check_point { + let errmsg = format!( + "check point for block {} is {:#x} but check hash is {:#}", + finalized_check_point_number, finalized_check_point, check_hash + ); + return Err(StatusCode::BlockFilterHashesIsUnexpected.with_context(errmsg)); + } + (0, index + 1) + } else if start_number == finalized_check_point_number + 1 { + if parent_block_filter_hash != finalized_check_point { + let errmsg = format!( + "check point for block {} is {:#x} but parent hash is {:#}", + finalized_check_point_number, finalized_check_point, parent_block_filter_hash + ); + return Err(StatusCode::BlockFilterHashesIsUnexpected.with_context(errmsg)); + } + (0, 0) + } else { + let diff = start_number - finalized_check_point_number; + let index = diff as usize - 2; + let filter_hash = &self.inner[index]; + if filter_hash != parent_block_filter_hash { + let errmsg = format!( + "filter hash for block {} is {:#x} but parent hash is {:#}", + start_number - 1, + filter_hash, + parent_block_filter_hash + ); + return Err(StatusCode::BlockFilterHashesIsUnexpected.with_context(errmsg)); + } + (index + 1, 0) + }; + for (index, (old_hash, new_hash)) in self.inner[start_index_for_old..] + .iter() + .zip(block_filter_hashes[start_index_for_new..].iter()) + .enumerate() + { + if old_hash != new_hash { + let number = start_number + (start_index_for_old + index) as BlockNumber; + let errmsg = format!( + "old filter hash for block {} is {:#x} but new is {:#}", + number, old_hash, new_hash + ); + return Err(StatusCode::Ignore.with_context(errmsg)); + } + } + // Update block filter hashes. + let index = start_index_for_new + self.inner[start_index_for_old..].len(); + self.inner.extend_from_slice(&block_filter_hashes[index..]); + if end_number < last_proved_number { + Ok(Some(end_number + 1)) + } else { + Ok(None) + } + } +} + impl Default for PeerState { fn default() -> Self { Self::Initialized @@ -381,25 +748,89 @@ impl Default for PeerState { impl fmt::Display for PeerState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::Initialized => write!(f, "PeerState::Initialized"), - Self::RequestFirstLastState { .. } => write!(f, "PeerState::RequestFirstLastState"), - Self::OnlyHasLastState { .. } => write!(f, "PeerState::OnlyHasLastState"), - Self::RequestFirstLastStateProof { .. } => { - write!(f, "PeerState::RequestFirstLastStateProof") - } - Self::Ready { .. } => write!(f, "PeerState::Ready"), - Self::RequestNewLastState { .. } => { - write!(f, "PeerState::RequestNewLastState") + let fullname = format!("PeerState::{}", self.name()); + if f.alternate() { + match self { + Self::Initialized => { + write!(f, "{}", fullname) + } + Self::RequestFirstLastState { when_sent } => { + write!(f, "{} {{ when_sent: {} }}", fullname, when_sent) + } + Self::OnlyHasLastState { last_state } => { + write!(f, "{} {{ last_state: {} }}", fullname, last_state) + } + Self::RequestFirstLastStateProof { + last_state, + request, + when_sent, + } => { + write!(f, "{} {{ last_state: {}", fullname, last_state)?; + write!(f, ", request: {}", request)?; + write!(f, ", when_sent: {}", when_sent)?; + write!(f, "}}") + } + Self::Ready { + last_state, + prove_state, + } => { + write!(f, "{} {{ last_state: {}", fullname, last_state)?; + write!(f, ", prove_state: {}", prove_state)?; + write!(f, "}}") + } + Self::RequestNewLastState { + last_state, + prove_state, + when_sent, + } => { + write!(f, "{} {{ last_state: {}", fullname, last_state)?; + write!(f, ", prove_state: {}", prove_state)?; + write!(f, ", when_sent: {}", when_sent)?; + write!(f, "}}") + } + Self::RequestNewLastStateProof { + last_state, + prove_state, + request, + when_sent, + } => { + write!(f, "{} {{ last_state: {}", fullname, last_state)?; + write!(f, ", prove_state: {}", prove_state)?; + write!(f, ", request: {}", request)?; + write!(f, ", when_sent: {}", when_sent)?; + write!(f, "}}") + } } - Self::RequestNewLastStateProof { .. } => { - write!(f, "PeerState::RequestNewLastStateProof") + } else { + match self { + Self::Initialized | Self::RequestFirstLastState { .. } => { + write!(f, "{}", fullname) + } + Self::OnlyHasLastState { last_state, .. } + | Self::RequestFirstLastStateProof { last_state, .. } + | Self::Ready { last_state, .. } + | Self::RequestNewLastState { last_state, .. } + | Self::RequestNewLastStateProof { last_state, .. } => { + write!(f, "{} {{ last_state: {} }}", fullname, last_state) + } } } } } impl PeerState { + fn name(&self) -> &'static str { + match self { + Self::Initialized => "Initialized", + Self::RequestFirstLastState { .. } => "RequestFirstLastState", + Self::OnlyHasLastState { .. } => "OnlyHasLastState", + Self::RequestFirstLastStateProof { .. } => "RequestFirstLastStateProof", + Self::Ready { .. } => "Ready", + Self::RequestNewLastState { .. } => "RequestNewLastState", + Self::RequestNewLastStateProof { .. } => "RequestNewLastStateProof", + } + } + fn take(&mut self) -> Self { let mut ret = Self::Initialized; mem::swap(self, &mut ret); @@ -606,12 +1037,21 @@ impl PeerState { } impl Peer { - fn new() -> Self { + fn new(check_point_interval: BlockNumber, start_check_point: (u32, packed::Byte32)) -> Self { + let check_points = CheckPoints::new( + check_point_interval, + start_check_point.0, + start_check_point.1, + ); + let check_point_number = check_point_interval * BlockNumber::from(start_check_point.0); + let latest_block_filter_hashes = LatestBlockFilterHashes::new(check_point_number); Self { state: Default::default(), blocks_proof_request: None, blocks_request: None, txs_proof_request: None, + check_points, + latest_block_filter_hashes, } } @@ -641,16 +1081,38 @@ impl Peer { } impl Peers { - // only used in unit tests now - #[cfg(test)] - pub fn new(last_headers: RwLock>) -> Self { + pub fn new( + max_outbound_peers: u32, + check_point_interval: BlockNumber, + start_check_point: (u32, packed::Byte32), + ) -> Self { Self { inner: Default::default(), - last_headers, + last_headers: Default::default(), fetching_headers: DashMap::new(), fetching_txs: DashMap::new(), matched_blocks: Default::default(), + cached_block_filter_hashes: Default::default(), + max_outbound_peers, + check_point_interval, + start_check_point, + } + } + + pub(crate) fn required_peers_count(&self) -> usize { + let required_peers_count = ((self.get_max_outbound_peers() + 1) / 2) as usize; + if required_peers_count == 0 { + panic!("max outbound peers shouldn't be zero!"); } + required_peers_count + } + + pub(crate) fn calc_check_point_number(&self, index: u32) -> BlockNumber { + self.check_point_interval * BlockNumber::from(index) + } + + fn calc_best_check_point_index_not_greater_than(&self, number: BlockNumber) -> u32 { + (number / self.check_point_interval) as u32 } pub(crate) fn last_headers(&self) -> &RwLock> { @@ -750,8 +1212,12 @@ impl Peers { &self.matched_blocks } + pub(crate) fn get_max_outbound_peers(&self) -> u32 { + self.max_outbound_peers + } + pub(crate) fn add_peer(&self, index: PeerIndex) { - let peer = Peer::new(); + let peer = Peer::new(self.check_point_interval, self.start_check_point.clone()); self.inner.insert(index, peer); } @@ -839,7 +1305,11 @@ impl Peers { ) -> Result<(), Status> { *self.last_headers.write().expect("poisoned") = state.get_last_headers().to_vec(); if let Some(mut peer) = self.inner.get_mut(&index) { + let has_reorg = !state.reorg_last_headers.is_empty(); peer.state = peer.state.take().receive_last_state_proof(state)?; + if has_reorg { + peer.latest_block_filter_hashes.clear(); + } } Ok(()) } @@ -1029,6 +1499,112 @@ impl Peers { } } + pub(crate) fn add_check_points( + &self, + index: PeerIndex, + last_proved_number: BlockNumber, + start_number: BlockNumber, + check_points: &[packed::Byte32], + ) -> Result, Status> { + if let Some(mut peer) = self.inner.get_mut(&index) { + peer.check_points + .add_check_points(last_proved_number, start_number, check_points) + } else { + Err(StatusCode::PeerIsNotFound.into()) + } + } + + pub(crate) fn remove_first_n_check_points(&self, index: PeerIndex, n: usize) { + if let Some(mut peer) = self.inner.get_mut(&index) { + peer.check_points.remove_first_n_check_points(n); + let number = peer.check_points.number_of_first_check_point(); + peer.latest_block_filter_hashes.reset(number); + } + } + + #[cfg(test)] + pub(crate) fn mock_latest_block_filter_hashes( + &self, + index: PeerIndex, + check_point_number: BlockNumber, + block_filter_hashes: Vec, + ) { + if let Some(mut peer) = self.inner.get_mut(&index) { + peer.latest_block_filter_hashes = + LatestBlockFilterHashes::mock(check_point_number, block_filter_hashes); + } + } + + #[allow(clippy::too_many_arguments)] // TODO fix clippy + pub(crate) fn update_latest_block_filter_hashes( + &self, + index: PeerIndex, + last_proved_number: BlockNumber, + finalized_check_point_index: u32, + finalized_check_point: &packed::Byte32, + start_number: BlockNumber, + parent_block_filter_hash: &packed::Byte32, + block_filter_hashes: &[packed::Byte32], + ) -> Result, Status> { + if let Some(mut peer) = self.inner.get_mut(&index) { + let finalized_check_point_number = + self.calc_check_point_number(finalized_check_point_index); + peer.latest_block_filter_hashes + .update_latest_block_filter_hashes( + last_proved_number, + finalized_check_point_number, + finalized_check_point, + start_number, + parent_block_filter_hash, + block_filter_hashes, + ) + } else { + Err(StatusCode::PeerIsNotFound.into()) + } + } + + pub(crate) fn update_min_filtered_block_number(&self, min_filtered_block_number: BlockNumber) { + let should_cached_check_point_index = + self.calc_best_check_point_index_not_greater_than(min_filtered_block_number); + let current_cached_check_point_index = + self.cached_block_filter_hashes.read().expect("poisoned").0; + if current_cached_check_point_index != should_cached_check_point_index { + let mut tmp = self.cached_block_filter_hashes.write().expect("poisoned"); + tmp.0 = should_cached_check_point_index; + tmp.1.clear(); + } + } + + pub(crate) fn get_cached_block_filter_hashes(&self) -> (u32, Vec) { + self.cached_block_filter_hashes + .read() + .expect("poisoned") + .clone() + } + + pub(crate) fn update_cached_block_filter_hashes(&self, hashes: Vec) { + self.cached_block_filter_hashes.write().expect("poisoned").1 = hashes; + } + + pub(crate) fn if_cached_block_filter_hashes_require_update( + &self, + finalized_check_point_index: u32, + ) -> Option { + let (cached_index, cached_length) = { + let tmp = self.cached_block_filter_hashes.read().expect("poisoned"); + (tmp.0, tmp.1.len()) + }; + if cached_index >= finalized_check_point_index { + return None; + } + if cached_length as BlockNumber >= self.check_point_interval { + return None; + } + let cached_last_number = + self.calc_check_point_number(cached_index) + cached_length as BlockNumber; + Some(cached_last_number + 1) + } + pub(crate) fn get_peers_which_require_new_state(&self, before_ts: u64) -> Vec { self.inner .iter() @@ -1057,6 +1633,149 @@ impl Peers { .collect() } + pub(crate) fn get_peers_which_require_more_check_points( + &self, + ) -> Vec<(PeerIndex, BlockNumber)> { + self.inner + .iter() + .filter_map(|item| { + item.value().state.get_prove_state().and_then(|state| { + let proved_number = state.get_last_header().header().number(); + let check_points = &item.value().check_points; + if check_points.if_require_next_check_point(proved_number) { + let next_check_point_number = check_points.number_of_next_check_point(); + Some((*item.key(), next_check_point_number)) + } else { + None + } + }) + }) + .collect() + } + + pub(crate) fn get_peers_which_require_more_latest_block_filter_hashes( + &self, + finalized_check_point_index: u32, + ) -> Vec<(PeerIndex, BlockNumber)> { + self.inner + .iter() + .filter_map(|item| { + item.value().state.get_prove_state().and_then(|state| { + let latest_block_filter_hashes = &item.value().latest_block_filter_hashes; + let check_point_number = latest_block_filter_hashes.get_check_point_number(); + let finalized_check_point_number = + self.calc_check_point_number(finalized_check_point_index); + if check_point_number == finalized_check_point_number { + let proved_number = state.get_last_header().header().number(); + let last_number = latest_block_filter_hashes.get_last_number(); + if last_number < proved_number { + Some((*item.key(), last_number + 1)) + } else { + None + } + } else { + None + } + }) + }) + .collect() + } + + pub(crate) fn get_latest_block_filter_hashes( + &self, + finalized_check_point_index: u32, + ) -> Vec { + let finalized_check_point_number = + self.calc_check_point_number(finalized_check_point_index); + let mut peers_with_data = self + .inner + .iter() + .filter_map(|item| { + item.value().state.get_prove_state().and_then(|_| { + let latest_block_filter_hashes = &item.value().latest_block_filter_hashes; + let check_point_number = latest_block_filter_hashes.get_check_point_number(); + if finalized_check_point_number == check_point_number { + Some((*item.key(), latest_block_filter_hashes.get_hashes())) + } else { + None + } + }) + }) + .collect::>(); + let mut hashes = Vec::new(); + let required_peers_count = self.required_peers_count(); + if peers_with_data.len() < required_peers_count { + return hashes; + } + let length_max = { + let mut hashes_sizes = peers_with_data + .values() + .map(|hashes| hashes.len()) + .collect::>(); + hashes_sizes.sort(); + hashes_sizes[required_peers_count - 1] + }; + let mut index = 0; + while index < length_max { + let map = peers_with_data + .values() + .map(|hashes| hashes.get(index).cloned()) + .fold(HashMap::new(), |mut map, hash_opt| { + if let Some(h) = hash_opt { + map.entry(h).and_modify(|count| *count += 1).or_insert(1); + } + map + }); + let count_max = map.values().max().cloned().unwrap_or(0); + if count_max >= required_peers_count { + let mut hash_opt = None; + for (hash, count) in map { + if count == count_max { + hash_opt = Some(hash); + break; + } + } + let hash = hash_opt.expect("checked: must be found"); + if count_max != peers_with_data.len() { + peers_with_data.retain(|_, hashes| { + hashes.get(index).map(|tmp| *tmp == hash).unwrap_or(false) + }); + } + hashes.push(hash); + } else { + break; + } + index += 1; + } + hashes + } + + pub(crate) fn could_request_more_block_filters( + &self, + finalized_check_point_index: u32, + min_filtered_block_number: BlockNumber, + ) -> bool { + let should_cached_check_point_index = + self.calc_best_check_point_index_not_greater_than(min_filtered_block_number); + if should_cached_check_point_index >= finalized_check_point_index { + let finalized_check_point_number = + self.calc_check_point_number(finalized_check_point_index); + let latest_block_filter_hashes_count = self + .get_latest_block_filter_hashes(finalized_check_point_index) + .len(); + finalized_check_point_number + latest_block_filter_hashes_count as BlockNumber + >= min_filtered_block_number + } else { + // Check: + // - If cached block filter hashes is same check point as the required, + // - If all block filter hashes in that check point are downloaded. + let cached_data = self.get_cached_block_filter_hashes(); + let current_cached_check_point_index = cached_data.0; + should_cached_check_point_index == current_cached_check_point_index + && cached_data.1.len() as BlockNumber == self.check_point_interval + } + } + pub(crate) fn get_peers_which_have_timeout(&self, now: u64) -> Vec { self.inner .iter() @@ -1102,7 +1821,27 @@ impl Peers { .collect() } - pub(crate) fn get_peers_which_are_proved(&self) -> Vec<(PeerIndex, ProveState)> { + pub(crate) fn get_all_proved_check_points( + &self, + ) -> HashMap)> { + self.inner + .iter() + .filter_map(|item| { + item.value().state.get_prove_state().map(|_| { + let check_points = &item.value().check_points; + ( + *item.key(), + ( + check_points.get_start_index(), + check_points.get_check_points(), + ), + ) + }) + }) + .collect() + } + + pub(crate) fn get_all_prove_states(&self) -> Vec<(PeerIndex, ProveState)> { self.inner .iter() .filter_map(|item| { @@ -1133,7 +1872,7 @@ impl Peers { } pub(crate) fn get_best_proved_peers(&self, best_tip: &packed::Header) -> Vec { - self.get_peers_which_are_proved() + self.get_all_prove_states() .into_iter() .filter(|(_, prove_state)| { Some(prove_state.get_last_header().header()) diff --git a/src/protocols/mod.rs b/src/protocols/mod.rs index 1da5ae5..fbdb11d 100644 --- a/src/protocols/mod.rs +++ b/src/protocols/mod.rs @@ -31,3 +31,5 @@ pub const LAST_N_BLOCKS: BlockNumber = 100; pub const GET_BLOCKS_PROOF_LIMIT: usize = 1000; // Copy from ckb/util/light-client-protocol-server pub const GET_TRANSACTIONS_PROOF_LIMIT: usize = 1000; +// Copy from ckb/sync +pub const CHECK_POINT_INTERVAL: BlockNumber = 2000; diff --git a/src/protocols/status.rs b/src/protocols/status.rs index a2279b2..df1fd6b 100644 --- a/src/protocols/status.rs +++ b/src/protocols/status.rs @@ -1,5 +1,7 @@ -#![allow(dead_code)] -use std::{fmt, time::Duration}; +use std::{fmt, sync::Arc, time::Duration}; + +use ckb_network::{CKBProtocolContext, PeerIndex}; +use log::{debug, error, trace, warn}; use super::BAD_MESSAGE_BAN_TIME; @@ -14,6 +16,7 @@ use super::BAD_MESSAGE_BAN_TIME; /// - 5xx: Local errors - The client failed to process a response. #[repr(u16)] #[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[allow(dead_code)] pub enum StatusCode { /// OK OK = 200, @@ -57,10 +60,27 @@ pub enum StatusCode { /// Reorg headers for a last state proof is invalid. InvalidReorgHeaders = 452, + // Errors for block filter protocol. + /// Check points is empty. + CheckPointsIsEmpty = 471, + /// Check points is unaligned. + CheckPointsIsUnaligned = 472, + /// Check points is unexpected. + CheckPointsIsUnexpected = 473, + /// Block filter hashes is empty. + BlockFilterHashesIsEmpty = 481, + /// Block filter hashes is unexpected. + BlockFilterHashesIsUnexpected = 482, + /// Block filter data is unexpected. + BlockFilterDataIsUnexpected = 483, + /// Throws an internal error. InternalError = 500, /// Throws an error from the network. Network = 501, + + /// Throws an error that could be ignored. + Ignore = 599, } /// Process message status. @@ -130,12 +150,13 @@ impl Status { /// Whether the code is `OK` or not. pub fn is_ok(&self) -> bool { - self.code == StatusCode::OK || self.code == StatusCode::RequireRecheck + let code = self.code(); + code == StatusCode::OK || code == StatusCode::RequireRecheck } /// Whether the session should be banned. pub fn should_ban(&self) -> Option { - let code = self.code as u16; + let code = self.code() as u16; if (400..500).contains(&code) { Some(BAD_MESSAGE_BAN_TIME) } else { @@ -145,7 +166,7 @@ impl Status { /// Whether a warning log should be output. pub fn should_warn(&self) -> bool { - let code = self.code as u16; + let code = self.code() as u16; (500..600).contains(&code) } @@ -153,4 +174,38 @@ impl Status { pub fn code(&self) -> StatusCode { self.code } + + pub fn process( + &self, + nc: Arc, + index: PeerIndex, + protocol: &str, + message: &str, + ) { + if let Some(ban_time) = self.should_ban() { + error!( + "{}Protocol.received {} from {}, result {}, ban {:?}", + protocol, message, index, self, ban_time + ); + nc.ban_peer(index, ban_time, self.to_string()); + } else if self.should_warn() { + warn!( + "{}Protocol.received {} from {}, result {}", + protocol, message, index, self + ); + } else if self.is_ok() { + trace!( + "{}Protocol.received {} from {}, result {}", + protocol, + message, + index, + self + ); + } else { + debug!( + "{}Protocol.received {} from {}, result {}", + protocol, message, index, self + ); + } + } } diff --git a/src/storage.rs b/src/storage.rs index 66dd0ed..1a48a8d 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -10,7 +10,7 @@ use ckb_types::{ bytes::Bytes, core::{ cell::{CellMeta, CellProvider, CellStatus}, - BlockNumber, HeaderView, TransactionInfo, + BlockNumber, BlockView, HeaderView, TransactionInfo, }, packed::{self, Block, Byte32, CellOutput, Header, OutPoint, Script, Transaction}, prelude::*, @@ -20,6 +20,7 @@ use ckb_types::{ use rocksdb::{prelude::*, Direction, IteratorMode, WriteBatch, DB}; use crate::error::Result; +use crate::patches::{build_filter_data, calc_filter_hash, FilterDataProvider}; use crate::protocols::Peers; pub const LAST_STATE_KEY: &str = "LAST_STATE"; @@ -28,6 +29,7 @@ const FILTER_SCRIPTS_KEY: &str = "FILTER_SCRIPTS"; const MATCHED_FILTER_BLOCKS_KEY: &str = "MATCHED_BLOCKS"; const MIN_FILTERED_BLOCK_NUMBER: &str = "MIN_FILTERED_NUMBER"; const LAST_N_HEADERS_KEY: &str = "LAST_N_HEADERS"; +const MAX_CHECK_POINT_INDEX: &str = "MAX_CHECK_POINT_INDEX"; pub struct ScriptStatus { pub script: Script, @@ -53,6 +55,34 @@ pub enum ScriptType { Type, } +struct WrappedBlockView<'a> { + inner: &'a BlockView, + index: HashMap, +} + +impl<'a> WrappedBlockView<'a> { + fn new(inner: &'a BlockView) -> Self { + let index = inner + .transactions() + .into_iter() + .enumerate() + .map(|(index, tx)| (tx.hash(), index)) + .collect(); + Self { inner, index } + } +} + +impl<'a> FilterDataProvider for WrappedBlockView<'a> { + fn cell(&self, out_point: &OutPoint) -> Option { + self.index.get(&out_point.tx_hash()).and_then(|tx_index| { + self.inner + .transactions() + .get(*tx_index) + .and_then(|tx| tx.outputs().get(out_point.index().unpack())) + }) + } +} + #[derive(Clone)] pub struct Storage { pub(crate) db: Arc, @@ -128,6 +158,20 @@ impl Storage { .expect("batch put should be ok"); batch.commit().expect("batch commit should be ok"); self.update_last_state(&U256::zero(), &block.header(), &[]); + let genesis_block_filter_hash: Byte32 = { + let block_view = block.into_view(); + let provider = WrappedBlockView::new(&block_view); + let parent_block_filter_hash = Byte32::zero(); + let (genesis_block_filter_vec, missing_out_points) = + build_filter_data(provider, &block_view.transactions()); + if !missing_out_points.is_empty() { + panic!("Genesis block shouldn't missing any out points."); + } + let genesis_block_filter_data = genesis_block_filter_vec.pack(); + calc_filter_hash(&parent_block_filter_hash, &genesis_block_filter_data).pack() + }; + self.update_max_check_point_index(0); + self.update_check_points(0, &[genesis_block_filter_hash]); self.update_min_filtered_block_number(0); } } @@ -517,6 +561,57 @@ impl Storage { .expect("db put min filtered block number should be ok"); } + pub fn get_last_check_point(&self) -> (CpIndex, Byte32) { + let index = self.get_max_check_point_index(); + let hash = self + .get_check_points(index, 1) + .get(0) + .cloned() + .expect("db get last check point should be ok"); + (index, hash) + } + + pub fn get_max_check_point_index(&self) -> CpIndex { + let key = Key::Meta(MAX_CHECK_POINT_INDEX).into_vec(); + self.db + .get_pinned(&key) + .expect("db get max check point index should be ok") + .map(|data| CpIndex::from_be_bytes(data.as_ref().try_into().unwrap())) + .expect("db get max check point index should be ok") + } + + pub fn update_max_check_point_index(&self, index: CpIndex) { + let key = Key::Meta(MAX_CHECK_POINT_INDEX).into_vec(); + let value = index.to_be_bytes(); + self.db + .put(key, value) + .expect("db put max check point index should be ok"); + } + + pub fn get_check_points(&self, start_index: CpIndex, limit: usize) -> Vec { + let start_key = Key::CheckPointIndex(start_index).into_vec(); + let key_prefix = [KeyPrefix::CheckPointIndex as u8]; + let mode = IteratorMode::From(start_key.as_ref(), Direction::Forward); + self.db + .iterator(mode) + .take_while(|(key, _value)| key.starts_with(&key_prefix)) + .take(limit) + .map(|(_key, value)| Byte32::from_slice(&value).expect("stored block filter hash")) + .collect() + } + + pub fn update_check_points(&self, start_index: CpIndex, check_points: &[Byte32]) { + let mut index = start_index; + let mut batch = self.batch(); + for cp in check_points { + let key = Key::CheckPointIndex(index).into_vec(); + let value = Value::BlockFilterHash(cp); + batch.put_kv(key, value).expect("batch put should be ok"); + index += 1; + } + batch.commit().expect("batch commit should be ok"); + } + pub fn update_block_number(&self, block_number: BlockNumber) { let key_prefix = Key::Meta(FILTER_SCRIPTS_KEY).into_vec(); let mode = IteratorMode::From(key_prefix.as_ref(), Direction::Forward); @@ -1084,6 +1179,7 @@ impl Batch { } pub type TxIndex = u32; +pub type CpIndex = u32; pub type OutputIndex = u32; pub type CellIndex = u32; pub enum CellType { @@ -1102,6 +1198,7 @@ pub enum CellType { /// | 128 | TxTypeScript | TxHash | /// | 160 | BlockHash | Header | /// | 192 | BlockNumber | BlockHash | +/// | 208 | CheckPointIndex | BlockFilterHash | /// | 224 | Meta | Meta | /// +--------------+--------------------+--------------------------+ /// @@ -1113,6 +1210,8 @@ pub enum Key<'a> { TxTypeScript(&'a Script, BlockNumber, TxIndex, CellIndex, CellType), BlockHash(&'a Byte32), BlockNumber(BlockNumber), + // The index number for check points. + CheckPointIndex(CpIndex), Meta(&'a str), } @@ -1121,6 +1220,7 @@ pub enum Value<'a> { TxHash(&'a Byte32), Header(&'a Header), BlockHash(&'a Byte32), + BlockFilterHash(&'a Byte32), Meta(Vec), } @@ -1133,6 +1233,7 @@ pub enum KeyPrefix { TxTypeScript = 128, BlockHash = 160, BlockNumber = 192, + CheckPointIndex = 208, Meta = 224, } @@ -1183,6 +1284,10 @@ impl<'a> From> for Vec { encoded.push(KeyPrefix::BlockNumber as u8); encoded.extend_from_slice(&block_number.to_be_bytes()); } + Key::CheckPointIndex(index) => { + encoded.push(KeyPrefix::CheckPointIndex as u8); + encoded.extend_from_slice(&index.to_be_bytes()); + } Key::Meta(meta_key) => { encoded.push(KeyPrefix::Meta as u8); encoded.extend_from_slice(meta_key.as_bytes()); @@ -1205,6 +1310,7 @@ impl<'a> From> for Vec { Value::TxHash(tx_hash) => tx_hash.as_slice().into(), Value::Header(header) => header.as_slice().into(), Value::BlockHash(block_hash) => block_hash.as_slice().into(), + Value::BlockFilterHash(block_filter_hash) => block_filter_hash.as_slice().into(), Value::Meta(meta_value) => meta_value, } } diff --git a/src/subcmds.rs b/src/subcmds.rs index bc6531d..f37803f 100644 --- a/src/subcmds.rs +++ b/src/subcmds.rs @@ -13,6 +13,7 @@ use crate::{ error::{Error, Result}, protocols::{ FilterProtocol, LightClientProtocol, Peers, PendingTxs, RelayProtocol, SyncProtocol, + CHECK_POINT_INTERVAL, }, service::Service, storage::Storage, @@ -38,6 +39,7 @@ impl RunConfig { storage.init_genesis_block(consensus.genesis_block().data()); let pending_txs = Arc::new(RwLock::new(PendingTxs::new(64))); + let max_outbound_peers = self.run_env.network.max_outbound_peers; let network_state = NetworkState::from_config(self.run_env.network) .map(|network_state| { Arc::new(network_state.required_flags( @@ -58,7 +60,11 @@ impl RunConfig { SupportProtocols::Filter.protocol_id(), ]; - let peers = Arc::new(Peers::default()); + let peers = Arc::new(Peers::new( + max_outbound_peers, + CHECK_POINT_INTERVAL, + storage.get_last_check_point(), + )); let sync_protocol = SyncProtocol::new(storage.clone(), Arc::clone(&peers)); let relay_protocol = RelayProtocol::new(pending_txs.clone(), Arc::clone(&peers)); let light_client: Box = Box::new(LightClientProtocol::new( diff --git a/src/tests/prelude.rs b/src/tests/prelude.rs index b549646..04ca005 100644 --- a/src/tests/prelude.rs +++ b/src/tests/prelude.rs @@ -18,6 +18,7 @@ use ckb_types::{ use crate::{ protocols::{ FilterProtocol, LastState, LightClientProtocol, Peers, ProveRequest, SyncProtocol, + CHECK_POINT_INTERVAL, }, storage::Storage, tests::{ALWAYS_SUCCESS_BIN, ALWAYS_SUCCESS_SCRIPT}, @@ -70,6 +71,16 @@ pub(crate) trait ChainExt { fn consensus(&self) -> &Consensus; + fn create_peers(&self) -> Arc { + let max_outbound_peers = 1; + let peers = Peers::new( + max_outbound_peers, + CHECK_POINT_INTERVAL, + self.client_storage().get_last_check_point(), + ); + Arc::new(peers) + } + fn create_light_client_protocol(&self, peers: Arc) -> LightClientProtocol { let storage = self.client_storage().to_owned(); let consensus = self.consensus().to_owned(); diff --git a/src/tests/protocols/block_filter.rs b/src/tests/protocols/block_filter.rs index c810132..a0144c6 100644 --- a/src/tests/protocols/block_filter.rs +++ b/src/tests/protocols/block_filter.rs @@ -16,7 +16,7 @@ use ckb_types::{ use crate::storage::SetScriptsCommand; use crate::storage::{ScriptStatus, ScriptType}; use crate::{ - protocols::{Peers, BAD_MESSAGE_BAN_TIME, GET_BLOCK_FILTERS_TOKEN}, + protocols::{BAD_MESSAGE_BAN_TIME, GET_BLOCK_FILTERS_TOKEN}, tests::{ prelude::*, utils::{MockChain, MockNetworkContext}, @@ -28,7 +28,7 @@ async fn test_block_filter_malformed_message() { let chain = MockChain::new_with_dummy_pow("test-block-filter"); let nc = MockNetworkContext::new(SupportProtocols::Filter); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let mut protocol = chain.create_filter_protocol(peers); let peer_index = PeerIndex::new(3); @@ -67,7 +67,7 @@ async fn test_block_filter_ignore_start_number() { None, Default::default(), ); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers @@ -117,7 +117,7 @@ async fn test_block_filter_empty_filters() { None, Default::default(), ); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers @@ -167,7 +167,7 @@ async fn test_block_filter_invalid_filters_count() { None, Default::default(), ); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers @@ -222,12 +222,12 @@ async fn test_block_filter_start_number_greater_then_proved_number() { None, Default::default(), ); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers }; - let mut protocol = chain.create_filter_protocol(peers); + let mut protocol = chain.create_filter_protocol(Arc::clone(&peers)); let content = packed::BlockFilters::new_builder() .start_number(start_number.pack()) .block_hashes(vec![H256(rand::random()).pack(), H256(rand::random()).pack()].pack()) @@ -237,7 +237,11 @@ async fn test_block_filter_start_number_greater_then_proved_number() { .set(content) .build(); - let peer_index = PeerIndex::new(3); + peers.mock_latest_block_filter_hashes( + peer_index, + 0, + vec![Default::default(); proved_number as usize], + ); protocol .received(nc.context(), peer_index, message.as_bytes()) .await; @@ -274,12 +278,12 @@ async fn test_block_filter_ok_with_blocks_not_matched() { None, Default::default(), ); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers }; - let mut protocol = chain.create_filter_protocol(peers); + let mut protocol = chain.create_filter_protocol(Arc::clone(&peers)); let block_hashes = vec![H256(rand::random()).pack(), H256(rand::random()).pack()]; let blocks_count = block_hashes.len(); let content = packed::BlockFilters::new_builder() @@ -291,6 +295,11 @@ async fn test_block_filter_ok_with_blocks_not_matched() { .set(content) .build(); + peers.mock_latest_block_filter_hashes( + peer_index, + 0, + vec![Default::default(); proved_number as usize], + ); protocol .received(nc.context(), peer_index, message.as_bytes()) .await; @@ -356,7 +365,7 @@ async fn test_block_filter_ok_with_blocks_matched() { let peer_index = PeerIndex::new(3); let (peers, prove_state_block_hash) = { let prove_state_block_hash = header.hash(); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); (peers, prove_state_block_hash) @@ -383,7 +392,12 @@ async fn test_block_filter_ok_with_blocks_matched() { .build() .as_bytes(); - let mut protocol = chain.create_filter_protocol(peers); + let mut protocol = chain.create_filter_protocol(Arc::clone(&peers)); + peers.mock_latest_block_filter_hashes( + peer_index, + 0, + vec![Default::default(); start_number as usize + 2], + ); protocol.received(nc.context(), peer_index, message).await; assert!(nc.not_banned(peer_index)); @@ -399,9 +413,7 @@ async fn test_block_filter_ok_with_blocks_matched() { }; let get_block_filters_message = { let blocks_count = 2; - let limit = proved_number - start_number + 1; - let actual_blocks_count = blocks_count.min(limit); - let new_start_number = start_number - 1 + actual_blocks_count + 1; + let new_start_number = start_number - 1 + blocks_count + 1; let content = packed::GetBlockFilters::new_builder() .start_number(new_start_number.pack()) .build(); @@ -454,13 +466,18 @@ async fn test_block_filter_notify_ask_filters() { None, Default::default(), ); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers }; - let mut protocol = chain.create_filter_protocol(peers); + let mut protocol = chain.create_filter_protocol(Arc::clone(&peers)); + peers.mock_latest_block_filter_hashes( + peer_index, + 0, + vec![Default::default(); min_filtered_block_number as usize + 1], + ); protocol.notify(nc.context(), GET_BLOCK_FILTERS_TOKEN).await; let message = { let start_number: u64 = min_filtered_block_number + 1; @@ -489,7 +506,7 @@ async fn test_block_filter_notify_no_proved_peers() { let peer_index = PeerIndex::new(3); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -527,7 +544,7 @@ async fn test_block_filter_notify_not_reach_ask() { None, Default::default(), ); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers @@ -567,7 +584,7 @@ async fn test_block_filter_notify_proved_number_not_big_enough() { None, Default::default(), ); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers @@ -604,7 +621,7 @@ async fn test_block_filter_notify_recover_matched_blocks() { .client_storage() .update_last_state(&U256::one(), &tip_header.header().data(), &[]); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers @@ -618,8 +635,13 @@ async fn test_block_filter_notify_recover_matched_blocks() { chain .client_storage() .add_matched_blocks(2, 2, matched_blocks); - let mut protocol = chain.create_filter_protocol(peers); + let mut protocol = chain.create_filter_protocol(Arc::clone(&peers)); + peers.mock_latest_block_filter_hashes( + peer_index, + 0, + vec![Default::default(); min_filtered_block_number as usize + 2], + ); protocol.notify(nc.context(), GET_BLOCK_FILTERS_TOKEN).await; let get_blocks_proof_message = { diff --git a/src/tests/protocols/light_client/mod.rs b/src/tests/protocols/light_client/mod.rs index 969ca56..66b9471 100644 --- a/src/tests/protocols/light_client/mod.rs +++ b/src/tests/protocols/light_client/mod.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use ckb_network::{bytes::Bytes, CKBProtocolHandler, PeerIndex, SupportProtocols}; use ckb_types::{ core::{BlockNumber, EpochNumberWithFraction, HeaderBuilder}, @@ -10,9 +8,7 @@ use ckb_types::{ }; use crate::{ - protocols::{ - light_client::constant::GET_IDLE_BLOCKS_TOKEN, PeerState, Peers, BAD_MESSAGE_BAN_TIME, - }, + protocols::{light_client::constant::GET_IDLE_BLOCKS_TOKEN, PeerState, BAD_MESSAGE_BAN_TIME}, tests::{ prelude::*, utils::{MockChain, MockNetworkContext}, @@ -29,7 +25,7 @@ async fn malformed_message() { let chain = MockChain::new_with_dummy_pow("test-light-client"); let nc = MockNetworkContext::new(SupportProtocols::LightClient); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let mut protocol = chain.create_light_client_protocol(peers); let peer_index = PeerIndex::new(3); @@ -46,7 +42,7 @@ async fn malformed_message() { fn build_prove_request_content() { let chain = MockChain::new_with_dummy_pow("test-light-client"); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let protocol = chain.create_light_client_protocol(peers); let storage = chain.client_storage(); @@ -179,7 +175,7 @@ async fn test_light_client_get_idle_matched_blocks() { .update_last_state(&U256::one(), &tip_header.header().data(), &[]); let tip_hash = tip_header.header().hash(); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.mock_prove_state(peer_index, tip_header).unwrap(); peers diff --git a/src/tests/protocols/light_client/send_blocks_proof.rs b/src/tests/protocols/light_client/send_blocks_proof.rs index e7f5297..76de814 100644 --- a/src/tests/protocols/light_client/send_blocks_proof.rs +++ b/src/tests/protocols/light_client/send_blocks_proof.rs @@ -1,12 +1,10 @@ -use std::sync::Arc; - use ckb_network::{CKBProtocolHandler, PeerIndex, SupportProtocols}; use ckb_types::{ core::BlockNumber, h256, packed, prelude::*, utilities::merkle_mountain_range::VerifiableHeader, }; use crate::{ - protocols::{LastState, Peers, ProveRequest, ProveState, StatusCode}, + protocols::{LastState, ProveRequest, ProveState, StatusCode}, tests::{ prelude::*, utils::{MockChain, MockNetworkContext}, @@ -18,7 +16,7 @@ async fn peer_state_is_not_found() { let chain = MockChain::new_with_dummy_pow("test-light-client"); let nc = MockNetworkContext::new(SupportProtocols::LightClient); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let mut protocol = chain.create_light_client_protocol(peers); let data = { @@ -42,7 +40,7 @@ async fn no_matched_request() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -69,7 +67,7 @@ async fn last_state_is_changed() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -162,7 +160,7 @@ async fn unexpected_response() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -268,7 +266,7 @@ async fn get_blocks_with_chunks() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -618,7 +616,7 @@ async fn test_send_blocks_proof(param: TestParameter) { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers diff --git a/src/tests/protocols/light_client/send_last_state.rs b/src/tests/protocols/light_client/send_last_state.rs index 09d167d..a1f3104 100644 --- a/src/tests/protocols/light_client/send_last_state.rs +++ b/src/tests/protocols/light_client/send_last_state.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use ckb_network::{CKBProtocolHandler, PeerIndex, SupportProtocols}; use ckb_types::{ core::{EpochNumberWithFraction, HeaderBuilder}, @@ -9,7 +7,7 @@ use ckb_types::{ }; use crate::{ - protocols::{LastState, Peers, ProveRequest, ProveState, StatusCode}, + protocols::{LastState, ProveRequest, ProveState, StatusCode}, tests::{ prelude::*, utils::{MockChain, MockNetworkContext}, @@ -21,7 +19,7 @@ async fn peer_state_is_not_found() { let chain = MockChain::new_with_dummy_pow("test-light-client"); let nc = MockNetworkContext::new(SupportProtocols::LightClient); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let mut protocol = chain.create_light_client_protocol(peers); let data = { @@ -45,7 +43,7 @@ async fn invalid_nonce() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -72,7 +70,7 @@ async fn invalid_chain_root() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -108,7 +106,7 @@ async fn initialize_last_state() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -169,7 +167,7 @@ async fn update_to_continuous_last_state() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -268,7 +266,7 @@ async fn update_to_noncontinuous_last_state() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -367,7 +365,7 @@ async fn update_to_continuous_but_forked_last_state() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -493,7 +491,7 @@ async fn update_to_proved_last_state() { let peer_index = PeerIndex::new(1); let peer_index_proved = PeerIndex::new(2); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.add_peer(peer_index_proved); peers diff --git a/src/tests/protocols/light_client/send_last_state_proof.rs b/src/tests/protocols/light_client/send_last_state_proof.rs index 15045e3..d0f90a8 100644 --- a/src/tests/protocols/light_client/send_last_state_proof.rs +++ b/src/tests/protocols/light_client/send_last_state_proof.rs @@ -8,7 +8,7 @@ use ckb_types::{ use log::debug; use crate::{ - protocols::{light_client::prelude::*, LastState, Peers, ProveRequest, ProveState, StatusCode}, + protocols::{light_client::prelude::*, LastState, ProveRequest, ProveState, StatusCode}, tests::{ prelude::*, utils::{setup, MockChain, MockNetworkContext}, @@ -34,7 +34,7 @@ async fn peer_state_is_not_found() { let chain = MockChain::new_with_dummy_pow("test-light-client"); let nc = MockNetworkContext::new(SupportProtocols::LightClient); - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let mut protocol = chain.create_light_client_protocol(peers); let data = { @@ -58,7 +58,7 @@ async fn no_matched_request() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -90,7 +90,7 @@ async fn update_last_state() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -174,7 +174,7 @@ async fn unknown_proof() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -247,7 +247,7 @@ async fn headers_should_be_sorted() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -333,7 +333,7 @@ async fn valid_proof_with_boundary_not_in_last_n() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -428,7 +428,7 @@ async fn valid_proof_with_boundary_in_last_n() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -523,7 +523,7 @@ async fn valid_proof_with_no_matched_sample() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -652,7 +652,7 @@ async fn valid_proof_with_prove_state() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -782,7 +782,7 @@ async fn valid_proof_with_reorg_blocks() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -925,7 +925,7 @@ async fn test_parent_chain_root_for_the_genesis_block(should_passed: bool) { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -1034,7 +1034,7 @@ async fn invalid_parent_chain_root_for_non_genesis_blocks() { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -1331,7 +1331,7 @@ async fn test_send_last_state_proof(param: TestParameter) { let peer_index = PeerIndex::new(1); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers @@ -1690,7 +1690,7 @@ async fn test_with_reorg_blocks(param: ReorgTestParameter) { let peer_index = PeerIndex::new(1); let downloading_matched_block = H256(rand::random()); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); peers.request_last_state(peer_index).unwrap(); peers diff --git a/src/tests/protocols/light_client/send_transactions_proof.rs b/src/tests/protocols/light_client/send_transactions_proof.rs index f45bbef..eb310a1 100644 --- a/src/tests/protocols/light_client/send_transactions_proof.rs +++ b/src/tests/protocols/light_client/send_transactions_proof.rs @@ -10,7 +10,7 @@ use ckb_types::{ }; use crate::{ - protocols::{light_client::constant::FETCH_HEADER_TX_TOKEN, FetchInfo, Peers, StatusCode}, + protocols::{light_client::constant::FETCH_HEADER_TX_TOKEN, FetchInfo, StatusCode}, tests::{ prelude::*, utils::{MockChain, MockNetworkContext}, @@ -109,7 +109,7 @@ async fn test_send_txs_proof_ok() { }; let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let txs_proof_request = packed::GetTransactionsProof::new_builder() .last_hash(last_header.header().calc_header_hash()) .tx_hashes( @@ -246,7 +246,7 @@ async fn test_send_txs_proof_invalid_mmr_proof() { }; let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let txs_proof_request = packed::GetTransactionsProof::new_builder() .last_hash(last_header.header().calc_header_hash()) .tx_hashes(tx_hashes.clone().pack()) @@ -376,7 +376,7 @@ async fn test_send_txs_proof_invalid_merkle_proof() { }; let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let txs_proof_request = packed::GetTransactionsProof::new_builder() .last_hash(last_header.header().calc_header_hash()) .tx_hashes(tx_hashes.clone().pack()) @@ -431,7 +431,7 @@ async fn test_send_txs_proof_is_empty() { }; let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); let txs_proof_request = packed::GetTransactionsProof::new_builder() .last_hash(last_header.header().calc_header_hash()) .build(); @@ -459,7 +459,7 @@ async fn test_send_headers_txs_request() { let peer_index = PeerIndex::new(3); let peers = { - let peers = Arc::new(Peers::new(Default::default())); + let peers = chain.create_peers(); peers.fetching_headers().insert( h256!("0xaa22").pack(), FetchInfo::new(111, 3344, false, false), diff --git a/src/tests/protocols/synchronizer.rs b/src/tests/protocols/synchronizer.rs index cf0e8b8..04b0cc8 100644 --- a/src/tests/protocols/synchronizer.rs +++ b/src/tests/protocols/synchronizer.rs @@ -8,7 +8,6 @@ use ckb_types::{ }; use crate::{ - protocols::Peers, storage::{ScriptStatus, ScriptType}, tests::{ prelude::*, @@ -48,7 +47,7 @@ async fn test_sync_add_block() { ); let peer_index = PeerIndex::new(3); let peers = { - let peers = Arc::new(Peers::default()); + let peers = chain.create_peers(); peers.add_peer(peer_index); { let mut matched_blocks = peers.matched_blocks().write().unwrap(); diff --git a/src/tests/service.rs b/src/tests/service.rs index 13b66d3..f8d9e4a 100644 --- a/src/tests/service.rs +++ b/src/tests/service.rs @@ -14,23 +14,20 @@ use ckb_types::{ }; use crate::{ - protocols::{FetchInfo, Peers, PendingTxs}, + protocols::{FetchInfo, PendingTxs}, service::{ BlockFilterRpc, BlockFilterRpcImpl, ChainRpc, ChainRpcImpl, FetchStatus, Order, ScriptStatus, ScriptType, SearchKey, SearchKeyFilter, SetScriptsCommand, Status, TransactionRpc, TransactionRpcImpl, TransactionWithStatus, TxStatus, }, storage::{self, StorageWithChainData}, - tests::utils::new_storage, + tests::utils::{create_peers, new_storage}, }; #[test] fn rpc() { let storage = new_storage("rpc"); - let swc = StorageWithChainData::new( - storage.clone(), - Arc::new(Peers::new(RwLock::new(Vec::new()))), - ); + let swc = StorageWithChainData::new(storage.clone(), create_peers()); let rpc = BlockFilterRpcImpl { swc }; // setup test data @@ -735,7 +732,12 @@ fn rpc() { .collect(); // insert fetched headers - let peers = Arc::new(Peers::new(RwLock::new(vec![extra_header.clone()]))); + let peers = create_peers(); + peers + .last_headers() + .write() + .unwrap() + .push(extra_header.clone()); peers.fetching_headers().insert( h256!("0xaa22").pack(), FetchInfo::new(1111, 3344, false, false), @@ -822,10 +824,7 @@ fn rpc() { "rollback should update script filter block number" ); - let swc = StorageWithChainData::new( - storage.clone(), - Arc::new(Peers::new(RwLock::new(Vec::new()))), - ); + let swc = StorageWithChainData::new(storage.clone(), create_peers()); let rpc = BlockFilterRpcImpl { swc }; // test get_cells rpc after rollback @@ -1009,10 +1008,7 @@ fn rpc() { #[test] fn get_cells_capacity_bug() { let storage = new_storage("get_cells_capacity_bug"); - let swc = StorageWithChainData::new( - storage.clone(), - Arc::new(Peers::new(RwLock::new(Vec::new()))), - ); + let swc = StorageWithChainData::new(storage.clone(), create_peers()); let rpc = BlockFilterRpcImpl { swc }; // setup test data @@ -1137,10 +1133,7 @@ fn get_cells_capacity_bug() { #[test] fn get_cells_after_rollback_bug() { let storage = new_storage("get_cells_after_rollback_bug"); - let swc = StorageWithChainData::new( - storage.clone(), - Arc::new(Peers::new(RwLock::new(Vec::new()))), - ); + let swc = StorageWithChainData::new(storage.clone(), create_peers()); let rpc = BlockFilterRpcImpl { swc }; // setup test data @@ -1333,7 +1326,7 @@ fn get_cells_after_rollback_bug() { #[test] fn test_set_scripts_clear_matched_blocks() { let storage = new_storage("set-scripts-clear-matched-blocks"); - let peers = Arc::new(Peers::new(RwLock::new(Vec::new()))); + let peers = create_peers(); let swc = StorageWithChainData::new(storage.clone(), Arc::clone(&peers)); let rpc = BlockFilterRpcImpl { swc }; @@ -1383,7 +1376,7 @@ fn test_set_scripts_clear_matched_blocks() { #[test] fn test_set_scripts_command() { let storage = new_storage("set-scripts-command"); - let peers = Arc::new(Peers::new(RwLock::new(Vec::new()))); + let peers = create_peers(); let swc = StorageWithChainData::new(storage.clone(), Arc::clone(&peers)); let rpc = BlockFilterRpcImpl { swc }; diff --git a/src/tests/utils/mod.rs b/src/tests/utils/mod.rs index 27564d3..4caac6b 100644 --- a/src/tests/utils/mod.rs +++ b/src/tests/utils/mod.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use env_logger::{Builder, Target}; use log::LevelFilter; @@ -7,7 +9,7 @@ mod network_context; pub(crate) use chain::MockChain; pub(crate) use network_context::MockNetworkContext; -use crate::storage::Storage; +use crate::{protocols::Peers, protocols::CHECK_POINT_INTERVAL, storage::Storage}; pub(crate) fn setup() { let _ = Builder::new() @@ -23,3 +25,13 @@ pub(crate) fn new_storage(prefix: &str) -> Storage { let tmp_dir = tempfile::Builder::new().prefix(prefix).tempdir().unwrap(); Storage::new(tmp_dir.path().to_str().unwrap()) } + +pub(crate) fn create_peers() -> Arc { + let max_outbound_peers = 1; + let peers = Peers::new( + max_outbound_peers, + CHECK_POINT_INTERVAL, + (0, Default::default()), + ); + Arc::new(peers) +} diff --git a/src/tests/verify.rs b/src/tests/verify.rs index f8a1998..4dcebf8 100644 --- a/src/tests/verify.rs +++ b/src/tests/verify.rs @@ -29,7 +29,7 @@ fn verify_valid_transaction() { // https://pudge.explorer.nervos.org/transaction/0xf34f4eaac4a662927fb52d4cb608e603150b9e0678a0f5ed941e3cfd5b68fb30 let transaction: packed::Transaction = serde_json::from_str::(r#"{"cell_deps":[{"dep_type":"dep_group","out_point":{"index":"0x0","tx_hash":"0xf8de3bb47d055cdf460d93a2a6e1b05f7432f9777c8c474abf4eec1d4aee5d37"}}],"header_deps":[],"inputs":[{"previous_output":{"index":"0x7","tx_hash":"0x8f8c79eb6671709633fe6a46de93c0fedc9c1b8a6527a18d3983879542635c9f"},"since":"0x0"}],"outputs":[{"capacity":"0x470de4df820000","lock":{"args":"0xff5094c2c5f476fc38510018609a3fd921dd28ad","code_hash":"0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8","hash_type":"type"},"type":null},{"capacity":"0xb61134e5a35e800","lock":{"args":"0x64257f00b6b63e987609fa9be2d0c86d351020fb","code_hash":"0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8","hash_type":"type"},"type":null}],"outputs_data":["0x","0x"],"version":"0x0","witnesses":["0x5500000010000000550000005500000041000000af34b54bebf8c5971da6a880f2df5a186c3f8d0b5c9a1fe1a90c95b8a4fb89ef3bab1ccec13797dcb3fee80400f953227dd7741227e08032e3598e16ccdaa49c00"]}"#).unwrap().into(); - let swc = StorageWithChainData::new(storage.to_owned(), Default::default()); + let swc = StorageWithChainData::new(storage.to_owned(), chain.create_peers()); let result = verify_tx(transaction.into_view(), &swc, &consensus).unwrap(); assert_eq!(1682789, result); } @@ -39,7 +39,7 @@ fn non_contextual_transaction_verifier() { let chain = MockChain::new_with_default_pow("non_contextual_transaction_verifier"); let storage = chain.client_storage(); let consensus = chain.consensus(); - let swc = StorageWithChainData::new(storage.to_owned(), Default::default()); + let swc = StorageWithChainData::new(storage.to_owned(), chain.create_peers()); // duplicate cell deps base on a valid transaction // https://pudge.explorer.nervos.org/transaction/0xf34f4eaac4a662927fb52d4cb608e603150b9e0678a0f5ed941e3cfd5b68fb30