Skip to content

Commit

Permalink
test: Block Import benchmarks and test helpers (#1274)
Browse files Browse the repository at this point in the history
Related issues:
 - #1167

This PR adds benchmarking for block synchronization. 

Benchmarks simulate the import by using mock services that introduce
artificial delays. Benchmarks are provisioned by reusing existing test
types and mock ports. This PR refactors the existing back pressure tests
to extract its test structures so that they can be reused in benchmarks.

In a follow up PR, I will be refactoring the block import to use a
single buffer and asynchronous tasks to perform header downloads.

---------

Co-authored-by: Brandon Kite <brandonkite92@gmail.com>
  • Loading branch information
Brandon Vrooman and Voxelot authored Aug 24, 2023
1 parent 3e635f2 commit f84ce9b
Show file tree
Hide file tree
Showing 18 changed files with 501 additions and 231 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Added

- [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization.
- [#1309](https://github.com/FuelLabs/fuel-core/pull/1309): Add documentation for running debug builds with CLion and Visual Studio Code.
- [#1308](https://github.com/FuelLabs/fuel-core/pull/1308): Add support for loading .env files when compiling with the `env` feature. This allows users to conveniently supply CLI arguments in a secure and IDE-agnostic way.
- [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions.
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ publish = false
version = "0.0.0"

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] }
criterion = { version = "0.5", features = ["html_reports", "async", "async_tokio"] }
ctrlc = "3.2.3"
ed25519-dalek = "1.0" # TODO: upgrade to 2.0 when it's released, and remove rand below
ed25519-dalek_old_rand = { package = "rand", version = "0.7.3" }
ethnum = "1.3"
fuel-core = { path = "../crates/fuel-core", default-features = false, features = ["metrics", "rocksdb-production"] }
fuel-core-services = { path = "./../crates/services" }
fuel-core-storage = { path = "./../crates/storage" }
fuel-core-sync = { path = "./../crates/services/sync", features = ["benchmarking"] }
fuel-core-types = { path = "./../crates/types", features = ["test-helpers"] }
p256 = { version = "0.13", default-features = false, features = ["digest", "ecdsa"] }
rand = { workspace = true }
Expand All @@ -23,6 +27,10 @@ serde_yaml = "0.9.13"
tikv-jemallocator = { workspace = true }
tokio = { workspace = true, features = ["full"] }

[[bench]]
harness = false
name = "import"

[[bench]]
harness = false
name = "state"
Expand All @@ -31,6 +39,9 @@ name = "state"
harness = false
name = "vm"

[features]
default = ["fuel-core/rocksdb"]

[[bench]]
harness = false
name = "block_target_gas"
101 changes: 101 additions & 0 deletions benches/benches/import.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use criterion::{
criterion_group,
criterion_main,
measurement::WallTime,
BenchmarkGroup,
Criterion,
};
use fuel_core_benches::import::{
provision_import_test,
Durations,
PressureImport,
SharedCounts,
};
use fuel_core_services::{
SharedMutex,
StateWatcher,
};
use fuel_core_sync::state::State;
use std::time::Duration;
use tokio::runtime::Runtime;

async fn execute_import(import: PressureImport, shutdown: &mut StateWatcher) {
import.import(shutdown).await.unwrap();
}

fn name(n: u32, durations: Durations, buffer_size: usize) -> String {
format!(
"import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {sz}",
n = n,
d_h = durations.headers.as_millis(),
d_c = durations.consensus.as_millis(),
d_t = durations.transactions.as_millis(),
d_e = durations.executes.as_millis(),
sz = buffer_size
)
}

fn bench_imports(c: &mut Criterion) {
let bench_import = |group: &mut BenchmarkGroup<WallTime>,
n: u32,
durations: Durations,
batch_size: u32,
buffer_size: usize| {
let name = name(n, durations, buffer_size);
group.bench_function(name, move |b| {
let rt = Runtime::new().unwrap();
b.to_async(&rt).iter_custom(|iters| async move {
let mut elapsed_time = Duration::default();
for _ in 0..iters {
let shared_count = SharedCounts::new(Default::default());
let state = State::new(None, n);
let shared_state = SharedMutex::new(state);
let (import, _tx, mut shutdown) = provision_import_test(
shared_count.clone(),
shared_state,
durations,
batch_size,
buffer_size,
buffer_size,
);
import.notify_one();
let start = std::time::Instant::now();
execute_import(import, &mut shutdown).await;
elapsed_time += start.elapsed();
}
elapsed_time
})
});
};

let mut group = c.benchmark_group("import");

let n = 100;
let durations = Durations {
headers: Duration::from_millis(5),
consensus: Duration::from_millis(5),
transactions: Duration::from_millis(5),
executes: Duration::from_millis(10),
};

// Header batch size = 10, header/txn buffer size = 10
bench_import(&mut group, n, durations, 10, 10);

// Header batch size = 20, header/txn buffer size = 10
bench_import(&mut group, n, durations, 20, 10);

// Header batch size = 50, header/txn buffer size = 10
bench_import(&mut group, n, durations, 20, 10);

// Header batch size = 10, header/txn buffer size = 20
bench_import(&mut group, n, durations, 10, 20);

// Header batch size = 10, header/txn buffer size = 50
bench_import(&mut group, n, durations, 10, 50);

// Header batch size = 50, header/txn buffer size = 50
bench_import(&mut group, n, durations, 10, 20);
}

criterion_group!(benches, bench_imports);
criterion_main!(benches);
80 changes: 80 additions & 0 deletions benches/src/import.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use fuel_core_services::{
SharedMutex,
StateWatcher,
};
pub use fuel_core_sync::import::test_helpers::SharedCounts;
use fuel_core_sync::{
import::{
test_helpers::{
PressureBlockImporter,
PressureConsensus,
PressurePeerToPeer,
},
Import,
},
state::State,
Config,
};
use std::{
sync::Arc,
time::Duration,
};
use tokio::sync::{
watch::Sender,
Notify,
};

pub type PressureImport =
Import<PressurePeerToPeer, PressureBlockImporter, PressureConsensus>;

#[derive(Default, Clone, Copy)]
pub struct Durations {
pub headers: Duration,
pub consensus: Duration,
pub transactions: Duration,
pub executes: Duration,
}

pub fn provision_import_test(
shared_count: SharedCounts,
shared_state: SharedMutex<State>,
input: Durations,
header_batch_size: u32,
max_header_batch_requests: usize,
max_get_txns_requests: usize,
) -> (
PressureImport,
Sender<fuel_core_services::State>,
StateWatcher,
) {
let shared_notify = Arc::new(Notify::new());
let params = Config {
max_header_batch_requests,
header_batch_size,
max_get_txns_requests,
};
let p2p = Arc::new(PressurePeerToPeer::new(
shared_count.clone(),
[input.headers, input.transactions],
));
let executor = Arc::new(PressureBlockImporter::new(
shared_count.clone(),
input.executes,
));
let consensus = Arc::new(PressureConsensus::new(
shared_count.clone(),
input.consensus,
));

let (tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started);
let watcher = shutdown.into();
let import = Import::new(
shared_state,
shared_notify,
params,
p2p,
executor,
consensus,
);
(import, tx, watcher)
}
2 changes: 2 additions & 0 deletions benches/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod import;

use fuel_core::database::vm_database::VmDatabase;
pub use fuel_core::database::Database;
use fuel_core_types::{
Expand Down
4 changes: 4 additions & 0 deletions crates/services/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-trait = { workspace = true }
fuel-core-services = { workspace = true }
fuel-core-types = { workspace = true }
futures = { workspace = true }
mockall = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

Expand All @@ -23,3 +24,6 @@ fuel-core-trace = { path = "../../trace" }
fuel-core-types = { path = "../../types", features = ["test-helpers"] }
mockall = { workspace = true }
test-case = { workspace = true }

[features]
benchmarking = ["dep:mockall"]
25 changes: 17 additions & 8 deletions crates/services/sync/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ use crate::{
},
};

#[cfg(test)]
pub(crate) use tests::empty_header;
#[cfg(any(test, feature = "benchmarking"))]
/// Accessories for testing the sync. Available only when compiling under test
/// or benchmarking.
pub mod test_helpers;

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -71,7 +73,9 @@ impl Default for Config {
}
}

pub(crate) struct Import<P, E, C> {
/// The combination of shared state, configuration, and services that define
/// import behavior.
pub struct Import<P, E, C> {
/// Shared state between import and sync tasks.
state: SharedMutex<State>,
/// Notify import when sync has new work.
Expand All @@ -87,7 +91,9 @@ pub(crate) struct Import<P, E, C> {
}

impl<P, E, C> Import<P, E, C> {
pub(crate) fn new(
/// Configure an import behavior from a shared state, configuration and
/// services that can be executed by an ImportTask.
pub fn new(
state: SharedMutex<State>,
notify: Arc<Notify>,
params: Config,
Expand All @@ -104,6 +110,11 @@ impl<P, E, C> Import<P, E, C> {
consensus,
}
}

/// Signal other asynchronous tasks that an import event has occurred.
pub fn notify_one(&self) {
self.notify.notify_one()
}
}
impl<P, E, C> Import<P, E, C>
where
Expand All @@ -112,10 +123,8 @@ where
C: ConsensusPort + Send + Sync + 'static,
{
#[tracing::instrument(skip_all)]
pub(crate) async fn import(
&self,
shutdown: &mut StateWatcher,
) -> anyhow::Result<bool> {
/// Execute imports until a shutdown is requested.
pub async fn import(&self, shutdown: &mut StateWatcher) -> anyhow::Result<bool> {
self.import_inner(shutdown).await?;

Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await)
Expand Down
Loading

0 comments on commit f84ce9b

Please sign in to comment.