diff --git a/CHANGELOG.md b/CHANGELOG.md index dd379aa8897..4695c30500f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Description of the upcoming release here. ### Changed +- [#1349](https://github.com/FuelLabs/fuel-core/pull/1349): Updated peer-to-peer transactions API to support multiple blocks in a single request, and updated block synchronization to request multiple blocks based on the configured range of headers. - [#1380](https://github.com/FuelLabs/fuel-core/pull/1380): Add preliminary, hard-coded config values for heartbeat peer reputation, removing `todo`. - [#1377](https://github.com/FuelLabs/fuel-core/pull/1377): Remove `DiscoveryEvent` and use `KademliaEvent` directly in `DiscoveryBehavior`. - [#1366](https://github.com/FuelLabs/fuel-core/pull/1366): Improve caching during docker builds in CI by replacing gha diff --git a/Cargo.lock b/Cargo.lock index bb3b3c55301..412ed42a608 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3117,6 +3117,7 @@ dependencies = [ "fuel-core-types", "futures", "mockall", + "rand 0.8.5", "test-case", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 656d6318890..a0bb9971f98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ anyhow = "1.0" async-trait = "0.1" cynic = { version = "2.2.1", features = ["http-reqwest"] } clap = "4.1" +derive_more = { version = "0.99" } hyper = { version = "0.14.26" } rand = "0.8" parking_lot = "0.12" diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index cd1963b4981..4c70704d3a4 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -13,7 +13,7 @@ description = "Tx client and schema specification." [dependencies] anyhow = { workspace = true } cynic = { workspace = true } -derive_more = { version = "0.99" } +derive_more = { workspace = true } eventsource-client = { version = "0.10.2", optional = true } fuel-core-types = { workspace = true, features = ["serde"] } futures = { workspace = true, optional = true } diff --git a/crates/fuel-core/src/database/sealed_block.rs b/crates/fuel-core/src/database/sealed_block.rs index fa1d4f07b4d..7b9f337fa20 100644 --- a/crates/fuel-core/src/database/sealed_block.rs +++ b/crates/fuel-core/src/database/sealed_block.rs @@ -23,8 +23,8 @@ use fuel_core_types::{ SealedBlock, SealedBlockHeader, }, - fuel_tx::Transaction, fuel_types::BlockHeight, + services::p2p::Transactions, }; use std::ops::Range; @@ -127,12 +127,21 @@ impl Database { } } - pub fn get_transactions_on_block( + pub fn get_transactions_on_blocks( &self, - block_id: &BlockId, - ) -> StorageResult>> { - Ok(self - .get_sealed_block_by_id(block_id)? - .map(|Sealed { entity: block, .. }| block.into_inner().1)) + block_height_range: Range, + ) -> StorageResult>> { + let transactions = block_height_range + .into_iter() + .map(BlockHeight::from) + .map(|block_height| { + let transactions = self + .get_sealed_block_by_height(&block_height)? + .map(|Sealed { entity: block, .. }| block.into_inner().1) + .map(Transactions); + Ok(transactions) + }) + .collect::>()?; + Ok(transactions) } } diff --git a/crates/fuel-core/src/service/adapters/p2p.rs b/crates/fuel-core/src/service/adapters/p2p.rs index 11f23d7be4f..aa3e0766d70 100644 --- a/crates/fuel-core/src/service/adapters/p2p.rs +++ b/crates/fuel-core/src/service/adapters/p2p.rs @@ -8,12 +8,11 @@ use fuel_core_services::stream::BoxStream; use fuel_core_storage::Result as StorageResult; use fuel_core_types::{ blockchain::{ - primitives::BlockId, SealedBlock, SealedBlockHeader, }, - fuel_tx::Transaction, fuel_types::BlockHeight, + services::p2p::Transactions, }; use std::ops::Range; @@ -41,9 +40,9 @@ impl P2pDb for Database { fn get_transactions( &self, - block_id: &BlockId, - ) -> StorageResult>> { - self.get_transactions_on_block(block_id) + block_height_range: Range, + ) -> StorageResult>> { + self.get_transactions_on_blocks(block_height_range) } } diff --git a/crates/fuel-core/src/service/adapters/sync.rs b/crates/fuel-core/src/service/adapters/sync.rs index 5482eec1191..1b63c8c25e1 100644 --- a/crates/fuel-core/src/service/adapters/sync.rs +++ b/crates/fuel-core/src/service/adapters/sync.rs @@ -12,14 +12,10 @@ use fuel_core_sync::ports::{ }; use fuel_core_types::{ blockchain::{ - primitives::{ - BlockId, - DaBlockHeight, - }, + primitives::DaBlockHeight, SealedBlock, SealedBlockHeader, }, - fuel_tx::Transaction, fuel_types::BlockHeight, services::p2p::{ peer_reputation::{ @@ -28,6 +24,7 @@ use fuel_core_types::{ }, PeerId, SourcePeer, + Transactions, }, }; use std::ops::Range; @@ -50,43 +47,41 @@ impl PeerToPeerPort for P2PAdapter { async fn get_sealed_block_headers( &self, - block_range_height: Range, + block_height_range: Range, ) -> anyhow::Result>>> { - if let Some(service) = &self.service { - let (peer_id, headers) = - service.get_sealed_block_headers(block_range_height).await?; - let sourced_headers = SourcePeer { - peer_id: peer_id.into(), - data: headers, - }; - Ok(sourced_headers) + let result = if let Some(service) = &self.service { + service.get_sealed_block_headers(block_height_range).await } else { Err(anyhow::anyhow!("No P2P service available")) + }; + match result { + Ok((peer_id, headers)) => { + let peer_id: PeerId = peer_id.into(); + let headers = peer_id.bind(headers); + Ok(headers) + } + Err(err) => Err(err), } } async fn get_transactions( &self, - block: SourcePeer, - ) -> anyhow::Result>> { + range: SourcePeer>, + ) -> anyhow::Result>> { let SourcePeer { peer_id, - data: block, - } = block; + data: range, + } = range; if let Some(service) = &self.service { service - .get_transactions_from_peer(peer_id.into(), block) + .get_transactions_from_peer(peer_id.into(), range) .await } else { Err(anyhow::anyhow!("No P2P service available")) } } - async fn report_peer( - &self, - peer: PeerId, - report: PeerReportReason, - ) -> anyhow::Result<()> { + fn report_peer(&self, peer: PeerId, report: PeerReportReason) -> anyhow::Result<()> { if let Some(service) = &self.service { let service_name = "Sync"; let new_report = self.process_report(report); diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 50f6e699eb0..930a8c876a9 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -274,13 +274,12 @@ impl ProtocolName for MessageExchangePostcardProtocol { #[cfg(test)] mod tests { - use fuel_core_types::blockchain::primitives::BlockId; - use super::*; #[test] fn test_request_size_fits() { - let m = RequestMessage::Transactions(BlockId::default()); + let arbitrary_range = 2..6; + let m = RequestMessage::Transactions(arbitrary_range); assert!(postcard::to_stdvec(&m).unwrap().len() <= MAX_REQUEST_SIZE); } } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 83912e9199b..0fe5a156a02 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -696,7 +696,6 @@ mod tests { BlockHeader, PartialBlockHeader, }, - primitives::BlockId, SealedBlock, SealedBlockHeader, }, @@ -704,7 +703,10 @@ mod tests { Transaction, TransactionBuilder, }, - services::p2p::GossipsubMessageAcceptance, + services::p2p::{ + GossipsubMessageAcceptance, + Transactions, + }, }; use futures::{ future::join_all, @@ -1557,7 +1559,7 @@ mod tests { tokio::select! { message_sent = rx_test_end.recv() => { // we received a signal to end the test - assert!(message_sent.unwrap(), "Receuved incorrect or missing missing messsage"); + assert!(message_sent.unwrap(), "Received incorrect or missing message"); break; } node_a_event = node_a.next_event() => { @@ -1604,7 +1606,7 @@ mod tests { } }); } - RequestMessage::Transactions(_) => { + RequestMessage::Transactions(_range) => { let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::Transactions(tx_orchestrator)).is_ok()); let tx_test_end = tx_test_end.clone(); @@ -1613,7 +1615,8 @@ mod tests { let response_message = rx_orchestrator.await; if let Ok(Some(transactions)) = response_message { - let _ = tx_test_end.send(transactions.len() == 5).await; + let check = transactions.len() == 1 && transactions[0].0.len() == 5; + let _ = tx_test_end.send(check).await; } else { tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); let _ = tx_test_end.send(false).await; @@ -1647,7 +1650,8 @@ mod tests { let _ = node_b.send_response_msg(*request_id, OutboundResponse::SealedHeaders(Some(sealed_headers))); } RequestMessage::Transactions(_) => { - let transactions = (0..5).map(|_| Transaction::default_test_tx()).collect(); + let txs = (0..5).map(|_| Transaction::default_test_tx()).collect(); + let transactions = vec![Transactions(txs)]; let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions)))); } } @@ -1662,8 +1666,8 @@ mod tests { #[tokio::test] #[instrument] async fn request_response_works_with_transactions() { - request_response_works_with(RequestMessage::Transactions(BlockId::default())) - .await + let arbitrary_range = 2..6; + request_response_works_with(RequestMessage::Transactions(arbitrary_range)).await } #[tokio::test] @@ -1675,7 +1679,8 @@ mod tests { #[tokio::test] #[instrument] async fn request_response_works_with_sealed_headers_range_inclusive() { - request_response_works_with(RequestMessage::SealedHeaders(2..6)).await + let arbitrary_range = 2..6; + request_response_works_with(RequestMessage::SealedHeaders(arbitrary_range)).await } #[tokio::test] diff --git a/crates/services/p2p/src/ports.rs b/crates/services/p2p/src/ports.rs index fe326ada5b8..94862f9a64c 100644 --- a/crates/services/p2p/src/ports.rs +++ b/crates/services/p2p/src/ports.rs @@ -2,12 +2,11 @@ use fuel_core_services::stream::BoxStream; use fuel_core_storage::Result as StorageResult; use fuel_core_types::{ blockchain::{ - primitives::BlockId, SealedBlock, SealedBlockHeader, }, - fuel_tx::Transaction, fuel_types::BlockHeight, + services::p2p::Transactions, }; use std::ops::Range; @@ -29,8 +28,8 @@ pub trait P2pDb: Send + Sync { fn get_transactions( &self, - block_id: &BlockId, - ) -> StorageResult>>; + block_height_range: Range, + ) -> StorageResult>>; } pub trait BlockHeightImporter: Send + Sync { diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 56eb6df75dc..68a9eaa7574 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -5,22 +5,17 @@ use std::{ use fuel_core_types::{ blockchain::{ - primitives::BlockId, SealedBlock, SealedBlockHeader, }, - fuel_tx::Transaction, fuel_types::BlockHeight, + services::p2p::Transactions, }; use libp2p::PeerId; use serde::{ Deserialize, Serialize, }; -use serde_with::{ - serde_as, - FromInto, -}; use thiserror::Error; use tokio::sync::oneshot; @@ -34,17 +29,16 @@ pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::( // This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format. // The Peer that requested the message receives the response over the wire in `NetworkResponse` format. // It then unpacks it into `ResponseMessage`. -// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receving channel. +// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receiving channel. // Client Peer: `RequestMessage` (send request) // Server Peer: `RequestMessage` (receive request) -> `OutboundResponse` -> `NetworkResponse` (send response) // Client Peer: `NetworkResponse` (receive response) -> `ResponseMessage(data)` -> `ResponseChannelItem(channel, data)` (handle response) -#[serde_as] #[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)] pub enum RequestMessage { Block(BlockHeight), SealedHeaders(Range), - Transactions(#[serde_as(as = "FromInto<[u8; 32]>")] BlockId), + Transactions(Range), } /// Final Response Message that p2p service sends to the Orchestrator @@ -52,7 +46,7 @@ pub enum RequestMessage { pub enum ResponseMessage { SealedBlock(Box>), SealedHeaders(Option>), - Transactions(Option>), + Transactions(Option>), } /// Holds oneshot channels for specific responses @@ -60,7 +54,7 @@ pub enum ResponseMessage { pub enum ResponseChannelItem { Block(oneshot::Sender>), SealedHeaders(oneshot::Sender<(PeerId, Option>)>), - Transactions(oneshot::Sender>>), + Transactions(oneshot::Sender>>), } /// Response that is sent over the wire @@ -78,7 +72,7 @@ pub enum NetworkResponse { pub enum OutboundResponse { Block(Option>), SealedHeaders(Option>), - Transactions(Option>>), + Transactions(Option>>), } #[derive(Debug, Error)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index f52ccea4d3d..29e9ca155b1 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -35,7 +35,6 @@ use fuel_core_types::{ blockchain::{ block::Block, consensus::ConsensusVote, - primitives::BlockId, SealedBlock, SealedBlockHeader, }, @@ -52,6 +51,7 @@ use fuel_core_types::{ GossipsubMessageInfo, PeerId as FuelPeerId, TransactionGossipData, + Transactions, }, }; use futures::{ @@ -99,9 +99,9 @@ enum TaskRequest { channel: oneshot::Sender<(PeerId, Option>)>, }, GetTransactions { - block_id: BlockId, + block_height_range: Range, from_peer: PeerId, - channel: oneshot::Sender>>, + channel: oneshot::Sender>>, }, // Responds back to the p2p network RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)), @@ -486,8 +486,8 @@ where .get_peer_id_with_height(&block_height); let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); } - Some(TaskRequest::GetTransactions { block_id, from_peer, channel }) => { - let request_msg = RequestMessage::Transactions(block_id); + Some(TaskRequest::GetTransactions { block_height_range, from_peer, channel }) => { + let request_msg = RequestMessage::Transactions(block_height_range); let channel_item = ResponseChannelItem::Transactions(channel); let _ = self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item); } @@ -549,14 +549,14 @@ where } } } - RequestMessage::Transactions(block_id) => { - match self.db.get_transactions(&block_id) { + RequestMessage::Transactions(range) => { + match self.db.get_transactions(range.clone()) { Ok(maybe_transactions) => { let response = maybe_transactions.map(Arc::new); let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Transactions(response)); }, Err(e) => { - tracing::error!("Failed to get transactions for block {:?}: {:?}", block_id, e); + tracing::error!("Failed to get transactions for range {:?}: {:?}", range, e); let response = None; let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Transactions(response)); return Err(e.into()) @@ -697,18 +697,17 @@ impl SharedState { pub async fn get_transactions_from_peer( &self, peer_id: Vec, - block_id: BlockId, - ) -> anyhow::Result>> { + range: Range, + ) -> anyhow::Result>> { let (sender, receiver) = oneshot::channel(); let from_peer = PeerId::from_bytes(&peer_id).expect("Valid PeerId"); - self.request_sender - .send(TaskRequest::GetTransactions { - block_id, - from_peer, - channel: sender, - }) - .await?; + let request = TaskRequest::GetTransactions { + block_height_range: range, + from_peer, + channel: sender, + }; + self.request_sender.send(request).await?; receiver.await.map_err(|e| anyhow!("{}", e)) } @@ -874,8 +873,8 @@ pub mod tests { fn get_transactions( &self, - _block_id: &fuel_core_types::blockchain::primitives::BlockId, - ) -> StorageResult>> { + _block_height_range: Range, + ) -> StorageResult>> { unimplemented!() } } @@ -996,8 +995,8 @@ pub mod tests { fn get_transactions( &self, - _block_id: &BlockId, - ) -> StorageResult>> { + _block_height_range: Range, + ) -> StorageResult>> { todo!() } } diff --git a/crates/services/sync/Cargo.toml b/crates/services/sync/Cargo.toml index 13f40c3b8fd..f83ba4e90ab 100644 --- a/crates/services/sync/Cargo.toml +++ b/crates/services/sync/Cargo.toml @@ -16,6 +16,7 @@ fuel-core-services = { workspace = true } fuel-core-types = { workspace = true } futures = { workspace = true } mockall = { workspace = true, optional = true } +rand = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index 895542efa70..5e35ff755f1 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -2,29 +2,22 @@ //! This module contains the import task which is responsible for //! importing blocks from the network into the local blockchain. -use std::{ - future::Future, - iter, - ops::RangeInclusive, - sync::Arc, -}; - -use anyhow::anyhow; use fuel_core_services::{ SharedMutex, StateWatcher, }; use fuel_core_types::{ + self, blockchain::{ block::Block, - consensus::Sealed, - primitives::BlockId, SealedBlock, SealedBlockHeader, }, + fuel_types::BlockHeight, services::p2p::{ PeerId, SourcePeer, + Transactions, }, }; use futures::{ @@ -32,6 +25,14 @@ use futures::{ FutureExt, Stream, }; +use std::{ + future::Future, + ops::{ + Range, + RangeInclusive, + }, + sync::Arc, +}; use tokio::sync::Notify; use tracing::Instrument; @@ -43,10 +44,7 @@ use crate::{ PeerToPeerPort, }, state::State, - tracing_helpers::{ - TraceErr, - TraceNone, - }, + tracing_helpers::TraceErr, }; #[cfg(any(test, feature = "benchmarking"))] @@ -121,6 +119,31 @@ impl Import { self.notify.notify_one() } } + +#[derive(Debug)] +struct Batch { + peer: PeerId, + range: Range, + results: Vec, +} + +impl Batch { + pub fn new(peer: PeerId, range: Range, results: Vec) -> Self { + Self { + peer, + range, + results, + } + } + + pub fn is_err(&self) -> bool { + self.results.len() < self.range.len() + } +} + +type SealedHeaderBatch = Batch; +type SealedBlockBatch = Batch; + impl Import where P: PeerToPeerPort + Send + Sync + 'static, @@ -139,7 +162,7 @@ where // If there is a range to process, launch the stream. if let Some(range) = self.state.apply(|s| s.process_range()) { // Launch the stream to import the range. - let (count, result) = self.launch_stream(range.clone(), shutdown).await; + let count = self.launch_stream(range.clone(), shutdown).await; // Get the size of the range. let range_len = range.size_hint().0 as u32; @@ -147,13 +170,13 @@ where // If we did not process the entire range, mark the failed heights as failed. if (count as u32) < range_len { let incomplete_range = (*range.start() + count as u32)..=*range.end(); - tracing::error!( + self.state + .apply(|s| s.failed_to_process(incomplete_range.clone())); + Err(anyhow::anyhow!( "Failed to import range of blocks: {:?}", incomplete_range - ); - self.state.apply(|s| s.failed_to_process(incomplete_range)); + ))?; } - result?; } Ok(()) } @@ -169,7 +192,7 @@ where &self, range: RangeInclusive, shutdown: &StateWatcher, - ) -> (usize, anyhow::Result<()>) { + ) -> usize { let Self { state, params, @@ -182,83 +205,87 @@ where let shutdown_signal = shutdown.clone(); let (shutdown_guard, mut shutdown_guard_recv) = tokio::sync::mpsc::channel::<()>(1); + let block_stream = get_block_stream(range.clone(), params, p2p.clone(), consensus.clone()); let result = block_stream - .map(move |stream_block_batch| { - let shutdown_guard = shutdown_guard.clone(); - let shutdown_signal = shutdown_signal.clone(); - tokio::spawn(async move { - // Hold a shutdown sender for the lifetime of the spawned task - let _shutdown_guard = shutdown_guard.clone(); - let mut shutdown_signal = shutdown_signal.clone(); - tokio::select! { + .map(move |stream_block_batch| { + let shutdown_guard = shutdown_guard.clone(); + let shutdown_signal = shutdown_signal.clone(); + tokio::spawn(async move { + // Hold a shutdown sender for the lifetime of the spawned task + let _shutdown_guard = shutdown_guard.clone(); + let mut shutdown_signal = shutdown_signal.clone(); + tokio::select! { // Stream a batch of blocks - blocks = stream_block_batch => blocks, + blocks = stream_block_batch => Some(blocks), // If a shutdown signal is received during the stream, terminate early and // return an empty response - _ = shutdown_signal.while_started() => Ok(None) + _ = shutdown_signal.while_started() => None } - }).then(|task| async { task.map_err(|e| anyhow!(e))? }) - }) - // Request up to `block_stream_buffer_size` transactions from the network. - .buffered(params.block_stream_buffer_size) - // Continue the stream unless an error or none occurs. - // Note the error will be returned but the stream will close. - .into_scan_none_or_err() - .scan_none_or_err() - // Continue the stream until the shutdown signal is received. - .take_until({ - let mut s = shutdown.clone(); - async move { - let _ = s.while_started().await; - tracing::info!("In progress import stream shutting down"); - } - }) - .then({ - let state = state.clone(); - let executor = executor.clone(); - let p2p = p2p.clone(); - move |res| { - let p2p = p2p.clone(); - let state = state.clone(); - let executor = executor.clone(); + }).map(|task| { + task.trace_err("Failed to join the task").ok().flatten() + }) + }) + // Request up to `block_stream_buffer_size` transactions from the network. + .buffered(params.block_stream_buffer_size) + // Continue the stream until the shutdown signal is received. + .take_until({ + let mut s = shutdown.clone(); async move { - let (peer_id, block) = res?; - - let res = execute_and_commit(executor.as_ref(), &state, block).await; - match &res { - Ok(_) => { - let _ = p2p.report_peer(peer_id.clone(), PeerReportReason::SuccessfulBlockImport) - .await - .map_err(|e| tracing::error!("Failed to report successful block import for peer {:?}: {:?}", peer_id, e)); - }, - Err(e) => { - // If this fails, then it means that consensus has approved a block that is invalid. - // This would suggest a more serious issue than a bad peer, e.g. a fork or an out-of-date client. - tracing::error!("Failed to execute and commit block from peer {:?}: {:?}", peer_id, e); - }, + let _ = s.while_started().await; + tracing::info!("In progress import stream shutting down"); + } + }) + .into_scan_none() + .scan_none() + .into_scan_err() + .scan_err() + .then(|batch| { + async move { + let Batch { + peer, + range, + results, + } = batch; + + let mut done = vec![]; + for sealed_block in results { + let res = execute_and_commit(executor.as_ref(), state, sealed_block).await; + + match &res { + Ok(_) => { + done.push(()); + }, + Err(e) => { + // If this fails, then it means that consensus has approved a block that is invalid. + // This would suggest a more serious issue than a bad peer, e.g. a fork or an out-of-date client. + tracing::error!("Failed to execute and commit block from peer {:?}: {:?}", peer, e); + break; + }, + }; } - res + + let batch = Batch::new(peer.clone(), range, done); + + if !batch.is_err() { + report_peer(p2p, peer, PeerReportReason::SuccessfulBlockImport); + } + + batch } - } - .instrument(tracing::debug_span!("execute_and_commit")) - .in_current_span() - }) - // Continue the stream unless an error occurs. - .into_scan_err() - .scan_err() - // Count the number of successfully executed blocks and - // find any errors. - // Fold the stream into a count and any errors. - .fold((0usize, Ok(())), |(count, res), result| async move { - match result { - Ok(_) => (count + 1, res), - Err(e) => (count, Err(e)), - } - }) - .in_current_span() - .await; + .instrument(tracing::debug_span!("execute_and_commit")) + .in_current_span() + }) + // Continue the stream unless an error occurs. + .into_scan_err() + .scan_err() + // Count the number of successfully executed blocks. + // Fold the stream into a count. + .fold(0usize, |count, batch| async move { + count + batch.results.len() + }) + .await; // Wait for any spawned tasks to shutdown let _ = shutdown_guard_recv.recv().await; @@ -274,105 +301,108 @@ fn get_block_stream< params: &Config, p2p: Arc

