diff --git a/CHANGELOG.md b/CHANGELOG.md index 15ffd551b29..f2cf97220f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Added +- [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization. - [#1309](https://github.com/FuelLabs/fuel-core/pull/1309): Add documentation for running debug builds with CLion and Visual Studio Code. - [#1308](https://github.com/FuelLabs/fuel-core/pull/1308): Add support for loading .env files when compiling with the `env` feature. This allows users to conveniently supply CLI arguments in a secure and IDE-agnostic way. - [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions. diff --git a/Cargo.lock b/Cargo.lock index 1c8cbd1d73d..2cf3afcfea5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2659,13 +2659,17 @@ dependencies = [ name = "fuel-core-benches" version = "0.0.0" dependencies = [ + "anyhow", + "async-trait", "clap", "criterion", "ctrlc", "ed25519-dalek", "ethnum", "fuel-core", + "fuel-core-services", "fuel-core-storage", + "fuel-core-sync", "fuel-core-types", "p256 0.13.2", "rand 0.7.3", diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 4aeba3620ce..89e2c62e91b 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -6,6 +6,8 @@ publish = false version = "0.0.0" [dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } criterion = { version = "0.5", features = ["html_reports", "async", "async_tokio"] } ctrlc = "3.2.3" @@ -13,7 +15,9 @@ ed25519-dalek = "1.0" # TODO: upgrade to 2.0 when it's released, and remove rand ed25519-dalek_old_rand = { package = "rand", version = "0.7.3" } ethnum = "1.3" fuel-core = { path = "../crates/fuel-core", default-features = false, features = ["metrics", "rocksdb-production"] } +fuel-core-services = { path = "./../crates/services" } fuel-core-storage = { path = "./../crates/storage" } +fuel-core-sync = { path = "./../crates/services/sync", features = ["benchmarking"] } fuel-core-types = { path = "./../crates/types", features = ["test-helpers"] } p256 = { version = "0.13", default-features = false, features = ["digest", "ecdsa"] } rand = { workspace = true } @@ -23,6 +27,10 @@ serde_yaml = "0.9.13" tikv-jemallocator = { workspace = true } tokio = { workspace = true, features = ["full"] } +[[bench]] +harness = false +name = "import" + [[bench]] harness = false name = "state" @@ -31,6 +39,9 @@ name = "state" harness = false name = "vm" +[features] +default = ["fuel-core/rocksdb"] + [[bench]] harness = false name = "block_target_gas" diff --git a/benches/benches/import.rs b/benches/benches/import.rs new file mode 100644 index 00000000000..c1c1a85412f --- /dev/null +++ b/benches/benches/import.rs @@ -0,0 +1,101 @@ +use criterion::{ + criterion_group, + criterion_main, + measurement::WallTime, + BenchmarkGroup, + Criterion, +}; +use fuel_core_benches::import::{ + provision_import_test, + Durations, + PressureImport, + SharedCounts, +}; +use fuel_core_services::{ + SharedMutex, + StateWatcher, +}; +use fuel_core_sync::state::State; +use std::time::Duration; +use tokio::runtime::Runtime; + +async fn execute_import(import: PressureImport, shutdown: &mut StateWatcher) { + import.import(shutdown).await.unwrap(); +} + +fn name(n: u32, durations: Durations, buffer_size: usize) -> String { + format!( + "import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {sz}", + n = n, + d_h = durations.headers.as_millis(), + d_c = durations.consensus.as_millis(), + d_t = durations.transactions.as_millis(), + d_e = durations.executes.as_millis(), + sz = buffer_size + ) +} + +fn bench_imports(c: &mut Criterion) { + let bench_import = |group: &mut BenchmarkGroup, + n: u32, + durations: Durations, + batch_size: u32, + buffer_size: usize| { + let name = name(n, durations, buffer_size); + group.bench_function(name, move |b| { + let rt = Runtime::new().unwrap(); + b.to_async(&rt).iter_custom(|iters| async move { + let mut elapsed_time = Duration::default(); + for _ in 0..iters { + let shared_count = SharedCounts::new(Default::default()); + let state = State::new(None, n); + let shared_state = SharedMutex::new(state); + let (import, _tx, mut shutdown) = provision_import_test( + shared_count.clone(), + shared_state, + durations, + batch_size, + buffer_size, + buffer_size, + ); + import.notify_one(); + let start = std::time::Instant::now(); + execute_import(import, &mut shutdown).await; + elapsed_time += start.elapsed(); + } + elapsed_time + }) + }); + }; + + let mut group = c.benchmark_group("import"); + + let n = 100; + let durations = Durations { + headers: Duration::from_millis(5), + consensus: Duration::from_millis(5), + transactions: Duration::from_millis(5), + executes: Duration::from_millis(10), + }; + + // Header batch size = 10, header/txn buffer size = 10 + bench_import(&mut group, n, durations, 10, 10); + + // Header batch size = 20, header/txn buffer size = 10 + bench_import(&mut group, n, durations, 20, 10); + + // Header batch size = 50, header/txn buffer size = 10 + bench_import(&mut group, n, durations, 20, 10); + + // Header batch size = 10, header/txn buffer size = 20 + bench_import(&mut group, n, durations, 10, 20); + + // Header batch size = 10, header/txn buffer size = 50 + bench_import(&mut group, n, durations, 10, 50); + + // Header batch size = 50, header/txn buffer size = 50 + bench_import(&mut group, n, durations, 10, 20); +} + +criterion_group!(benches, bench_imports); +criterion_main!(benches); diff --git a/benches/src/import.rs b/benches/src/import.rs new file mode 100644 index 00000000000..3d79206bda5 --- /dev/null +++ b/benches/src/import.rs @@ -0,0 +1,80 @@ +use fuel_core_services::{ + SharedMutex, + StateWatcher, +}; +pub use fuel_core_sync::import::test_helpers::SharedCounts; +use fuel_core_sync::{ + import::{ + test_helpers::{ + PressureBlockImporter, + PressureConsensus, + PressurePeerToPeer, + }, + Import, + }, + state::State, + Config, +}; +use std::{ + sync::Arc, + time::Duration, +}; +use tokio::sync::{ + watch::Sender, + Notify, +}; + +pub type PressureImport = + Import; + +#[derive(Default, Clone, Copy)] +pub struct Durations { + pub headers: Duration, + pub consensus: Duration, + pub transactions: Duration, + pub executes: Duration, +} + +pub fn provision_import_test( + shared_count: SharedCounts, + shared_state: SharedMutex, + input: Durations, + header_batch_size: u32, + max_header_batch_requests: usize, + max_get_txns_requests: usize, +) -> ( + PressureImport, + Sender, + StateWatcher, +) { + let shared_notify = Arc::new(Notify::new()); + let params = Config { + max_header_batch_requests, + header_batch_size, + max_get_txns_requests, + }; + let p2p = Arc::new(PressurePeerToPeer::new( + shared_count.clone(), + [input.headers, input.transactions], + )); + let executor = Arc::new(PressureBlockImporter::new( + shared_count.clone(), + input.executes, + )); + let consensus = Arc::new(PressureConsensus::new( + shared_count.clone(), + input.consensus, + )); + + let (tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); + let watcher = shutdown.into(); + let import = Import::new( + shared_state, + shared_notify, + params, + p2p, + executor, + consensus, + ); + (import, tx, watcher) +} diff --git a/benches/src/lib.rs b/benches/src/lib.rs index 410c3fe231f..550cfe84c60 100644 --- a/benches/src/lib.rs +++ b/benches/src/lib.rs @@ -1,3 +1,5 @@ +pub mod import; + use fuel_core::database::vm_database::VmDatabase; pub use fuel_core::database::Database; use fuel_core_types::{ diff --git a/crates/services/sync/Cargo.toml b/crates/services/sync/Cargo.toml index a6cd607fb50..1b41e15436f 100644 --- a/crates/services/sync/Cargo.toml +++ b/crates/services/sync/Cargo.toml @@ -15,6 +15,7 @@ async-trait = { workspace = true } fuel-core-services = { workspace = true } fuel-core-types = { workspace = true } futures = { workspace = true } +mockall = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } @@ -23,3 +24,6 @@ fuel-core-trace = { path = "../../trace" } fuel-core-types = { path = "../../types", features = ["test-helpers"] } mockall = { workspace = true } test-case = { workspace = true } + +[features] +benchmarking = ["dep:mockall"] diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index 2354df0bf8c..19658e220ee 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -41,8 +41,10 @@ use crate::{ }, }; -#[cfg(test)] -pub(crate) use tests::empty_header; +#[cfg(any(test, feature = "benchmarking"))] +/// Accessories for testing the sync. Available only when compiling under test +/// or benchmarking. +pub mod test_helpers; #[cfg(test)] mod tests; @@ -71,7 +73,9 @@ impl Default for Config { } } -pub(crate) struct Import { +/// The combination of shared state, configuration, and services that define +/// import behavior. +pub struct Import { /// Shared state between import and sync tasks. state: SharedMutex, /// Notify import when sync has new work. @@ -87,7 +91,9 @@ pub(crate) struct Import { } impl Import { - pub(crate) fn new( + /// Configure an import behavior from a shared state, configuration and + /// services that can be executed by an ImportTask. + pub fn new( state: SharedMutex, notify: Arc, params: Config, @@ -104,6 +110,11 @@ impl Import { consensus, } } + + /// Signal other asynchronous tasks that an import event has occurred. + pub fn notify_one(&self) { + self.notify.notify_one() + } } impl Import where @@ -112,10 +123,8 @@ where C: ConsensusPort + Send + Sync + 'static, { #[tracing::instrument(skip_all)] - pub(crate) async fn import( - &self, - shutdown: &mut StateWatcher, - ) -> anyhow::Result { + /// Execute imports until a shutdown is requested. + pub async fn import(&self, shutdown: &mut StateWatcher) -> anyhow::Result { self.import_inner(shutdown).await?; Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await) diff --git a/crates/services/sync/src/import/back_pressure_tests.rs b/crates/services/sync/src/import/back_pressure_tests.rs index 423286df914..6dea34a7678 100644 --- a/crates/services/sync/src/import/back_pressure_tests.rs +++ b/crates/services/sync/src/import/back_pressure_tests.rs @@ -1,29 +1,13 @@ -use std::{ - ops::Range, - time::Duration, +use std::time::Duration; + +use super::*; +use crate::import::test_helpers::{ + Count, + PressureBlockImporter, + PressureConsensus, + PressurePeerToPeer, + SharedCounts, }; - -use fuel_core_services::stream::BoxStream; -use fuel_core_types::{ - blockchain::primitives::{ - BlockId, - DaBlockHeight, - }, - fuel_tx::Transaction, -}; - -use crate::ports::{ - BlockImporterPort, - MockBlockImporterPort, - MockConsensusPort, - MockPeerToPeerPort, -}; - -use super::{ - tests::empty_header, - *, -}; -use fuel_core_types::fuel_types::BlockHeight; use test_case::test_case; #[derive(Default)] @@ -141,168 +125,3 @@ async fn test_back_pressure(input: Input, state: State, params: Config) -> Count import.import(&mut watcher).await.unwrap(); counts.apply(|c| c.max.clone()) } - -#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)] -struct Count { - headers: usize, - transactions: usize, - consensus: usize, - executes: usize, - blocks: usize, -} - -#[derive(Debug, Default, PartialEq, Eq)] -struct Counts { - now: Count, - max: Count, -} - -type SharedCounts = SharedMutex; - -struct PressurePeerToPeer { - p2p: MockPeerToPeerPort, - durations: [Duration; 2], - counts: SharedCounts, -} - -struct PressureBlockImporter(MockBlockImporterPort, Duration, SharedCounts); - -struct PressureConsensus(MockConsensusPort, Duration, SharedCounts); - -#[async_trait::async_trait] -impl PeerToPeerPort for PressurePeerToPeer { - fn height_stream(&self) -> BoxStream { - self.p2p.height_stream() - } - - async fn get_sealed_block_headers( - &self, - block_height_range: Range, - ) -> anyhow::Result>>> { - self.counts.apply(|c| c.inc_headers()); - tokio::time::sleep(self.durations[0]).await; - self.counts.apply(|c| c.dec_headers()); - for _ in block_height_range.clone() { - self.counts.apply(|c| c.inc_blocks()); - } - self.p2p.get_sealed_block_headers(block_height_range).await - } - - async fn get_transactions( - &self, - block_id: SourcePeer, - ) -> anyhow::Result>> { - self.counts.apply(|c| c.inc_transactions()); - tokio::time::sleep(self.durations[1]).await; - self.counts.apply(|c| c.dec_transactions()); - self.p2p.get_transactions(block_id).await - } -} - -#[async_trait::async_trait] -impl BlockImporterPort for PressureBlockImporter { - fn committed_height_stream(&self) -> BoxStream { - self.0.committed_height_stream() - } - - async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> { - self.2.apply(|c| c.inc_executes()); - tokio::time::sleep(self.1).await; - self.2.apply(|c| { - c.dec_executes(); - c.dec_blocks(); - }); - self.0.execute_and_commit(block).await - } -} - -#[async_trait::async_trait] -impl ConsensusPort for PressureConsensus { - fn check_sealed_header(&self, header: &SealedBlockHeader) -> anyhow::Result { - self.0.check_sealed_header(header) - } - - async fn await_da_height(&self, da_height: &DaBlockHeight) -> anyhow::Result<()> { - self.2.apply(|c| c.inc_consensus()); - tokio::time::sleep(self.1).await; - self.2.apply(|c| c.dec_consensus()); - self.0.await_da_height(da_height).await - } -} - -impl PressurePeerToPeer { - fn new(counts: SharedCounts, delays: [Duration; 2]) -> Self { - let mut mock = MockPeerToPeerPort::default(); - mock.expect_get_sealed_block_headers().returning(|range| { - Ok(Some( - range - .clone() - .map(BlockHeight::from) - .map(empty_header) - .collect(), - )) - }); - mock.expect_get_transactions() - .returning(|_| Ok(Some(vec![]))); - Self { - p2p: mock, - durations: delays, - counts, - } - } -} - -impl PressureBlockImporter { - fn new(counts: SharedCounts, delays: Duration) -> Self { - let mut mock = MockBlockImporterPort::default(); - mock.expect_execute_and_commit().returning(move |_| Ok(())); - Self(mock, delays, counts) - } -} - -impl PressureConsensus { - fn new(counts: SharedCounts, delays: Duration) -> Self { - let mut mock = MockConsensusPort::default(); - mock.expect_await_da_height().returning(|_| Ok(())); - mock.expect_check_sealed_header().returning(|_| Ok(true)); - Self(mock, delays, counts) - } -} - -impl Counts { - fn inc_headers(&mut self) { - self.now.headers += 1; - self.max.headers = self.max.headers.max(self.now.headers); - } - fn dec_headers(&mut self) { - self.now.headers -= 1; - } - fn inc_transactions(&mut self) { - self.now.transactions += 1; - self.max.transactions = self.max.transactions.max(self.now.transactions); - } - fn dec_transactions(&mut self) { - self.now.transactions -= 1; - } - fn inc_consensus(&mut self) { - self.now.consensus += 1; - self.max.consensus = self.max.consensus.max(self.now.consensus); - } - fn dec_consensus(&mut self) { - self.now.consensus -= 1; - } - fn inc_executes(&mut self) { - self.now.executes += 1; - self.max.executes = self.max.executes.max(self.now.executes); - } - fn dec_executes(&mut self) { - self.now.executes -= 1; - } - fn inc_blocks(&mut self) { - self.now.blocks += 1; - self.max.blocks = self.max.blocks.max(self.now.blocks); - } - fn dec_blocks(&mut self) { - self.now.blocks -= 1; - } -} diff --git a/crates/services/sync/src/import/test_helpers.rs b/crates/services/sync/src/import/test_helpers.rs new file mode 100644 index 00000000000..939dbd7befb --- /dev/null +++ b/crates/services/sync/src/import/test_helpers.rs @@ -0,0 +1,45 @@ +#![allow(missing_docs)] + +mod counts; +mod pressure_block_importer; +mod pressure_consensus; +mod pressure_peer_to_peer; + +use fuel_core_types::{ + blockchain::{ + consensus::{ + Consensus, + Sealed, + }, + header::BlockHeader, + SealedBlockHeader, + }, + fuel_types::BlockHeight, + services::p2p::SourcePeer, +}; + +pub use counts::{ + Count, + SharedCounts, +}; +pub use pressure_block_importer::PressureBlockImporter; +pub use pressure_consensus::PressureConsensus; +pub use pressure_peer_to_peer::PressurePeerToPeer; + +pub fn empty_header(h: BlockHeight) -> SourcePeer { + let mut header = BlockHeader::default(); + header.consensus.height = h; + let transaction_tree = + fuel_core_types::fuel_merkle::binary::in_memory::MerkleTree::new(); + header.application.generated.transactions_root = transaction_tree.root().into(); + + let consensus = Consensus::default(); + let sealed = Sealed { + entity: header, + consensus, + }; + SourcePeer { + peer_id: vec![].into(), + data: sealed, + } +} diff --git a/crates/services/sync/src/import/test_helpers/counts.rs b/crates/services/sync/src/import/test_helpers/counts.rs new file mode 100644 index 00000000000..d98e75ddd30 --- /dev/null +++ b/crates/services/sync/src/import/test_helpers/counts.rs @@ -0,0 +1,56 @@ +use fuel_core_services::SharedMutex; + +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub struct Count { + pub headers: usize, + pub transactions: usize, + pub consensus: usize, + pub executes: usize, + pub blocks: usize, +} + +#[derive(Debug, Default, PartialEq, Eq)] +pub struct Counts { + pub now: Count, + pub max: Count, +} + +pub type SharedCounts = SharedMutex; + +impl Counts { + pub fn inc_headers(&mut self) { + self.now.headers += 1; + self.max.headers = self.max.headers.max(self.now.headers); + } + pub fn dec_headers(&mut self) { + self.now.headers -= 1; + } + pub fn inc_transactions(&mut self) { + self.now.transactions += 1; + self.max.transactions = self.max.transactions.max(self.now.transactions); + } + pub fn dec_transactions(&mut self) { + self.now.transactions -= 1; + } + pub fn inc_consensus(&mut self) { + self.now.consensus += 1; + self.max.consensus = self.max.consensus.max(self.now.consensus); + } + pub fn dec_consensus(&mut self) { + self.now.consensus -= 1; + } + pub fn inc_executes(&mut self) { + self.now.executes += 1; + self.max.executes = self.max.executes.max(self.now.executes); + } + pub fn dec_executes(&mut self) { + self.now.executes -= 1; + } + pub fn inc_blocks(&mut self) { + self.now.blocks += 1; + self.max.blocks = self.max.blocks.max(self.now.blocks); + } + pub fn dec_blocks(&mut self) { + self.now.blocks -= 1; + } +} diff --git a/crates/services/sync/src/import/test_helpers/pressure_block_importer.rs b/crates/services/sync/src/import/test_helpers/pressure_block_importer.rs new file mode 100644 index 00000000000..217e3fd1c9f --- /dev/null +++ b/crates/services/sync/src/import/test_helpers/pressure_block_importer.rs @@ -0,0 +1,40 @@ +use crate::{ + import::test_helpers::SharedCounts, + ports::{ + BlockImporterPort, + MockBlockImporterPort, + }, +}; +use fuel_core_services::stream::BoxStream; +use fuel_core_types::{ + blockchain::SealedBlock, + fuel_types::BlockHeight, +}; +use std::time::Duration; + +pub struct PressureBlockImporter(MockBlockImporterPort, Duration, SharedCounts); + +#[async_trait::async_trait] +impl BlockImporterPort for PressureBlockImporter { + fn committed_height_stream(&self) -> BoxStream { + self.0.committed_height_stream() + } + + async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> { + self.2.apply(|c| c.inc_executes()); + tokio::time::sleep(self.1).await; + self.2.apply(|c| { + c.dec_executes(); + c.dec_blocks(); + }); + self.0.execute_and_commit(block).await + } +} + +impl PressureBlockImporter { + pub fn new(counts: SharedCounts, delays: Duration) -> Self { + let mut mock = MockBlockImporterPort::default(); + mock.expect_execute_and_commit().returning(move |_| Ok(())); + Self(mock, delays, counts) + } +} diff --git a/crates/services/sync/src/import/test_helpers/pressure_consensus.rs b/crates/services/sync/src/import/test_helpers/pressure_consensus.rs new file mode 100644 index 00000000000..441dca03563 --- /dev/null +++ b/crates/services/sync/src/import/test_helpers/pressure_consensus.rs @@ -0,0 +1,37 @@ +use crate::{ + import::test_helpers::counts::SharedCounts, + ports::{ + ConsensusPort, + MockConsensusPort, + }, +}; +use fuel_core_types::blockchain::{ + primitives::DaBlockHeight, + SealedBlockHeader, +}; +use std::time::Duration; + +pub struct PressureConsensus(MockConsensusPort, Duration, SharedCounts); + +#[async_trait::async_trait] +impl ConsensusPort for PressureConsensus { + fn check_sealed_header(&self, header: &SealedBlockHeader) -> anyhow::Result { + self.0.check_sealed_header(header) + } + + async fn await_da_height(&self, da_height: &DaBlockHeight) -> anyhow::Result<()> { + self.2.apply(|c| c.inc_consensus()); + tokio::time::sleep(self.1).await; + self.2.apply(|c| c.dec_consensus()); + self.0.await_da_height(da_height).await + } +} + +impl PressureConsensus { + pub fn new(counts: SharedCounts, delays: Duration) -> Self { + let mut mock = MockConsensusPort::default(); + mock.expect_await_da_height().returning(|_| Ok(())); + mock.expect_check_sealed_header().returning(|_| Ok(true)); + Self(mock, delays, counts) + } +} diff --git a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs new file mode 100644 index 00000000000..b0d5b93b946 --- /dev/null +++ b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs @@ -0,0 +1,82 @@ +use crate::{ + import::test_helpers::{ + empty_header, + SharedCounts, + }, + ports::{ + MockPeerToPeerPort, + PeerToPeerPort, + }, +}; +use fuel_core_services::stream::BoxStream; +use fuel_core_types::{ + blockchain::{ + primitives::BlockId, + SealedBlockHeader, + }, + fuel_tx::Transaction, + fuel_types::BlockHeight, + services::p2p::SourcePeer, +}; +use std::{ + ops::Range, + time::Duration, +}; + +pub struct PressurePeerToPeer { + p2p: MockPeerToPeerPort, + durations: [Duration; 2], + counts: SharedCounts, +} + +#[async_trait::async_trait] +impl PeerToPeerPort for PressurePeerToPeer { + fn height_stream(&self) -> BoxStream { + self.p2p.height_stream() + } + + async fn get_sealed_block_headers( + &self, + block_height_range: Range, + ) -> anyhow::Result>>> { + self.counts.apply(|c| c.inc_headers()); + tokio::time::sleep(self.durations[0]).await; + self.counts.apply(|c| c.dec_headers()); + for _ in block_height_range.clone() { + self.counts.apply(|c| c.inc_blocks()); + } + self.p2p.get_sealed_block_headers(block_height_range).await + } + + async fn get_transactions( + &self, + block_id: SourcePeer, + ) -> anyhow::Result>> { + self.counts.apply(|c| c.inc_transactions()); + tokio::time::sleep(self.durations[1]).await; + self.counts.apply(|c| c.dec_transactions()); + self.p2p.get_transactions(block_id).await + } +} + +impl PressurePeerToPeer { + pub fn new(counts: SharedCounts, delays: [Duration; 2]) -> Self { + let mut mock = MockPeerToPeerPort::default(); + mock.expect_get_sealed_block_headers().returning(|range| { + Ok(Some( + range + .clone() + .map(BlockHeight::from) + .map(empty_header) + .collect(), + )) + }); + mock.expect_get_transactions() + .returning(|_| Ok(Some(vec![]))); + Self { + p2p: mock, + durations: delays, + counts, + } + } +} diff --git a/crates/services/sync/src/import/tests.rs b/crates/services/sync/src/import/tests.rs index 308d58e5347..796dc70094c 100644 --- a/crates/services/sync/src/import/tests.rs +++ b/crates/services/sync/src/import/tests.rs @@ -1,16 +1,13 @@ #![allow(non_snake_case)] -use fuel_core_types::blockchain::{ - consensus::Consensus, - header::BlockHeader, +use crate::{ + import::test_helpers::empty_header, + ports::{ + MockBlockImporterPort, + MockConsensusPort, + MockPeerToPeerPort, + }, }; - -use crate::ports::{ - MockBlockImporterPort, - MockConsensusPort, - MockPeerToPeerPort, -}; -use fuel_core_types::fuel_types::BlockHeight; use test_case::test_case; use super::*; @@ -657,21 +654,3 @@ impl DefaultMocks for MockBlockImporterPort { executor } } - -pub(crate) fn empty_header(h: BlockHeight) -> SourcePeer { - let mut header = BlockHeader::default(); - header.consensus.height = h; - let transaction_tree = - fuel_core_types::fuel_merkle::binary::in_memory::MerkleTree::new(); - header.application.generated.transactions_root = transaction_tree.root().into(); - - let consensus = Consensus::default(); - let sealed = Sealed { - entity: header, - consensus, - }; - SourcePeer { - peer_id: vec![].into(), - data: sealed, - } -} diff --git a/crates/services/sync/src/lib.rs b/crates/services/sync/src/lib.rs index 8ff9cab3ca4..9087e9aadc4 100644 --- a/crates/services/sync/src/lib.rs +++ b/crates/services/sync/src/lib.rs @@ -7,7 +7,7 @@ pub mod import; pub mod ports; pub mod service; -mod state; +pub mod state; pub mod sync; mod tracing_helpers; diff --git a/crates/services/sync/src/ports.rs b/crates/services/sync/src/ports.rs index fcbef6ecbec..bdd860212a4 100644 --- a/crates/services/sync/src/ports.rs +++ b/crates/services/sync/src/ports.rs @@ -16,7 +16,7 @@ use fuel_core_types::{ }; use std::ops::Range; -#[cfg_attr(test, mockall::automock)] +#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] #[async_trait::async_trait] /// Port for communication with the network. pub trait PeerToPeerPort { @@ -37,7 +37,7 @@ pub trait PeerToPeerPort { ) -> anyhow::Result>>; } -#[cfg_attr(test, mockall::automock)] +#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] #[async_trait::async_trait] /// Port for communication with the consensus service. pub trait ConsensusPort { @@ -47,7 +47,7 @@ pub trait ConsensusPort { async fn await_da_height(&self, da_height: &DaBlockHeight) -> anyhow::Result<()>; } -#[cfg_attr(test, mockall::automock)] +#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] #[async_trait::async_trait] /// Port for communication with the block importer. pub trait BlockImporterPort { diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs index 30cca1235bc..04617bb97a5 100644 --- a/crates/services/sync/src/service/tests.rs +++ b/crates/services/sync/src/service/tests.rs @@ -8,7 +8,7 @@ use futures::{ }; use crate::{ - import::empty_header, + import::test_helpers::empty_header, ports::{ MockBlockImporterPort, MockConsensusPort,