Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use the mock_node framework in a benchmark #6778

Merged
merged 9 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ rusty-tags.vi
costs-*.txt
names-to-stats.txt
data_dump_*.bin

# benchmark sample home directories
tools/mock_node/benches/empty/
tools/mock_node/benches/full/
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 20 additions & 9 deletions test-utils/actix-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ macro_rules! handle_interrupt {
Poll::Pending
})
}.fuse() => panic!("SIGINT received"),
_ = $future.fuse() => {},
output = $future.fuse() => output,
}
}
};
Expand All @@ -64,19 +64,15 @@ macro_rules! handle_interrupt {
#[inline]
pub fn spawn_interruptible<F: std::future::Future + 'static>(
f: F,
) -> actix_rt::task::JoinHandle<()> {
) -> actix_rt::task::JoinHandle<F::Output> {
actix_rt::spawn(handle_interrupt!(f))
}

// Number of actix instances that are currently running.
pub(crate) static ACTIX_INSTANCES_COUNTER: Lazy<Mutex<usize>> = Lazy::new(|| (Mutex::new(0)));

pub fn run_actix<F: std::future::Future>(f: F) {
pub fn setup_actix() -> actix_rt::SystemRunner {
static SET_PANIC_HOOK: std::sync::Once = std::sync::Once::new();
{
let mut value = ACTIX_INSTANCES_COUNTER.lock().unwrap();
*value += 1;
}

// This is a workaround to make actix/tokio runtime stop when a task panics.
// See: https://github.com/actix/actix-net/issues/80
Expand Down Expand Up @@ -108,9 +104,24 @@ pub fn run_actix<F: std::future::Future>(f: F) {
})
.expect("failed to spawn SIGINT handler thread");
});
actix_rt::System::new()
}

pub fn block_on_interruptible<F: std::future::Future>(
sys: &actix_rt::SystemRunner,
f: F,
) -> F::Output {
sys.block_on(handle_interrupt!(f))
}

