diff --git a/Cargo.lock b/Cargo.lock index 7a869119287..e06ef4f3e8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1532,6 +1532,18 @@ dependencies = [ "instant", ] +[[package]] +name = "filetime" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0408e2626025178a6a7f7ffc05a25bc47103229f19c113755de7bf63816290c" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall 0.2.10", + "winapi", +] + [[package]] name = "fixed-hash" version = "0.7.0" @@ -2553,8 +2565,11 @@ version = "0.0.0" dependencies = [ "actix", "actix-rt", + "actix-web", "anyhow", "clap 3.1.6", + "criterion", + "flate2", "futures", "near-actix-test-utils", "near-chain", @@ -2574,6 +2589,7 @@ dependencies = [ "rand 0.7.3", "rayon", "regex", + "tar", "tempfile", "tracing", ] @@ -5392,6 +5408,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b55807c0344e1e6c04d7c965f5289c39a8d94ae23ed5c0b57aabac549f871c6" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "target-lexicon" version = "0.10.0" @@ -6593,6 +6620,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" +[[package]] +name = "xattr" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d1526bbe5aaeb5eb06885f4d987bcdfa5e23187055de9b83fe00156a821fabc" +dependencies = [ + "libc", +] + [[package]] name = "xshell" version = "0.2.1" diff --git a/test-utils/actix-test-utils/src/lib.rs b/test-utils/actix-test-utils/src/lib.rs index 58378ceb99c..051c3174160 100644 --- a/test-utils/actix-test-utils/src/lib.rs +++ b/test-utils/actix-test-utils/src/lib.rs @@ -55,7 +55,7 @@ macro_rules! handle_interrupt { Poll::Pending }) }.fuse() => panic!("SIGINT received"), - _ = $future.fuse() => {}, + output = $future.fuse() => output, } } }; @@ -64,19 +64,15 @@ macro_rules! handle_interrupt { #[inline] pub fn spawn_interruptible( f: F, -) -> actix_rt::task::JoinHandle<()> { +) -> actix_rt::task::JoinHandle { actix_rt::spawn(handle_interrupt!(f)) } // Number of actix instances that are currently running. pub(crate) static ACTIX_INSTANCES_COUNTER: Lazy> = Lazy::new(|| (Mutex::new(0))); -pub fn run_actix(f: F) { +pub fn setup_actix() -> actix_rt::SystemRunner { static SET_PANIC_HOOK: std::sync::Once = std::sync::Once::new(); - { - let mut value = ACTIX_INSTANCES_COUNTER.lock().unwrap(); - *value += 1; - } // This is a workaround to make actix/tokio runtime stop when a task panics. // See: https://github.com/actix/actix-net/issues/80 @@ -108,9 +104,24 @@ pub fn run_actix(f: F) { }) .expect("failed to spawn SIGINT handler thread"); }); + actix_rt::System::new() +} + +pub fn block_on_interruptible( + sys: &actix_rt::SystemRunner, + f: F, +) -> F::Output { + sys.block_on(handle_interrupt!(f)) +} + +pub fn run_actix(f: F) { + { + let mut value = ACTIX_INSTANCES_COUNTER.lock().unwrap(); + *value += 1; + } - let sys = actix_rt::System::new(); - sys.block_on(handle_interrupt!(f)); + let sys = setup_actix(); + block_on_interruptible(&sys, f); sys.run().unwrap(); { diff --git a/tools/mock_node/Cargo.toml b/tools/mock_node/Cargo.toml index 53d3ae9d7ac..0226f712a6d 100644 --- a/tools/mock_node/Cargo.toml +++ b/tools/mock_node/Cargo.toml @@ -14,12 +14,15 @@ name = "start_mock_node" [dependencies] actix = "0.13.0" actix-rt = "2" +actix-web = "=4.0.0-beta.6" anyhow = "1.0.55" clap = { version = "3.1.6", features = ["derive"] } +flate2 = "1.0.22" futures = "0.3" rand = "0.7" rayon = "1.5" regex = "1" +tar = "0.4.38" tempfile = "3" tracing = "0.1.13" @@ -39,6 +42,13 @@ near-primitives = { path = "../../core/primitives" } near-logger-utils = { path = "../../test-utils/logger" } nearcore = { path = "../../nearcore" } +[dev-dependencies] +criterion = { version = "0.3.5", default_features = false, features = ["html_reports", "cargo_bench_support"] } + +[[bench]] +name = "sync" +harness = false + [features] default = ["mock_node"] test_features = ["nearcore/test_features"] diff --git a/tools/mock_node/benches/README.md b/tools/mock_node/benches/README.md new file mode 100644 index 00000000000..9985bb5a2ab --- /dev/null +++ b/tools/mock_node/benches/README.md @@ -0,0 +1,36 @@ +The benchmarks in this directory use the mock node framework to define +benchmarks that measure the time taken to sync from an empty home dir +to a particular height in the chain defined by the sample home +directory archives included here. To run all the benchmarks: + +```shell +$ cargo bench -p mock_node +``` + +This will take quite a while though, as each iteration of the +benchmark `mock_node_sync_full` takes several minutes, and it's run 10 +times. To run just the quicker one: + +```shell +$ cargo bench -p mock_node -- mock_node_sync_empty +``` + +You can pretty easily define and run your own benchmark based on some +other source home directory by creating a gzipped tar archive and +moving it to, say, `tools/mock_node/benches/foo.tar.gz`, and modifying +the code like so: + +```diff +--- a/tools/mock_node/benches/sync.rs ++++ b/tools/mock_node/benches/sync.rs +@@ -123,5 +123,9 @@ fn sync_full_chunks(c: &mut Criterion) { + do_bench(c, "./benches/full.tar.gz", Some(100)) + } + +-criterion_group!(benches, sync_empty_chunks, sync_full_chunks); ++fn sync_foo_chunks(c: &mut Criterion) { ++ do_bench(c, "./benches/foo.tar.gz", Some(123)) ++} ++ ++criterion_group!(benches, sync_empty_chunks, sync_full_chunks, sync_foo_chunks); +``` \ No newline at end of file diff --git a/tools/mock_node/benches/empty.tar.gz b/tools/mock_node/benches/empty.tar.gz new file mode 100644 index 00000000000..8d3802316b7 Binary files /dev/null and b/tools/mock_node/benches/empty.tar.gz differ diff --git a/tools/mock_node/benches/full.tar.gz b/tools/mock_node/benches/full.tar.gz new file mode 100644 index 00000000000..31528c43063 Binary files /dev/null and b/tools/mock_node/benches/full.tar.gz differ diff --git a/tools/mock_node/benches/sync.rs b/tools/mock_node/benches/sync.rs new file mode 100644 index 00000000000..945049a6e64 --- /dev/null +++ b/tools/mock_node/benches/sync.rs @@ -0,0 +1,148 @@ +#[macro_use] +extern crate criterion; + +use actix::System; +use anyhow::anyhow; +use criterion::Criterion; +use flate2::read::GzDecoder; +use mock_node::setup::{setup_mock_node, MockNetworkMode}; +use mock_node::GetChainTargetBlockHeight; +use near_actix_test_utils::{block_on_interruptible, setup_actix}; +use near_chain_configs::GenesisValidationMode; +use near_client::GetBlock; +use near_crypto::{InMemorySigner, KeyType}; +use near_primitives::types::BlockHeight; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; +use tar::Archive; + +// The point of this is to return this struct from the benchmark +// function so that we can run the code in its drop() function after the +// benchmark has been measured. It would be possible to just run all +// that code in the benchmark function, but we don't really want to +// include that in the measurements. +struct Sys { + sys: Option, + servers: Vec<(&'static str, actix_web::dev::Server)>, +} + +impl Drop for Sys { + fn drop(&mut self) { + // TODO: we only have to do this because shutdown is not well handled right now. Ideally we would not have + // to tear down the whole system, and could just stop the client actor/view client actors each time. + let system = System::current(); + let sys = self.sys.take().unwrap(); + + sys.block_on(async move { + futures::future::join_all(self.servers.iter().map(|(_name, server)| async move { + server.stop(true).await; + })) + .await; + }); + system.stop(); + sys.run().unwrap(); + near_store::db::RocksDB::block_until_all_instances_are_dropped(); + } +} + +/// "./benches/empty.tar.gz" -> "empty" +fn test_name(home_archive: &str) -> &str { + Path::new(home_archive.strip_suffix(".tar.gz").unwrap()).file_name().unwrap().to_str().unwrap() +} + +/// "./benches/empty.tar.gz" -> "/tmp/near_mock_node_sync_empty" +fn extracted_path(home_archive: &str) -> anyhow::Result { + if !home_archive.ends_with(".tar.gz") { + return Err(anyhow!("{} doesn't end with .tar.gz", home_archive)); + } + let mut ret = PathBuf::from("/tmp"); + let dir_name = String::from("near_mock_node_sync_") + test_name(home_archive); + ret.push(dir_name); + Ok(ret) +} + +// Expects home_archive to be a gzipped tar archive and extracts +// it to a /tmp directory if not already extracted. +fn extract_home(home_archive: &str) -> anyhow::Result { + let extracted = extracted_path(home_archive)?; + if extracted.exists() { + return Ok(extracted); + } + + let tar_gz = File::open(home_archive)?; + let tar = GzDecoder::new(tar_gz); + let mut archive = Archive::new(tar); + archive.unpack(&extracted)?; + Ok(extracted) +} + +// Sets up a mock node with the extracted contents of `home_archive` serving as the equivalent +// to the `chain_history_home_dir` argument to the `start_mock_node` tool, and measures the time +// taken to sync to target_height. +fn do_bench(c: &mut Criterion, home_archive: &str, target_height: Option) { + let name = String::from("mock_node_sync_") + test_name(home_archive); + let mut group = c.benchmark_group(name.clone()); + // The default of 100 is way too big for the longer running ones, and 10 is actually the minimum allowed. + group.sample_size(10); + group.bench_function(name, |bench| { + bench.iter_with_setup(|| { + let home = extract_home(home_archive).unwrap(); + let mut near_config = nearcore::config::load_config(home.as_path(), GenesisValidationMode::Full) + .unwrap_or_else(|e| panic!("Error loading config: {:#}", e)); + near_config.validator_signer = None; + near_config.client_config.min_num_peers = 1; + let signer = InMemorySigner::from_random("mock_node".parse().unwrap(), KeyType::ED25519); + near_config.network_config.public_key = signer.public_key; + near_config.network_config.secret_key = signer.secret_key; + near_config.client_config.tracked_shards = + (0..near_config.genesis.config.shard_layout.num_shards()).collect(); + (setup_actix(), near_config, home) + }, + |(sys, near_config, home)| { + let tempdir = tempfile::Builder::new().prefix("mock_node").tempdir().unwrap(); + let servers = block_on_interruptible(&sys, async move { + let (mock_network, _client, view_client, servers) = setup_mock_node( + tempdir.path(), + home.as_path(), + near_config, + MockNetworkMode::NoNewBlocks, + Duration::from_millis(100), + None, + target_height, + false, + ); + let target_height = + mock_network.send(GetChainTargetBlockHeight).await.unwrap(); + + let started = Instant::now(); + loop { + // TODO: make it so that we can just get notified when syncing has finished instead + // of asking over and over. + if let Ok(Ok(block)) = view_client.send(GetBlock::latest()).await { + if block.header.height >= target_height { + break; + } + } + if started.elapsed() > Duration::from_secs(target_height * 5) { + panic!("mock_node sync bench timed out with home dir {:?}, target height {:?}", &home, target_height); + } + } + servers + }); + Sys{sys: Some(sys), servers: servers.unwrap()} + }) + }); + group.finish(); +} + +fn sync_empty_chunks(c: &mut Criterion) { + do_bench(c, "./benches/empty.tar.gz", Some(100)) +} + +fn sync_full_chunks(c: &mut Criterion) { + do_bench(c, "./benches/full.tar.gz", Some(100)) +} + +criterion_group!(benches, sync_empty_chunks, sync_full_chunks); +criterion_main!(benches); diff --git a/tools/mock_node/src/main.rs b/tools/mock_node/src/main.rs index cd53a555c56..799178dcdb5 100644 --- a/tools/mock_node/src/main.rs +++ b/tools/mock_node/src/main.rs @@ -88,7 +88,7 @@ fn main() { args.client_home_dir.unwrap_or(String::from(tempdir.path().to_str().unwrap())); let network_delay = Duration::from_millis(args.network_delay); run_actix(async move { - let (mock_network, _client, view_client) = setup_mock_node( + let (mock_network, _client, view_client, _) = setup_mock_node( Path::new(&client_home_dir), home_dir, near_config, diff --git a/tools/mock_node/src/setup.rs b/tools/mock_node/src/setup.rs index 35283ae3a34..4bb66ef8da0 100644 --- a/tools/mock_node/src/setup.rs +++ b/tools/mock_node/src/setup.rs @@ -127,6 +127,9 @@ impl FromStr for MockNetworkMode { /// `target_height`: height that the simulated peers will produce blocks until. If None, will /// use the height from the chain head in storage /// `in_memory_storage`: if true, make client use in memory storage instead of rocksdb +/// +/// Returns an actix::Addr handle to each of the actors spawned, plus a Vec of Servers representing +/// the ports that the mock node is currently listening on. pub fn setup_mock_node( client_home_dir: &Path, network_home_dir: &Path, @@ -136,7 +139,12 @@ pub fn setup_mock_node( client_start_height: Option, target_height: Option, in_memory_storage: bool, -) -> (Addr, Addr, Addr) { +) -> ( + Addr, + Addr, + Addr, + Option>, +) { let parent_span = tracing::debug_span!(target: "mock_node", "setup_mock_node").entered(); let client_runtime = setup_runtime(client_home_dir, &config, in_memory_storage); let mock_network_runtime = setup_runtime(network_home_dir, &config, false); @@ -319,19 +327,23 @@ pub fn setup_mock_node( !archival, ) }); + // for some reason, with "test_features", start_http requires PeerManagerActor, // we are not going to run start_mock_network with test_features, so let's disable that for now #[cfg(not(feature = "test_features"))] - if let Some(rpc_config) = config.rpc_config { + let server = config.rpc_config.map(|rpc_config| { near_jsonrpc::start_http( rpc_config, config.genesis.config, client_actor.clone(), view_client.clone(), - ); - } + ) + }); + #[cfg(feature = "test_features")] + let server = None; + network_adapter.set_recipient(mock_network_actor.clone().recipient()); - (mock_network_actor, client_actor, view_client) + (mock_network_actor, client_actor, view_client, server) } #[cfg(test)] @@ -448,7 +460,7 @@ mod test { near_config1.client_config.tracked_shards = (0..near_config1.genesis.config.shard_layout.num_shards()).collect(); run_actix(async move { - let (_mock_network, _client, view_client) = setup_mock_node( + let (_mock_network, _client, view_client, _) = setup_mock_node( dir1.path().clone(), dir.path().clone(), near_config1,