Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use the mock_node framework in a benchmark #6778

Merged
merged 9 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 20 additions & 9 deletions test-utils/actix-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ macro_rules! handle_interrupt {
Poll::Pending
})
}.fuse() => panic!("SIGINT received"),
_ = $future.fuse() => {},
output = $future.fuse() => output,
}
}
};
Expand All @@ -64,19 +64,15 @@ macro_rules! handle_interrupt {
#[inline]
pub fn spawn_interruptible<F: std::future::Future + 'static>(
f: F,
) -> actix_rt::task::JoinHandle<()> {
) -> actix_rt::task::JoinHandle<F::Output> {
actix_rt::spawn(handle_interrupt!(f))
}

// Number of actix instances that are currently running.
pub(crate) static ACTIX_INSTANCES_COUNTER: Lazy<Mutex<usize>> = Lazy::new(|| (Mutex::new(0)));

pub fn run_actix<F: std::future::Future>(f: F) {
pub fn setup_actix() -> actix_rt::SystemRunner {
static SET_PANIC_HOOK: std::sync::Once = std::sync::Once::new();
{
let mut value = ACTIX_INSTANCES_COUNTER.lock().unwrap();
*value += 1;
}

// This is a workaround to make actix/tokio runtime stop when a task panics.
// See: https://github.com/actix/actix-net/issues/80
Expand Down Expand Up @@ -108,9 +104,24 @@ pub fn run_actix<F: std::future::Future>(f: F) {
})
.expect("failed to spawn SIGINT handler thread");
});
actix_rt::System::new()
}

pub fn block_on_interruptible<F: std::future::Future>(
sys: &actix_rt::SystemRunner,
f: F,
) -> F::Output {
sys.block_on(handle_interrupt!(f))
}

