From c41a51a2ff86198f44dd6b24ff534507a17cf519 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Thu, 29 Feb 2024 15:57:02 +0100 Subject: [PATCH 1/8] fix: revert update smart contract event (#470) --- crates/topos-sequencer-subnet-client/src/lib.rs | 6 +----- crates/topos-sequencer-subnet-client/src/subnet_contract.rs | 6 ++---- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/crates/topos-sequencer-subnet-client/src/lib.rs b/crates/topos-sequencer-subnet-client/src/lib.rs index da5e74157..6f988baf1 100644 --- a/crates/topos-sequencer-subnet-client/src/lib.rs +++ b/crates/topos-sequencer-subnet-client/src/lib.rs @@ -36,11 +36,7 @@ pub type Hash = String; /// Event collected from the sending subnet #[derive(Debug, Clone, Serialize, Deserialize)] pub enum SubnetEvent { - CrossSubnetMessageSent { - target_subnet_id: SubnetId, - source_subnet_id: SubnetId, - nonce: u64, - }, + CrossSubnetMessageSent { target_subnet_id: SubnetId }, } #[derive(Default, Debug, Clone, Serialize, Deserialize)] diff --git a/crates/topos-sequencer-subnet-client/src/subnet_contract.rs b/crates/topos-sequencer-subnet-client/src/subnet_contract.rs index 63207026c..43dc27a3e 100644 --- a/crates/topos-sequencer-subnet-client/src/subnet_contract.rs +++ b/crates/topos-sequencer-subnet-client/src/subnet_contract.rs @@ -12,7 +12,7 @@ use tracing::info; abigen!( IToposCore, - "npm:@topos-protocol/topos-smart-contracts@latest/artifacts/contracts/interfaces/IToposCore.\ + "npm:@topos-protocol/topos-smart-contracts@3.2.0/artifacts/contracts/interfaces/IToposCore.\ sol/IToposCore.json" ); @@ -60,11 +60,9 @@ pub(crate) async fn get_block_events( ); result.push(SubnetEvent::CrossSubnetMessageSent { target_subnet_id: f.target_subnet_id.into(), - source_subnet_id: f.source_subnet_id.into(), - nonce: f.nonce.as_u64(), }) } else { - // Ignored other events until we need them + // Ignored for now other events Upgraded, CertStored } } From d8db631d970d6b855e5a47f0c561abe8bab9832d Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Fri, 1 Mar 2024 21:18:36 +0100 Subject: [PATCH 2/8] feat: introduce topos-node crate (#459) --- Cargo.lock | 61 +++++ Cargo.toml | 2 +- crates/topos-config/src/edge/command.rs | 4 +- crates/topos-node/Cargo.toml | 76 ++++++ crates/topos-node/build.rs | 23 ++ crates/topos-node/src/lib.rs | 221 ++++++++++++++++++ crates/topos-node/src/main.rs | 4 + .../services => topos-node/src}/process.rs | 0 crates/topos-tce-broadcast/src/tests/mod.rs | 11 +- crates/topos/Cargo.toml | 1 + crates/topos/src/components/node/mod.rs | 181 +------------- crates/topos/src/components/node/services.rs | 1 - crates/topos/tests/config.rs | 55 +++-- 13 files changed, 445 insertions(+), 195 deletions(-) create mode 100644 crates/topos-node/Cargo.toml create mode 100644 crates/topos-node/build.rs create mode 100644 crates/topos-node/src/lib.rs create mode 100644 crates/topos-node/src/main.rs rename crates/{topos/src/components/node/services => topos-node/src}/process.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 0a2ab77b3..00ec1ba51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7711,6 +7711,7 @@ dependencies = [ "topos-certificate-spammer", "topos-config", "topos-core", + "topos-node", "topos-p2p", "topos-sequencer", "topos-tce", @@ -7879,6 +7880,66 @@ dependencies = [ "prometheus", ] +[[package]] +name = "topos-node" +version = "0.1.0" +dependencies = [ + "assert_cmd", + "async-stream", + "async-trait", + "clap 4.4.18", + "dirs", + "env_logger 0.10.2", + "flate2", + "futures", + "hex", + "insta", + "libp2p", + "once_cell", + "openssl", + "opentelemetry", + "opentelemetry-otlp", + "predicates 3.1.0", + "rand", + "regex", + "reqwest", + "rlp", + "rstest", + "serde", + "serde_json", + "serial_test", + "sysinfo", + "tar", + "tempfile", + "test-log", + "thiserror", + "tokio", + "tokio-util", + "toml 0.7.8", + "tonic 0.10.2", + "topos-certificate-spammer", + "topos-config", + "topos-core", + "topos-p2p", + "topos-sequencer", + "topos-tce", + "topos-tce-api", + "topos-tce-broadcast", + "topos-tce-gatekeeper", + "topos-tce-storage", + "topos-tce-synchronizer", + "topos-telemetry", + "topos-test-sdk", + "topos-wallet", + "tower", + "tracing", + "tracing-log", + "tracing-opentelemetry", + "tracing-subscriber", + "url", + "uuid 1.7.0", +] + [[package]] name = "topos-p2p" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cbc89c01b..75dfd2dd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -default-members = ["crates/topos"] +default-members = ["crates/topos", "crates/topos-node"] members = [ "crates/*" ] diff --git a/crates/topos-config/src/edge/command.rs b/crates/topos-config/src/edge/command.rs index 383fa98b2..ce5e8a4c2 100644 --- a/crates/topos-config/src/edge/command.rs +++ b/crates/topos-config/src/edge/command.rs @@ -70,8 +70,8 @@ impl CommandConfig { let mut child = command .stderr(Stdio::piped()) - .stdout(Stdio::inherit()) - .stdin(Stdio::inherit()) + .stdout(Stdio::piped()) + .stdin(Stdio::piped()) .spawn()?; if let Some(pid) = child.id() { diff --git a/crates/topos-node/Cargo.toml b/crates/topos-node/Cargo.toml new file mode 100644 index 000000000..5962cd6bd --- /dev/null +++ b/crates/topos-node/Cargo.toml @@ -0,0 +1,76 @@ +[package] +name = "topos-node" +version = "0.1.0" +edition = "2021" + +description = "Runtime crate of a topos-node" + +[lints] +workspace = true + +[dependencies] +topos-config = { path = "../topos-config/" } +topos-tce = { path = "../topos-tce/" } +topos-p2p = { path = "../topos-p2p" } +topos-sequencer = { path = "../topos-sequencer" } +topos-core = { workspace = true, features = ["api"] } +topos-certificate-spammer = { path = "../topos-certificate-spammer" } +topos-tce-broadcast = { path = "../topos-tce-broadcast", optional = true } +topos-wallet = { path = "../topos-wallet" } +topos-telemetry = { path = "../topos-telemetry/", features = ["tracing"] } + +async-stream.workspace = true +async-trait.workspace = true +clap.workspace = true +hex.workspace = true +futures.workspace = true +opentelemetry.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio = { workspace = true, features = ["full"] } +tokio-util.workspace = true +tonic.workspace = true +tower.workspace = true +tracing = { workspace = true, features = ["log"] } +tracing-opentelemetry.workspace = true +tracing-subscriber = { workspace = true, features = ["env-filter", "json", "ansi", "fmt"] } +uuid.workspace = true +rand.workspace = true +reqwest.workspace = true +thiserror.workspace = true +opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics", "tls-roots"] } +dirs = "5.0" +tracing-log = { version = "0.1.3", features = ["env_logger"] } +tar = "0.4.38" +flate2 ="1.0.26" +url = "2.3.1" +once_cell = "1.17.1" +regex = "1" +rlp = "0.5.1" +openssl = { version = "0.10.61", features = ["vendored"] } + +[dev-dependencies] +toml = "0.7.4" +topos-tce-broadcast = { path = "../topos-tce-broadcast" } +topos-tce-synchronizer = { path = "../topos-tce-synchronizer" } +topos-tce-gatekeeper = { path = "../topos-tce-gatekeeper" } +topos-tce-api = { path = "../topos-tce-api" } +topos-tce-storage = { path = "../topos-tce-storage" } +topos-test-sdk = { path = "../topos-test-sdk" } +serde.workspace = true +serde_json.workspace = true +test-log.workspace = true +env_logger.workspace = true +rand.workspace = true +futures.workspace = true +libp2p = { workspace = true, features = ["identify"] } +assert_cmd = "2.0.6" +insta = { version = "1.21", features = ["json", "redactions"] } +rstest = { workspace = true, features = ["async-timeout"] } +tempfile = "3.8.0" +predicates = "3.0.3" +sysinfo = "0.29.11" +serial_test = {version = "0.9.0"} + +[features] +default = [] diff --git a/crates/topos-node/build.rs b/crates/topos-node/build.rs new file mode 100644 index 000000000..ceded3d53 --- /dev/null +++ b/crates/topos-node/build.rs @@ -0,0 +1,23 @@ +use std::process::Command; + +const DEFAULT_VERSION: &str = "detached"; + +fn main() { + // Set TOPOS_VERSION to HEAD short commit hash unless it's already set + if std::option_env!("TOPOS_VERSION").is_none() { + let output = Command::new("git") + .args(["rev-parse", "--short", "HEAD"]) + .output() + .expect("failed to access the HEAD commit hash"); + + let git_hash = String::from_utf8(output.stdout).unwrap(); + + let topos_version = if git_hash.is_empty() { + DEFAULT_VERSION + } else { + git_hash.as_str() + }; + + println!("cargo:rustc-env=TOPOS_VERSION={topos_version}"); + } +} diff --git a/crates/topos-node/src/lib.rs b/crates/topos-node/src/lib.rs new file mode 100644 index 000000000..748a3e4cb --- /dev/null +++ b/crates/topos-node/src/lib.rs @@ -0,0 +1,221 @@ +//! Temporary lib exposition for backward topos CLI compatibility +use std::{path::PathBuf, process::ExitStatus}; + +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use opentelemetry::{global, sdk::metrics::controllers::BasicController}; +use process::Errors; +use tokio::{ + signal::{self, unix::SignalKind}, + sync::mpsc, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use topos_config::{ + edge::command::BINARY_NAME, + genesis::Genesis, + node::{NodeConfig, NodeRole}, +}; +use topos_telemetry::tracing::setup_tracing; +use topos_wallet::SecretManager; +use tracing::{error, info}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +mod process; + +#[allow(clippy::too_many_arguments)] +pub async fn start( + verbose: u8, + no_color: bool, + otlp_agent: Option, + otlp_service_name: Option, + no_edge_process: bool, + node_path: PathBuf, + home: PathBuf, + config: NodeConfig, +) -> Result<(), Box> { + println!( + "โš™๏ธ Reading the configuration from {}/config.toml", + node_path.display() + ); + + // Load genesis pointed by the local config + let genesis_file_path = home + .join("subnet") + .join(config.base.subnet.clone()) + .join("genesis.json"); + + let genesis = match Genesis::new(genesis_file_path.clone()) { + Ok(genesis) => genesis, + Err(_) => { + println!( + "Could not load genesis.json file on path {} \n Please make sure to have a valid \ + genesis.json file for your subnet in the {}/subnet/{} folder.", + genesis_file_path.display(), + home.display(), + &config.base.subnet + ); + std::process::exit(1); + } + }; + + // Get secrets + let keys = match &config.base.secrets_config { + Some(secrets_config) => SecretManager::from_aws(secrets_config), + None => SecretManager::from_fs(node_path.clone()), + }; + + // Setup instrumentation if both otlp agent and otlp service name + // are provided as arguments + let basic_controller = setup_tracing( + verbose, + no_color, + otlp_agent, + otlp_service_name, + env!("TOPOS_VERSION"), + )?; + + info!( + "๐Ÿงข New joiner: {} for the \"{}\" subnet as {:?}", + config.base.name, config.base.subnet, config.base.role + ); + + let shutdown_token = CancellationToken::new(); + let shutdown_trigger = shutdown_token.clone(); + + let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); + + let mut processes = spawn_processes( + no_edge_process, + config, + node_path, + home, + genesis, + shutdown_sender, + keys, + shutdown_token, + ); + + let mut sigterm_stream = signal::unix::signal(SignalKind::terminate())?; + + tokio::select! { + _ = sigterm_stream.recv() => { + info!("Received SIGTERM, shutting down application..."); + shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await; + } + _ = signal::ctrl_c() => { + info!("Received ctrl_c, shutting down application..."); + shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await; + } + Some(result) = processes.next() => { + shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await; + processes.clear(); + match result { + Ok(Ok(status)) => { + if let Some(0) = status.code() { + info!("Terminating with success error code"); + } else { + info!("Terminating with error status: {:?}", status); + std::process::exit(1); + } + } + Ok(Err(e)) => { + error!("Terminating with error: {e}"); + std::process::exit(1); + } + Err(e) => { + error!("Terminating with error: {e}"); + std::process::exit(1); + } + } + } + }; + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn spawn_processes( + no_edge_process: bool, + config: NodeConfig, + node_path: PathBuf, + edge_path: PathBuf, + genesis: Genesis, + shutdown_sender: mpsc::Sender<()>, + keys: SecretManager, + shutdown_token: CancellationToken, +) -> FuturesUnordered>> { + let processes = FuturesUnordered::new(); + + // Edge node + if no_edge_process { + info!("Using external edge node, skip running of local edge instance...") + } else if let Some(edge_config) = config.edge { + let data_dir = node_path.clone(); + info!( + "Spawning edge process with genesis file: {}, data directory: {}, additional edge \ + arguments: {:?}", + genesis.path.display(), + data_dir.display(), + edge_config.args + ); + processes.push(process::spawn_edge_process( + edge_path.join(BINARY_NAME), + data_dir, + genesis.path.clone(), + edge_config.args, + )); + } else { + error!("Missing edge configuration, could not run edge node!"); + std::process::exit(1); + } + + // Sequencer + if matches!(config.base.role, NodeRole::Sequencer) { + let sequencer_config = config + .sequencer + .clone() + .expect("valid sequencer configuration"); + info!( + "Running sequencer with configuration {:?}", + sequencer_config + ); + processes.push(process::spawn_sequencer_process( + sequencer_config, + &keys, + (shutdown_token.clone(), shutdown_sender.clone()), + )); + } + + // TCE + if config.base.subnet == "topos" { + info!("Running topos TCE service...",); + processes.push(process::spawn_tce_process( + config.tce.unwrap(), + keys, + genesis, + (shutdown_token.clone(), shutdown_sender.clone()), + )); + } + + drop(shutdown_sender); + processes +} + +async fn shutdown( + basic_controller: Option, + trigger: CancellationToken, + mut termination: mpsc::Receiver<()>, +) { + trigger.cancel(); + // Wait that all sender get dropped + info!("Waiting that all components dropped"); + let _ = termination.recv().await; + info!("Shutdown procedure finished, exiting..."); + // Shutdown tracing + global::shutdown_tracer_provider(); + if let Some(basic_controller) = basic_controller { + if let Err(e) = basic_controller.stop(&tracing::Span::current().context()) { + error!("Error stopping tracing: {e}"); + } + } +} diff --git a/crates/topos-node/src/main.rs b/crates/topos-node/src/main.rs new file mode 100644 index 000000000..1b854721d --- /dev/null +++ b/crates/topos-node/src/main.rs @@ -0,0 +1,4 @@ +#[tokio::main] +async fn main() -> Result<(), Box> { + Ok(()) +} diff --git a/crates/topos/src/components/node/services/process.rs b/crates/topos-node/src/process.rs similarity index 100% rename from crates/topos/src/components/node/services/process.rs rename to crates/topos-node/src/process.rs diff --git a/crates/topos-tce-broadcast/src/tests/mod.rs b/crates/topos-tce-broadcast/src/tests/mod.rs index ed10a2871..112e8df07 100644 --- a/crates/topos-tce-broadcast/src/tests/mod.rs +++ b/crates/topos-tce-broadcast/src/tests/mod.rs @@ -1,9 +1,18 @@ use crate::double_echo::*; -use crate::*; +use crate::event::ProtocolEvents; use rstest::*; +use std::collections::HashSet; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Receiver; +use tokio::sync::{broadcast, mpsc, oneshot}; +use topos_config::tce::broadcast::ReliableBroadcastParams; +use topos_core::uci::Certificate; +use topos_crypto::messages::MessageSigner; +use topos_crypto::validator_id::ValidatorId; +use topos_tce_storage::types::CertificateDeliveredWithPositions; +use topos_tce_storage::validator::ValidatorStore; use topos_test_sdk::constants::*; use topos_test_sdk::storage::create_validator_store; diff --git a/crates/topos/Cargo.toml b/crates/topos/Cargo.toml index ebf00e859..f3b2dfbca 100644 --- a/crates/topos/Cargo.toml +++ b/crates/topos/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" workspace = true [dependencies] +topos-node = { path = "../topos-node/" } topos-config = { path = "../topos-config/" } topos-tce = { path = "../topos-tce/" } topos-p2p = { path = "../topos-p2p" } diff --git a/crates/topos/src/components/node/mod.rs b/crates/topos/src/components/node/mod.rs index 4f7014775..66123c510 100644 --- a/crates/topos/src/components/node/mod.rs +++ b/crates/topos/src/components/node/mod.rs @@ -1,28 +1,17 @@ -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use opentelemetry::global; -use opentelemetry::sdk::metrics::controllers::BasicController; use std::{ fs::{create_dir_all, remove_dir_all, OpenOptions}, io::Write, }; use std::{path::Path, sync::Arc}; -use tokio::{ - signal::{self, unix::SignalKind}, - sync::{mpsc, Mutex}, -}; -use tokio_util::sync::CancellationToken; +use tokio::sync::Mutex; use tonic::transport::{Channel, Endpoint}; -use topos_telemetry::tracing::setup_tracing; use tower::Service; -use tracing::{error, info}; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::error; use self::commands::{NodeCommand, NodeCommands}; -use topos_config::{edge::command::BINARY_NAME, genesis::Genesis, Config}; -use topos_config::{node::NodeConfig, node::NodeRole}; +use topos_config::node::NodeConfig; +use topos_config::{edge::command::BINARY_NAME, Config}; use topos_core::api::grpc::tce::v1::console_service_client::ConsoleServiceClient; -use topos_wallet::SecretManager; pub(crate) mod commands; pub(crate) mod services; @@ -146,6 +135,7 @@ pub(crate) async fn handle_command( let node_path = home.join("node").join(name); let config_path = node_path.join("config.toml"); + // TODO: Move this to `topos-node` when migrated if !Path::new(&config_path).exists() { println!( "Please run 'topos node init --name {name}' to create a config file first for \ @@ -155,145 +145,17 @@ pub(crate) async fn handle_command( } let config = NodeConfig::new(&node_path, Some(command)); - println!( - "โš™๏ธ Reading the configuration from {}/config.toml", - node_path.display() - ); - - // Load genesis pointed by the local config - let genesis_file_path = home - .join("subnet") - .join(config.base.subnet.clone()) - .join("genesis.json"); - let genesis = match Genesis::new(genesis_file_path.clone()) { - Ok(genesis) => genesis, - Err(_) => { - println!( - "Could not load genesis.json file on path {} \n Please make sure to have \ - a valid genesis.json file for your subnet in the {}/subnet/{} folder.", - genesis_file_path.display(), - home.display(), - &config.base.subnet - ); - std::process::exit(1); - } - }; - - // Get secrets - let keys = match &config.base.secrets_config { - Some(secrets_config) => SecretManager::from_aws(secrets_config), - None => SecretManager::from_fs(node_path.clone()), - }; - - info!( - "๐Ÿงข New joiner: {} for the \"{}\" subnet as {:?}", - config.base.name, config.base.subnet, config.base.role - ); - - let shutdown_token = CancellationToken::new(); - let shutdown_trigger = shutdown_token.clone(); - - // Setup instrumentation if both otlp agent and otlp service name - // are provided as arguments - let basic_controller = setup_tracing( + topos_node::start( verbose, no_color, cmd_cloned.otlp_agent, cmd_cloned.otlp_service_name, - env!("TOPOS_VERSION"), - )?; - - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - let mut processes = FuturesUnordered::new(); - - // Edge node - if cmd_cloned.no_edge_process { - info!("Using external edge node, skip running of local edge instance...") - } else if let Some(edge_config) = config.edge { - let data_dir = node_path.clone(); - info!( - "Spawning edge process with genesis file: {}, data directory: {}, additional \ - edge arguments: {:?}", - genesis.path.display(), - data_dir.display(), - edge_config.args - ); - processes.push(services::process::spawn_edge_process( - edge_path.join(BINARY_NAME), - data_dir, - genesis.path.clone(), - edge_config.args, - )); - } else { - error!("Missing edge configuration, could not run edge node!"); - std::process::exit(1); - } - - // Sequencer - if matches!(config.base.role, NodeRole::Sequencer) { - let sequencer_config = config - .sequencer - .clone() - .expect("valid sequencer configuration"); - info!( - "Running sequencer with configuration {:?}", - sequencer_config - ); - processes.push(services::process::spawn_sequencer_process( - sequencer_config, - &keys, - (shutdown_token.clone(), shutdown_sender.clone()), - )); - } - - // TCE - if config.base.subnet == "topos" { - info!("Running topos TCE service...",); - processes.push(services::process::spawn_tce_process( - config.tce.unwrap(), - keys, - genesis, - (shutdown_token.clone(), shutdown_sender.clone()), - )); - } - - drop(shutdown_sender); - - let mut sigterm_stream = signal::unix::signal(SignalKind::terminate())?; - - tokio::select! { - _ = sigterm_stream.recv() => { - info!("Received SIGTERM, shutting down application..."); - shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await; - } - _ = signal::ctrl_c() => { - info!("Received ctrl_c, shutting down application..."); - shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await; - } - Some(result) = processes.next() => { - shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await; - processes.clear(); - match result { - Ok(Ok(status)) => { - if let Some(0) = status.code() { - info!("Terminating with success error code"); - } else { - info!("Terminating with error status: {:?}", status); - std::process::exit(1); - } - } - Ok(Err(e)) => { - error!("Terminating with error: {e}"); - std::process::exit(1); - } - Err(e) => { - error!("Terminating with error: {e}"); - std::process::exit(1); - } - } - } - }; + cmd_cloned.no_edge_process, + node_path, + home, + config, + ) + .await?; Ok(()) } @@ -317,22 +179,3 @@ fn setup_console_tce_grpc(endpoint: &str) -> Arc, - trigger: CancellationToken, - mut termination: mpsc::Receiver<()>, -) { - trigger.cancel(); - // Wait that all sender get dropped - info!("Waiting that all components dropped"); - let _ = termination.recv().await; - info!("Shutdown procedure finished, exiting..."); - // Shutdown tracing - global::shutdown_tracer_provider(); - if let Some(basic_controller) = basic_controller { - if let Err(e) = basic_controller.stop(&tracing::Span::current().context()) { - error!("Error stopping tracing: {e}"); - } - } -} diff --git a/crates/topos/src/components/node/services.rs b/crates/topos/src/components/node/services.rs index 102a1bd4c..3d7dcd739 100644 --- a/crates/topos/src/components/node/services.rs +++ b/crates/topos/src/components/node/services.rs @@ -1,2 +1 @@ -pub(crate) mod process; pub(crate) mod status; diff --git a/crates/topos/tests/config.rs b/crates/topos/tests/config.rs index 33e619906..270249010 100644 --- a/crates/topos/tests/config.rs +++ b/crates/topos/tests/config.rs @@ -2,7 +2,6 @@ use assert_cmd::prelude::*; use regex::Regex; use std::path::PathBuf; use std::process::{Command, Stdio}; -use std::time::Duration; use tempfile::tempdir; use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; @@ -15,12 +14,17 @@ use crate::utils::setup_polygon_edge; mod utils; mod serial_integration { + use rstest::rstest; + use sysinfo::{Pid, PidExt, ProcessExt, Signal, System, SystemExt}; + use super::*; + #[rstest] #[tokio::test] - async fn handle_command_init() -> Result<(), Box> { - let tmp_home_dir = tempdir()?; - let path = setup_polygon_edge(tmp_home_dir.path().to_str().unwrap()).await; + async fn handle_command_init( + #[from(create_folder)] home: PathBuf, + ) -> Result<(), Box> { + let path = setup_polygon_edge(home.to_str().unwrap()).await; let mut cmd = Command::cargo_bin("topos")?; cmd.arg("node") @@ -28,15 +32,13 @@ mod serial_integration { .arg(path) .arg("init") .arg("--home") - .arg(tmp_home_dir.path().to_str().unwrap()); + .arg(home.to_str().unwrap()); let output = cmd.assert().success(); let result: &str = std::str::from_utf8(&output.get_output().stdout)?; assert!(result.contains("Created node config file")); - let home = PathBuf::from(tmp_home_dir.path()); - // Verification: check that the config file was created let config_path = home.join("node").join("default").join("config.toml"); assert!(config_path.exists()); @@ -246,12 +248,13 @@ mod serial_integration { Ok(()) } /// Test node up running from config file + #[rstest] #[test_log::test(tokio::test)] - async fn command_node_up() -> Result<(), Box> { - let tmp_home_dir = tempdir()?; - + async fn command_node_up( + #[from(create_folder)] tmp_home_dir: PathBuf, + ) -> Result<(), Box> { // Create config file - let node_up_home_env = tmp_home_dir.path().to_str().unwrap(); + let node_up_home_env = tmp_home_dir.to_str().unwrap(); let node_edge_path_env = setup_polygon_edge(node_up_home_env).await; let node_up_name_env = "TEST_NODE_UP"; let node_up_role_env = "full-node"; @@ -330,13 +333,10 @@ mod serial_integration { /// Test node up running from config file #[rstest::rstest] - #[timeout(Duration::from_secs(30))] #[test_log::test(tokio::test)] async fn command_node_up_with_old_config( - create_folder: PathBuf, + #[from(create_folder)] tmp_home_dir: PathBuf, ) -> Result<(), Box> { - let tmp_home_dir = create_folder; - // Create config file let node_up_home_env = tmp_home_dir.to_str().unwrap(); let node_edge_path_env = setup_polygon_edge(node_up_home_env).await; @@ -418,14 +418,27 @@ mod serial_integration { .stdout(Stdio::piped()); let cmd = tokio::process::Command::from(cmd).spawn().unwrap(); - let output = tokio::time::timeout(Duration::from_secs(10), cmd.wait_with_output()).await; - let stdout = output??.stdout; - let stdout = String::from_utf8_lossy(&stdout); + let pid = cmd.id().unwrap(); + let _ = tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + let s = System::new_all(); + if let Some(process) = s.process(Pid::from_u32(pid)) { + if process.kill_with(Signal::Term).is_none() { + eprintln!("This signal isn't supported on this platform"); + } + } - println!("STDOUT: {}", stdout); - let reg = Regex::new(r#"Local node is listening on "\/ip4\/.*\/tcp\/9091\/p2p\/"#).unwrap(); - assert!(reg.is_match(&stdout)); + if let Ok(output) = cmd.wait_with_output().await { + assert!(output.status.success()); + let stdout = output.unwrap().unwrap().stdout; + let stdout = String::from_utf8_lossy(&stdout); + let reg = + Regex::new(r#"Local node is listening on "\/ip4\/.*\/tcp\/9091\/p2p\/"#).unwrap(); + assert!(reg.is_match(&stdout)); + } else { + panic!("Failed to shutdown gracefully"); + } // Cleanup std::fs::remove_dir_all(node_up_home_env)?; From 82917405e7bf102c06194caadc973f57ac735649 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Tue, 5 Mar 2024 03:42:26 -0400 Subject: [PATCH 3/8] fix: update mio (#473) --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00ec1ba51..bc2c1e2f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4557,9 +4557,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi", From b2e4cf88ac0c0b2ee92b7ef120a4c4e97493150c Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Tue, 5 Mar 2024 08:42:55 +0100 Subject: [PATCH 4/8] chore: removing cache_size (#472) Signed-off-by: Simon Paitrault --- crates/topos-p2p/src/behaviour/peer_info.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/topos-p2p/src/behaviour/peer_info.rs b/crates/topos-p2p/src/behaviour/peer_info.rs index 3e7049ffe..e2a3e80fc 100644 --- a/crates/topos-p2p/src/behaviour/peer_info.rs +++ b/crates/topos-p2p/src/behaviour/peer_info.rs @@ -12,8 +12,7 @@ pub struct PeerInfoBehaviour { impl PeerInfoBehaviour { pub(crate) fn new(identify_protocol: &'static str, peer_key: &Keypair) -> PeerInfoBehaviour { let ident_config = IdentifyConfig::new(identify_protocol.to_string(), peer_key.public()) - .with_push_listen_addr_updates(true) - .with_cache_size(0); + .with_push_listen_addr_updates(true); let identify = Identify::new(ident_config); From cc0c7b538d9f6b91c184db10eedd9d94c4f368fb Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Tue, 5 Mar 2024 10:53:31 +0100 Subject: [PATCH 5/8] refactor: update error management and config/process (#460) --- crates/topos-config/src/genesis/mod.rs | 18 +++- crates/topos-config/src/genesis/tests.rs | 2 +- crates/topos-config/src/node.rs | 83 +++++++++++++-- crates/topos-node/src/lib.rs | 127 +++++++++++++---------- crates/topos-telemetry/src/tracing.rs | 3 +- crates/topos/src/components/node/mod.rs | 22 ++-- crates/topos/tests/config.rs | 25 ++--- 7 files changed, 182 insertions(+), 98 deletions(-) diff --git a/crates/topos-config/src/genesis/mod.rs b/crates/topos-config/src/genesis/mod.rs index ab0db46d0..21ad50442 100644 --- a/crates/topos-config/src/genesis/mod.rs +++ b/crates/topos-config/src/genesis/mod.rs @@ -8,12 +8,13 @@ use topos_core::types::ValidatorId; use topos_p2p::{Multiaddr, PeerId}; use tracing::info; +use crate::node::NodeConfig; + #[cfg(test)] pub(crate) mod tests; /// From the Edge format pub struct Genesis { - pub path: PathBuf, pub json: Value, } @@ -21,19 +22,20 @@ pub struct Genesis { pub enum Error { #[error("Failed to parse validators")] ParseValidators, + #[error("Invalid genesis file on path {0}: {1}")] InvalidGenesisFile(String, String), } impl Genesis { - pub fn new(path: PathBuf) -> Result { + pub fn new(path: &PathBuf) -> Result { info!("Reading subnet genesis file {}", path.display()); - let genesis_file = fs::File::open(&path) + let genesis_file = fs::File::open(path) .map_err(|e| Error::InvalidGenesisFile(path.display().to_string(), e.to_string()))?; let json: Value = serde_json::from_reader(genesis_file).expect("genesis json parsed"); - Ok(Self { path, json }) + Ok(Self { json }) } // TODO: parse directly with serde @@ -103,3 +105,11 @@ impl Genesis { ) } } + +impl TryFrom<&NodeConfig> for Genesis { + type Error = Error; + + fn try_from(config: &NodeConfig) -> Result { + Genesis::new(&config.edge_path) + } +} diff --git a/crates/topos-config/src/genesis/tests.rs b/crates/topos-config/src/genesis/tests.rs index a0e769ab3..f4cd6ffe3 100644 --- a/crates/topos-config/src/genesis/tests.rs +++ b/crates/topos-config/src/genesis/tests.rs @@ -14,7 +14,7 @@ macro_rules! test_case { #[fixture] #[once] pub fn genesis() -> Genesis { - Genesis::new(test_case!("genesis-example.json").into()) + Genesis::new(&test_case!("genesis-example.json").into()) .expect("Expected valid test genesis file") } diff --git a/crates/topos-config/src/node.rs b/crates/topos-config/src/node.rs index 4297d6434..40f99a1bb 100644 --- a/crates/topos-config/src/node.rs +++ b/crates/topos-config/src/node.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use figment::{ providers::{Format, Toml}, @@ -6,6 +6,8 @@ use figment::{ }; use serde::{Deserialize, Serialize}; +use topos_wallet::SecretManager; +use tracing::{debug, error}; use crate::{ base::BaseConfig, edge::EdgeConfig, load_config, sequencer::SequencerConfig, tce::TceConfig, @@ -26,28 +28,86 @@ pub struct NodeConfig { pub tce: Option, pub sequencer: Option, pub edge: Option, + + #[serde(skip)] + pub home_path: PathBuf, + + #[serde(skip)] + pub node_path: PathBuf, + + #[serde(skip)] + pub edge_path: PathBuf, } impl NodeConfig { - pub fn new(home: &Path, config: Option) -> Self { - let base = load_config::(home, config); + /// Try to create a new node config struct from the given home path and node name. + /// It expects a config file to be present in the node's folder. + /// + /// This `config.toml` can be generated using: `topos node init` command + pub fn try_from( + home_path: &Path, + node_name: &str, + config: Option, + ) -> Result { + let node_path = home_path.join("node").join(node_name); + let config_path = node_path.join("config.toml"); + + // TODO: Move this to `topos-node` when migrated + if !Path::new(&config_path).exists() { + error!( + "Please run 'topos node init --name {node_name}' to create a config file first \ + for {node_name}." + ); + std::process::exit(1); + } + + Ok(Self::build_config(node_path, home_path, config)) + } + + /// Create a new node config struct from the given home path and node name. + /// + /// It doesn't check the existence of the config file. + /// It's useful for creating a config file for a new node, relying on the default values. + pub fn create(home_path: &Path, node_name: &str, config: Option) -> Self { + let node_path = home_path.join("node").join(node_name); + + Self::build_config(node_path, home_path, config) + } + + /// Common function to build a node config struct from the given home path and node name. + fn build_config(node_path: PathBuf, home_path: &Path, config: Option) -> Self { + let node_folder = node_path.as_path(); + let base = load_config::(node_folder, config); + + // Load genesis pointed by the local config + let edge_path = home_path + .join("subnet") + .join(base.subnet.clone()) + .join("genesis.json"); let mut config = NodeConfig { + node_path: node_path.to_path_buf(), + edge_path, + home_path: home_path.to_path_buf(), base: base.clone(), sequencer: base .need_sequencer() - .then(|| load_config::(home, None)), + .then(|| load_config::(node_folder, None)), tce: base .need_tce() - .then(|| load_config::(home, None)), + .then(|| load_config::(node_folder, None)), edge: base .need_edge() - .then(|| load_config::(home, None)), + .then(|| load_config::(node_folder, None)), }; // Make the TCE DB path relative to the folder if let Some(config) = config.tce.as_mut() { - config.db_path = home.join(&config.db_path); + config.db_path = node_folder.join(&config.db_path); + debug!( + "Maked TCE DB path relative to the node folder -> {:?}", + config.db_path + ); } config @@ -71,3 +131,12 @@ impl Config for NodeConfig { "default".to_string() } } + +impl From<&NodeConfig> for SecretManager { + fn from(val: &NodeConfig) -> Self { + match val.base.secrets_config.as_ref() { + Some(secrets_config) => SecretManager::from_aws(secrets_config), + None => SecretManager::from_fs(val.node_path.clone()), + } + } +} diff --git a/crates/topos-node/src/lib.rs b/crates/topos-node/src/lib.rs index 748a3e4cb..3b9b6a5a3 100644 --- a/crates/topos-node/src/lib.rs +++ b/crates/topos-node/src/lib.rs @@ -1,5 +1,5 @@ //! Temporary lib exposition for backward topos CLI compatibility -use std::{path::PathBuf, process::ExitStatus}; +use std::process::ExitStatus; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -18,53 +18,43 @@ use topos_config::{ }; use topos_telemetry::tracing::setup_tracing; use topos_wallet::SecretManager; -use tracing::{error, info}; +use tracing::{debug, error, info}; use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing_subscriber::util::TryInitError; mod process; -#[allow(clippy::too_many_arguments)] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + GenesisFile(#[from] topos_config::genesis::Error), + + #[error("Unable to setup tracing logger: {0}")] + Tracing(#[from] TryInitError), + + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error( + "The role in the config file expect to have a sequencer config defined, none was found" + )] + MissingSequencerConfig, + + #[error("An Edge config was expected to be found in the config file")] + MissingEdgeConfig, + + #[error("A TCE config was expected to be found in the config file")] + MissingTCEConfig, +} + pub async fn start( verbose: u8, no_color: bool, otlp_agent: Option, otlp_service_name: Option, no_edge_process: bool, - node_path: PathBuf, - home: PathBuf, config: NodeConfig, -) -> Result<(), Box> { - println!( - "โš™๏ธ Reading the configuration from {}/config.toml", - node_path.display() - ); - - // Load genesis pointed by the local config - let genesis_file_path = home - .join("subnet") - .join(config.base.subnet.clone()) - .join("genesis.json"); - - let genesis = match Genesis::new(genesis_file_path.clone()) { - Ok(genesis) => genesis, - Err(_) => { - println!( - "Could not load genesis.json file on path {} \n Please make sure to have a valid \ - genesis.json file for your subnet in the {}/subnet/{} folder.", - genesis_file_path.display(), - home.display(), - &config.base.subnet - ); - std::process::exit(1); - } - }; - - // Get secrets - let keys = match &config.base.secrets_config { - Some(secrets_config) => SecretManager::from_aws(secrets_config), - None => SecretManager::from_fs(node_path.clone()), - }; - +) -> Result<(), Error> { // Setup instrumentation if both otlp agent and otlp service name // are provided as arguments let basic_controller = setup_tracing( @@ -75,6 +65,29 @@ pub async fn start( env!("TOPOS_VERSION"), )?; + info!( + "โš™๏ธ Read the configuration from {}/config.toml", + config.node_path.display() + ); + + debug!("TceConfig: {:?}", config); + + let config_ref = &config; + let genesis: Genesis = config_ref.try_into().map_err(|error| { + info!( + "Could not load genesis.json file on path {} \n Please make sure to have a valid \ + genesis.json file for your subnet in the {}/subnet/{} folder.", + config.edge_path.display(), + config.home_path.display(), + &config.base.subnet + ); + + error + })?; + + // Get secrets + let keys: SecretManager = config_ref.into(); + info!( "๐Ÿงข New joiner: {} for the \"{}\" subnet as {:?}", config.base.name, config.base.subnet, config.base.role @@ -88,13 +101,11 @@ pub async fn start( let mut processes = spawn_processes( no_edge_process, config, - node_path, - home, genesis, shutdown_sender, keys, shutdown_token, - ); + )?; let mut sigterm_stream = signal::unix::signal(SignalKind::terminate())?; @@ -130,51 +141,51 @@ pub async fn start( } } }; + Ok(()) } -#[allow(clippy::too_many_arguments)] fn spawn_processes( no_edge_process: bool, - config: NodeConfig, - node_path: PathBuf, - edge_path: PathBuf, + mut config: NodeConfig, genesis: Genesis, shutdown_sender: mpsc::Sender<()>, keys: SecretManager, shutdown_token: CancellationToken, -) -> FuturesUnordered>> { +) -> Result>>, Error> { let processes = FuturesUnordered::new(); // Edge node if no_edge_process { info!("Using external edge node, skip running of local edge instance...") - } else if let Some(edge_config) = config.edge { - let data_dir = node_path.clone(); + } else { + let edge_config = config.edge.take().ok_or(Error::MissingEdgeConfig)?; + + let data_dir = config.node_path.clone(); + info!( "Spawning edge process with genesis file: {}, data directory: {}, additional edge \ arguments: {:?}", - genesis.path.display(), + config.edge_path.display(), data_dir.display(), edge_config.args ); + processes.push(process::spawn_edge_process( - edge_path.join(BINARY_NAME), + config.home_path.join(BINARY_NAME), data_dir, - genesis.path.clone(), + config.edge_path.clone(), edge_config.args, )); - } else { - error!("Missing edge configuration, could not run edge node!"); - std::process::exit(1); } // Sequencer if matches!(config.base.role, NodeRole::Sequencer) { let sequencer_config = config .sequencer - .clone() - .expect("valid sequencer configuration"); + .take() + .ok_or(Error::MissingSequencerConfig)?; + info!( "Running sequencer with configuration {:?}", sequencer_config @@ -188,9 +199,11 @@ fn spawn_processes( // TCE if config.base.subnet == "topos" { + let tce_config = config.tce.ok_or(Error::MissingTCEConfig)?; info!("Running topos TCE service...",); + processes.push(process::spawn_tce_process( - config.tce.unwrap(), + tce_config, keys, genesis, (shutdown_token.clone(), shutdown_sender.clone()), @@ -198,7 +211,7 @@ fn spawn_processes( } drop(shutdown_sender); - processes + Ok(processes) } async fn shutdown( diff --git a/crates/topos-telemetry/src/tracing.rs b/crates/topos-telemetry/src/tracing.rs index 4adf2725e..c8da45cdb 100644 --- a/crates/topos-telemetry/src/tracing.rs +++ b/crates/topos-telemetry/src/tracing.rs @@ -6,6 +6,7 @@ use opentelemetry::{global, KeyValue}; use opentelemetry_otlp::{SpanExporterBuilder, WithExportConfig}; use std::time::Duration; use tracing::Level; +use tracing_subscriber::util::TryInitError; use tracing_subscriber::{ prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, }; @@ -66,7 +67,7 @@ pub fn setup_tracing( otlp_agent: Option, otlp_service_name: Option, version: &'static str, -) -> Result, Box> { +) -> Result, TryInitError> { let mut layers = Vec::new(); let ansi = !no_color; diff --git a/crates/topos/src/components/node/mod.rs b/crates/topos/src/components/node/mod.rs index 66123c510..410791548 100644 --- a/crates/topos/src/components/node/mod.rs +++ b/crates/topos/src/components/node/mod.rs @@ -5,6 +5,7 @@ use std::{ use std::{path::Path, sync::Arc}; use tokio::sync::Mutex; use tonic::transport::{Channel, Endpoint}; +use topos_telemetry::tracing::setup_tracing; use tower::Service; use tracing::error; @@ -40,12 +41,13 @@ pub(crate) async fn handle_command( match subcommands { Some(NodeCommands::Init(cmd)) => { let cmd = *cmd; - let name = cmd.name.as_ref().expect("No name or default was given"); + let name = cmd.name.clone().expect("No name or default was given"); + _ = setup_tracing(verbose, no_color, None, None, env!("TOPOS_VERSION")); // Construct path to node config // will be $TOPOS_HOME/node/default/ with no given name // and $TOPOS_HOME/node// with a given name - let node_path = home.join("node").join(name); + let node_path = home.join("node").join(&name); // If the folders don't exist yet, create it create_dir_all(&node_path).expect("failed to create node folder"); @@ -93,7 +95,7 @@ pub(crate) async fn handle_command( } } - let node_config = NodeConfig::new(&node_path, Some(cmd)); + let node_config = NodeConfig::create(&home, &name, Some(cmd)); // Creating the TOML output let config_toml = match node_config.to_toml() { @@ -132,27 +134,15 @@ pub(crate) async fn handle_command( .name .as_ref() .expect("No name or default was given for node"); - let node_path = home.join("node").join(name); - let config_path = node_path.join("config.toml"); - // TODO: Move this to `topos-node` when migrated - if !Path::new(&config_path).exists() { - println!( - "Please run 'topos node init --name {name}' to create a config file first for \ - {name}." - ); - std::process::exit(1); - } + let config = NodeConfig::try_from(&home, name, Some(command))?; - let config = NodeConfig::new(&node_path, Some(command)); topos_node::start( verbose, no_color, cmd_cloned.otlp_agent, cmd_cloned.otlp_service_name, cmd_cloned.no_edge_process, - node_path, - home, config, ) .await?; diff --git a/crates/topos/tests/config.rs b/crates/topos/tests/config.rs index 270249010..3ac3c53c8 100644 --- a/crates/topos/tests/config.rs +++ b/crates/topos/tests/config.rs @@ -147,12 +147,16 @@ mod serial_integration { } /// Test node init env arguments + #[rstest] #[tokio::test] - async fn command_init_precedence_env() -> Result<(), Box> { - let tmp_home_directory = tempdir()?; - + async fn command_init_precedence_env( + create_folder: PathBuf, + ) -> Result<(), Box> { + let tmp_home_directory = create_folder; // Test node init with env variables - let node_init_home_env = tmp_home_directory.path().to_str().unwrap(); + let node_init_home_env = tmp_home_directory + .to_str() + .expect("path names are valid utf-8"); let node_edge_path_env = setup_polygon_edge(node_init_home_env).await; let node_init_name_env = "TEST_NODE_ENV"; let node_init_role_env = "full-node"; @@ -182,7 +186,6 @@ mod serial_integration { assert!(config_path.exists()); // Check if config file params are according to env params let config_contents = std::fs::read_to_string(&config_path).unwrap(); - println!("{:#?}", config_contents); assert!(config_contents.contains("name = \"TEST_NODE_ENV\"")); assert!(config_contents.contains("role = \"fullnode\"")); assert!(config_contents.contains("subnet = \"topos-env\"")); @@ -193,17 +196,17 @@ mod serial_integration { /// Test node cli arguments precedence over env arguments #[tokio::test] async fn command_init_precedence_cli_env() -> Result<(), Box> { - let tmp_home_dir_env = tempdir()?; - let tmp_home_dir_cli = tempdir()?; + let tmp_home_dir_env = create_folder("command_init_precedence_cli_env"); + let tmp_home_dir_cli = create_folder("command_init_precedence_cli_env"); // Test node init with both cli and env flags // Cli arguments should take precedence over env variables - let node_init_home_env = tmp_home_dir_env.path().to_str().unwrap(); + let node_init_home_env = tmp_home_dir_env.to_str().unwrap(); let node_edge_path_env = setup_polygon_edge(node_init_home_env).await; let node_init_name_env = "TEST_NODE_ENV"; let node_init_role_env = "full-node"; let node_init_subnet_env = "topos-env"; - let node_init_home_cli = tmp_home_dir_cli.path().to_str().unwrap(); + let node_init_home_cli = tmp_home_dir_cli.to_str().unwrap(); let node_edge_path_cli = node_edge_path_env.clone(); let node_init_name_cli = "TEST_NODE_CLI"; let node_init_role_cli = "sequencer"; @@ -359,7 +362,6 @@ mod serial_integration { let home = PathBuf::from(node_up_home_env); // Verification: check that the config file was created let config_path = home.join("node").join(node_up_name_env).join("config.toml"); - println!("config path {:?}", config_path); assert!(config_path.exists()); let mut file = OpenOptions::new() @@ -394,7 +396,6 @@ mod serial_integration { // Generate polygon edge genesis file let polygon_edge_bin = format!("{}/polygon-edge", node_edge_path_env); - println!("polygon_edge_bin {:?}", polygon_edge_bin); utils::generate_polygon_edge_genesis_file( &polygon_edge_bin, node_up_home_env, @@ -430,7 +431,7 @@ mod serial_integration { if let Ok(output) = cmd.wait_with_output().await { assert!(output.status.success()); - let stdout = output.unwrap().unwrap().stdout; + let stdout = output.stdout; let stdout = String::from_utf8_lossy(&stdout); let reg = From 2c73f0bae1dc25aad504d45dcc789360cce7dbaa Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Tue, 5 Mar 2024 08:00:30 -0400 Subject: [PATCH 6/8] feat: terminate stream if client is dropping the connection (#463) --- crates/topos-tce-api/src/stream/mod.rs | 29 ++++++++++++-- crates/topos-tce-api/src/stream/tests.rs | 49 +++++++++++++++++++++++- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/crates/topos-tce-api/src/stream/mod.rs b/crates/topos-tce-api/src/stream/mod.rs index 6a7b92506..0cef0d097 100644 --- a/crates/topos-tce-api/src/stream/mod.rs +++ b/crates/topos-tce-api/src/stream/mod.rs @@ -150,14 +150,37 @@ impl Stream { } } - Some(_stream_packet) = self.inbound_stream.next() => { - + // We currently open the stream, but no other message from the client is getting processed. + // We are using this open connection to communicate `delivered_certificates` to the client. + Some(stream_packet) = self.inbound_stream.next() => { + match stream_packet { + Ok((_request_id, _message)) => { + trace!("Received message from stream_id: {:?}", self.stream_id); + } + Err(error) => { + match error.kind { + StreamErrorKind::StreamClosed => { + warn!("Stream {} closed", self.stream_id); + return Err(StreamError::new(self.stream_id, StreamErrorKind::StreamClosed)); + } + _ => { + // We are not handling specific errors for now. + // If the sequencer is closing the connection, we are receiving a + // StreamErrorKind::TransportError. + error!( "Stream error: {:?}", error); + return Err(StreamError::new(self.stream_id, error.kind)); + + } + + } + } + } } - // For graceful shutdown in case streams are closed else => break, } } + Ok(self.stream_id) } } diff --git a/crates/topos-tce-api/src/stream/tests.rs b/crates/topos-tce-api/src/stream/tests.rs index 4eb2b9d9d..3b09c15dc 100644 --- a/crates/topos-tce-api/src/stream/tests.rs +++ b/crates/topos-tce-api/src/stream/tests.rs @@ -200,9 +200,54 @@ async fn resuming_one_subscription() {} #[ignore = "not yet implemented"] async fn resuming_all_subscription() {} +#[rstest] #[test(tokio::test)] -#[ignore = "not yet implemented"] -async fn closing_client_stream() {} +async fn closing_client_stream() -> Result<(), Box> { + let (mut tx, stream, mut context) = StreamBuilder::default().build(); + + let join = spawn(stream.run()); + + let msg: WatchCertificatesRequest = GrpcOpenStream { + target_checkpoint: Some(TargetCheckpoint { + target_subnet_ids: vec![TARGET_SUBNET_ID_1.into()], + positions: vec![], + }), + source_checkpoint: None, + } + .into(); + + _ = tx.send_data(encode(&msg)?).await; + + let expected_stream_id = context.stream_id; + + wait_for_command!( + context.runtime_receiver, + matches: InternalRuntimeCommand::Register { stream_id, sender, .. } if stream_id == expected_stream_id => { + sender.send(Ok(())) + } + ); + + let msg = context.stream_receiver.recv().await; + + assert!( + matches!( + msg, + Some(Ok((_, OutboundMessage::StreamOpened(StreamOpened { ref subnet_ids })))) if subnet_ids == &[TARGET_SUBNET_ID_1], + ), + "Expected StreamOpened, received: {msg:?}" + ); + + tx.abort(); + + let result = join.await?; + + assert!( + matches!(result, Err(StreamError { stream_id, kind: StreamErrorKind::Transport(_)}) if stream_id == context.stream_id), + "Doesn't match {result:?}", + ); + + Ok(()) +} #[test(tokio::test)] #[ignore = "not yet implemented"] From 8044310b8ee330d5a14d509137dc4243cb2c2372 Mon Sep 17 00:00:00 2001 From: Monir Hadji Date: Tue, 5 Mar 2024 14:30:21 +0100 Subject: [PATCH 7/8] chore: refactor logs and fix typo (#465) --- crates/topos-certificate-spammer/src/lib.rs | 14 ++--- crates/topos-core/src/api/graphql/subnet.rs | 4 +- crates/topos-p2p/src/error.rs | 2 +- crates/topos-sequencer/src/app_context.rs | 8 +-- .../src/double_echo/broadcast_state.rs | 54 ++++++++++--------- .../src/double_echo/mod.rs | 4 +- .../src/task_manager/mod.rs | 19 +++---- .../src/task_manager/task.rs | 16 +++--- crates/topos-tce-storage/src/validator/mod.rs | 10 ++-- crates/topos-tce/src/app_context.rs | 4 +- crates/topos-tce/src/app_context/api.rs | 4 +- crates/topos-tce/src/app_context/network.rs | 33 +++++------- crates/topos-tce/src/app_context/protocol.rs | 6 +-- crates/topos/tests/node.rs | 2 +- 14 files changed, 86 insertions(+), 94 deletions(-) diff --git a/crates/topos-certificate-spammer/src/lib.rs b/crates/topos-certificate-spammer/src/lib.rs index b0847894e..4e62dbaab 100644 --- a/crates/topos-certificate-spammer/src/lib.rs +++ b/crates/topos-certificate-spammer/src/lib.rs @@ -80,7 +80,7 @@ async fn open_target_node_connection( Ok(value) => value, Err(e) => { error!( - "Unable to create TCE client for node {}, error details: {}", + "Unable to create TCE client for node {}: {}", &tce_address, e ); return Err(Error::TCENodeConnection(e)); @@ -90,10 +90,7 @@ async fn open_target_node_connection( match tce_client.open_stream(Vec::new()).await { Ok(_) => {} Err(e) => { - error!( - "Unable to connect to node {}, error details: {}", - &tce_address, e - ); + error!("Unable to connect to node {}: {}", &tce_address, e); return Err(Error::TCENodeConnection(e)); } } @@ -150,7 +147,7 @@ async fn close_target_node_connections( { info!("Closing connection to target node {}", target_node.address); if let Err(e) = target_node.shutdown().await { - error!("Error shutting down connection {e}"); + error!("Failed to close stream with {}: {e}", target_node.address); } } } @@ -176,10 +173,7 @@ async fn send_new_certificate(tce_client: &mut TceClient, cert: Certificate) { .instrument(Span::current()) .await { - error!( - "failed to pass certificate to tce client, error details: {}", - e - ); + error!("Failed to send the Certificate to the TCE client: {}", e); } } diff --git a/crates/topos-core/src/api/graphql/subnet.rs b/crates/topos-core/src/api/graphql/subnet.rs index 2d9107528..e588d11e1 100644 --- a/crates/topos-core/src/api/graphql/subnet.rs +++ b/crates/topos-core/src/api/graphql/subnet.rs @@ -1,7 +1,7 @@ use async_graphql::NewType; use serde::{Deserialize, Serialize}; use std::str::FromStr; -use tracing::{error, warn}; +use tracing::error; use super::errors::GraphQLServerError; @@ -30,7 +30,7 @@ impl PartialEq for SubnetId { if let Ok(current) = crate::uci::SubnetId::from_str(&self.0) { other.as_array().eq(current.as_array()) } else { - warn!("Unexpected parsing error for subnet id during comparaison"); + error!("Failed to parse the subnet id {} during comparison", self.0); false } } diff --git a/crates/topos-p2p/src/error.rs b/crates/topos-p2p/src/error.rs index 44b939dcf..e9ac60fb7 100644 --- a/crates/topos-p2p/src/error.rs +++ b/crates/topos-p2p/src/error.rs @@ -24,7 +24,7 @@ pub enum P2PError { #[error(transparent)] CommandError(#[from] CommandExecutionError), - #[error("An error occured on the Transport layer: {0}")] + #[error("An error occurred on the Transport layer: {0}")] TransportError(#[from] TransportError), #[error("Unable to receive expected response of a oneshot channel")] diff --git a/crates/topos-sequencer/src/app_context.rs b/crates/topos-sequencer/src/app_context.rs index 3ff343556..55e540bb6 100644 --- a/crates/topos-sequencer/src/app_context.rs +++ b/crates/topos-sequencer/src/app_context.rs @@ -64,14 +64,14 @@ impl AppContext { match tce_evt { TceProxyEvent::TceServiceFailure | TceProxyEvent::WatchCertificatesChannelFailed => { // Unrecoverable failure in interaction with the TCE. Sequencer needs to be restarted - warn!( + error!( "Unrecoverable failure in sequencer <-> tce interaction. Shutting down sequencer \ sequencer..." ); if let Err(e) = self.shutdown().await { - warn!("Error happened during shutdown: {e:?}"); + warn!("Failed to shutdown: {e:?}"); } - warn!("Shutdown finished, restarting sequencer..."); + info!("Shutdown finished, restarting sequencer..."); return AppContextStatus::Restarting; }, _ => self.on_tce_proxy_event(tce_evt).await, @@ -82,7 +82,7 @@ impl AppContext { _ = shutdown.0.cancelled() => { info!("Shutting down Sequencer app context..."); if let Err(e) = self.shutdown().await { - error!("Error shutting down Sequencer app context: {e}"); + error!("Failed to shutdown the Sequencer app context: {e}"); } // Drop the sender to notify the Sequencer termination drop(shutdown.1); diff --git a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs index cfc762785..3b0892b79 100644 --- a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs +++ b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs @@ -12,7 +12,7 @@ use topos_core::{ }; use topos_crypto::messages::MessageSigner; use topos_metrics::DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, trace}; mod status; pub use status::Status; @@ -66,7 +66,10 @@ impl BroadcastState { }); if need_gossip { - warn!("๐Ÿ“ฃ Gossiping the Certificate {}", &state.certificate.id); + debug!( + "๐Ÿ“ฃ Gossiping the Certificate {} from the source subnet {}", + &state.certificate.id, &state.certificate.source_subnet_id + ); let _ = state.event_sender.try_send(ProtocolEvents::Gossip { cert: state.certificate.clone(), }); @@ -118,7 +121,7 @@ impl BroadcastState { } fn update_status(&mut self) -> Option { - // Nothing happened yet, we're in the initial state and didn't Procced + // Nothing happened yet, we're in the initial state and didn't process // any Echo or Ready messages // Sending our Echo message if let Status::Pending = self.status { @@ -133,9 +136,10 @@ impl BroadcastState { }); self.status = Status::EchoSent; - debug!( - "๐Ÿ“ Certificate {} is now {}", - &self.certificate.id, self.status + trace!( + "Certificate {} is now {}", + &self.certificate.id, + self.status ); return Some(self.status); } @@ -156,14 +160,15 @@ impl BroadcastState { validator_id: self.validator_id, }; if let Err(e) = self.event_sender.try_send(event) { - warn!("Error sending Ready message: {}", e); + error!("Failed to send the Ready message: {}", e); } self.status = self.status.ready_sent(); - debug!( - "๐Ÿ“ Certificate {} is now {}", - &self.certificate.id, self.status + trace!( + "Certificate {} is now {}", + &self.certificate.id, + self.status ); return Some(self.status); } @@ -173,9 +178,10 @@ impl BroadcastState { if !self.status.is_delivered() && self.reached_delivery_threshold() { self.status = self.status.delivered(); - debug!( - "๐Ÿ“ Certificate {} is now {}", - &self.certificate.id, self.status + trace!( + "Certificate {} is now {}", + &self.certificate.id, + self.status ); // Calculate delivery time let from = self.delivery_time; @@ -183,15 +189,10 @@ impl BroadcastState { let d = duration; info!( - "Certificate {} got delivered in {:?}", + "๐Ÿ“ Certificate delivered {} with broadcast duration: {:?}", self.certificate.id, d ); - debug!( - "๐Ÿ“ Accepted[{}]\t Delivery time: {:?}", - &self.certificate.id, d - ); - DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL.inc(); return Some(self.status); @@ -220,9 +221,11 @@ impl BroadcastState { None => false, }; - debug!( - "๐Ÿ“ Certificate {} reached Echo threshold: {} and Ready threshold: {}", - &self.certificate.id, reached_echo_threshold, reached_ready_threshold + trace!( + "Certificate {} reached Echo threshold: {} and Ready threshold: {}", + &self.certificate.id, + reached_echo_threshold, + reached_ready_threshold ); // If reached any of the Echo or Ready thresholds, I send the Ready reached_echo_threshold || reached_ready_threshold @@ -239,9 +242,10 @@ impl BroadcastState { None => false, }; - debug!( - "๐Ÿ“ Certificate {} reached Delivery threshold: {}", - &self.certificate.id, delivery_threshold + trace!( + "Certificate {} reached Delivery threshold: {}", + &self.certificate.id, + delivery_threshold ); delivery_threshold diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 702e67596..b3ac469bf 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -184,7 +184,7 @@ impl DoubleEcho { } else => { - warn!("Break the tokio loop for the double echo"); + debug!("Break the tokio loop for the double echo"); break None; } } @@ -194,7 +194,7 @@ impl DoubleEcho { info!("Shutting down p2p double echo..."); _ = sender.send(()); } else { - warn!("Shutting down p2p double echo due to error..."); + debug!("Shutting down p2p double echo due to error..."); } } } diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index c5f4921c6..edf569f52 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -22,9 +22,7 @@ use topos_tce_storage::store::ReadStore; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; use topos_tce_storage::PendingCertificateId; -use tracing::debug; -use tracing::error; -use tracing::warn; +use tracing::{debug, error, info, trace, warn}; pub mod task; @@ -55,7 +53,6 @@ pub struct TaskManager { pub validator_id: ValidatorId, pub validator_store: Arc, pub broadcast_sender: broadcast::Sender, - pub latest_pending_id: PendingCertificateId, } @@ -108,7 +105,7 @@ impl TaskManager { } } Err(error) => { - error!("Error while fetching pending certificates: {:?}", error); + error!("Failed to fetch the pending certificates: {:?}", error); } } } @@ -136,7 +133,7 @@ impl TaskManager { }; } DoubleEchoCommand::Broadcast { ref cert, need_gossip } => { - debug!("Received broadcast message for certificate {} ", cert.id); + trace!("Received broadcast message for certificate {} ", cert.id); self.create_task(cert, need_gossip) } @@ -146,25 +143,25 @@ impl TaskManager { Some((certificate_id, status)) = self.running_tasks.next() => { if let TaskStatus::Success = status { - debug!("Task for certificate {} finished successfully", certificate_id); + trace!("Task for certificate {} finished successfully", certificate_id); self.tasks.remove(&certificate_id); DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); } else { - debug!("Task for certificate {} finished unsuccessfully", certificate_id); + error!("Task for certificate {} finished unsuccessfully", certificate_id); } self.next_pending_certificate(); } _ = shutdown_receiver.cancelled() => { - warn!("Task Manager shutting down"); + info!("Task Manager shutting down"); - warn!("There are still {} active tasks", self.tasks.len()); + debug!("Remaining active tasks: {:?}", self.tasks.len()); if !self.tasks.is_empty() { debug!("Certificates still in broadcast: {:?}", self.tasks.keys()); } - warn!("There are still {} buffered messages", self.buffered_messages.len()); + warn!("Remaining buffered messages: {}", self.buffered_messages.len()); for task in self.tasks.iter() { task.1.shutdown_sender.send(()).await.unwrap(); } diff --git a/crates/topos-tce-broadcast/src/task_manager/task.rs b/crates/topos-tce-broadcast/src/task_manager/task.rs index dd96e52b5..ad707080f 100644 --- a/crates/topos-tce-broadcast/src/task_manager/task.rs +++ b/crates/topos-tce-broadcast/src/task_manager/task.rs @@ -9,7 +9,7 @@ use topos_tce_storage::errors::StorageError; use topos_tce_storage::store::{ReadStore, WriteStore}; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; -use tracing::warn; +use tracing::{debug, error}; use crate::double_echo::broadcast_state::{BroadcastState, Status}; use crate::{DoubleEchoCommand, TaskStatus}; @@ -88,9 +88,11 @@ impl IntoFuture for Task { Err(_) => return (self.certificate_id, TaskStatus::Failure), }; - warn!( - "Expected position for {} is {:?}", - self.certificate_id, expected_position + debug!( + "Expected position for Certificate {} is {:?} for the subnet {}", + self.certificate_id, + expected_position, + self.broadcast_state.certificate.source_subnet_id ); self.broadcast_state.expected_position = Some(expected_position); @@ -107,7 +109,7 @@ impl IntoFuture for Task { return (self.certificate_id, TaskStatus::Success); } Err(error) => { - tracing::error!("Unable to persist one delivered certificate: {:?}", error); + error!("Unable to persist one delivered certificate: {:?}", error); return (self.certificate_id, TaskStatus::Failure); } } @@ -123,7 +125,7 @@ impl IntoFuture for Task { return (self.certificate_id, TaskStatus::Success); } Err(error) => { - tracing::error!("Unable to persist one delivered certificate: {:?}", error); + error!("Unable to persist one delivered certificate: {:?}", error); return (self.certificate_id, TaskStatus::Failure); } } @@ -133,7 +135,7 @@ impl IntoFuture for Task { } } _ = self.shutdown_receiver.recv() => { - warn!("Received shutdown, shutting down task {:?}", self.certificate_id); + debug!("Received shutdown, shutting down task {:?}", self.certificate_id); return (self.certificate_id, TaskStatus::Failure) } } diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index d9fcc137a..1000e7f59 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -12,7 +12,7 @@ //! different pending pools and to manage them but also to access the [`FullNodeStore`] in order to //! persist or update [`Certificate`] or `streams`. //! -//! Pending pools and their behavior is decribed in the [`ValidatorPendingTables`] documentation. +//! Pending pools and their behavior are described in the [`ValidatorPendingTables`] documentation. //! use std::{ collections::HashMap, @@ -56,7 +56,7 @@ mod tables; /// /// The key point is that the [`ValidatorStore`] is managing the different pending pools using a [`ValidatorPendingTables`]. /// -/// Pending pools and how they behave is decribed in the [`ValidatorPendingTables`] documentation. +/// Pending pools and how they behave are described in the [`ValidatorPendingTables`] documentation. /// pub struct ValidatorStore { pub(crate) pending_tables: ValidatorPendingTables, @@ -96,7 +96,7 @@ impl ValidatorStore { .count_pending_certificates()? .try_into() .map_err(|error| { - warn!("Failed to convert estimate-num-keys to i64: {}", error); + error!("Failed to convert estimate-num-keys to i64: {}", error); StorageError::InternalStorage(InternalStorageError::UnexpectedDBState( "Failed to convert estimate-num-keys to i64", )) @@ -106,7 +106,7 @@ impl ValidatorStore { .count_precedence_pool_certificates()? .try_into() .map_err(|error| { - warn!("Failed to convert estimate-num-keys to i64: {}", error); + error!("Failed to convert estimate-num-keys to i64: {}", error); StorageError::InternalStorage(InternalStorageError::UnexpectedDBState( "Failed to convert estimate-num-keys to i64", )) @@ -360,7 +360,7 @@ impl ValidatorStore { .await?; debug!( - "Certificate Sync: unverified proof as been removed for {}", + "Certificate Sync: unverified proof has been removed for {}", certificate_id ); self.fullnode_store diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index 19a1ae1b0..deb282c38 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -107,7 +107,7 @@ impl AppContext { if let Some(timer) = self.delivery_latency.remove(&certificate_id) { let duration = timer.stop_and_record(); - warn!("Certificate delivered {} in {}s", certificate_id, duration); + info!("Certificate {} delivered with total latency: {}s", certificate_id, duration); } } @@ -135,7 +135,7 @@ impl AppContext { info!("Shutting down TCE app context..."); if let Err(e) = self.shutdown().await { - error!("Error shutting down TCE app context: {e}"); + error!("Failed to shutdown the TCE app context: {e}"); } // Drop the sender to notify the TCE termination drop(shutdown.1); diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs index 1d84ade6d..a645a3822 100644 --- a/crates/topos-tce/src/app_context/api.rs +++ b/crates/topos-tce/src/app_context/api.rs @@ -43,7 +43,7 @@ impl AppContext { InternalStorageError::CertificateAlreadyPending, )) => { debug!( - "Certificate {} has been already added to the pending pool, skipping", + "Certificate {} has already been added to the pending pool, skipping", certificate.id ); sender.send(Ok(PendingResult::AlreadyPending)) @@ -52,7 +52,7 @@ impl AppContext { InternalStorageError::CertificateAlreadyExists, )) => { debug!( - "Certificate {} has been already delivered, skipping", + "Certificate {} has already been delivered, skipping", certificate.id ); sender.send(Ok(PendingResult::AlreadyDelivered)) diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index f93a05116..32d8a078a 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -7,7 +7,7 @@ use tokio::spawn; use topos_metrics::CERTIFICATE_DELIVERY_LATENCY; use topos_p2p::Event as NetEvent; use topos_tce_broadcast::DoubleEchoCommand; -use tracing::{debug, error, trace}; +use tracing::{debug, error, info, trace}; use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready}; use topos_core::uci; @@ -36,8 +36,8 @@ impl AppContext { { entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer()); } - debug!( - "Received certificate {} from Gossip message from {}", + info!( + "Received certificate {} from GossipSub from {}", cert.id, from ); @@ -81,7 +81,7 @@ impl AppContext { } } Err(e) => { - error!("Error converting received certificate {e}"); + error!("Failed to parse the received Certificate: {e}"); } }, double_echo_request::Request::Echo(Echo { @@ -93,25 +93,21 @@ impl AppContext { spawn(async move { let certificate_id = certificate_id.clone().try_into().map_err(|e| { error!( - "Invalid certificate id, could not process Echo message: {e}, \ - certificate_id: {certificate_id}" + "Failed to parse the CertificateId {certificate_id} from Echo: {e}" ); e }); let validator_id = validator_id.clone().try_into().map_err(|e| { - error!( - "Invalid validator id, could not process Echo message: {e}, \ - validator_id: {validator_id}" - ); + error!("Failed to parse the ValidatorId {validator_id} from Echo: {e}"); e }); if let (Ok(certificate_id), Ok(validator_id)) = (certificate_id, validator_id) { - debug!( + trace!( "Received Echo message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from {from}", + validator_id: {validator_id} from: {from}", certificate_id = certificate_id, validator_id = validator_id ); @@ -123,7 +119,7 @@ impl AppContext { }) .await { - error!("Unable to pass received Echo message, {:?}", e); + error!("Unable to pass received Echo message: {:?}", e); } } else { error!("Unable to process Echo message due to invalid data"); @@ -139,24 +135,23 @@ impl AppContext { spawn(async move { let certificate_id = certificate_id.clone().try_into().map_err(|e| { error!( - "Invalid certificate id, could not process Ready message: {e}, \ - certificate_id: {certificate_id}" + "Failed to parse the CertificateId {certificate_id} from Ready: \ + {e}" ); e }); let validator_id = validator_id.clone().try_into().map_err(|e| { error!( - "Invalid validator id, could not process Ready message: {e}, \ - validator_id: {validator_id}" + "Failed to parse the ValidatorId {validator_id} from Ready: {e}" ); e }); if let (Ok(certificate_id), Ok(validator_id)) = (certificate_id, validator_id) { - debug!( + trace!( "Received Ready message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from {from}", + validator_id: {validator_id} from: {from}", certificate_id = certificate_id, validator_id = validator_id ); diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs index 57dd40068..278a13c0a 100644 --- a/crates/topos-tce/src/app_context/protocol.rs +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -26,7 +26,7 @@ impl AppContext { .publish(topos_p2p::TOPOS_GOSSIP, request) .await { - error!("Unable to send Gossip due to error: {e}"); + error!("Unable to send Gossip: {e}"); } } @@ -49,7 +49,7 @@ impl AppContext { .publish(topos_p2p::TOPOS_ECHO, request) .await { - error!("Unable to send Echo due to error: {e}"); + error!("Unable to send Echo: {e}"); } } @@ -71,7 +71,7 @@ impl AppContext { .publish(topos_p2p::TOPOS_READY, request) .await { - error!("Unable to send Ready due to error: {e}"); + error!("Unable to send Ready: {e}"); } } ProtocolEvents::BroadcastFailed { certificate_id } => { diff --git a/crates/topos/tests/node.rs b/crates/topos/tests/node.rs index 5d70cd1dc..f7a15aedd 100644 --- a/crates/topos/tests/node.rs +++ b/crates/topos/tests/node.rs @@ -91,7 +91,7 @@ mod serial_integration { if let Ok(code) = cmd.wait().await { assert!(code.success()); } else { - panic!("Failed to gracefull shutdown"); + panic!("Failed to shutdown gracefully"); } // Cleanup From 213b8d482cf6e08ec0f1cae0e9dfd981b156a98d Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Tue, 5 Mar 2024 18:22:14 +0100 Subject: [PATCH 8/8] refactor: store instantiation (#461) Signed-off-by: Simon Paitrault --- crates/topos-tce-storage/src/epoch/mod.rs | 6 ++-- crates/topos-tce-storage/src/epoch/tables.rs | 11 ++++--- crates/topos-tce-storage/src/fullnode/mod.rs | 18 ++++++++++- crates/topos-tce-storage/src/index/mod.rs | 6 ++-- .../src/tests/support/mod.rs | 10 +++---- crates/topos-tce-storage/src/validator/mod.rs | 11 +++++-- .../topos-tce-storage/src/validator/tables.rs | 10 +++---- crates/topos-tce/src/lib.rs | 30 +++---------------- crates/topos-test-sdk/src/storage/mod.rs | 15 +++++----- 9 files changed, 59 insertions(+), 58 deletions(-) diff --git a/crates/topos-tce-storage/src/epoch/mod.rs b/crates/topos-tce-storage/src/epoch/mod.rs index ec9f4f0e4..1704a705d 100644 --- a/crates/topos-tce-storage/src/epoch/mod.rs +++ b/crates/topos-tce-storage/src/epoch/mod.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::path::Path; use std::sync::Arc; use std::{collections::HashMap, sync::RwLock}; @@ -23,7 +23,7 @@ pub struct ValidatorPerEpochStore { } impl ValidatorPerEpochStore { - pub fn new(epoch_id: EpochId, path: PathBuf) -> Result, StorageError> { + pub fn new(epoch_id: EpochId, path: &Path) -> Result, StorageError> { let tables: ValidatorPerEpochTables = ValidatorPerEpochTables::open(epoch_id, path); let store = ArcSwap::from(Arc::new(Self { epoch_id, @@ -42,7 +42,7 @@ pub struct EpochValidatorsStore { } impl EpochValidatorsStore { - pub fn new(path: PathBuf) -> Result, StorageError> { + pub fn new(path: &Path) -> Result, StorageError> { let tables = EpochValidatorsTables::open(path); let store = Arc::new(Self { tables, diff --git a/crates/topos-tce-storage/src/epoch/tables.rs b/crates/topos-tce-storage/src/epoch/tables.rs index cfef710b2..00628c94b 100644 --- a/crates/topos-tce-storage/src/epoch/tables.rs +++ b/crates/topos-tce-storage/src/epoch/tables.rs @@ -1,4 +1,4 @@ -use std::{fs::create_dir_all, path::PathBuf}; +use std::{fs::create_dir_all, path::Path}; use rocksdb::ColumnFamilyDescriptor; use topos_core::uci::CertificateId; @@ -19,8 +19,8 @@ pub struct EpochValidatorsTables { } impl EpochValidatorsTables { - pub(crate) fn open(mut path: PathBuf) -> Self { - path.push("validators"); + pub(crate) fn open(path: &Path) -> Self { + let path = path.join("validators"); let mut options = rocksdb::Options::default(); options.create_if_missing(true); let db = init_db(&path, options).unwrap_or_else(|_| panic!("Cannot open DB at {:?}", path)); @@ -42,9 +42,8 @@ pub struct ValidatorPerEpochTables { } impl ValidatorPerEpochTables { - pub(crate) fn open(epoch_id: EpochId, mut path: PathBuf) -> Self { - path.push("epochs"); - path.push(epoch_id.to_string()); + pub(crate) fn open(epoch_id: EpochId, path: &Path) -> Self { + let path = path.join("epochs").join(epoch_id.to_string()); if !path.exists() { warn!("Path {:?} does not exist, creating it", path); create_dir_all(&path).expect("Cannot create ValidatorPerEpochTables directory"); diff --git a/crates/topos-tce-storage/src/fullnode/mod.rs b/crates/topos-tce-storage/src/fullnode/mod.rs index 600bfa625..ff668d6dd 100644 --- a/crates/topos-tce-storage/src/fullnode/mod.rs +++ b/crates/topos-tce-storage/src/fullnode/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, path::Path, sync::Arc}; use arc_swap::ArcSwap; use async_trait::async_trait; @@ -51,6 +51,22 @@ pub struct FullNodeStore { } impl FullNodeStore { + /// Try to create a new instance of [`FullNodeStore`] based on the given path + pub fn new(path: &Path) -> Result, StorageError> { + let perpetual_tables = Arc::new(ValidatorPerpetualTables::open(path)); + let index_tables = Arc::new(IndexTables::open(path)); + + let validators_store = EpochValidatorsStore::new(path)?; + + let epoch_store = ValidatorPerEpochStore::new(0, path)?; + + FullNodeStore::open( + epoch_store, + validators_store, + perpetual_tables, + index_tables, + ) + } pub fn open( epoch_store: ArcSwap, validators_store: Arc, diff --git a/crates/topos-tce-storage/src/index/mod.rs b/crates/topos-tce-storage/src/index/mod.rs index ed5b1c75d..f754c3d66 100644 --- a/crates/topos-tce-storage/src/index/mod.rs +++ b/crates/topos-tce-storage/src/index/mod.rs @@ -1,4 +1,4 @@ -use std::{fs::create_dir_all, path::PathBuf}; +use std::{fs::create_dir_all, path::Path}; use rocksdb::ColumnFamilyDescriptor; use topos_core::{ @@ -27,8 +27,8 @@ pub struct IndexTables { } impl IndexTables { - pub fn open(mut path: PathBuf) -> Self { - path.push("index"); + pub fn open(path: &Path) -> Self { + let path = path.join("index"); if !path.exists() { warn!("Path {:?} does not exist, creating it", path); create_dir_all(&path).expect("Cannot create IndexTables directory"); diff --git a/crates/topos-tce-storage/src/tests/support/mod.rs b/crates/topos-tce-storage/src/tests/support/mod.rs index 61c960b48..b5f436cb5 100644 --- a/crates/topos-tce-storage/src/tests/support/mod.rs +++ b/crates/topos-tce-storage/src/tests/support/mod.rs @@ -40,14 +40,14 @@ pub(crate) fn database_name() -> &'static str { #[fixture] pub(crate) fn store() -> Arc { let temp_dir = create_folder::default(); - let perpetual_tables = Arc::new(ValidatorPerpetualTables::open(temp_dir.clone())); - let index_tables = Arc::new(IndexTables::open(temp_dir.clone())); + let perpetual_tables = Arc::new(ValidatorPerpetualTables::open(&temp_dir)); + let index_tables = Arc::new(IndexTables::open(&temp_dir)); let participants_store = - EpochValidatorsStore::new(temp_dir.clone()).expect("Unable to create Participant store"); + EpochValidatorsStore::new(&temp_dir).expect("Unable to create Participant store"); let epoch_store = - ValidatorPerEpochStore::new(0, temp_dir.clone()).expect("Unable to create Per epoch store"); + ValidatorPerEpochStore::new(0, &temp_dir).expect("Unable to create Per epoch store"); let store = FullNodeStore::open( epoch_store, @@ -57,7 +57,7 @@ pub(crate) fn store() -> Arc { ) .expect("Unable to create full node store"); - ValidatorStore::open(temp_dir, store).unwrap() + ValidatorStore::open(&temp_dir, store).unwrap() } #[fixture] diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index 1000e7f59..272a17c9d 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -16,7 +16,7 @@ //! use std::{ collections::HashMap, - path::PathBuf, + path::Path, sync::{atomic::Ordering, Arc}, }; @@ -64,9 +64,16 @@ pub struct ValidatorStore { } impl ValidatorStore { + /// Try to create a new instance of [`ValidatorStore`] based on the given path + pub fn new(path: &Path) -> Result, StorageError> { + let fullnode_store = FullNodeStore::new(path)?; + + Self::open(path, fullnode_store) + } + /// Open a [`ValidatorStore`] at the given `path` and using the given [`FullNodeStore`] pub fn open( - path: PathBuf, + path: &Path, fullnode_store: Arc, ) -> Result, StorageError> { let pending_tables: ValidatorPendingTables = ValidatorPendingTables::open(path); diff --git a/crates/topos-tce-storage/src/validator/tables.rs b/crates/topos-tce-storage/src/validator/tables.rs index c6fa1193f..5770b72b2 100644 --- a/crates/topos-tce-storage/src/validator/tables.rs +++ b/crates/topos-tce-storage/src/validator/tables.rs @@ -1,6 +1,6 @@ use std::{ fs::create_dir_all, - path::PathBuf, + path::Path, sync::atomic::{AtomicU64, Ordering}, }; @@ -62,8 +62,8 @@ pub struct ValidatorPendingTables { impl ValidatorPendingTables { /// Open the [`ValidatorPendingTables`] at the given path. - pub fn open(mut path: PathBuf) -> Self { - path.push("pending"); + pub fn open(path: &Path) -> Self { + let path = path.join("pending"); if !path.exists() { warn!("Path {:?} does not exist, creating it", path); create_dir_all(&path).expect("Cannot create ValidatorPendingTables directory"); @@ -125,8 +125,8 @@ pub struct ValidatorPerpetualTables { } impl ValidatorPerpetualTables { - pub fn open(mut path: PathBuf) -> Self { - path.push("perpetual"); + pub fn open(path: &Path) -> Self { + let path = path.join("perpetual"); if !path.exists() { warn!("Path {:?} does not exist, creating it", path); create_dir_all(&path).expect("Cannot create ValidatorPerpetualTables directory"); diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 1e7e96489..34fe92d62 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -16,14 +16,7 @@ use topos_p2p::{ GrpcContext, GrpcRouter, }; use topos_tce_broadcast::{ReliableBroadcastClient, ReliableBroadcastConfig}; -use topos_tce_storage::{ - epoch::{EpochValidatorsStore, ValidatorPerEpochStore}, - fullnode::FullNodeStore, - index::IndexTables, - store::ReadStore, - validator::{ValidatorPerpetualTables, ValidatorStore}, - StorageClient, -}; +use topos_tce_storage::{store::ReadStore, validator::ValidatorStore, StorageClient}; use topos_tce_synchronizer::SynchronizerService; use tracing::{debug, info, warn}; @@ -108,26 +101,11 @@ pub async fn run( ))); }; - let perpetual_tables = Arc::new(ValidatorPerpetualTables::open(path.clone())); - let index_tables = Arc::new(IndexTables::open(path.clone())); - - let validators_store = EpochValidatorsStore::new(path.clone()) - .map_err(|error| format!("Unable to create EpochValidators store: {error}"))?; - - let epoch_store = ValidatorPerEpochStore::new(0, path.clone()) - .map_err(|error| format!("Unable to create Per epoch store: {error}"))?; - - let fullnode_store = FullNodeStore::open( - epoch_store, - validators_store, - perpetual_tables, - index_tables, - ) - .map_err(|error| format!("Unable to create Fullnode store: {error}"))?; - - let validator_store = ValidatorStore::open(path.clone(), fullnode_store.clone()) + let validator_store = ValidatorStore::new(path) .map_err(|error| format!("Unable to create validator store: {error}"))?; + let fullnode_store = validator_store.get_fullnode_store(); + let certificates_synced = fullnode_store .count_certificates_delivered() .map_err(|error| format!("Unable to count certificates delivered: {error}"))?; diff --git a/crates/topos-test-sdk/src/storage/mod.rs b/crates/topos-test-sdk/src/storage/mod.rs index 704e8f70f..f8416809e 100644 --- a/crates/topos-test-sdk/src/storage/mod.rs +++ b/crates/topos-test-sdk/src/storage/mod.rs @@ -36,7 +36,7 @@ pub async fn create_validator_store( let fullnode_store = create_fullnode_store.await; let store = - ValidatorStore::open(temp_dir, fullnode_store).expect("Unable to create validator store"); + ValidatorStore::open(&temp_dir, fullnode_store).expect("Unable to create validator store"); store .insert_certificates_delivered(&certificates) @@ -49,21 +49,22 @@ pub async fn create_validator_store( pub async fn create_validator_store_with_fullnode( fullnode_store: Arc, ) -> Arc { - ValidatorStore::open(create_folder::default(), fullnode_store) + ValidatorStore::open(&create_folder::default(), fullnode_store) .expect("Unable to create validator store") } + #[fixture(certificates = Vec::new())] pub async fn create_fullnode_store(certificates: Vec) -> Arc { let temp_dir = create_folder::default(); - let perpetual_tables = Arc::new(ValidatorPerpetualTables::open(temp_dir.clone())); - let index_tables = Arc::new(IndexTables::open(temp_dir.clone())); + let perpetual_tables = Arc::new(ValidatorPerpetualTables::open(&temp_dir)); + let index_tables = Arc::new(IndexTables::open(&temp_dir)); - let validators_store = EpochValidatorsStore::new(temp_dir.clone()) - .expect("Unable to create EpochValidators store"); + let validators_store = + EpochValidatorsStore::new(&temp_dir).expect("Unable to create EpochValidators store"); let epoch_store = - ValidatorPerEpochStore::new(0, temp_dir).expect("Unable to create Per epoch store"); + ValidatorPerEpochStore::new(0, &temp_dir).expect("Unable to create Per epoch store"); let store = FullNodeStore::open( epoch_store,