From f84ce9b8ce3ed4e7b758854bd9ccb87c541bc30e Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Wed, 23 Aug 2023 22:13:33 -0400 Subject: [PATCH] test: Block Import benchmarks and test helpers (#1274) Related issues: - https://github.com/FuelLabs/fuel-core/issues/1167 This PR adds benchmarking for block synchronization. Benchmarks simulate the import by using mock services that introduce artificial delays. Benchmarks are provisioned by reusing existing test types and mock ports. This PR refactors the existing back pressure tests to extract its test structures so that they can be reused in benchmarks. In a follow up PR, I will be refactoring the block import to use a single buffer and asynchronous tasks to perform header downloads. --------- Co-authored-by: Brandon Kite --- CHANGELOG.md | 1 + Cargo.lock | 4 + benches/Cargo.toml | 11 + benches/benches/import.rs | 101 +++++++++ benches/src/import.rs | 80 +++++++ benches/src/lib.rs | 2 + crates/services/sync/Cargo.toml | 4 + crates/services/sync/src/import.rs | 25 ++- .../sync/src/import/back_pressure_tests.rs | 199 +----------------- .../services/sync/src/import/test_helpers.rs | 45 ++++ .../sync/src/import/test_helpers/counts.rs | 56 +++++ .../test_helpers/pressure_block_importer.rs | 40 ++++ .../import/test_helpers/pressure_consensus.rs | 37 ++++ .../test_helpers/pressure_peer_to_peer.rs | 82 ++++++++ crates/services/sync/src/import/tests.rs | 35 +-- crates/services/sync/src/lib.rs | 2 +- crates/services/sync/src/ports.rs | 6 +- crates/services/sync/src/service/tests.rs | 2 +- 18 files changed, 501 insertions(+), 231 deletions(-) create mode 100644 benches/benches/import.rs create mode 100644 benches/src/import.rs create mode 100644 crates/services/sync/src/import/test_helpers.rs create mode 100644 crates/services/sync/src/import/test_helpers/counts.rs create mode 100644 crates/services/sync/src/import/test_helpers/pressure_block_importer.rs create mode 100644 crates/services/sync/src/import/test_helpers/pressure_consensus.rs create mode 100644 crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 15ffd551b29..f2cf97220f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Added +- [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization. - [#1309](https://github.com/FuelLabs/fuel-core/pull/1309): Add documentation for running debug builds with CLion and Visual Studio Code. - [#1308](https://github.com/FuelLabs/fuel-core/pull/1308): Add support for loading .env files when compiling with the `env` feature. This allows users to conveniently supply CLI arguments in a secure and IDE-agnostic way. - [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions. diff --git a/Cargo.lock b/Cargo.lock index 1c8cbd1d73d..2cf3afcfea5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2659,13 +2659,17 @@ dependencies = [ name = "fuel-core-benches" version = "0.0.0" dependencies = [ + "anyhow", + "async-trait", "clap", "criterion", "ctrlc", "ed25519-dalek", "ethnum", "fuel-core", + "fuel-core-services", "fuel-core-storage", + "fuel-core-sync", "fuel-core-types", "p256 0.13.2", "rand 0.7.3", diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 4aeba3620ce..89e2c62e91b 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -6,6 +6,8 @@ publish = false version = "0.0.0" [dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } criterion = { version = "0.5", features = ["html_reports", "async", "async_tokio"] } ctrlc = "3.2.3" @@ -13,7 +15,9 @@ ed25519-dalek = "1.0" # TODO: upgrade to 2.0 when it's released, and remove rand ed25519-dalek_old_rand = { package = "rand", version = "0.7.3" } ethnum = "1.3" fuel-core = { path = "../crates/fuel-core", default-features = false, features = ["metrics", "rocksdb-production"] } +fuel-core-services = { path = "./../crates/services" } fuel-core-storage = { path = "./../crates/storage" } +fuel-core-sync = { path = "./../crates/services/sync", features = ["benchmarking"] } fuel-core-types = { path = "./../crates/types", features = ["test-helpers"] } p256 = { version = "0.13", default-features = false, features = ["digest", "ecdsa"] } rand = { workspace = true } @@ -23,6 +27,10 @@ serde_yaml = "0.9.13" tikv-jemallocator = { workspace = true } tokio = { workspace = true, features = ["full"] } +[[bench]] +harness = false +name = "import" + [[bench]] harness = false name = "state" @@ -31,6 +39,9 @@ name = "state" harness = false name = "vm" +[features] +default = ["fuel-core/rocksdb"] + [[bench]] harness = false name = "block_target_gas" diff --git a/benches/benches/import.rs b/benches/benches/import.rs new file mode 100644 index 00000000000..c1c1a85412f --- /dev/null +++ b/benches/benches/import.rs @@ -0,0 +1,101 @@ +use criterion::{ + criterion_group, + criterion_main, + measurement::WallTime, + BenchmarkGroup, + Criterion, +}; +use fuel_core_benches::import::{ + provision_import_test, + Durations, + PressureImport, + SharedCounts, +}; +use fuel_core_services::{ + SharedMutex, + StateWatcher, +}; +use fuel_core_sync::state::State; +use std::time::Duration; +use tokio::runtime::Runtime; + +async fn execute_import(import: PressureImport, shutdown: &mut StateWatcher) { + import.import(shutdown).await.unwrap(); +} + +fn name(n: u32, durations: Durations, buffer_size: usize) -> String { + format!( + "import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {sz}", + n = n, + d_h = durations.headers.as_millis(), + d_c = durations.consensus.as_millis(), + d_t = durations.transactions.as_millis(), + d_e = durations.executes.as_millis(), + sz = buffer_size + ) +} + +fn bench_imports(c: &mut Criterion) { + let bench_import = |group: &mut BenchmarkGroup, + n: u32, + durations: Durations, + batch_size: u32, + buffer_size: usize| { + let name = name(n, durations, buffer_size); + group.bench_function(name, move |b| { + let rt = Runtime::new().unwrap(); + b.to_async(&rt).iter_custom(|iters| async move { + let mut elapsed_time = Duration::default(); + for _ in 0..iters { + let shared_count = SharedCounts::new(Default::default()); + let state = State::new(None, n); + let shared_state = SharedMutex::new(state); + let (import, _tx, mut shutdown) = provision_import_test( + shared_count.clone(), + shared_state, + durations, + batch_size, + buffer_size, + buffer_size, + ); + import.notify_one(); + let start = std::time::Instant::now(); + execute_import(import, &mut shutdown).await; + elapsed_time += start.elapsed(); + } + elapsed_time + }) + }); + }; + + let mut group = c.benchmark_group("import"); + + let n = 100; + let durations = Durations { + headers: Duration::from_millis(5), + consensus: Duration::from_millis(5), + transactions: Duration::from_millis(5), + executes: Duration::from_millis(10), + }; + + // Header batch size = 10, header/txn buffer size = 10 + bench_import(&mut group, n, durations, 10, 10); + + // Header batch size = 20, header/txn buffer size = 10 + bench_import(&mut group, n, durations, 20, 10); + + // Header batch size = 50, header/txn buffer size = 10 + bench_import(&mut group, n, durations, 20, 10); + + // Header batch size = 10, header/txn buffer size = 20 + bench_import(&mut group, n, durations, 10, 20); + + // Header batch size = 10, header/txn buffer size = 50 + bench_import(&mut group, n, durations, 10, 50); + + // Header batch size = 50, header/txn buffer size = 50 + bench_import(&mut group, n, durations, 10, 20); +} + +criterion_group!(benches, bench_imports); +criterion_main!(benches); diff --git a/benches/src/import.rs b/benches/src/import.rs new file mode 100644 index 00000000000..3d79206bda5 --- /dev/null +++ b/benches/src/import.rs @@ -0,0 +1,80 @@ +use fuel_core_services::{ + SharedMutex, + StateWatcher, +}; +pub use fuel_core_sync::import::test_helpers::SharedCounts; +use fuel_core_sync::{ + import::{ + test_helpers::{ + PressureBlockImporter, + PressureConsensus, + PressurePeerToPeer, + }, + Import, + }, + state::State, + Config, +}; +use std::{ + sync::Arc, + time::Duration, +}; +use tokio::sync::{ + watch::Sender, + Notify, +}; + +pub type PressureImport = + Import; + +#[derive(Default, Clone, Copy)] +pub struct Durations { + pub headers: Duration, + pub consensus: Duration, + pub transactions: Duration, + pub executes: Duration, +} + +pub fn provision_import_test( + shared_count: SharedCounts, + shared_state: SharedMutex, + input: Durations, + header_batch_size: u32, + max_header_batch_requests: usize, + max_get_txns_requests: usize, +) -> ( + PressureImport, + Sender, + StateWatcher, +) { + let shared_notify = Arc::new(Notify::new()); + let params = Config { + max_header_batch_requests, + header_batch_size, + max_get_txns_requests, + }; + let p2p = Arc::new(PressurePeerToPeer::new( + shared_count.clone(), + [input.headers, input.transactions], + )); + let executor = Arc::new(PressureBlockImporter::new( + shared_count.clone(), + input.executes, + )); + let consensus = Arc::new(PressureConsensus::new( + shared_count.clone(), + input.consensus, + )); + + let (tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); + let watcher = shutdown.into(); + let import = Import::new( + shared_state, + shared_notify, + params, + p2p, + executor, + consensus, + ); + (import, tx, watcher) +} diff --git a/benches/src/lib.rs b/benches/src/lib.rs index 410c3fe231f..550cfe84c60 100644 --- a/benches/src/lib.rs +++ b/benches/src/lib.rs @@ -1,3 +1,5 @@ +pub mod import; + use fuel_core::database::vm_database::VmDatabase; pub use fuel_core::database::Database; use fuel_core_types::{ diff --git a/crates/services/sync/Cargo.toml b/crates/services/sync/Cargo.toml index a6cd607fb50..1b41e15436f 100644 --- a/crates/services/sync/Cargo.toml +++ b/crates/services/sync/Cargo.toml @@ -15,6 +15,7 @@ async-trait = { workspace = true } fuel-core-services = { workspace = true } fuel-core-types = { workspace = true } futures = { workspace = true } +mockall = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } @@ -23,3 +24,6 @@ fuel-core-trace = { path = "../../trace" } fuel-core-types = { path = "../../types", features = ["test-helpers"] } mockall = { workspace = true } test-case = { workspace = true } + +[features] +benchmarking = ["dep:mockall"] diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index 2354df0bf8c..19658e220ee 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -41,8 +41,10 @@ use crate::{ }, }; -#[cfg(test)] -pub(crate) use tests::empty_header; +#[cfg(any(test, feature = "benchmarking"))] +/// Accessories for testing the sync. Available only when compiling under test +/// or benchmarking. +pub mod test_helpers; #[cfg(test)] mod tests; @@ -71,7 +73,9 @@ impl Default for Config { } } -pub(crate) struct Import { +/// The combination of shared state, configuration, and services that define +/// import behavior. +pub struct Import { /// Shared state between import and sync tasks. state: SharedMutex, /// Notify import when sync has new work. @@ -87,7 +91,9 @@ pub(crate) struct Import { } impl Import { - pub(crate) fn new( + /// Configure an import behavior from a shared state, configuration and + /// services that can be executed by an ImportTask. + pub fn new( state: SharedMutex, notify: Arc, params: Config, @@ -104,6 +110,11 @@ impl Import { consensus, } } + + /// Signal other asynchronous tasks that an import event has occurred. + pub fn notify_one(&self) { + self.notify.notify_one() + } } impl Import where @@ -112,10 +123,8 @@ where C: ConsensusPort + Send + Sync + 'static, { #[tracing::instrument(skip_all)] - pub(crate) async fn import( - &self, - shutdown: &mut StateWatcher, - ) -> anyhow::Result { + /// Execute imports until a shutdown is requested. + pub async fn import(&self, shutdown: &mut StateWatcher) -> anyhow::Result { self.import_inner(shutdown).await?; Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await) diff --git a/crates/services/sync/src/import/back_pressure_tests.rs b/crates/services/sync/src/import/back_pressure_tests.rs index 423286df914..6dea34a7678 100644 --- a/crates/services/sync/src/import/back_pressure_tests.rs +++ b/crates/services/sync/src/import/back_pressure_tests.rs @@ -1,29 +1,13 @@ -use std::{ - ops::Range, - time::Duration, +use std::time::Duration; + +use super::*; +use crate::import::test_helpers::{ + Count, + PressureBlockImporter, + PressureConsensus, + PressurePeerToPeer, + SharedCounts, }; - -use fuel_core_services::stream::BoxStream; -use fuel_core_types::{ - blockchain::primitives::{ - BlockId, - DaBlockHeight, - }, - fuel_tx::Transaction, -}; - -use crate::ports::{ - BlockImporterPort, - MockBlockImporterPort, - MockConsensusPort, - MockPeerToPeerPort, -}; - -use super::{ - tests::empty_header, - *, -}; -use fuel_core_types::fuel_types::BlockHeight; use test_case::test_case; #[derive(Default)] @@ -141,168 +125,3 @@ async fn test_back_pressure(input: Input, state: State, params: Config) -> Count import.import(&mut watcher).await.unwrap(); counts.apply(|c| c.max.clone()) } - -#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)] -struct Count { - headers: usize, - transactions: usize, - consensus: usize, - executes: usize, - blocks: usize, -} - -#[derive(Debug, Default, PartialEq, Eq)] -struct Counts { - now: Count, - max: Count, -} - -type SharedCounts = SharedMutex; - -struct PressurePeerToPeer { - p2p: MockPeerToPeerPort, - durations: [Duration; 2], - counts: SharedCounts, -} - -struct PressureBlockImporter(MockBlockImporterPort, Duration, SharedCounts); - -struct PressureConsensus(MockConsensusPort, Duration, SharedCounts); - -#[async_trait::async_trait] -impl PeerToPeerPort for PressurePeerToPeer { - fn height_stream(&self) -> BoxStream { - self.p2p.height_stream() - } - - async fn get_sealed_block_headers( - &self, - block_height_range: Range, - ) -> anyhow::Result>>> { - self.counts.apply(|c| c.inc_headers()); - tokio::time::sleep(self.durations[0]).await; - self.counts.apply(|c| c.dec_headers()); - for _ in block_height_range.clone() { - self.counts.apply(|c| c.inc_blocks()); - } - self.p2p.get_sealed_block_headers(block_height_range).await - } - - async fn get_transactions( - &self, - block_id: SourcePeer, - ) -> anyhow::Result>> { - self.counts.apply(|c| c.inc_transactions()); - tokio::time::sleep(self.durations[1]).await; - self.counts.apply(|c| c.dec_transactions()); - self.p2p.get_transactions(block_id).await - } -} - -#[async_trait::async_trait] -impl BlockImporterPort for PressureBlockImporter { - fn committed_height_stream(&self) -> BoxStream { - self.0.committed_height_stream() - } - - async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> { - self.2.apply(|c| c.inc_executes()); - tokio::time::sleep(self.1).await; - self.2.apply(|c| { - c.dec_executes(); - c.dec_blocks(); - }); - self.0.execute_and_commit(block).await - } -} - -#[async_trait::async_trait] -impl ConsensusPort for PressureConsensus { - fn check_sealed_header(&self, header: &SealedBlockHeader) -> anyhow::Result { - self.0.check_sealed_header(header) - } - - async fn await_da_height(&self, da_height: &DaBlockHeight) -> anyhow::Result<()> { - self.2.apply(|c| c.inc_consensus()); - tokio::time::sleep(self.1).await; - self.2.apply(|c| c.dec_consensus()); - self.0.await_da_height(da_height).await - } -} - -impl PressurePeerToPeer { - fn new(counts: SharedCounts, delays: [Duration; 2]) -> Self { - let mut mock = MockPeerToPeerPort::default(); - mock.expect_get_sealed_block_headers().returning(|range| { - Ok(Some( - range - .clone() - .map(BlockHeight::from) - .map(empty_header) - .collect(), - )) - }); - mock.expect_get_transactions() - .returning(|_| Ok(Some(vec![]))); - Self { - p2p: mock, - durations: delays, - counts, - } - } -} - -impl PressureBlockImporter { - fn new(counts: SharedCounts, delays: Duration) -> Self { - let mut mock = MockBlockImporterPort::default(); - mock.expect_execute_and_commit().returning(move |_| Ok(())); - Self(mock, delays, counts) - } -} - -impl PressureConsensus { - fn new(counts: SharedCounts, delays: Duration) -> Self { - let mut mock = MockConsensusPort::default(); - mock.expect_await_da_height().returning(|_| Ok(())); - mock.expect_check_sealed_header().returning(|_| Ok(true)); - Self(mock, delays, counts) - } -} - -impl Counts { - fn inc_headers(&mut self) { - self.now.headers += 1; - self.max.headers = self.max.headers.max(self.now.headers); - } - fn dec_headers(&mut self) { - self.now.headers -= 1; - } - fn inc_transactions(&mut self) { - self.now.transactions += 1; - self.max.transactions = self.max.transactions.max(self.now.transactions); - } - fn dec_transactions(&mut self) { - self.now.transactions -= 1; - } - fn inc_consensus(&mut self) { - self.now.consensus += 1; - self.max.consensus = self.max.consensus.max(self.now.consensus); - } - fn dec_consensus(&mut self) { - self.now.consensus -= 1; - } - fn inc_executes(&mut self) { - self.now.executes += 1; - self.max.executes = self.max.executes.max(self.now.executes); - } - fn dec_executes(&mut self) { - self.now.executes -= 1; - } - fn inc_blocks(&mut self) { - self.now.blocks += 1; - self.max.blocks = self.max.blocks.max(self.now.blocks); - } - fn dec_blocks(&mut self) { - self.now.blocks -= 1; - } -} diff --git a/crates/services/sync/src/import/test_helpers.rs b/crates/services/sync/src/import/test_helpers.rs new file mode 100644 index 00000000000..939dbd7befb --- /dev/null +++ b/crates/services/sync/src/import/test_helpers.rs @@ -0,0 +1,45 @@ +#![allow(missing_docs)] + +mod counts; +mod pressure_block_importer; +mod pressure_consensus; +mod pressure_peer_to_peer; + +use fuel_core_types::{ + blockchain::{ + consensus::{ + Consensus, + Sealed, + }, + header::BlockHeader, + SealedBlockHeader, + }, + fuel_types::BlockHeight, + services::p2p::SourcePeer, +}; + +pub use counts::{ + Count, + SharedCounts, +}; +pub use pressure_block_importer::PressureBlockImporter; +pub use pressure_consensus::PressureConsensus; +pub use pressure_peer_to_peer::PressurePeerToPeer; + +pub fn empty_header(h: BlockHeight) -> SourcePeer { + let mut header = BlockHeader::default(); + header.consensus.height = h; + let transaction_tree = + fuel_core_types::fuel_merkle::binary::in_memory::MerkleTree::new(); + header.application.generated.transactions_root = transaction_tree.root().into(); + + let consensus = Consensus::default(); + let sealed = Sealed { + entity: header, + consensus, + }; + SourcePeer { + peer_id: vec![].into(), + data: sealed, + } +} diff --git a/crates/services/sync/src/import/test_helpers/counts.rs b/crates/services/sync/src/import/test_helpers/counts.rs new file mode 100644 index 00000000000..d98e75ddd30 --- /dev/null +++ b/crates/services/sync/src/import/test_helpers/counts.rs @@ -0,0 +1,56 @@ +use fuel_core_services::SharedMutex; + +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub struct Count { + pub headers: usize, + pub transactions: usize, + pub consensus: usize, + pub executes: usize, + pub blocks: usize, +} + +#[derive(Debug, Default, PartialEq, Eq)] +pub struct Counts { + pub now: Count, + pub max: Count, +} + +pub type SharedCounts = SharedMutex; + +impl Counts { + pub fn inc_headers(&mut self) { + self.now.headers += 1; + self.max.headers = self.max.headers.max(self.now.headers); + } + pub fn dec_headers(&mut self) { + self.now.headers -= 1; + } + pub fn inc_transactions(&mut self) { + self.now.transactions += 1; + self.max.transactions = self.max.transactions.max(self.now.transactions); + } + pub fn dec_transactions(&mut self) { + self.now.transactions -= 1; + } + pub fn inc_consensus(&mut self) { + self.now.consensus += 1; + self.max.consensus = self.max.consensus.max(self.now.consensus); + } + pub fn dec_consensus(&mut self) { + self.now.consensus -= 1; + } + pub fn inc_executes(&mut self) { + self.now.executes += 1; + self.max.executes = self.max.executes.max(self.now.executes); + } + pub fn dec_executes(&mut self) { + self.now.executes -= 1; + } + pub fn inc_blocks(&mut self) { + self.now.blocks += 1; + self.max.blocks = self.max.blocks.max(self.now.blocks); + } + pub fn dec_blocks(&mut self) { + self.now.blocks -= 1; + } +} diff --git a/crates/services/sync/src/import/test_helpers/pressure_block_importer.rs b/crates/services/sync/src/import/test_helpers/pressure_block_importer.rs new file mode 100644 index 00000000000..217e3fd1c9f --- /dev/null +++ b/crates/services/sync/src/import/test_helpers/pressure_block_importer.rs @@ -0,0 +1,40 @@ +use crate::{ + import::test_helpers::SharedCounts, + ports::{ + BlockImporterPort, + MockBlockImporterPort, + }, +}; +use fuel_core_services::stream::BoxStream; +use fuel_core_types::{ + blockchain::SealedBlock, + fuel_types::BlockHeight, +}; +use std::time::Duration; + +pub struct PressureBlockImporter(MockBlockImporterPort, Duration, SharedCounts); + +#[async_trait::async_trait] +impl BlockImporterPort for PressureBlockImporter { + fn committed_height_stream(&self) -> BoxStream { + self.0.committed_height_stream() + } + + async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> { + self.2.apply(|c| c.inc_executes()); + tokio::time::sleep(self.1).await; + self.2.apply(|c| { + c.dec_executes(); + c.dec_blocks(); + }); + self.0.execute_and_commit(block).await + } +} + +impl PressureBlockImporter { + pub fn new(counts: SharedCounts, delays: Duration) -> Self { + let mut mock = MockBlockImporterPort::default(); + mock.expect_execute_and_commit().returning(move |_| Ok(())); + Self(mock, delays, counts) + } +} diff --git a/crates/services/sync/src/import/test_helpers/pressure_consensus.rs b/crates/services/sync/src/import/test_helpers/pressure_consensus.rs new file mode 100644 index 00000000000..441dca03563 --- /dev/null +++ b/crates/services/sync/src/import/test_helpers/pressure_consensus.rs @@ -0,0 +1,37 @@ +use crate::{ + import::test_helpers::counts::SharedCounts, + ports::{ + ConsensusPort, + MockConsensusPort, + }, +}; +use fuel_core_types::blockchain::{ + primitives::DaBlockHeight, + SealedBlockHeader, +}; +use std::time::Duration; + +pub struct PressureConsensus(MockConsensusPort, Duration, SharedCounts); + +#[async_trait::async_trait] +impl ConsensusPort for PressureConsensus { + fn check_sealed_header(&self, header: &SealedBlockHeader) -> anyhow::Result { + self.0.check_sealed_header(header) + } + + async fn await_da_height(&self, da_height: &DaBlockHeight) -> anyhow::Result<()> { + self.2.apply(|c| c.inc_consensus()); + tokio::time::sleep(self.1).await; + self.2.apply(|c| c.dec_consensus()); + self.0.await_da_height(da_height).await + } +} + +impl PressureConsensus { + pub fn new(counts: SharedCounts, delays: Duration) -> Self { + let mut mock = MockConsensusPort::default(); + mock.expect_await_da_height().returning(|_| Ok(())); + mock.expect_check_sealed_header().returning(|_| Ok(true)); + Self(mock, delays, counts) + } +} diff --git a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs new file mode 100644 index 00000000000..b0d5b93b946 --- /dev/null +++ b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs @@ -0,0 +1,82 @@ +use crate::{ + import::test_helpers::{ + empty_header, + SharedCounts, + }, + ports::{ + MockPeerToPeerPort, + PeerToPeerPort, + }, +}; +use fuel_core_services::stream::BoxStream; +use fuel_core_types::{ + blockchain::{ + primitives::BlockId, + SealedBlockHeader, + }, + fuel_tx::Transaction, + fuel_types::BlockHeight, + services::p2p::SourcePeer, +}; +use std::{ + ops::Range, + time::Duration, +}; + +pub struct PressurePeerToPeer { + p2p: MockPeerToPeerPort, + durations: [Duration; 2], + counts: SharedCounts, +} + +#[async_trait::async_trait] +impl PeerToPeerPort for PressurePeerToPeer { + fn height_stream(&self) -> BoxStream { + self.p2p.height_stream() + } + + async fn get_sealed_block_headers( + &self, + block_height_range: Range, + ) -> anyhow::Result>>> { + self.counts.apply(|c| c.inc_headers()); + tokio::time::sleep(self.durations[0]).await; + self.counts.apply(|c| c.dec_headers()); + for _ in block_height_range.clone() { + self.counts.apply(|c| c.inc_blocks()); + } + self.p2p.get_sealed_block_headers(block_height_range).await + } + + async fn get_transactions( + &self, + block_id: SourcePeer, + ) -> anyhow::Result>> { + self.counts.apply(|c| c.inc_transactions()); + tokio::time::sleep(self.durations[1]).await; + self.counts.apply(|c| c.dec_transactions()); + self.p2p.get_transactions(block_id).await + } +} + +impl PressurePeerToPeer { + pub fn new(counts: SharedCounts, delays: [Duration; 2]) -> Self { + let mut mock = MockPeerToPeerPort::default(); + mock.expect_get_sealed_block_headers().returning(|range| { + Ok(Some( + range + .clone() + .map(BlockHeight::from) + .map(empty_header) + .collect(), + )) + }); + mock.expect_get_transactions() + .returning(|_| Ok(Some(vec![]))); + Self { + p2p: mock, + durations: delays, + counts, + } + } +} diff --git a/crates/services/sync/src/import/tests.rs b/crates/services/sync/src/import/tests.rs index 308d58e5347..796dc70094c 100644 --- a/crates/services/sync/src/import/tests.rs +++ b/crates/services/sync/src/import/tests.rs @@ -1,16 +1,13 @@ #![allow(non_snake_case)] -use fuel_core_types::blockchain::{ - consensus::Consensus, - header::BlockHeader, +use crate::{ + import::test_helpers::empty_header, + ports::{ + MockBlockImporterPort, + MockConsensusPort, + MockPeerToPeerPort, + }, }; - -use crate::ports::{ - MockBlockImporterPort, - MockConsensusPort, - MockPeerToPeerPort, -}; -use fuel_core_types::fuel_types::BlockHeight; use test_case::test_case; use super::*; @@ -657,21 +654,3 @@ impl DefaultMocks for MockBlockImporterPort { executor } } - -pub(crate) fn empty_header(h: BlockHeight) -> SourcePeer { - let mut header = BlockHeader::default(); - header.consensus.height = h; - let transaction_tree = - fuel_core_types::fuel_merkle::binary::in_memory::MerkleTree::new(); - header.application.generated.transactions_root = transaction_tree.root().into(); - - let consensus = Consensus::default(); - let sealed = Sealed { - entity: header, - consensus, - }; - SourcePeer { - peer_id: vec![].into(), - data: sealed, - } -} diff --git a/crates/services/sync/src/lib.rs b/crates/services/sync/src/lib.rs index 8ff9cab3ca4..9087e9aadc4 100644 --- a/crates/services/sync/src/lib.rs +++ b/crates/services/sync/src/lib.rs @@ -7,7 +7,7 @@ pub mod import; pub mod ports; pub mod service; -mod state; +pub mod state; pub mod sync; mod tracing_helpers; diff --git a/crates/services/sync/src/ports.rs b/crates/services/sync/src/ports.rs index fcbef6ecbec..bdd860212a4 100644 --- a/crates/services/sync/src/ports.rs +++ b/crates/services/sync/src/ports.rs @@ -16,7 +16,7 @@ use fuel_core_types::{ }; use std::ops::Range; -#[cfg_attr(test, mockall::automock)] +#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] #[async_trait::async_trait] /// Port for communication with the network. pub trait PeerToPeerPort { @@ -37,7 +37,7 @@ pub trait PeerToPeerPort { ) -> anyhow::Result>>; } -#[cfg_attr(test, mockall::automock)] +#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] #[async_trait::async_trait] /// Port for communication with the consensus service. pub trait ConsensusPort { @@ -47,7 +47,7 @@ pub trait ConsensusPort { async fn await_da_height(&self, da_height: &DaBlockHeight) -> anyhow::Result<()>; } -#[cfg_attr(test, mockall::automock)] +#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] #[async_trait::async_trait] /// Port for communication with the block importer. pub trait BlockImporterPort { diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs index 30cca1235bc..04617bb97a5 100644 --- a/crates/services/sync/src/service/tests.rs +++ b/crates/services/sync/src/service/tests.rs @@ -8,7 +8,7 @@ use futures::{ }; use crate::{ - import::empty_header, + import::test_helpers::empty_header, ports::{ MockBlockImporterPort, MockConsensusPort,