, consensus: Arc, -) -> impl Stream>>> -{ - get_header_stream(range, params, p2p.clone()).map({ - let p2p = p2p.clone(); - let consensus_port = consensus.clone(); - move |batch| { - { - let p2p = p2p.clone(); - let consensus_port = consensus_port.clone(); - get_sealed_blocks(batch, p2p.clone(), consensus_port.clone()) +) -> impl Stream> + '_ { + let header_stream = get_header_batch_stream(range.clone(), params, p2p.clone()); + header_stream + .map({ + let consensus = consensus.clone(); + let p2p = p2p.clone(); + move |header_batch: SealedHeaderBatch| { + let Batch { + peer, + range, + results, + } = header_batch; + let checked_headers = results + .into_iter() + .take_while(|header| { + check_sealed_header(header, peer.clone(), &p2p, &consensus) + }) + .collect::>(); + Batch::new(peer, range, checked_headers) + } + }) + .map(move |headers| { + let consensus = consensus.clone(); + let p2p = p2p.clone(); + async move { + let Batch { + peer, + range, + results, + } = headers; + if results.is_empty() { + SealedBlockBatch::new(peer, range, vec![]) + } else { + await_da_height( + results + .last() + .expect("We checked headers are not empty above"), + &consensus, + ) + .await; + let headers = SealedHeaderBatch::new(peer, range, results); + get_blocks(&p2p, headers).await + } } .instrument(tracing::debug_span!("consensus_and_transactions")) .in_current_span() - } - }) + }) } -fn get_header_stream( +fn get_header_batch_stream( range: RangeInclusive, params: &Config, p2p: Arc

, -) -> impl Stream>> { +) -> impl Stream { let Config { header_batch_size, .. } = params; let ranges = range_chunks(range, *header_batch_size); - let p2p_gen = iter::repeat_with(move || p2p.clone()); - let iter = ranges.zip(p2p_gen); - futures::stream::iter(iter) - .then(move |(range, p2p)| async { - tracing::debug!( - "getting header range from {} to {} inclusive", - range.start(), - range.end() - ); - get_headers_batch(range, p2p).await - }) - .flatten() - .into_scan_none_or_err() - .scan_none_or_err() + futures::stream::iter(ranges).then(move |range| { + let p2p = p2p.clone(); + async move { get_headers_batch(range, &p2p).await } + }) } fn range_chunks( range: RangeInclusive, chunk_size: u32, -) -> impl Iterator> { - let end = *range.end(); +) -> impl Iterator> { + let end = *range.end() + 1; range.step_by(chunk_size as usize).map(move |chunk_start| { let block_end = (chunk_start + chunk_size).min(end); - chunk_start..=block_end + chunk_start..block_end }) } -async fn get_sealed_blocks< +fn check_sealed_header< P: PeerToPeerPort + Send + Sync + 'static, C: ConsensusPort + Send + Sync + 'static, >( - result: anyhow::Result>, - p2p: Arc

, - consensus_port: Arc, -) -> anyhow::Result> { - let header = match result { - Ok(h) => h, - Err(e) => return Err(e), - }; - let SourcePeer { - peer_id, - data: header, - } = header; - let id = header.entity.id(); - let block_id = SourcePeer { - peer_id: peer_id.clone(), - data: id, - }; - - // Check the consensus is valid on this header. - if !consensus_port - .check_sealed_header(&header) - .trace_err("Failed to check consensus on header")? - { - let _ = p2p - .report_peer(peer_id.clone(), PeerReportReason::BadBlockHeader) - .await - .map_err(|e| { - tracing::error!( - "Failed to report bad block header from peer {:?}: {:?}", - peer_id, - e - ) - }); - return Ok(None) + header: &SealedBlockHeader, + peer_id: PeerId, + p2p: &Arc

, + consensus: &Arc, +) -> bool { + let validity = consensus + .check_sealed_header(header) + .trace_err("Failed to check consensus on header") + .unwrap_or(false); + if !validity { + report_peer(p2p, peer_id.clone(), PeerReportReason::BadBlockHeader); } + validity +} - // Wait for the da to be at least the da height on the header. - consensus_port +async fn await_da_height( + header: &SealedBlockHeader, + consensus: &Arc, +) { + let _ = consensus .await_da_height(&header.entity.da_height) - .await?; - - get_transactions_on_block(p2p.as_ref(), block_id, header, &peer_id).await + .await + .trace_err("Failed to wait for DA layer to sync"); } /// Waits for a notify or shutdown signal. @@ -393,147 +423,125 @@ async fn wait_for_notify_or_shutdown( matches!(r, futures::future::Either::Left(_)) } -async fn get_headers_batch( - mut range: RangeInclusive, - p2p: Arc, -) -> impl Stream>>> { +async fn get_sealed_block_headers

( + range: Range, + p2p: &Arc

, +) -> SourcePeer> +where + P: PeerToPeerPort + Send + Sync + 'static, +{ tracing::debug!( "getting header range from {} to {} inclusive", - range.start(), - range.end() + range.start, + range.end ); - let start = *range.start(); - let end = *range.end() + 1; + p2p.get_sealed_block_headers(range) + .await + .trace_err("Failed to get headers") + .unwrap_or_default() + .map(|inner| inner.unwrap_or_default()) +} + +async fn get_transactions

( + peer_id: PeerId, + range: Range, + p2p: &Arc

, +) -> Option> +where + P: PeerToPeerPort + Send + Sync + 'static, +{ + let range = peer_id.clone().bind(range); let res = p2p - .get_sealed_block_headers(start..end) + .get_transactions(range) .await - .trace_err("Failed to get headers"); - let sorted_headers = match res { - Ok(sourced_headers) => { - let SourcePeer { - peer_id, - data: maybe_headers, - } = sourced_headers; - let cloned_peer_id = peer_id.clone(); - let headers = match maybe_headers { - None => { - tracing::error!( - "No headers received from peer {:?} for range {} to {}", - peer_id, - start, - end - ); - vec![Err(anyhow::anyhow!("Headers provider was unable to fulfill request for unspecified reason. Possibly because requested batch size was too large"))] - } - Some(headers) => headers - .into_iter() - .map(move |header| { - let header = range.next().and_then(|height| { - if *(header.entity.height()) == height.into() { - let sourced_header = SourcePeer { - peer_id: cloned_peer_id.clone(), - data: header, - }; - Some(sourced_header) - } else { - None - } - }); - Ok(header) - }) - .collect(), - }; - if let Some(expected_len) = end.checked_sub(start) { - if headers.len() != expected_len as usize - || headers.iter().any(|h| h.is_err()) - { - let _ = p2p - .report_peer( - peer_id.clone(), - PeerReportReason::MissingBlockHeaders, - ) - .await - .map_err(|e| { - tracing::error!( - "Failed to report missing block header from peer {:?}: {:?}", - peer_id, - e - ) - }); - } - } - headers + .trace_err("Failed to get transactions"); + match res { + Ok(Some(transactions)) => Some(transactions), + _ => { + report_peer(p2p, peer_id.clone(), PeerReportReason::MissingTransactions); + None } + } +} - Err(e) => vec![Err(e)], - }; - futures::stream::iter(sorted_headers) +async fn get_headers_batch

(range: Range, p2p: &Arc

) -> SealedHeaderBatch +where + P: PeerToPeerPort + Send + Sync + 'static, +{ + tracing::debug!( + "getting header range from {} to {} inclusive", + range.start, + range.end + ); + let sourced_headers = get_sealed_block_headers(range.clone(), p2p).await; + let SourcePeer { + peer_id, + data: headers, + } = sourced_headers; + let heights = range.clone().map(BlockHeight::from); + let headers = headers + .into_iter() + .zip(heights) + .take_while(move |(header, expected_height)| { + let height = header.entity.height(); + height == expected_height + }) + .map(|(header, _)| header) + .collect::>(); + if headers.len() != range.len() { + report_peer(p2p, peer_id.clone(), PeerReportReason::MissingBlockHeaders); + } + Batch::new(peer_id, range, headers) } -#[tracing::instrument( - skip(p2p, header), - fields( - height = **header.entity.height(), - id = %header.entity.consensus.generated.application_hash - ), - err -)] -async fn get_transactions_on_block

( - p2p: &P, - block_id: SourcePeer, - header: SealedBlockHeader, - peer_id: &PeerId, -) -> anyhow::Result> +fn report_peer

(p2p: &Arc

, peer_id: PeerId, reason: PeerReportReason) where P: PeerToPeerPort + Send + Sync + 'static, { - let Sealed { - entity: header, - consensus, - } = header; - - // Request the transactions for this block. - let maybe_txs = p2p - .get_transactions(block_id) - .await - .trace_err("Failed to get transactions")? - .trace_none_warn("Could not find transactions for header"); - match maybe_txs { - None => { - let _ = p2p - .report_peer(peer_id.clone(), PeerReportReason::MissingTransactions) - .await - .map_err(|e| { - tracing::error!( - "Failed to report missing transactions from peer {:?}: {:?}", - peer_id, - e - ) - }); - Ok(None) - } - Some(transactions) => { - match Block::try_from_executed(header, transactions) { - Some(block) => Ok(Some(( - peer_id.clone(), - SealedBlock { - entity: block, - consensus, - }, - ))), - None => { - tracing::error!( - "Failed to created block from header and transactions" - ); - let _ = p2p - .report_peer(peer_id.clone(), PeerReportReason::InvalidTransactions) - .await - .map_err(|e| tracing::error!("Failed to report invalid transaction from peer {:?}: {:?}", peer_id, e)); - Ok(None) - } - } + tracing::info!("Reporting peer for {:?}", reason); + + // Failure to report a peer is a non-fatal error; ignore the error + let _ = p2p + .report_peer(peer_id.clone(), reason) + .trace_err(&format!("Failed to report peer {:?}", peer_id)); +} + +/// Get blocks correlating to the headers from a specific peer +#[tracing::instrument(skip(p2p, headers))] +async fn get_blocks

(p2p: &Arc

, headers: SealedHeaderBatch) -> SealedBlockBatch +where + P: PeerToPeerPort + Send + Sync + 'static, +{ + let Batch { + results: headers, + peer, + range, + } = headers; + let Some(transaction_data) = get_transactions(peer.clone(), range.clone(), p2p).await + else { + return Batch::new(peer, range, vec![]) + }; + + let iter = headers.into_iter().zip(transaction_data.into_iter()); + let mut blocks = vec![]; + for (block_header, transactions) in iter { + let SealedBlockHeader { + consensus, + entity: header, + } = block_header; + let block = + Block::try_from_executed(header, transactions.0).map(|block| SealedBlock { + entity: block, + consensus, + }); + if let Some(block) = block { + blocks.push(block); + } else { + report_peer(p2p, peer.clone(), PeerReportReason::InvalidTransactions); + break } } + Batch::new(peer, range, blocks) } #[tracing::instrument( @@ -567,16 +575,12 @@ where /// Extra stream utilities. trait StreamUtil: Sized { - /// Turn a stream of `Result>` into a stream of `Result`. - /// Close the stream if an error occurs or a `None` is received. - /// Return the error if the stream closes. - fn into_scan_none_or_err(self) -> ScanNoneErr { - ScanNoneErr(self) + /// Scan the stream for `None`. + fn into_scan_none(self) -> ScanNone { + ScanNone(self) } - /// Turn a stream of `Result` into a stream of `Result`. - /// Close the stream if an error occurs. - /// Return the error if the stream closes. + /// Scan the stream for errors. fn into_scan_err(self) -> ScanErr { ScanErr(self) } @@ -584,42 +588,35 @@ trait StreamUtil: Sized { impl StreamUtil for S {} -struct ScanNoneErr(S); struct ScanErr(S); +struct ScanNone(S); -impl ScanNoneErr { - /// Scan the stream for `None` or errors. - fn scan_none_or_err(self) -> impl Stream> +impl ScanNone { + fn scan_none<'a, T: 'a>(self) -> impl Stream + 'a where - S: Stream>> + Send + 'static, + S: Stream> + Send + 'a, { - let stream = self.0.boxed(); - futures::stream::unfold((false, stream), |(mut is_err, mut stream)| async move { - if is_err { - None - } else { - let result = stream.next().await?; - is_err = result.is_err(); - result.transpose().map(|result| (result, (is_err, stream))) - } + let stream = self.0.boxed::<'a>(); + futures::stream::unfold((false, stream), |(_, mut stream)| async move { + let element = stream.next().await?; + element.map(|e| (e, (false, stream))) }) } } impl ScanErr { - /// Scan the stream for errors. - fn scan_err(self) -> impl Stream> + fn scan_err<'a, T: 'a>(self) -> impl Stream> + 'a where - S: Stream> + Send + 'static, + S: Stream> + Send + 'a, { - let stream = self.0.boxed(); + let stream = self.0.boxed::<'a>(); futures::stream::unfold((false, stream), |(mut err, mut stream)| async move { if err { None } else { - let result = stream.next().await?; - err = result.is_err(); - Some((result, (err, stream))) + let batch = stream.next().await?; + err = batch.is_err(); + Some((batch, (err, stream))) } }) } diff --git a/crates/services/sync/src/import/test_helpers.rs b/crates/services/sync/src/import/test_helpers.rs index a4efd9a6f3f..1329ec82d9f 100644 --- a/crates/services/sync/src/import/test_helpers.rs +++ b/crates/services/sync/src/import/test_helpers.rs @@ -16,22 +16,32 @@ use fuel_core_types::{ }, fuel_types::BlockHeight, }; +use rand::{ + rngs::StdRng, + Rng, + SeedableRng, +}; pub use counts::{ Count, SharedCounts, }; -use fuel_core_types::services::p2p::{ - PeerId, - SourcePeer, -}; +use fuel_core_types::services::p2p::PeerId; + pub use pressure_block_importer::PressureBlockImporter; pub use pressure_consensus::PressureConsensus; pub use pressure_peer_to_peer::PressurePeerToPeer; -pub fn empty_header(h: BlockHeight) -> SealedBlockHeader { +pub fn random_peer() -> PeerId { + let mut rng = StdRng::seed_from_u64(0xF00DF00D); + let bytes = rng.gen::<[u8; 32]>().to_vec(); + PeerId::from(bytes) +} + +pub fn empty_header>(i: I) -> SealedBlockHeader { let mut header = BlockHeader::default(); - header.consensus.height = h; + let height = i.into(); + header.consensus.height = height; let transaction_tree = fuel_core_types::fuel_merkle::binary::in_memory::MerkleTree::new(); header.application.generated.transactions_root = transaction_tree.root().into(); @@ -42,19 +52,3 @@ pub fn empty_header(h: BlockHeight) -> SealedBlockHeader { consensus, } } - -pub fn peer_sourced_headers( - headers: Option>, -) -> SourcePeer>> { - peer_sourced_headers_peer_id(headers, vec![].into()) -} - -pub fn peer_sourced_headers_peer_id( - headers: Option>, - peer_id: PeerId, -) -> SourcePeer>> { - SourcePeer { - peer_id, - data: headers, - } -} diff --git a/crates/services/sync/src/import/test_helpers/counts.rs b/crates/services/sync/src/import/test_helpers/counts.rs index d98e75ddd30..0ef831f36a6 100644 --- a/crates/services/sync/src/import/test_helpers/counts.rs +++ b/crates/services/sync/src/import/test_helpers/counts.rs @@ -29,9 +29,16 @@ impl Counts { self.now.transactions += 1; self.max.transactions = self.max.transactions.max(self.now.transactions); } + pub fn add_transactions(&mut self, transactions: usize) { + self.now.transactions += transactions; + self.max.transactions = self.max.transactions.max(self.now.transactions); + } pub fn dec_transactions(&mut self) { self.now.transactions -= 1; } + pub fn sub_transactions(&mut self, transactions: usize) { + self.now.transactions -= transactions; + } pub fn inc_consensus(&mut self) { self.now.consensus += 1; self.max.consensus = self.max.consensus.max(self.now.consensus); diff --git a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs index 108efed4ab8..3b3f16ada36 100644 --- a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs +++ b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs @@ -1,6 +1,7 @@ use crate::{ import::test_helpers::{ empty_header, + random_peer, SharedCounts, }, ports::{ @@ -11,15 +12,12 @@ use crate::{ }; use fuel_core_services::stream::BoxStream; use fuel_core_types::{ - blockchain::{ - primitives::BlockId, - SealedBlockHeader, - }, - fuel_tx::Transaction, + blockchain::SealedBlockHeader, fuel_types::BlockHeight, services::p2p::{ PeerId, SourcePeer, + Transactions, }, }; use std::{ @@ -54,15 +52,18 @@ impl PeerToPeerPort for PressurePeerToPeer { async fn get_transactions( &self, - block_id: SourcePeer, - ) -> anyhow::Result>> { - self.counts.apply(|c| c.inc_transactions()); + block_ids: SourcePeer>, + ) -> anyhow::Result>> { + let transactions_count = block_ids.data.len(); + self.counts + .apply(|c| c.add_transactions(transactions_count)); tokio::time::sleep(self.durations[1]).await; - self.counts.apply(|c| c.dec_transactions()); - self.p2p.get_transactions(block_id).await + self.counts + .apply(|c| c.sub_transactions(transactions_count)); + self.p2p.get_transactions(block_ids).await } - async fn report_peer( + fn report_peer( &self, _peer: PeerId, _report: PeerReportReason, @@ -75,22 +76,20 @@ impl PressurePeerToPeer { pub fn new(counts: SharedCounts, delays: [Duration; 2]) -> Self { let mut mock = MockPeerToPeerPort::default(); mock.expect_get_sealed_block_headers().returning(|range| { - let headers = Some( - range - .clone() - .map(BlockHeight::from) - .map(empty_header) - .collect(), - ); - let peer_id = vec![].into(); - let source_peer_data = SourcePeer { - peer_id, - data: headers, - }; - Ok(source_peer_data) + let peer = random_peer(); + let headers = range + .clone() + .map(BlockHeight::from) + .map(empty_header) + .collect(); + let headers = peer.bind(Some(headers)); + Ok(headers) + }); + mock.expect_get_transactions().returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) }); - mock.expect_get_transactions() - .returning(|_| Ok(Some(vec![]))); Self { p2p: mock, durations: delays, diff --git a/crates/services/sync/src/import/tests.rs b/crates/services/sync/src/import/tests.rs index 38f7c57f120..b97f27958a6 100644 --- a/crates/services/sync/src/import/tests.rs +++ b/crates/services/sync/src/import/tests.rs @@ -3,8 +3,7 @@ use crate::{ import::test_helpers::{ empty_header, - peer_sourced_headers, - peer_sourced_headers_peer_id, + random_peer, }, ports::{ MockBlockImporterPort, @@ -13,17 +12,169 @@ use crate::{ PeerReportReason, }, }; -use fuel_core_types::fuel_tx::Transaction; -use test_case::test_case; +use fuel_core_types::services::p2p::Transactions; use super::*; -#[test_case(State::new(None, 5), Mocks::times([6]) => (State::new(5, None), true) ; "executes 5")] -#[test_case(State::new(3, 5), Mocks::times([2]) => (State::new(5, None), true) ; "executes 3 to 5")] +fn div_ceil(divisor: usize, dividend: usize) -> usize { + (divisor + (dividend - 1)) / dividend +} + #[tokio::test] -async fn test_import(state: State, mocks: Mocks) -> (State, bool) { +async fn test_import_0_to_5() { + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(6) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); + + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_headers() + .times(1) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions() + .times(1) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; + let mocks = Mocks { + consensus_port, + p2p, + executor: DefaultMocks::times([6]), + }; + + let state = State::new(None, 5); let state = SharedMutex::new(state); - test_import_inner(state, mocks, None).await + let v = test_import_inner(state, mocks, None, params).await; + let expected = (State::new(5, None), true); + assert_eq!(v, expected); +} + +#[tokio::test] +async fn test_import_3_to_5() { + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(2) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); + + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_headers() + .times(1) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions() + .times(1) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; + let mocks = Mocks { + consensus_port, + p2p, + executor: DefaultMocks::times([2]), + }; + + let state = State::new(3, 5); + let state = SharedMutex::new(state); + let v = test_import_inner(state, mocks, None, params).await; + let expected = (State::new(5, None), true); + assert_eq!(v, expected); +} + +#[tokio::test] +async fn test_import_0_to_499() { + // The observed block height + let end = 499; + // The number of headers/blocks in range 0..end + let n = end + 1; + // The number of headers/blocks per batch + let header_batch_size = 10; + + let mut consensus_port = MockConsensusPort::default(); + + // Happens once for each header + let times = n; + consensus_port + .expect_check_sealed_header() + .times(times) + .returning(|_| Ok(true)); + + // Happens once for each batch + let times = div_ceil(n, header_batch_size); + consensus_port + .expect_await_da_height() + .times(times) + .returning(|_| Ok(())); + + let mut p2p = MockPeerToPeerPort::default(); + + // Happens once for each batch + let times = div_ceil(n, header_batch_size); + p2p.expect_get_sealed_block_headers() + .times(times) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + + // Happens once for each batch + let times = div_ceil(n, header_batch_size); + p2p.expect_get_transactions() + .times(times) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: header_batch_size as u32, + }; + let mocks = Mocks { + consensus_port, + p2p, + executor: DefaultMocks::times([n]), + }; + + let state = State::new(None, end as u32); + let state = SharedMutex::new(state); + let v = test_import_inner(state, mocks, None, params).await; + let expected = (State::new(end as u32, None), true); + assert_eq!(v, expected); } #[tokio::test] @@ -38,19 +189,39 @@ async fn import__signature_fails_on_header_5_only() { .expect_await_da_height() .times(1) .returning(|_| Ok(())); + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_headers() + .times(1) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions() + .times(1) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); let state = State::new(3, 5).into(); let mocks = Mocks { consensus_port, - p2p: DefaultMocks::times([1]), + p2p, executor: DefaultMocks::times([1]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(4, None), true), res); + assert_eq!((State::new(4, None), false), res); } #[tokio::test] @@ -59,25 +230,46 @@ async fn import__signature_fails_on_header_4_only() { let mut consensus_port = MockConsensusPort::default(); consensus_port .expect_check_sealed_header() - .times(2) + .times(1) .returning(|h| Ok(**h.entity.height() != 4)); consensus_port .expect_await_da_height() - .times(1) + .times(0) .returning(|_| Ok(())); + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_headers() + .times(1) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions() + .times(0) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + let state = State::new(3, 5).into(); let mocks = Mocks { consensus_port, - p2p: DefaultMocks::times([1]), + p2p, executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(3, None), true), res); + assert_eq!((State::new(3, None), false), res); } #[tokio::test] @@ -86,7 +278,12 @@ async fn import__header_not_found() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(peer_sourced_headers(Some(Vec::new())))); + .returning(|_| { + let peer = random_peer(); + let headers = Some(Vec::new()); + let headers = peer.bind(headers); + Ok(headers) + }); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -94,12 +291,16 @@ async fn import__header_not_found() { consensus_port: DefaultMocks::times([0]), executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(3, None), true), res); + assert_eq!((State::new(3, None), false), res); } #[tokio::test] @@ -108,7 +309,12 @@ async fn import__header_response_incomplete() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(peer_sourced_headers(None))); + .returning(|_| { + let peer = random_peer(); + let headers = None; + let headers = peer.bind(headers); + Ok(headers) + }); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -116,9 +322,13 @@ async fn import__header_response_incomplete() { consensus_port: DefaultMocks::times([0]), executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then assert_eq!((State::new(3, None), false), res); @@ -130,10 +340,20 @@ async fn import__header_5_not_found() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(peer_sourced_headers(Some(vec![empty_header(4.into())])))); + .returning(|_| { + let peer = random_peer(); + let headers = Some(vec![empty_header(4)]); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions() .times(1) - .returning(|_| Ok(Some(vec![]))); + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -141,12 +361,16 @@ async fn import__header_5_not_found() { consensus_port: DefaultMocks::times([1]), executor: DefaultMocks::times([1]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(4, None), true), res); + assert_eq!((State::new(4, None), false), res); } #[tokio::test] @@ -155,10 +379,13 @@ async fn import__header_4_not_found() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(peer_sourced_headers(Some(vec![empty_header(5.into())])))); - p2p.expect_get_transactions() - .times(0) - .returning(|_| Ok(Some(vec![]))); + .returning(|_| { + let peer = random_peer(); + let headers = Some(vec![empty_header(5)]); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -166,114 +393,159 @@ async fn import__header_4_not_found() { consensus_port: DefaultMocks::times([0]), executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(3, None), true), res); + assert_eq!((State::new(3, None), false), res); } #[tokio::test] async fn import__transactions_not_found() { // given + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(2) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); + let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| { - Ok(peer_sourced_headers(Some(vec![ - empty_header(4.into()), - empty_header(5.into()), - ]))) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) }); p2p.expect_get_transactions() - .times(2) + .times(1) .returning(|_| Ok(None)); let state = State::new(3, 5).into(); let mocks = Mocks { p2p, - consensus_port: DefaultMocks::times([2]), + consensus_port, executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(3, None), true), res); + assert_eq!((State::new(3, None), false), res); } #[tokio::test] async fn import__transactions_not_found_for_header_4() { // given + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(2) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); + let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| { - Ok(peer_sourced_headers(Some(vec![ - empty_header(4.into()), - empty_header(5.into()), - ]))) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) }); let mut height = 3; - p2p.expect_get_transactions().times(2).returning(move |_| { - height += 1; - if height == 4 { - Ok(None) - } else { - Ok(Some(vec![])) - } - }); + p2p.expect_get_transactions() + .times(1) + .returning(move |block_ids| { + height += 1; + if height == 4 { + Ok(None) + } else { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + } + }); let state = State::new(3, 5).into(); let mocks = Mocks { p2p, - consensus_port: DefaultMocks::times([2]), + consensus_port, executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(3, None), true), res); + assert_eq!((State::new(3, None), false), res); } #[tokio::test] async fn import__transactions_not_found_for_header_5() { // given + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(2) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); + let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| { - Ok(peer_sourced_headers(Some(vec![ - empty_header(4.into()), - empty_header(5.into()), - ]))) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) }); - let mut height = 3; - p2p.expect_get_transactions().times(2).returning(move |_| { - height += 1; - if height == 5 { - Ok(None) - } else { - Ok(Some(vec![])) - } + p2p.expect_get_transactions().times(1).returning(move |_| { + let v = vec![Transactions::default()]; + Ok(Some(v)) }); let state = State::new(3, 5).into(); let mocks = Mocks { p2p, - consensus_port: DefaultMocks::times([2]), + consensus_port, executor: DefaultMocks::times([1]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(4, None), true), res); + assert_eq!((State::new(4, None), false), res); } #[tokio::test] @@ -283,6 +555,7 @@ async fn import__p2p_error() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|_| Err(anyhow::anyhow!("Some network error"))); + p2p.expect_get_transactions().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -290,9 +563,13 @@ async fn import__p2p_error() { consensus_port: DefaultMocks::times([0]), executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then assert_eq!((State::new(3, None), false), res); @@ -301,73 +578,45 @@ async fn import__p2p_error() { #[tokio::test] async fn import__p2p_error_on_4_transactions() { // given - let mut p2p = MockPeerToPeerPort::default(); - p2p.expect_get_sealed_block_headers() + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(2) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() .times(1) - .returning(|_| { - Ok(peer_sourced_headers(Some(vec![ - empty_header(4.into()), - empty_header(5.into()), - ]))) - }); - let mut height = 3; - p2p.expect_get_transactions().times(2).returning(move |_| { - height += 1; - if height == 4 { - Err(anyhow::anyhow!("Some network error")) - } else { - Ok(Some(vec![])) - } - }); - - let state = State::new(3, 5).into(); - let mocks = Mocks { - p2p, - consensus_port: DefaultMocks::times([2]), - executor: DefaultMocks::times([0]), - }; - - // when - let res = test_import_inner(state, mocks, None).await; - - // then - assert_eq!((State::new(3, None), false), res); -} + .returning(|_| Ok(())); -#[tokio::test] -async fn import__p2p_error_on_5_transactions() { - // given let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| { - Ok(peer_sourced_headers(Some(vec![ - empty_header(4.into()), - empty_header(5.into()), - ]))) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) }); - let mut height = 3; - p2p.expect_get_transactions().times(2).returning(move |_| { - height += 1; - if height == 5 { - Err(anyhow::anyhow!("Some network error")) - } else { - Ok(Some(vec![])) - } - }); + p2p.expect_get_transactions() + .times(1) + .returning(|_| Err(anyhow::anyhow!("Some network error"))); let state = State::new(3, 5).into(); let mocks = Mocks { p2p, - consensus_port: DefaultMocks::times([2]), - executor: DefaultMocks::times([1]), + consensus_port, + executor: DefaultMocks::times([0]), + }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(4, None), false), res); + assert_eq!((State::new(3, None), false), res); } #[tokio::test] @@ -376,7 +625,7 @@ async fn import__consensus_error_on_4() { let mut consensus_port = MockConsensusPort::default(); consensus_port .expect_check_sealed_header() - .times(2) + .times(1) .returning(|h| { if **h.entity.height() == 4 { Err(anyhow::anyhow!("Some consensus error")) @@ -386,18 +635,33 @@ async fn import__consensus_error_on_4() { }); consensus_port .expect_await_da_height() - .times(1) + .times(0) .returning(|_| Ok(())); + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_headers() + .times(1) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions().times(0); + let state = State::new(3, 5).into(); let mocks = Mocks { consensus_port, - p2p: DefaultMocks::times([1]), + p2p, executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then assert_eq!((State::new(3, None), false), res); @@ -422,15 +686,36 @@ async fn import__consensus_error_on_5() { .times(1) .returning(|_| Ok(())); + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_headers() + .times(1) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions() + .times(1) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + let state = State::new(3, 5).into(); let mocks = Mocks { consensus_port, - p2p: DefaultMocks::times([1]), + p2p, executor: DefaultMocks::times([1]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then assert_eq!((State::new(4, None), false), res); @@ -439,6 +724,33 @@ async fn import__consensus_error_on_5() { #[tokio::test] async fn import__execution_error_on_header_4() { // given + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(2) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); + + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_headers() + .times(1) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions() + .times(1) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + let mut executor = MockBlockImporterPort::default(); executor .expect_execute_and_commit() @@ -453,13 +765,17 @@ async fn import__execution_error_on_header_4() { let state = State::new(3, 5).into(); let mocks = Mocks { - consensus_port: DefaultMocks::times([2]), - p2p: DefaultMocks::times([2]), + consensus_port, + p2p, executor, }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then assert_eq!((State::new(3, None), false), res); @@ -468,6 +784,33 @@ async fn import__execution_error_on_header_4() { #[tokio::test] async fn import__execution_error_on_header_5() { // given + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(2) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); + + let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_get_sealed_block_headers() + .times(1) + .returning(|range| { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions() + .times(1) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + let mut executor = MockBlockImporterPort::default(); executor .expect_execute_and_commit() @@ -482,13 +825,17 @@ async fn import__execution_error_on_header_5() { let state = State::new(3, 5).into(); let mocks = Mocks { - consensus_port: DefaultMocks::times([2]), - p2p: DefaultMocks::times([2]), + consensus_port, + p2p, executor, }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then assert_eq!((State::new(4, None), false), res); @@ -500,8 +847,9 @@ async fn signature_always_fails() { let mut consensus_port = MockConsensusPort::default(); consensus_port .expect_check_sealed_header() - .times(2) + .times(1) .returning(|_| Ok(false)); + consensus_port.expect_await_da_height().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -509,12 +857,16 @@ async fn signature_always_fails() { p2p: DefaultMocks::times([0]), executor: DefaultMocks::times([0]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(state, mocks, None).await; + let res = test_import_inner(state, mocks, None, params).await; // then - assert_eq!((State::new(3, None), true), res); + assert_eq!((State::new(3, None), false), res); } #[tokio::test] @@ -522,26 +874,48 @@ async fn import__can_work_in_two_loops() { // given let s = SharedMutex::new(State::new(3, 5)); let state = s.clone(); + + let mut consensus_port = MockConsensusPort::default(); + consensus_port + .expect_check_sealed_header() + .times(3) + .returning(|_| Ok(true)); + consensus_port + .expect_await_da_height() + .times(2) + .returning(|_| Ok(())); + let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(2) .returning(move |range| { state.apply(|s| s.observe(6)); - let headers = range.clone().map(|h| empty_header(h.into())).collect(); - Ok(peer_sourced_headers(Some(headers))) + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) }); p2p.expect_get_transactions() - .times(3) - .returning(move |_| Ok(Some(vec![]))); + .times(2) + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + let c = DefaultMocks::times([2]); let mocks = Mocks { - consensus_port: DefaultMocks::times([3]), + consensus_port, p2p, executor: DefaultMocks::times([3]), }; + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; // when - let res = test_import_inner(s, mocks, Some(c)).await; + let res = test_import_inner(s, mocks, Some(c), params).await; // then assert_eq!((State::new(6, None), true), res); @@ -551,6 +925,7 @@ async fn test_import_inner( state: SharedMutex, mocks: Mocks, count: Option, + params: Config, ) -> (State, bool) { let notify = Arc::new(Notify::new()); let Mocks { @@ -558,10 +933,6 @@ async fn test_import_inner( mut p2p, executor, } = mocks; - let params = Config { - block_stream_buffer_size: 10, - header_batch_size: 10, - }; p2p.expect_report_peer().returning(|_, _| Ok(())); let p2p = Arc::new(p2p); @@ -602,67 +973,67 @@ async fn test_import_inner( #[tokio::test] async fn import__happy_path_sends_good_peer_report() { // Given - PeerReportTestBuider::new() + PeerReportTestBuilder::new() // When (no changes) // Then - .run_with_expected_report(PeerReportReason::SuccessfulBlockImport) + .run_with_expected_reports([PeerReportReason::SuccessfulBlockImport]) .await; } #[tokio::test] async fn import__multiple_blocks_happy_path_sends_good_peer_report() { // Given - PeerReportTestBuider::new() + PeerReportTestBuilder::new() // When .times(3) // Then - .run_with_expected_report(PeerReportReason::SuccessfulBlockImport) + .run_with_expected_reports([PeerReportReason::SuccessfulBlockImport]) .await; } #[tokio::test] async fn import__missing_headers_sends_peer_report() { // Given - PeerReportTestBuider::new() + PeerReportTestBuilder::new() // When - .with_get_headers(None) + .with_get_sealed_block_headers(None) // Then - .run_with_expected_report(PeerReportReason::MissingBlockHeaders) + .run_with_expected_reports([PeerReportReason::MissingBlockHeaders]) .await; } #[tokio::test] async fn import__bad_block_header_sends_peer_report() { // Given - PeerReportTestBuider::new() + PeerReportTestBuilder::new() // When .with_check_sealed_header(false) // Then - .run_with_expected_report(PeerReportReason::BadBlockHeader) + .run_with_expected_reports([PeerReportReason::BadBlockHeader]) .await; } #[tokio::test] async fn import__missing_transactions_sends_peer_report() { // Given - PeerReportTestBuider::new() + PeerReportTestBuilder::new() // When .with_get_transactions(None) // Then - .run_with_expected_report(PeerReportReason::MissingTransactions) + .run_with_expected_reports([PeerReportReason::MissingTransactions]) .await; } -struct PeerReportTestBuider { +struct PeerReportTestBuilder { shared_peer_id: Vec, get_sealed_headers: Option>>, - get_transactions: Option>>, + get_transactions: Option>>, check_sealed_header: Option, block_count: u32, debug: bool, } -impl PeerReportTestBuider { +impl PeerReportTestBuilder { pub fn new() -> Self { Self { shared_peer_id: vec![1, 2, 3, 4], @@ -680,7 +1051,7 @@ impl PeerReportTestBuider { self } - pub fn with_get_headers( + pub fn with_get_sealed_block_headers( mut self, get_headers: Option>, ) -> Self { @@ -690,7 +1061,7 @@ impl PeerReportTestBuider { pub fn with_get_transactions( mut self, - get_transactions: Option>, + get_transactions: Option>, ) -> Self { self.get_transactions = Some(get_transactions); self @@ -706,14 +1077,17 @@ impl PeerReportTestBuider { self } - pub async fn run_with_expected_report(self, expected_report: PeerReportReason) { + pub async fn run_with_expected_reports(self, expected_reports: R) + where + R: IntoIterator, + { if self.debug { let _ = tracing_subscriber::fmt() .with_max_level(tracing::Level::DEBUG) .try_init(); } - let p2p = self.p2p(expected_report); + let p2p = self.p2p(expected_reports); let executor = self.executor(); let consensus = self.consensus(); @@ -743,39 +1117,54 @@ impl PeerReportTestBuider { let _ = import.import(&mut watcher).await; } - fn p2p(&self, expected_report: PeerReportReason) -> Arc { - let peer_id = self.shared_peer_id.clone(); + fn p2p(&self, expected_reports: R) -> Arc + where + R: IntoIterator, + { let mut p2p = MockPeerToPeerPort::default(); + let peer_id = self.shared_peer_id.clone(); if let Some(get_headers) = self.get_sealed_headers.clone() { p2p.expect_get_sealed_block_headers().returning(move |_| { - Ok(peer_sourced_headers_peer_id( - get_headers.clone(), - peer_id.clone().into(), - )) + let peer: PeerId = peer_id.clone().into(); + let headers = peer.bind(get_headers.clone()); + Ok(headers) }); } else { p2p.expect_get_sealed_block_headers() .returning(move |range| { - Ok(peer_sourced_headers_peer_id( - Some(range.clone().map(|h| empty_header(h.into())).collect()), - peer_id.clone().into(), - )) + let peer: PeerId = peer_id.clone().into(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) }); } - let get_transactions = self.get_transactions.clone().unwrap_or(Some(vec![])); - p2p.expect_get_transactions() - .returning(move |_| Ok(get_transactions.clone())); + let transactions = self.get_transactions.clone(); + if let Some(t) = transactions { + p2p.expect_get_transactions() + .returning(move |_| Ok(t.clone())); + } else { + p2p.expect_get_transactions().returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); + } - let peer_id = self.shared_peer_id.clone(); - p2p.expect_report_peer() - .times(self.block_count as usize) - .withf(move |peer, report| { - let peer_id = peer_id.clone(); - peer.as_ref() == peer_id && report == &expected_report - }) - .returning(|_, _| Ok(())); + let mut seq = mockall::Sequence::new(); + let peer_id: PeerId = self.shared_peer_id.clone().into(); + let expected_reports = expected_reports.into_iter(); + for expected_report in expected_reports { + p2p.expect_report_peer() + .times(1) + .with( + mockall::predicate::eq(peer_id.clone()), + mockall::predicate::eq(expected_report), + ) + .returning(|_, _| Ok(())) + .in_sequence(&mut seq); + } Arc::new(p2p) } @@ -875,13 +1264,19 @@ impl DefaultMocks for MockPeerToPeerPort { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - Ok(peer_sourced_headers(Some( - range.clone().map(|h| empty_header(h.into())).collect(), - ))) + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) }); + p2p.expect_get_transactions() .times(t.next().unwrap()) - .returning(|_| Ok(Some(vec![]))); + .returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }); p2p } } diff --git a/crates/services/sync/src/lib.rs b/crates/services/sync/src/lib.rs index 9087e9aadc4..2b2fcdc87c8 100644 --- a/crates/services/sync/src/lib.rs +++ b/crates/services/sync/src/lib.rs @@ -13,5 +13,7 @@ mod tracing_helpers; pub use import::Config; +use rand as _; + #[cfg(test)] fuel_core_trace::enable_tracing!(); diff --git a/crates/services/sync/src/ports.rs b/crates/services/sync/src/ports.rs index 1d270a70ed8..86f8280489d 100644 --- a/crates/services/sync/src/ports.rs +++ b/crates/services/sync/src/ports.rs @@ -3,18 +3,15 @@ use fuel_core_services::stream::BoxStream; use fuel_core_types::{ blockchain::{ - primitives::{ - BlockId, - DaBlockHeight, - }, + primitives::DaBlockHeight, SealedBlock, SealedBlockHeader, }, - fuel_tx::Transaction, fuel_types::BlockHeight, services::p2p::{ PeerId, SourcePeer, + Transactions, }, }; use std::ops::Range; @@ -54,15 +51,11 @@ pub trait PeerToPeerPort { /// and source peer. async fn get_transactions( &self, - block_id: SourcePeer, - ) -> anyhow::Result>>; + block_ids: SourcePeer>, + ) -> anyhow::Result>>; /// Report a peer for some reason to modify their reputation. - async fn report_peer( - &self, - peer: PeerId, - report: PeerReportReason, - ) -> anyhow::Result<()>; + fn report_peer(&self, peer: PeerId, report: PeerReportReason) -> anyhow::Result<()>; } #[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs index 64dad7c22b1..57440afae24 100644 --- a/crates/services/sync/src/service/tests.rs +++ b/crates/services/sync/src/service/tests.rs @@ -2,6 +2,7 @@ use fuel_core_services::{ stream::IntoBoxStream, Service, }; +use fuel_core_types::services::p2p::Transactions; use futures::{ stream, StreamExt, @@ -10,7 +11,7 @@ use futures::{ use crate::{ import::test_helpers::{ empty_header, - peer_sourced_headers, + random_peer, }, ports::{ MockBlockImporterPort, @@ -38,16 +39,16 @@ async fn test_new_service() { .into_boxed() }); p2p.expect_get_sealed_block_headers().returning(|range| { - Ok(peer_sourced_headers(Some( - range - .clone() - .map(BlockHeight::from) - .map(empty_header) - .collect(), - ))) + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect::>()); + let headers = peer.bind(headers); + Ok(headers) + }); + p2p.expect_get_transactions().returning(|block_ids| { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) }); - p2p.expect_get_transactions() - .returning(|_| Ok(Some(vec![]))); let mut importer = MockBlockImporterPort::default(); importer .expect_committed_height_stream() diff --git a/crates/types/src/services/p2p.rs b/crates/types/src/services/p2p.rs index d0f8e604e59..758ca83df2d 100644 --- a/crates/types/src/services/p2p.rs +++ b/crates/types/src/services/p2p.rs @@ -5,9 +5,15 @@ use crate::{ fuel_types::BlockHeight, }; use std::fmt::Debug; + /// Contains types and logic for Peer Reputation pub mod peer_reputation; +/// List of transactions +#[derive(Debug, Clone, Default)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Transactions(pub Vec); + /// Lightweight representation of gossipped data that only includes IDs #[derive(Debug, Clone, Hash, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] @@ -48,7 +54,7 @@ pub struct GossipData { /// Transactions gossiped by peers for inclusion into a block pub type TransactionGossipData = GossipData; -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] /// The source of some network data. pub struct SourcePeer { /// The source of the data. @@ -57,6 +63,19 @@ pub struct SourcePeer { pub data: T, } +impl SourcePeer { + /// Maps a `SourcePeer` to `SourcePeer` by applying a function to the + /// contained data. The internal `peer_id` is maintained. + pub fn map(self, mut f: F) -> SourcePeer + where + F: FnMut(T) -> U, + { + let peer_id = self.peer_id; + let data = f(self.data); + SourcePeer { peer_id, data } + } +} + impl GossipData { /// Construct a new gossip message pub fn new( @@ -93,7 +112,7 @@ pub struct BlockHeightHeartbeatData { } /// Opaque peer identifier. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct PeerId(Vec); @@ -114,3 +133,14 @@ impl From for Vec { peer_id.0 } } + +impl PeerId { + /// Bind the PeerId and given data of type T together to generate a + /// SourcePeer + pub fn bind(self, data: T) -> SourcePeer { + SourcePeer { + peer_id: self, + data, + } + } +}