pub fn run_actix<F: std::future::Future>(f: F) {
{
let mut value = ACTIX_INSTANCES_COUNTER.lock().unwrap();
*value += 1;
}

let sys = actix_rt::System::new();
sys.block_on(handle_interrupt!(f));
let sys = setup_actix();
block_on_interruptible(&sys, f);
sys.run().unwrap();

{
Expand Down
10 changes: 10 additions & 0 deletions tools/mock_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ name = "start_mock_node"
[dependencies]
actix = "0.13.0"
actix-rt = "2"
actix-web = "=4.0.0-beta.6"
anyhow = "1.0.55"
clap = { version = "3.1.6", features = ["derive"] }
flate2 = "1.0.22"
futures = "0.3"
rand = "0.7"
rayon = "1.5"
regex = "1"
tar = "0.4.38"
tempfile = "3"
tracing = "0.1.13"

Expand All @@ -39,6 +42,13 @@ near-primitives = { path = "../../core/primitives" }
near-logger-utils = { path = "../../test-utils/logger" }
nearcore = { path = "../../nearcore" }

[dev-dependencies]
criterion = { version = "0.3.5", default_features = false, features = ["html_reports", "cargo_bench_support"] }

[[bench]]
name = "sync"
harness = false

[features]
default = ["mock_node"]
test_features = ["nearcore/test_features"]
Expand Down
Binary file added tools/mock_node/benches/empty.tar.gz
Binary file not shown.
Binary file added tools/mock_node/benches/full.tar.gz
Binary file not shown.
127 changes: 127 additions & 0 deletions tools/mock_node/benches/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#[macro_use]
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
extern crate criterion;

use actix::System;
use anyhow::anyhow;
use criterion::Criterion;
use flate2::read::GzDecoder;
use mock_node::setup::{setup_mock_node, MockNetworkMode};
use mock_node::GetChainTargetBlockHeight;
use near_actix_test_utils::{block_on_interruptible, setup_actix};
use near_chain_configs::GenesisValidationMode;
use near_client::GetBlock;
use near_crypto::{InMemorySigner, KeyType};
use near_primitives::types::BlockHeight;
use std::fs::File;
use std::path::Path;
use std::time::{Duration, Instant};
use tar::Archive;

struct Sys {
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
sys: Option<actix_rt::SystemRunner>,
servers: Vec<(&'static str, actix_web::dev::Server)>,
}

impl Drop for Sys {
fn drop(&mut self) {
// TODO: we only have to do this because shutdown is not well handled right now. Ideally we would not have
// to tear down the whole system, and could just stop the client actor/view client actors each time.
let system = System::current();
let sys = self.sys.take().unwrap();

sys.block_on(async move {
futures::future::join_all(self.servers.iter().map(|(_name, server)| async move {
server.stop(true).await;
}))
.await;
});
system.stop();
sys.run().unwrap();
near_store::db::RocksDB::block_until_all_instances_are_dropped();
}
}

fn extract_home(home_archive: &str) -> anyhow::Result<&Path> {
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
let len = home_archive.len();
if len <= 7 || &home_archive[len - 7..] != ".tar.gz" {
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
return Err(anyhow!("{} doesn't end with .tar.gz", home_archive));
}
let extracted = Path::new(&home_archive[..len - 7]);
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
if extracted.exists() {
return Ok(extracted);
}

let tar_gz = File::open(home_archive)?;
let tar = GzDecoder::new(tar_gz);
let mut archive = Archive::new(tar);
archive.unpack(extracted)?;
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
Ok(extracted)
}

fn do_bench(c: &mut Criterion, home_archive: &str, target_height: Option<BlockHeight>) {
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
let home_dir = extract_home(home_archive).unwrap();
let mut near_config = nearcore::config::load_config(home_dir, GenesisValidationMode::Full)
.unwrap_or_else(|e| panic!("Error loading config: {:#}", e));
near_config.validator_signer = None;
near_config.client_config.min_num_peers = 1;
let signer = InMemorySigner::from_random("mock_node".parse().unwrap(), KeyType::ED25519);
near_config.network_config.public_key = signer.public_key;
near_config.network_config.secret_key = signer.secret_key;
near_config.client_config.tracked_shards =
(0..near_config.genesis.config.shard_layout.num_shards()).collect();

let name = String::from("mock_node_sync_") + home_dir.file_name().unwrap().to_str().unwrap();
let mut group = c.benchmark_group(name.clone());
group.sample_size(10);
group.bench_function(name, |bench| {
bench.iter_with_setup(|| {
setup_actix()
},
|sys| {
let tempdir = tempfile::Builder::new().prefix("mock_node").tempdir().unwrap();
let near_config = near_config.clone();
let servers = block_on_interruptible(&sys, async move {
let (mock_network, _client, view_client, servers) = setup_mock_node(
tempdir.path(),
home_dir,
near_config,
MockNetworkMode::NoNewBlocks,
Duration::from_millis(100),
None,
target_height,
false,
);
let target_height =
mock_network.send(GetChainTargetBlockHeight).await.unwrap();

let started = Instant::now();
loop {
// TODO: make it so that we can just get notified when syncing has finished instead
// of asking over and over.
if let Ok(Ok(block)) = view_client.send(GetBlock::latest()).await {
if block.header.height >= target_height {
break;
}
}
if started.elapsed() > Duration::from_secs(target_height * 5) {
panic!("mock_node sync bench timed out with home dir {:?}, target height {:?}", home_dir, target_height);
}
}
servers
});
Sys{sys: Some(sys), servers: servers.unwrap()}
})
});
group.finish();
}

fn sync_empty_chunks(c: &mut Criterion) {
do_bench(c, "./benches/empty.tar.gz", Some(100))
}

fn sync_full_chunks(c: &mut Criterion) {
do_bench(c, "./benches/full.tar.gz", Some(100))
}

criterion_group!(benches, sync_empty_chunks, sync_full_chunks);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion tools/mock_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ fn main() {
args.client_home_dir.unwrap_or(String::from(tempdir.path().to_str().unwrap()));
let network_delay = Duration::from_millis(args.network_delay);
run_actix(async move {
let (mock_network, _client, view_client) = setup_mock_node(
let (mock_network, _client, view_client, _) = setup_mock_node(
Path::new(&client_home_dir),
home_dir,
near_config,
Expand Down
21 changes: 15 additions & 6 deletions tools/mock_node/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ pub fn setup_mock_node(
client_start_height: Option<BlockHeight>,
target_height: Option<BlockHeight>,
in_memory_storage: bool,
) -> (Addr<MockPeerManagerActor>, Addr<ClientActor>, Addr<ViewClientActor>) {
) -> (
Addr<MockPeerManagerActor>,
Addr<ClientActor>,
Addr<ViewClientActor>,
Option<Vec<(&'static str, actix_web::dev::Server)>>,
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
) {
let client_runtime = setup_runtime(client_home_dir, &config, in_memory_storage);
let mock_network_runtime = setup_runtime(network_home_dir, &config, false);

Expand Down Expand Up @@ -310,19 +315,23 @@ pub fn setup_mock_node(
!archival,
)
});

// for some reason, with "test_features", start_http requires PeerManagerActor,
// we are not going to run start_mock_network with test_features, so let's disable that for now
#[cfg(not(feature = "test_features"))]
if let Some(rpc_config) = config.rpc_config {
let server = config.rpc_config.map(|rpc_config| {
near_jsonrpc::start_http(
rpc_config,
config.genesis.config,
client_actor.clone(),
view_client.clone(),
);
}
)
});
#[cfg(feature = "test_features")]
let server = None;

network_adapter.set_recipient(mock_network_actor.clone().recipient());
(mock_network_actor, client_actor, view_client)
(mock_network_actor, client_actor, view_client, server)
}

#[cfg(test)]
Expand Down Expand Up @@ -439,7 +448,7 @@ mod test {
near_config1.client_config.tracked_shards =
(0..near_config1.genesis.config.shard_layout.num_shards()).collect();
run_actix(async move {
let (_mock_network, _client, view_client) = setup_mock_node(
let (_mock_network, _client, view_client, _) = setup_mock_node(
dir1.path().clone(),
dir.path().clone(),
near_config1,
Expand Down