From 7d9f90e5b3ca940b3854f6661b38a84d80f7cc9e Mon Sep 17 00:00:00 2001 From: green Date: Tue, 24 Jan 2023 12:53:56 +0000 Subject: [PATCH 1/2] Fixed some [nit]s after review the PR for `fuel-core-sync` integration --- crates/fuel-core/src/executor.rs | 7 ++-- .../fuel-core/src/query/message_proof/test.rs | 4 +- crates/fuel-core/src/service/adapters.rs | 5 +-- .../src/service/adapters/block_importer.rs | 35 ++++------------- .../service/adapters/consensus_module/poa.rs | 20 +++++++++- .../src/service/adapters/executor.rs | 39 +++++++++++++++++++ .../src/service/adapters/producer.rs | 13 +------ crates/fuel-core/src/service/sub_services.rs | 3 +- .../consensus_module/poa/src/verifier.rs | 31 ++++++--------- .../consensus_module/src/block_verifier.rs | 11 +++++- crates/services/p2p/src/codecs/bincode.rs | 6 +-- crates/services/p2p/src/peer_manager.rs | 5 ++- crates/services/sync/src/service.rs | 4 +- crates/services/sync/src/service/tests.rs | 2 +- crates/types/src/blockchain/block.rs | 27 ++++--------- crates/types/src/blockchain/header.rs | 13 +++++-- tests/tests/sync.rs | 3 +- 17 files changed, 125 insertions(+), 103 deletions(-) create mode 100644 crates/fuel-core/src/service/adapters/executor.rs diff --git a/crates/fuel-core/src/executor.rs b/crates/fuel-core/src/executor.rs index e9526bd2e97..6aa4310d6cf 100644 --- a/crates/fuel-core/src/executor.rs +++ b/crates/fuel-core/src/executor.rs @@ -8,7 +8,6 @@ use crate::{ service::Config, }; use fuel_core_executor::refs::ContractRef; -use fuel_core_producer::ports::Executor as ExecutorTrait; use fuel_core_storage::{ tables::{ Coins, @@ -138,15 +137,15 @@ impl Executor { } } -impl ExecutorTrait for Executor { - fn execute_without_commit( +impl Executor { + pub fn execute_without_commit( &self, block: ExecutionBlock, ) -> ExecutorResult>> { self.execute_inner(block, &self.database) } - fn dry_run( + pub fn dry_run( &self, block: ExecutionBlock, utxo_validation: Option, diff --git a/crates/fuel-core/src/query/message_proof/test.rs b/crates/fuel-core/src/query/message_proof/test.rs index 02768b34be1..11370ab803d 100644 --- a/crates/fuel-core/src/query/message_proof/test.rs +++ b/crates/fuel-core/src/query/message_proof/test.rs @@ -162,7 +162,7 @@ async fn can_build_message_proof() { let header = header.clone(); let message_ids = message_ids.clone(); move |_| { - let header = header.clone().generate(&[vec![]], &message_ids); + let header = header.clone().generate(&[], &message_ids); let transactions = TXNS.to_vec(); Ok(CompressedBlock::test(header, transactions)) } @@ -173,6 +173,6 @@ async fn can_build_message_proof() { .unwrap(); assert_eq!(p.message_id(), message_id); assert_eq!(p.signature, Signature::default()); - let header = header.generate(&[vec![]], &message_ids); + let header = header.generate(&[], &message_ids); assert_eq!(p.header.output_messages_root, header.output_messages_root); } diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index f59def14b5c..01ce86d496e 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -8,14 +8,14 @@ use std::sync::Arc; pub mod block_importer; pub mod consensus_module; +pub mod executor; pub mod graphql_api; #[cfg(feature = "p2p")] pub mod p2p; pub mod producer; -pub mod txpool; - #[cfg(feature = "p2p")] pub mod sync; +pub mod txpool; #[derive(Clone)] pub struct PoAAdapter { @@ -60,7 +60,6 @@ pub struct BlockProducerAdapter { pub struct BlockImporterAdapter { pub block_importer: Arc>, - execution_semaphore: Arc, } #[cfg(feature = "p2p")] diff --git a/crates/fuel-core/src/service/adapters/block_importer.rs b/crates/fuel-core/src/service/adapters/block_importer.rs index 3b133e39891..e2e95d5d4fd 100644 --- a/crates/fuel-core/src/service/adapters/block_importer.rs +++ b/crates/fuel-core/src/service/adapters/block_importer.rs @@ -36,9 +36,10 @@ use fuel_core_types::{ SealedBlock, }, fuel_tx::Bytes32, - services::{ - block_importer::UncommittedResult, - executor::ExecutionBlock, + services::executor::{ + ExecutionBlock, + Result as ExecutorResult, + UncommittedResult as UncommittedExecutionResult, }, }; use std::sync::Arc; @@ -52,7 +53,6 @@ impl BlockImporterAdapter { ) -> Self { Self { block_importer: Arc::new(Importer::new(config, database, executor, verifier)), - execution_semaphore: Arc::new(tokio::sync::Semaphore::new(1)), } } @@ -60,30 +60,15 @@ impl BlockImporterAdapter { &self, sealed_block: SealedBlock, ) -> anyhow::Result<()> { - let permit = self.execution_semaphore.acquire().await?; tokio::task::spawn_blocking({ let importer = self.block_importer.clone(); move || importer.execute_and_commit(sealed_block) }) .await??; - core::mem::drop(permit); Ok(()) } } -impl fuel_core_poa::ports::BlockImporter for BlockImporterAdapter { - type Database = Database; - - fn commit_result( - &self, - result: UncommittedResult>, - ) -> anyhow::Result<()> { - self.block_importer - .commit_result(result) - .map_err(Into::into) - } -} - impl BlockVerifier for VerifierAdapter { fn verify_block_fields( &self, @@ -126,14 +111,8 @@ impl Executor for ExecutorAdapter { fn execute_without_commit( &self, block: ExecutionBlock, - ) -> Result< - fuel_core_types::services::executor::UncommittedResult< - StorageTransaction, - >, - fuel_core_types::services::executor::Error, - > { - fuel_core_producer::ports::Executor::::execute_without_commit( - self, block, - ) + ) -> ExecutorResult>> + { + self._execute_without_commit(block) } } diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index 32fbf3116b4..000a2f879b4 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -2,13 +2,17 @@ use crate::{ database::Database, fuel_core_graphql_api::ports::ConsensusModulePort, service::adapters::{ + BlockImporterAdapter, BlockProducerAdapter, PoAAdapter, TxPoolAdapter, }, }; use fuel_core_poa::{ - ports::TransactionPool, + ports::{ + BlockImporter, + TransactionPool, + }, service::SharedState, }; use fuel_core_services::stream::BoxStream; @@ -22,6 +26,7 @@ use fuel_core_types::{ TxId, }, services::{ + block_importer::UncommittedResult as UncommittedImporterResult, executor::UncommittedResult, txpool::{ ArcPoolTx, @@ -98,3 +103,16 @@ impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter { .await } } + +impl BlockImporter for BlockImporterAdapter { + type Database = Database; + + fn commit_result( + &self, + result: UncommittedImporterResult>, + ) -> anyhow::Result<()> { + self.block_importer + .commit_result(result) + .map_err(Into::into) + } +} diff --git a/crates/fuel-core/src/service/adapters/executor.rs b/crates/fuel-core/src/service/adapters/executor.rs new file mode 100644 index 00000000000..c9f3a55a7c0 --- /dev/null +++ b/crates/fuel-core/src/service/adapters/executor.rs @@ -0,0 +1,39 @@ +use crate::{ + database::Database, + executor::Executor, + service::adapters::ExecutorAdapter, +}; +use fuel_core_storage::transactional::StorageTransaction; +use fuel_core_types::{ + fuel_tx::Receipt, + services::executor::{ + ExecutionBlock, + Result as ExecutorResult, + UncommittedResult, + }, +}; + +impl ExecutorAdapter { + pub(crate) fn _execute_without_commit( + &self, + block: ExecutionBlock, + ) -> ExecutorResult>> { + let executor = Executor { + database: self.database.clone(), + config: self.config.clone(), + }; + executor.execute_without_commit(block) + } + + pub(crate) fn _dry_run( + &self, + block: ExecutionBlock, + utxo_validation: Option, + ) -> ExecutorResult>> { + let executor = Executor { + database: self.database.clone(), + config: self.config.clone(), + }; + executor.dry_run(block, utxo_validation) + } +} diff --git a/crates/fuel-core/src/service/adapters/producer.rs b/crates/fuel-core/src/service/adapters/producer.rs index 21b3168dc70..7e96ce44536 100644 --- a/crates/fuel-core/src/service/adapters/producer.rs +++ b/crates/fuel-core/src/service/adapters/producer.rs @@ -1,6 +1,5 @@ use crate::{ database::Database, - executor::Executor, service::adapters::{ BlockProducerAdapter, ExecutorAdapter, @@ -66,11 +65,7 @@ impl fuel_core_producer::ports::Executor for ExecutorAdapter { &self, block: ExecutionBlock, ) -> ExecutorResult>> { - let executor = Executor { - database: self.database.clone(), - config: self.config.clone(), - }; - executor.execute_without_commit(block) + self._execute_without_commit(block) } fn dry_run( @@ -78,11 +73,7 @@ impl fuel_core_producer::ports::Executor for ExecutorAdapter { block: ExecutionBlock, utxo_validation: Option, ) -> ExecutorResult>> { - let executor = Executor { - database: self.database.clone(), - config: self.config.clone(), - }; - executor.dry_run(block, utxo_validation) + self._dry_run(block, utxo_validation) } } diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index e991cb3d761..611945f6764 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -123,9 +123,8 @@ pub fn init_sub_services( #[cfg(feature = "p2p")] let sync = { - let current_fuel_block_height = Some(database.latest_height()?); fuel_core_sync::service::new_service( - current_fuel_block_height, + last_height, p2p_adapter, importer_adapter.clone(), verifier, diff --git a/crates/services/consensus_module/poa/src/verifier.rs b/crates/services/consensus_module/poa/src/verifier.rs index ddd3a181942..411cde8f271 100644 --- a/crates/services/consensus_module/poa/src/verifier.rs +++ b/crates/services/consensus_module/poa/src/verifier.rs @@ -4,9 +4,9 @@ use fuel_core_storage::Result as StorageResult; use fuel_core_types::{ blockchain::{ block::Block, + consensus::poa::PoAConsensus, header::BlockHeader, primitives::BlockHeight, - SealedBlockHeader, }, fuel_tx::Input, fuel_types::Bytes32, @@ -31,28 +31,21 @@ pub trait Database { fn block_header_merkle_root(&self, height: &BlockHeight) -> StorageResult; } +// TODO: Make this function `async` and await the synchronization with the relayer. pub fn verify_consensus( consensus_config: &ConsensusConfig, - header: &SealedBlockHeader, + header: &BlockHeader, + consensus: &PoAConsensus, ) -> bool { - let SealedBlockHeader { - entity: header, - consensus, - } = header; - match consensus { - fuel_core_types::blockchain::consensus::Consensus::PoA(consensus) => { - match consensus_config { - ConsensusConfig::PoA { signing_key } => { - let id = header.id(); - let m = id.as_message(); - consensus - .signature - .recover(m) - .map_or(false, |k| Input::owner(&k) == *signing_key) - } - } + match consensus_config { + ConsensusConfig::PoA { signing_key } => { + let id = header.id(); + let m = id.as_message(); + consensus + .signature + .recover(m) + .map_or(false, |k| Input::owner(&k) == *signing_key) } - _ => true, } } diff --git a/crates/services/consensus_module/src/block_verifier.rs b/crates/services/consensus_module/src/block_verifier.rs index 0c2d0906899..c79f0b4085b 100644 --- a/crates/services/consensus_module/src/block_verifier.rs +++ b/crates/services/consensus_module/src/block_verifier.rs @@ -74,7 +74,16 @@ where /// Verifies the consensus of the block header. pub fn verify_consensus(&self, header: &SealedBlockHeader) -> bool { - verify_consensus(&self.config.chain_config.consensus, header) + let SealedBlockHeader { + entity: header, + consensus, + } = header; + match consensus { + Consensus::Genesis(_) => true, + Consensus::PoA(consensus) => { + verify_consensus(&self.config.chain_config.consensus, header, consensus) + } + } } } diff --git a/crates/services/p2p/src/codecs/bincode.rs b/crates/services/p2p/src/codecs/bincode.rs index e1bc0f86e1b..400b187eb6e 100644 --- a/crates/services/p2p/src/codecs/bincode.rs +++ b/crates/services/p2p/src/codecs/bincode.rs @@ -232,7 +232,7 @@ impl RequestResponseConverter for BincodeCodec { match res_msg { OutboundResponse::Block(sealed_block) => { let response = if let Some(sealed_block) = sealed_block { - Some(self.serialize(&**sealed_block)?) + Some(self.serialize(sealed_block.as_ref())?) } else { None }; @@ -241,7 +241,7 @@ impl RequestResponseConverter for BincodeCodec { } OutboundResponse::SealedHeader(sealed_header) => { let response = if let Some(sealed_header) = sealed_header { - Some(self.serialize(&**sealed_header)?) + Some(self.serialize(sealed_header.as_ref())?) } else { None }; @@ -250,7 +250,7 @@ impl RequestResponseConverter for BincodeCodec { } OutboundResponse::Transactions(transactions) => { let response = if let Some(transactions) = transactions { - Some(self.serialize(&**transactions)?) + Some(self.serialize(transactions.as_ref())?) } else { None }; diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index b9142937930..e2947ee18c6 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -672,11 +672,14 @@ impl PeerManager { /// Find a peer that is holding the given block height. pub fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option { let mut range = rand::thread_rng(); + // TODO: Optimize the selection of the peer. + // We can store pair `(peer id, height)` for all nodes(reserved and not) in the + // https://docs.rs/sorted-vec/latest/sorted_vec/struct.SortedVec.html self.non_reserved_connected_peers .iter() .chain(self.reserved_connected_peers.iter()) .filter(|(_, peer_info)| { - peer_info.heartbeat_data.block_height == Some(*height) + peer_info.heartbeat_data.block_height >= Some(*height) }) .map(|(peer_id, _)| *peer_id) .choose(&mut range) diff --git a/crates/services/sync/src/service.rs b/crates/services/sync/src/service.rs index 88e80739ea3..083d357282d 100644 --- a/crates/services/sync/src/service.rs +++ b/crates/services/sync/src/service.rs @@ -37,7 +37,7 @@ mod tests; /// Creates an instance of runnable sync service. pub fn new_service( - current_fuel_block_height: Option, + current_fuel_block_height: BlockHeight, p2p: P, executor: E, consensus: C, @@ -49,7 +49,7 @@ where C: ports::ConsensusPort + Send + Sync + 'static, { let height_stream = p2p.height_stream(); - let state = State::new(current_fuel_block_height.map(Into::into), None); + let state = State::new(Some(current_fuel_block_height.into()), None); Ok(ServiceRunner::new(SyncTask::new( height_stream, state, diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs index aa7e7d9433f..db0dddf98c1 100644 --- a/crates/services/sync/src/service/tests.rs +++ b/crates/services/sync/src/service/tests.rs @@ -51,7 +51,7 @@ async fn test_new_service() { max_get_header_requests: 10, max_get_txns_requests: 10, }; - let s = new_service(Some(4u32.into()), p2p, executor, consensus, params).unwrap(); + let s = new_service(4u32.into(), p2p, executor, consensus, params).unwrap(); assert_eq!( s.start_and_await().await.unwrap(), diff --git a/crates/types/src/blockchain/block.rs b/crates/types/src/blockchain/block.rs index 95d38e10cc0..5d84049a618 100644 --- a/crates/types/src/blockchain/block.rs +++ b/crates/types/src/blockchain/block.rs @@ -19,10 +19,7 @@ use crate::{ TxId, UniqueIdentifier, }, - fuel_types::{ - bytes::SerializableVec, - MessageId, - }, + fuel_types::MessageId, }; /// Fuel block with all transaction data included @@ -67,15 +64,11 @@ impl Block { /// the ids from the receipts of messages outputs. pub fn new( header: PartialBlockHeader, - mut transactions: Vec, + transactions: Vec, message_ids: &[MessageId], ) -> Self { - // I think this is safe as it doesn't appear that any of the reads actually mutate the data. - // Alternatively we can clone to be safe. - let transaction_ids: Vec<_> = - transactions.iter_mut().map(|tx| tx.to_bytes()).collect(); Self { - header: header.generate(&transaction_ids[..], message_ids), + header: header.generate(&transactions, message_ids), transactions, } } @@ -85,16 +78,12 @@ impl Block { /// This will fail if the transactions don't match the header. pub fn try_from_executed( header: BlockHeader, - mut transactions: Vec, + transactions: Vec, ) -> Option { - let transaction_ids: Vec<_> = - transactions.iter_mut().map(|tx| tx.to_bytes()).collect(); - header - .validate_transactions(&transaction_ids[..]) - .then_some(Self { - header, - transactions, - }) + header.validate_transactions(&transactions).then_some(Self { + header, + transactions, + }) } /// Compresses the fuel block and replaces transactions with hashes. diff --git a/crates/types/src/blockchain/header.rs b/crates/types/src/blockchain/header.rs index f58d20b773b..51f2395ca8a 100644 --- a/crates/types/src/blockchain/header.rs +++ b/crates/types/src/blockchain/header.rs @@ -11,7 +11,9 @@ use super::{ }; use crate::{ fuel_merkle, + fuel_tx::Transaction, fuel_types::{ + bytes::SerializableVec, Bytes32, MessageId, }, @@ -199,7 +201,7 @@ impl BlockHeader { } /// Validate the transactions match the header. - pub fn validate_transactions(&self, transactions: &[Vec]) -> bool { + pub fn validate_transactions(&self, transactions: &[Transaction]) -> bool { // Generate the transaction merkle root. let transactions_root = generate_txns_root(transactions); @@ -222,7 +224,7 @@ impl PartialBlockHeader { /// The transactions are the bytes of the executed [`Transaction`]s. pub fn generate( self, - transactions: &[Vec], + transactions: &[Transaction], message_ids: &[MessageId], ) -> BlockHeader { // Generate the transaction merkle root. @@ -265,10 +267,13 @@ impl PartialBlockHeader { } } -fn generate_txns_root(transactions: &[Vec]) -> Bytes32 { +fn generate_txns_root(transactions: &[Transaction]) -> Bytes32 { + // TODO: The `to_bytes` requires mutability(but it is problem of the API). + // Remove `clone` when we can use `to_bytes` without mutability. + let transaction_ids = transactions.iter().map(|tx| tx.clone().to_bytes()); // Generate the transaction merkle root. let mut transaction_tree = fuel_merkle::binary::in_memory::MerkleTree::new(); - for id in transactions { + for id in transaction_ids { transaction_tree.push(id.as_ref()); } transaction_tree.root().into() diff --git a/tests/tests/sync.rs b/tests/tests/sync.rs index c3bcbe4fa15..7c3c3dc3e13 100644 --- a/tests/tests/sync.rs +++ b/tests/tests/sync.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use fuel_core::{ chain_config::ChainConfig, database::Database, @@ -24,6 +22,7 @@ use rand::{ Rng, SeedableRng, }; +use std::sync::Arc; #[tokio::test(flavor = "multi_thread")] async fn test_nodes_syncing() { From bb7bd2e08e9d37b5756e33595ee18159e8c95eac Mon Sep 17 00:00:00 2001 From: green Date: Tue, 24 Jan 2023 18:55:26 +0000 Subject: [PATCH 2/2] Renamed `verify_poa_block_fields` into `verify_block_fields` --- .../consensus_module/poa/src/verifier.rs | 2 +- .../poa/src/verifier/tests.rs | 2 +- .../consensus_module/src/block_verifier.rs | 22 +++++++++---------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/services/consensus_module/poa/src/verifier.rs b/crates/services/consensus_module/poa/src/verifier.rs index 411cde8f271..3273f7dd688 100644 --- a/crates/services/consensus_module/poa/src/verifier.rs +++ b/crates/services/consensus_module/poa/src/verifier.rs @@ -49,7 +49,7 @@ pub fn verify_consensus( } } -pub fn verify_poa_block_fields( +pub fn verify_block_fields( config: &Config, database: &D, block: &Block, diff --git a/crates/services/consensus_module/poa/src/verifier/tests.rs b/crates/services/consensus_module/poa/src/verifier/tests.rs index 3071a432784..d3afc80ecce 100644 --- a/crates/services/consensus_module/poa/src/verifier/tests.rs +++ b/crates/services/consensus_module/poa/src/verifier/tests.rs @@ -107,5 +107,5 @@ fn test_verify_genesis_block_fields(input: Input) -> anyhow::Result<()> { let mut b = Block::default(); b.header_mut().consensus = ch; b.header_mut().application = ah; - verify_poa_block_fields(&c, &d, &b) + verify_block_fields(&c, &d, &b) } diff --git a/crates/services/consensus_module/src/block_verifier.rs b/crates/services/consensus_module/src/block_verifier.rs index c79f0b4085b..3ac21fca047 100644 --- a/crates/services/consensus_module/src/block_verifier.rs +++ b/crates/services/consensus_module/src/block_verifier.rs @@ -8,11 +8,7 @@ mod tests; use crate::block_verifier::config::Config; use anyhow::ensure; -use fuel_core_poa::verifier::{ - verify_consensus, - verify_poa_block_fields, - Database as PoAVerifierDatabase, -}; +use fuel_core_poa::verifier::Database as PoAVerifierDatabase; use fuel_core_types::{ blockchain::{ block::Block, @@ -66,9 +62,11 @@ where .unwrap_or_else(|| 0u32.into()); verify_genesis_block_fields(expected_genesis_height, block.header()) } - Consensus::PoA(_) => { - verify_poa_block_fields(&self.config.poa, &self.database, block) - } + Consensus::PoA(_) => fuel_core_poa::verifier::verify_block_fields( + &self.config.poa, + &self.database, + block, + ), } } @@ -80,9 +78,11 @@ where } = header; match consensus { Consensus::Genesis(_) => true, - Consensus::PoA(consensus) => { - verify_consensus(&self.config.chain_config.consensus, header, consensus) - } + Consensus::PoA(consensus) => fuel_core_poa::verifier::verify_consensus( + &self.config.chain_config.consensus, + header, + consensus, + ), } } }