pub fn run_actix<F: std::future::Future>(f: F) {
{
let mut value = ACTIX_INSTANCES_COUNTER.lock().unwrap();
*value += 1;
}

let sys = actix_rt::System::new();
sys.block_on(handle_interrupt!(f));
let sys = setup_actix();
block_on_interruptible(&sys, f);
sys.run().unwrap();

{
Expand Down
10 changes: 10 additions & 0 deletions tools/mock_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ name = "start_mock_node"
[dependencies]
actix = "0.13.0"
actix-rt = "2"
actix-web = "=4.0.0-beta.6"
anyhow = "1.0.55"
clap = { version = "3.1.6", features = ["derive"] }
flate2 = "1.0.22"
futures = "0.3"
rand = "0.7"
rayon = "1.5"
regex = "1"
tar = "0.4.38"
tempfile = "3"
tracing = "0.1.13"

Expand All @@ -39,6 +42,13 @@ near-primitives = { path = "../../core/primitives" }
near-logger-utils = { path = "../../test-utils/logger" }
nearcore = { path = "../../nearcore" }

[dev-dependencies]
criterion = { version = "0.3.5", default_features = false, features = ["html_reports", "cargo_bench_support"] }

[[bench]]
name = "sync"
harness = false

[features]
default = ["mock_node"]
test_features = ["nearcore/test_features"]
Expand Down
36 changes: 36 additions & 0 deletions tools/mock_node/benches/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
The benchmarks in this directory use the mock node framework to define
benchmarks that measure the time taken to sync from an empty home dir
to a particular height in the chain defined by the sample home
directory archives included here. To run all the benchmarks:

```shell
$ cargo bench -p mock_node
```

This will take quite a while though, as each iteration of the
benchmark `mock_node_sync_full` takes several minutes, and it's run 10
times. To run just the quicker one:

```shell
$ cargo bench -p mock_node -- mock_node_sync_empty
```

You can pretty easily define and run your own benchmark based on some
other source home directory by creating a gzipped tar archive and
moving it to, say, `tools/mock_node/benches/foo.tar.gz`, and modifying
the code like so:

```diff
--- a/tools/mock_node/benches/sync.rs
+++ b/tools/mock_node/benches/sync.rs
@@ -123,5 +123,9 @@ fn sync_full_chunks(c: &mut Criterion) {
do_bench(c, "./benches/full.tar.gz", Some(100))
}

-criterion_group!(benches, sync_empty_chunks, sync_full_chunks);
+fn sync_foo_chunks(c: &mut Criterion) {
+ do_bench(c, "./benches/foo.tar.gz", Some(123))
+}
+
+criterion_group!(benches, sync_empty_chunks, sync_full_chunks, sync_foo_chunks);
```
Binary file added tools/mock_node/benches/empty.tar.gz
Binary file not shown.
Binary file added tools/mock_node/benches/full.tar.gz
Binary file not shown.
148 changes: 148 additions & 0 deletions tools/mock_node/benches/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#[macro_use]
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
extern crate criterion;

use actix::System;
use anyhow::anyhow;
use criterion::Criterion;
use flate2::read::GzDecoder;
use mock_node::setup::{setup_mock_node, MockNetworkMode};
use mock_node::GetChainTargetBlockHeight;
use near_actix_test_utils::{block_on_interruptible, setup_actix};
use near_chain_configs::GenesisValidationMode;
use near_client::GetBlock;
use near_crypto::{InMemorySigner, KeyType};
use near_primitives::types::BlockHeight;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tar::Archive;

// The point of this is to return this struct from the benchmark
// function so that we can run the code in its drop() function after the
// benchmark has been measured. It would be possible to just run all
// that code in the benchmark function, but we don't really want to
// include that in the measurements.
struct Sys {
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
sys: Option<actix_rt::SystemRunner>,
servers: Vec<(&'static str, actix_web::dev::Server)>,
}

impl Drop for Sys {
fn drop(&mut self) {
// TODO: we only have to do this because shutdown is not well handled right now. Ideally we would not have
// to tear down the whole system, and could just stop the client actor/view client actors each time.
let system = System::current();
let sys = self.sys.take().unwrap();

sys.block_on(async move {
futures::future::join_all(self.servers.iter().map(|(_name, server)| async move {
server.stop(true).await;
}))
.await;
});
system.stop();
sys.run().unwrap();
near_store::db::RocksDB::block_until_all_instances_are_dropped();
}
}

/// "./benches/empty.tar.gz" -> "empty"
fn test_name(home_archive: &str) -> &str {
Path::new(home_archive.strip_suffix(".tar.gz").unwrap()).file_name().unwrap().to_str().unwrap()
}

/// "./benches/empty.tar.gz" -> "/tmp/near_mock_node_sync_empty"
fn extracted_path(home_archive: &str) -> anyhow::Result<PathBuf> {
if !home_archive.ends_with(".tar.gz") {
return Err(anyhow!("{} doesn't end with .tar.gz", home_archive));
}
let mut ret = PathBuf::from("/tmp");
let dir_name = String::from("near_mock_node_sync_") + test_name(home_archive);
ret.push(dir_name);
Ok(ret)
}

// Expects home_archive to be a gzipped tar archive and extracts
// it to a /tmp directory if not already extracted.
fn extract_home(home_archive: &str) -> anyhow::Result<PathBuf> {
let extracted = extracted_path(home_archive)?;
if extracted.exists() {
return Ok(extracted);
}

let tar_gz = File::open(home_archive)?;
let tar = GzDecoder::new(tar_gz);
let mut archive = Archive::new(tar);
archive.unpack(&extracted)?;
Ok(extracted)
}

// Sets up a mock node with the extracted contents of `home_archive` serving as the equivalent
// to the `chain_history_home_dir` argument to the `start_mock_node` tool, and measures the time
// taken to sync to target_height.
fn do_bench(c: &mut Criterion, home_archive: &str, target_height: Option<BlockHeight>) {
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
let name = String::from("mock_node_sync_") + test_name(home_archive);
let mut group = c.benchmark_group(name.clone());
// The default of 100 is way too big for the longer running ones, and 10 is actually the minimum allowed.
group.sample_size(10);
group.bench_function(name, |bench| {
bench.iter_with_setup(|| {
let home = extract_home(home_archive).unwrap();
let mut near_config = nearcore::config::load_config(home.as_path(), GenesisValidationMode::Full)
.unwrap_or_else(|e| panic!("Error loading config: {:#}", e));
near_config.validator_signer = None;
near_config.client_config.min_num_peers = 1;
let signer = InMemorySigner::from_random("mock_node".parse().unwrap(), KeyType::ED25519);
near_config.network_config.public_key = signer.public_key;
near_config.network_config.secret_key = signer.secret_key;
near_config.client_config.tracked_shards =
(0..near_config.genesis.config.shard_layout.num_shards()).collect();
(setup_actix(), near_config, home)
},
|(sys, near_config, home)| {
let tempdir = tempfile::Builder::new().prefix("mock_node").tempdir().unwrap();
let servers = block_on_interruptible(&sys, async move {
let (mock_network, _client, view_client, servers) = setup_mock_node(
tempdir.path(),
home.as_path(),
near_config,
MockNetworkMode::NoNewBlocks,
Duration::from_millis(100),
None,
target_height,
false,
);
let target_height =
mock_network.send(GetChainTargetBlockHeight).await.unwrap();

let started = Instant::now();
loop {
// TODO: make it so that we can just get notified when syncing has finished instead
// of asking over and over.
if let Ok(Ok(block)) = view_client.send(GetBlock::latest()).await {
if block.header.height >= target_height {
break;
}
}
if started.elapsed() > Duration::from_secs(target_height * 5) {
panic!("mock_node sync bench timed out with home dir {:?}, target height {:?}", &home, target_height);
}
}
servers
});
Sys{sys: Some(sys), servers: servers.unwrap()}
})
});
group.finish();
}

fn sync_empty_chunks(c: &mut Criterion) {
do_bench(c, "./benches/empty.tar.gz", Some(100))
}

fn sync_full_chunks(c: &mut Criterion) {
do_bench(c, "./benches/full.tar.gz", Some(100))
}

criterion_group!(benches, sync_empty_chunks, sync_full_chunks);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion tools/mock_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ fn main() {
args.client_home_dir.unwrap_or(String::from(tempdir.path().to_str().unwrap()));
let network_delay = Duration::from_millis(args.network_delay);
run_actix(async move {
let (mock_network, _client, view_client) = setup_mock_node(
let (mock_network, _client, view_client, _) = setup_mock_node(
Path::new(&client_home_dir),
home_dir,
near_config,
Expand Down
Loading