diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 9f4b35b5de50..a81e76dc16ff 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -10,7 +10,6 @@ use super::peer_manager::PeerManager; use super::{Error, SyncNetworkContext}; use address::Address; use amt::Amt; -use async_std::prelude::*; use async_std::sync::{channel, Receiver, Sender}; use async_std::task; use blocks::{Block, FullTipset, Tipset, TipsetKeys, TxMeta}; @@ -22,6 +21,7 @@ use encoding::{Cbor, Error as EncodingError}; use forest_libp2p::{ hello::HelloMessage, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES, }; +use futures::stream::{FuturesUnordered, StreamExt}; use ipld_blockstore::BlockStore; use libp2p::core::PeerId; use log::{debug, info, warn}; @@ -96,7 +96,7 @@ struct MsgMetaData { impl ChainSyncer where - DB: BlockStore, + DB: BlockStore + Sync + Send + 'static, { pub fn new( chain_store: ChainStore, @@ -129,12 +129,7 @@ where next_sync_target: SyncBucket::default(), }) } -} -impl ChainSyncer -where - DB: BlockStore, -{ pub async fn start(mut self) -> Result<(), Error> { self.net_handler.spawn(Arc::clone(&self.peer_manager)); @@ -316,7 +311,8 @@ where } // validate message root from header matches message root - let sm_root = self.compute_msg_data(&bls_msgs, &secp_msgs)?; + let sm_root = + Self::compute_msg_data(self.chain_store.blockstore(), &bls_msgs, &secp_msgs)?; if header.messages() != &sm_root { return Err(Error::InvalidRoots); } @@ -436,7 +432,11 @@ where /// Validates message root from header matches message root generated from the /// bls and secp messages contained in the passed in block and stores them in a key-value store fn validate_msg_data(&self, block: &Block) -> Result<(), Error> { - let sm_root = self.compute_msg_data(block.bls_msgs(), block.secp_msgs())?; + let sm_root = Self::compute_msg_data( + self.chain_store.blockstore(), + block.bls_msgs(), + block.secp_msgs(), + )?; if block.header().messages() != &sm_root { return Err(Error::InvalidRoots); } @@ -448,7 +448,7 @@ where } /// Returns message root CID from bls and secp message contained in the param Block fn compute_msg_data( - &self, + blockstore: &DB, bls_msgs: &[UnsignedMessage], secp_msgs: &[SignedMessage], ) -> Result { @@ -456,8 +456,8 @@ where let bls_cids = cids_from_messages(bls_msgs)?; let secp_cids = cids_from_messages(secp_msgs)?; // generate Amt and batch set message values - let bls_root = Amt::new_from_slice(self.chain_store.blockstore(), &bls_cids)?; - let secp_root = Amt::new_from_slice(self.chain_store.blockstore(), &secp_cids)?; + let bls_root = Amt::new_from_slice(blockstore, &bls_cids)?; + let secp_root = Amt::new_from_slice(blockstore, &secp_cids)?; let meta = TxMeta { bls_message_root: bls_root, @@ -465,9 +465,7 @@ where }; // TODO this should be memoryDB for temp storage // store message roots and receive meta_root - let meta_root = self - .chain_store - .blockstore() + let meta_root = blockstore .put(&meta, Blake2b256) .map_err(|e| Error::Other(e.to_string()))?; @@ -510,16 +508,17 @@ where Ok(fts) } // Block message validation checks - fn check_block_msgs(&self, block: Block, tip: &Tipset) -> Result<(), Error> { + fn check_block_msgs(db: Arc, block: Block, tip: &Tipset) -> Result<(), Error> { + //do the initial loop here + // Check Block Message and Signatures in them let mut pub_keys = Vec::new(); let mut cids = Vec::new(); for m in block.bls_msgs() { - let pk = self - .state_manager - .get_bls_public_key(m.from(), tip.parent_state())?; + let pk = StateManager::get_bls_public_key(&db, m.from(), tip.parent_state())?; pub_keys.push(pk); cids.push(m.cid()?.to_bytes()); } + if let Some(sig) = block.header().bls_aggregate() { if !verify_bls_aggregate( cids.iter() @@ -593,7 +592,7 @@ where let mut msg_meta_data: HashMap = HashMap::default(); // TODO retrieve tipset state and load state tree // temporary - let tree = StateTree::new(self.chain_store.db.as_ref()); + let tree = StateTree::new(db.as_ref()); // loop through bls messages and check msg validity for m in block.bls_msgs() { check_msg(m, &mut msg_meta_data, &tree)?; @@ -607,7 +606,7 @@ where .map_err(|e| Error::Validation(format!("Message signature invalid: {}", e)))?; } // validate message root from header matches message root - let sm_root = self.compute_msg_data(block.bls_msgs(), block.secp_msgs())?; + let sm_root = Self::compute_msg_data(db.as_ref(), block.bls_msgs(), block.secp_msgs())?; if block.header().messages() != &sm_root { return Err(Error::InvalidRoots); } @@ -617,48 +616,89 @@ where /// Validates block semantically according to https://github.com/filecoin-project/specs/blob/6ab401c0b92efb6420c6e198ec387cf56dc86057/validation.md async fn validate(&self, block: &Block) -> Result<(), Error> { + let mut error_vec: Vec = Vec::new(); + let mut validations = FuturesUnordered::new(); + let header = block.header(); // check if block has been signed if header.signature().is_none() { - return Err(Error::Validation("Signature is nil in header".to_owned())); + error_vec.push("Signature is nil in header".to_owned()); } let parent_tipset = self.chain_store.tipset_from_keys(header.parents())?; // time stamp checks - header.validate_timestamps(&parent_tipset)?; + if let Err(err) = header.validate_timestamps(&parent_tipset) { + error_vec.push(err.to_string()); + } + + let b = block.clone(); + let db = Arc::clone(&self.chain_store.db); + let parent_clone = parent_tipset.clone(); // check messages to ensure valid state transitions - self.check_block_msgs(block.clone(), &parent_tipset)?; + let x = task::spawn_blocking(move || Self::check_block_msgs(db, b, &parent_clone)); + validations.push(x); // TODO use computed state_root instead of parent_tipset.parent_state() - let work_addr = self - .state_manager - .get_miner_work_addr(&parent_tipset.parent_state(), header.miner_address())?; + // block signature check - header.check_block_signature(&work_addr)?; + let work_addr_result = self + .state_manager + .get_miner_work_addr(&parent_tipset.parent_state(), header.miner_address()); + match work_addr_result { + Ok(work_addr) => { + let temp_header = header.clone(); + let block_sig_task = task::spawn_blocking(move || { + temp_header + .check_block_signature(&work_addr) + .map_err(Error::Blockchain) + }); + validations.push(block_sig_task) + } + Err(err) => error_vec.push(err.to_string()), + } let slash = self .state_manager - .is_miner_slashed(header.miner_address(), &parent_tipset.parent_state())?; + .is_miner_slashed(header.miner_address(), &parent_tipset.parent_state()) + .unwrap_or_else(|err| { + error_vec.push(err.to_string()); + false + }); if slash { - return Err(Error::Validation( - "Received block was from slashed or invalid miner".to_owned(), - )); + error_vec.push("Received block was from slashed or invalid miner".to_owned()) } - let (c_pow, net_pow) = self + let power_result = self .state_manager - .get_power(&parent_tipset.parent_state(), header.miner_address())?; + .get_power(&parent_tipset.parent_state(), header.miner_address()); // ticket winner check - if !header.is_ticket_winner(c_pow, net_pow) { - return Err(Error::Validation( - "Miner created a block but was not a winner".to_owned(), - )); + match power_result { + Ok(pow_tuple) => { + let (c_pow, net_pow) = pow_tuple; + if !header.is_ticket_winner(c_pow, net_pow) { + error_vec.push("Miner created a block but was not a winner".to_owned()) + } + } + Err(err) => error_vec.push(err.to_string()), } + // TODO verify_ticket_vrf + // collect the errors from the async validations + while let Some(result) = validations.next().await { + if result.is_err() { + error_vec.push(result.err().unwrap().to_string()); + } + } + // combine vec of error strings and return Validation error with this resultant string + if !error_vec.is_empty() { + let error_string = error_vec.join(", "); + return Err(Error::Validation(error_string)); + } + Ok(()) } /// validates tipsets and adds header data to tipset tracker @@ -947,7 +987,8 @@ mod tests { Cid::from_raw_cid("bafy2bzacecujyfvb74s7xxnlajidxpgcpk6abyatk62dlhgq6gcob3iixhgom") .unwrap(); - let root = cs.compute_msg_data(&[bls], &[secp]).unwrap(); + let root = + ChainSyncer::compute_msg_data(cs.chain_store.blockstore(), &[bls], &[secp]).unwrap(); assert_eq!(root, expected_root); } } diff --git a/blockchain/state_manager/src/lib.rs b/blockchain/state_manager/src/lib.rs index cc8ffac4eaa8..df8d47643ddd 100644 --- a/blockchain/state_manager/src/lib.rs +++ b/blockchain/state_manager/src/lib.rs @@ -118,9 +118,13 @@ where } /// Returns a bls public key from provided address - pub fn get_bls_public_key(&self, addr: &Address, state_cid: &Cid) -> Result, Error> { - let state = StateTree::new_from_root(self.bs.as_ref(), state_cid).map_err(Error::State)?; - let kaddr = resolve_to_key_addr(&state, self.bs.as_ref(), addr) + pub fn get_bls_public_key( + db: &Arc, + addr: &Address, + state_cid: &Cid, + ) -> Result, Error> { + let state = StateTree::new_from_root(db.as_ref(), state_cid).map_err(Error::State)?; + let kaddr = resolve_to_key_addr(&state, db.as_ref(), addr) .map_err(|e| format!("Failed to resolve key address, error: {}", e))?; if kaddr.protocol() != Protocol::BLS { return Err("Address must be BLS address to load bls public key"