diff --git a/Cargo.lock b/Cargo.lock index 1bbc0b9c0..62324c828 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7192,10 +7192,14 @@ version = "0.1.0" dependencies = [ "byteorder", "eth-keystore", + "ethers", "hex", "keccak-hash", + "rstest", "secp256k1", "thiserror", + "topos-core", + "topos-tce-transport", ] [[package]] @@ -7314,6 +7318,7 @@ dependencies = [ "clap 4.4.4", "cucumber", "futures", + "hex", "hyper", "libp2p", "opentelemetry", @@ -7331,6 +7336,7 @@ dependencies = [ "tokio-util", "tonic 0.9.2", "topos-core", + "topos-crypto", "topos-metrics", "topos-p2p", "topos-tce-api", @@ -7399,6 +7405,7 @@ dependencies = [ "criterion", "env_logger 0.10.0", "futures", + "hex", "lazy_static", "rand", "rand_core 0.6.4", @@ -7409,6 +7416,7 @@ dependencies = [ "tokio", "tokio-stream", "topos-core", + "topos-crypto", "topos-metrics", "topos-p2p", "topos-tce-storage", @@ -7531,8 +7539,11 @@ name = "topos-tce-transport" version = "0.1.0" dependencies = [ "clap 4.4.4", + "hex", "serde", + "thiserror", "topos-core", + "topos-crypto", "topos-p2p", "tracing", ] @@ -7553,6 +7564,7 @@ name = "topos-test-sdk" version = "0.1.0" dependencies = [ "futures", + "hex", "lazy_static", "libp2p", "proc_macro_sdk", @@ -7563,6 +7575,7 @@ dependencies = [ "tokio-util", "tonic 0.9.2", "topos-core", + "topos-crypto", "topos-p2p", "topos-tce", "topos-tce-api", diff --git a/crates/topos-crypto/Cargo.toml b/crates/topos-crypto/Cargo.toml index 513d902dc..c5b4c4413 100644 --- a/crates/topos-crypto/Cargo.toml +++ b/crates/topos-crypto/Cargo.toml @@ -12,6 +12,13 @@ secp256k1.workspace = true byteorder.workspace = true hex.workspace = true thiserror.workspace = true +ethers.workspace = true keccak-hash = "0.10.0" eth-keystore = "0.5.0" + +[dev-dependencies] +rstest.workspace = true +topos-tce-transport = { path = "../topos-tce-transport" } +topos-core = { path = "../topos-core" } +ethers.workspace = true diff --git a/crates/topos-crypto/src/lib.rs b/crates/topos-crypto/src/lib.rs index a14b004aa..e56aaa0e7 100644 --- a/crates/topos-crypto/src/lib.rs +++ b/crates/topos-crypto/src/lib.rs @@ -3,6 +3,7 @@ use thiserror::Error; pub mod hash; pub mod keys; pub mod keystore; +pub mod messages; pub mod signatures; #[derive(Debug, Error)] diff --git a/crates/topos-crypto/src/messages.rs b/crates/topos-crypto/src/messages.rs new file mode 100644 index 000000000..eeada9f2f --- /dev/null +++ b/crates/topos-crypto/src/messages.rs @@ -0,0 +1,50 @@ +use ethers::signers::Signer; +use ethers::signers::{LocalWallet, WalletError}; +use ethers::types::{RecoveryMessage, SignatureError}; +use ethers::utils::hash_message; +use std::sync::Arc; +use thiserror::Error; + +pub use ethers::types::{Address, Signature, H160}; + +#[derive(Error, Debug)] +pub enum MessageSignerError { + #[error("Unable to parse private key")] + PrivateKeyParsing, +} + +#[derive(Debug)] +pub struct MessageSigner { + pub public_address: Address, + wallet: LocalWallet, +} + +impl MessageSigner { + pub fn new(private_key: &str) -> Result, MessageSignerError> { + let wallet: LocalWallet = private_key + .parse() + .map_err(|_| MessageSignerError::PrivateKeyParsing)?; + + Ok(Arc::new(Self { + public_address: wallet.address(), + wallet, + })) + } + + pub fn sign_message(&self, payload: &[u8]) -> Result { + let hash = hash_message(payload); + + LocalWallet::sign_hash(&self.wallet, hash) + } + + pub fn verify_signature( + &self, + signature: Signature, + payload: &[u8], + public_key: Address, + ) -> Result<(), SignatureError> { + let message: RecoveryMessage = payload.into(); + + signature.verify(message, public_key) + } +} diff --git a/crates/topos-crypto/tests/messages.rs b/crates/topos-crypto/tests/messages.rs new file mode 100644 index 000000000..05f961102 --- /dev/null +++ b/crates/topos-crypto/tests/messages.rs @@ -0,0 +1,63 @@ +use rstest::*; +use topos_core::uci::CertificateId; +use topos_crypto::messages::MessageSigner; +use topos_tce_transport::ValidatorId; + +#[rstest] +pub fn test_signing_messages() { + let message_signer_sender = + MessageSigner::new("122f3ae6ade1fd136b292cea4f6243c7811160352c8821528547a1fe7c459daf") + .unwrap(); + let validator_id_sender = ValidatorId::from(message_signer_sender.public_address); + let certificate_id = CertificateId::from_array([0u8; 32]); + + let mut payload = Vec::new(); + payload.extend_from_slice(certificate_id.as_array()); + payload.extend_from_slice(validator_id_sender.as_bytes()); + + let signature = message_signer_sender + .sign_message(&payload) + .expect("Cannot create Signature"); + + let message_signer_receiver = + MessageSigner::new("a2e33a9bad88f7b7568228f51d5274c471a9217162d46f1533b6a290f0be1baf") + .unwrap(); + + let verify = message_signer_receiver.verify_signature( + signature, + &payload, + validator_id_sender.address(), + ); + + assert!(verify.is_ok()); +} + +#[rstest] +pub fn fails_to_verify_with_own_public_address() { + let message_signer_sender = + MessageSigner::new("122f3ae6ade1fd136b292cea4f6243c7811160352c8821528547a1fe7c459daf") + .unwrap(); + let validator_id_sender = ValidatorId::from(message_signer_sender.public_address); + let certificate_id = CertificateId::from_array([0u8; 32]); + + let mut payload = Vec::new(); + payload.extend_from_slice(certificate_id.as_array()); + payload.extend_from_slice(validator_id_sender.as_bytes()); + + let signature = message_signer_sender + .sign_message(&payload) + .expect("Cannot create Signature"); + + let message_signer_receiver = + MessageSigner::new("a2e33a9bad88f7b7568228f51d5274c471a9217162d46f1533b6a290f0be1baf") + .unwrap(); + let validator_id_receiver = ValidatorId::from(message_signer_receiver.public_address); + + let verify = message_signer_receiver.verify_signature( + signature, + &payload, + validator_id_receiver.address(), + ); + + assert!(verify.is_err()); +} diff --git a/crates/topos-sequencer-subnet-client/src/lib.rs b/crates/topos-sequencer-subnet-client/src/lib.rs index 07f390ba2..9762d9ce9 100644 --- a/crates/topos-sequencer-subnet-client/src/lib.rs +++ b/crates/topos-sequencer-subnet-client/src/lib.rs @@ -3,7 +3,7 @@ pub mod subnet_contract; use crate::subnet_contract::{create_topos_core_contract_from_json, get_block_events}; use ethers::abi::ethabi::ethereum_types::{H160, U256}; use ethers::core::k256::ecdsa::SigningKey; -use ethers::prelude::Wallet; +use ethers::signers::Wallet; use ethers::types::TransactionReceipt; use ethers::{ abi::Token, diff --git a/crates/topos-sequencer-subnet-client/src/subnet_contract.rs b/crates/topos-sequencer-subnet-client/src/subnet_contract.rs index 7b3722fd9..8adbf74bb 100644 --- a/crates/topos-sequencer-subnet-client/src/subnet_contract.rs +++ b/crates/topos-sequencer-subnet-client/src/subnet_contract.rs @@ -1,7 +1,7 @@ use crate::{Error, SubnetEvent}; use ethers::abi::ethabi::ethereum_types::{H160, U64}; use ethers::contract::ContractError; -use ethers::prelude::LocalWallet; +use ethers::signers::LocalWallet; use ethers::{ prelude::abigen, providers::{Middleware, Provider, Ws}, diff --git a/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs b/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs index c25f2d0f1..6e0acea7a 100644 --- a/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs +++ b/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs @@ -3,8 +3,6 @@ use dockertest::{ Composition, DockerTest, Image, LogAction, LogOptions, LogPolicy, LogSource, PullPolicy, Source, }; -use ethers::prelude::Block; -use ethers::types::H256; use ethers::{ abi::{ethabi::ethereum_types::U256, Address}, contract::abigen, @@ -14,6 +12,7 @@ use ethers::{ prelude::Wallet, providers::{Http, Middleware, Provider}, signers::{LocalWallet, Signer}, + types::{Block, H256}, }; use rstest::*; use serial_test::serial; diff --git a/crates/topos-tce-api/tests/runtime.rs b/crates/topos-tce-api/tests/runtime.rs index 48eb19ffb..09a007fe9 100644 --- a/crates/topos-tce-api/tests/runtime.rs +++ b/crates/topos-tce-api/tests/runtime.rs @@ -29,7 +29,7 @@ use topos_test_sdk::storage::{create_fullnode_store, storage_client}; use topos_test_sdk::tce::public_api::{broadcast_stream, create_public_api, PublicApiContext}; #[rstest] -#[timeout(Duration::from_secs(1))] +#[timeout(Duration::from_secs(4))] #[test(tokio::test)] async fn runtime_can_dispatch_a_cert( #[future] create_public_api: (PublicApiContext, impl Stream), @@ -104,7 +104,7 @@ async fn runtime_can_dispatch_a_cert( } #[rstest] -#[timeout(Duration::from_secs(2))] +#[timeout(Duration::from_secs(4))] #[test(tokio::test)] async fn can_catchup_with_old_certs( #[with(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 15)] @@ -191,7 +191,7 @@ async fn can_catchup_with_old_certs( } #[rstest] -#[timeout(Duration::from_secs(2))] +#[timeout(Duration::from_secs(4))] #[test(tokio::test)] async fn can_catchup_with_old_certs_with_position( broadcast_stream: broadcast::Receiver, @@ -323,7 +323,7 @@ async fn can_catchup_with_old_certs_with_position( async fn can_listen_for_multiple_subnet_id() {} #[rstest] -#[timeout(Duration::from_secs(2))] +#[timeout(Duration::from_secs(4))] #[test(tokio::test)] async fn boots_healthy_graphql_server( broadcast_stream: broadcast::Receiver, @@ -369,7 +369,7 @@ async fn boots_healthy_graphql_server( } #[rstest] -#[timeout(Duration::from_secs(2))] +#[timeout(Duration::from_secs(4))] #[test(tokio::test)] async fn graphql_server_enables_cors( broadcast_stream: broadcast::Receiver, @@ -438,7 +438,7 @@ async fn graphql_server_enables_cors( } #[rstest] -#[timeout(Duration::from_secs(2))] +#[timeout(Duration::from_secs(4))] #[test(tokio::test)] async fn can_query_graphql_endpoint_for_certificates( broadcast_stream: broadcast::Receiver, diff --git a/crates/topos-tce-broadcast/Cargo.toml b/crates/topos-tce-broadcast/Cargo.toml index d7e93a5a1..4b5fe2e29 100644 --- a/crates/topos-tce-broadcast/Cargo.toml +++ b/crates/topos-tce-broadcast/Cargo.toml @@ -18,12 +18,12 @@ tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true, features = ["sync"] } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } tracing.workspace = true - tce_transport = { package = "topos-tce-transport", path = "../topos-tce-transport"} topos-core = { workspace = true, features = ["uci"] } topos-p2p = { path = "../topos-p2p/"} topos-metrics = { path = "../topos-metrics/" } topos-tce-storage = { path = "../topos-tce-storage/" } +topos-crypto = { path = "../topos-crypto" } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_futures", "async_tokio"] } @@ -31,7 +31,7 @@ test-log.workspace = true env_logger.workspace = true rstest.workspace = true rand.workspace = true - +hex.workspace = true topos-test-sdk = { path = "../topos-test-sdk/" } [features] diff --git a/crates/topos-tce-broadcast/benches/task_manager.rs b/crates/topos-tce-broadcast/benches/task_manager.rs index 7b3e9eb2e..4a99ee11f 100644 --- a/crates/topos-tce-broadcast/benches/task_manager.rs +++ b/crates/topos-tce-broadcast/benches/task_manager.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use std::sync::Arc; -use tce_transport::ReliableBroadcastParams; +use tce_transport::{ReliableBroadcastParams, ValidatorId}; use tokio::sync::{broadcast, mpsc, oneshot}; +use topos_crypto::messages::MessageSigner; use topos_tce_broadcast::double_echo::DoubleEcho; use topos_tce_broadcast::sampler::SubscriptionsView; use topos_tce_storage::validator::ValidatorStore; @@ -9,6 +10,7 @@ use topos_test_sdk::certificates::create_certificate_chain; use topos_test_sdk::constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1}; const CHANNEL_SIZE: usize = 256_000; +const PRIVATE_KEY: &str = "47d361f6becb933a77d7e01dee7b1c1859b656adbd8428bf7bf9519503e5d5d6"; struct TceParams { nb_peers: usize, @@ -34,13 +36,21 @@ pub async fn processing_double_echo(n: u64, validator_store: Arc }, }; + let message_signer: Arc = MessageSigner::new(PRIVATE_KEY).unwrap(); + + let mut validators = HashSet::new(); + let validator_id = ValidatorId::from(message_signer.public_address); + validators.insert(validator_id); + let mut double_echo = DoubleEcho::new( params.broadcast_params, + validator_id, + message_signer.clone(), + validators, task_manager_message_sender.clone(), cmd_receiver, event_sender, double_echo_shutdown_receiver, - 0, validator_store, broadcast_sender, ); @@ -93,12 +103,24 @@ pub async fn processing_double_echo(n: u64, validator_store: Arc } for cert in &certificates { + let mut payload = Vec::new(); + payload.extend_from_slice(cert.certificate.id.as_array()); + payload.extend_from_slice(validator_id.as_bytes()); + for p in &double_echo_selected_echo { - double_echo.handle_echo(*p, cert.certificate.id).await; + let signature = message_signer.sign_message(&payload).unwrap(); + + double_echo + .handle_echo(*p, cert.certificate.id, validator_id, signature) + .await; } for p in &double_echo_selected_ready { - double_echo.handle_ready(*p, cert.certificate.id).await; + let signature = message_signer.sign_message(&payload).unwrap(); + + double_echo + .handle_ready(*p, cert.certificate.id, validator_id, signature) + .await; } } diff --git a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs index 0b9d8d062..25c686cdf 100644 --- a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs +++ b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs @@ -1,8 +1,10 @@ +use crate::sampler::SubscriptionsView; +use std::sync::Arc; use std::{collections::HashSet, time}; - -use tce_transport::ProtocolEvents; +use tce_transport::{ProtocolEvents, ValidatorId}; use tokio::sync::mpsc; use topos_core::uci::Certificate; +use topos_crypto::messages::MessageSigner; use topos_metrics::DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL; use topos_p2p::PeerId; use topos_tce_storage::{ @@ -10,9 +12,6 @@ use topos_tce_storage::{ Position, }; use tracing::{debug, info, warn}; - -use crate::sampler::SubscriptionsView; - mod status; pub use status::Status; @@ -22,9 +21,11 @@ pub struct BroadcastState { subscriptions_view: SubscriptionsView, status: Status, pub(crate) certificate: Certificate, + validator_id: ValidatorId, echo_threshold: usize, ready_threshold: usize, delivery_threshold: usize, + message_signer: Arc, event_sender: mpsc::Sender, delivery_time: time::Instant, readies: HashSet, @@ -32,22 +33,27 @@ pub struct BroadcastState { } impl BroadcastState { + #[allow(clippy::too_many_arguments)] pub fn new( certificate: Certificate, + validator_id: ValidatorId, echo_threshold: usize, ready_threshold: usize, delivery_threshold: usize, event_sender: mpsc::Sender, subscriptions_view: SubscriptionsView, need_gossip: bool, + message_signer: Arc, ) -> Self { let mut state = Self { subscriptions_view, status: Status::Pending, certificate, + validator_id, echo_threshold, ready_threshold, delivery_threshold, + message_signer, event_sender, delivery_time: time::Instant::now(), readies: HashSet::new(), @@ -114,8 +120,14 @@ impl BroadcastState { // any Echo or Ready messages // Sending our Echo message if let Status::Pending = self.status { - _ = self.event_sender.try_send(ProtocolEvents::Echo { + let mut payload = Vec::new(); + payload.extend_from_slice(self.certificate.id.as_array()); + payload.extend_from_slice(self.validator_id.as_bytes()); + + let _ = self.event_sender.try_send(ProtocolEvents::Echo { certificate_id: self.certificate.id, + signature: self.message_signer.sign_message(&payload).ok()?, + validator_id: self.validator_id, }); self.status = Status::EchoSent; @@ -132,8 +144,14 @@ impl BroadcastState { // If the status was EchoSent, we update it to ReadySent // If the status was Delivered, we update it to DeliveredWithReadySent if !self.status.is_ready_sent() && self.reached_ready_threshold() { + let mut payload = Vec::new(); + payload.extend_from_slice(self.certificate.id.as_array()); + payload.extend_from_slice(self.validator_id.as_bytes()); + let event = ProtocolEvents::Ready { certificate_id: self.certificate.id, + signature: self.message_signer.sign_message(&payload).ok()?, + validator_id: self.validator_id, }; if let Err(e) = self.event_sender.try_send(event) { warn!("Error sending Ready message: {}", e); diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 5af5494d4..5b5604800 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -2,10 +2,10 @@ use crate::TaskStatus; use crate::{DoubleEchoCommand, SubscriptionsView}; use std::collections::HashSet; use std::sync::Arc; -use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; +use tce_transport::{ProtocolEvents, ReliableBroadcastParams, ValidatorId}; use tokio::sync::{broadcast, mpsc, oneshot}; use topos_core::uci::{Certificate, CertificateId}; - +use topos_crypto::messages::{MessageSigner, Signature}; use topos_p2p::PeerId; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; @@ -28,6 +28,12 @@ pub struct DoubleEcho { task_manager_message_sender: mpsc::Sender, /// The overview of the network, which holds echo and ready subscriptions and the network size pub subscriptions: SubscriptionsView, + /// Local node ValidatorId + pub validator_id: ValidatorId, + /// Keypair to sign and verify ECHO and READY messages + pub message_signer: Arc, + /// List of approved validators through smart contract and/or genesis + pub validators: HashSet, pub validator_store: Arc, pub broadcast_sender: broadcast::Sender, } @@ -38,16 +44,21 @@ impl DoubleEcho { #[allow(clippy::too_many_arguments)] pub fn new( params: ReliableBroadcastParams, + validator_id: ValidatorId, + message_signer: Arc, + validators: HashSet, task_manager_message_sender: mpsc::Sender, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, shutdown: mpsc::Receiver>, - _pending_certificate_count: usize, validator_store: Arc, broadcast_sender: broadcast::Sender, ) -> Self { Self { params, + validator_id, + message_signer, + validators, task_manager_message_sender, command_receiver, event_sender, @@ -72,7 +83,9 @@ impl DoubleEcho { task_completion_sender, subscriptions_view_receiver, self.event_sender.clone(), + self.validator_id, self.params.clone(), + self.message_signer.clone(), self.validator_store.clone(), self.broadcast_sender.clone(), ); @@ -95,6 +108,8 @@ impl DoubleEcho { task_completion_sender, subscriptions_view_receiver, self.event_sender.clone(), + self.validator_id, + self.message_signer.clone(), self.params.clone(), self.validator_store.clone(), ); @@ -145,8 +160,38 @@ impl DoubleEcho { command if self.subscriptions.is_some() => { match command { - DoubleEchoCommand::Echo { from_peer, certificate_id } => self.handle_echo(from_peer, certificate_id).await, - DoubleEchoCommand::Ready { from_peer, certificate_id } => self.handle_ready(from_peer, certificate_id).await, + DoubleEchoCommand::Echo { from_peer, certificate_id, validator_id, signature } => { + // Check if source is part of known_validators + if !self.validators.contains(&validator_id) { + return error!("ECHO message comes from non-validator: {}", validator_id); + } + + let mut payload = Vec::new(); + payload.extend_from_slice(certificate_id.as_array()); + payload.extend_from_slice(validator_id.as_bytes()); + + if let Err(e) = self.message_signer.verify_signature(signature, &payload, validator_id.address()) { + return error!("ECHO messag signature cannot be verified from: {}", e); + } + + self.handle_echo(from_peer, certificate_id, validator_id, signature).await + }, + DoubleEchoCommand::Ready { from_peer, certificate_id, validator_id, signature } => { + // Check if source is part of known_validators + if !self.validators.contains(&validator_id) { + return error!("READY message comes from non-validator: {}", validator_id); + } + + let mut payload = Vec::new(); + payload.extend_from_slice(certificate_id.as_array()); + payload.extend_from_slice(validator_id.as_bytes()); + + if let Err(e) = self.message_signer.verify_signature(signature, &payload, validator_id.address()) { + return error!("READY message signature cannot be verified from: {}", e); + } + + self.handle_ready(from_peer, certificate_id, validator_id, signature).await + }, _ => {} } @@ -167,7 +212,7 @@ impl DoubleEcho { warn!("Break the tokio loop for the double echo"); break None; } - }; + } }; if let Some(sender) = shutdowned { @@ -259,25 +304,41 @@ impl DoubleEcho { } impl DoubleEcho { - pub async fn handle_echo(&mut self, from_peer: PeerId, certificate_id: CertificateId) { + pub async fn handle_echo( + &mut self, + from_peer: PeerId, + certificate_id: CertificateId, + validator_id: ValidatorId, + signature: Signature, + ) { if self.delivered_certificates.get(&certificate_id).is_none() { let _ = self .task_manager_message_sender .send(DoubleEchoCommand::Echo { from_peer, + validator_id, certificate_id, + signature, }) .await; } } - pub async fn handle_ready(&mut self, from_peer: PeerId, certificate_id: CertificateId) { + pub async fn handle_ready( + &mut self, + from_peer: PeerId, + certificate_id: CertificateId, + validator_id: ValidatorId, + signature: Signature, + ) { if self.delivered_certificates.get(&certificate_id).is_none() { let _ = self .task_manager_message_sender .send(DoubleEchoCommand::Ready { from_peer, + validator_id, certificate_id, + signature, }) .await; } diff --git a/crates/topos-tce-broadcast/src/lib.rs b/crates/topos-tce-broadcast/src/lib.rs index 30bf8a2b7..4daa3f3e6 100644 --- a/crates/topos-tce-broadcast/src/lib.rs +++ b/crates/topos-tce-broadcast/src/lib.rs @@ -3,22 +3,19 @@ //! Abstracted from actual transport implementation. //! Abstracted from actual storage implementation. +use double_echo::DoubleEcho; +use futures::Stream; +use sampler::SampleType; use std::collections::HashSet; use std::sync::Arc; - -use sampler::SampleType; +use tce_transport::{ProtocolEvents, ReliableBroadcastParams, ValidatorId}; use thiserror::Error; use tokio::spawn; -use tokio_stream::wrappers::ReceiverStream; - -use futures::Stream; use tokio::sync::mpsc::Sender; use tokio::sync::{broadcast, mpsc, oneshot}; - -use double_echo::DoubleEcho; -use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; - +use tokio_stream::wrappers::ReceiverStream; use topos_core::uci::{Certificate, CertificateId}; +use topos_crypto::messages::{MessageSigner, Signature}; use topos_metrics::DOUBLE_ECHO_COMMAND_CHANNEL_CAPACITY_TOTAL; use topos_p2p::PeerId; use topos_tce_storage::types::CertificateDeliveredWithPositions; @@ -54,6 +51,9 @@ pub enum TaskStatus { /// Configuration of TCE implementation pub struct ReliableBroadcastConfig { pub tce_params: ReliableBroadcastParams, + pub validator_id: ValidatorId, + pub validators: HashSet, + pub message_signer: Arc, } #[derive(Debug)] @@ -84,22 +84,26 @@ pub enum DoubleEchoCommand { /// When echo reply received Echo { from_peer: PeerId, + validator_id: ValidatorId, certificate_id: CertificateId, + signature: Signature, }, /// When ready reply received Ready { from_peer: PeerId, + validator_id: ValidatorId, certificate_id: CertificateId, + signature: Signature, }, } /// Thread safe client to the protocol aggregate #[derive(Clone, Debug)] pub struct ReliableBroadcastClient { - command_sender: mpsc::Sender, - pub(crate) subscriptions_view_sender: mpsc::Sender, - pub(crate) double_echo_shutdown_channel: mpsc::Sender>, + command_sender: Sender, + pub(crate) subscriptions_view_sender: Sender, + pub(crate) double_echo_shutdown_channel: Sender>, } impl ReliableBroadcastClient { @@ -109,7 +113,6 @@ impl ReliableBroadcastClient { /// Aggregate is spawned as new task. pub async fn new( config: ReliableBroadcastConfig, - _local_peer_id: String, validator_store: Arc, broadcast_sender: broadcast::Sender, ) -> (Self, impl Stream) { @@ -123,15 +126,15 @@ impl ReliableBroadcastClient { let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(*constant::BROADCAST_TASK_MANAGER_CHANNEL_SIZE); - let pending_certificate_count = validator_store.count_pending_certificates().unwrap_or(0); - let double_echo = DoubleEcho::new( config.tce_params, + config.validator_id, + config.message_signer, + config.validators, task_manager_message_sender, command_receiver, event_sender, double_echo_shutdown_receiver, - pending_certificate_count, validator_store.clone(), broadcast_sender, ); @@ -223,6 +226,9 @@ pub enum Errors { #[error("Requested digest not found for certificate {0:?}")] DigestNotFound(CertificateId), + #[error("Cannot create public address from private key")] + ProducePublicAddress, + #[error("Unable to execute shutdown for the reliable broadcast: {0}")] ShutdownCommunication(mpsc::error::SendError>), } diff --git a/crates/topos-tce-broadcast/src/sampler/mod.rs b/crates/topos-tce-broadcast/src/sampler/mod.rs index c7bf40a51..fe8474cf4 100644 --- a/crates/topos-tce-broadcast/src/sampler/mod.rs +++ b/crates/topos-tce-broadcast/src/sampler/mod.rs @@ -34,15 +34,4 @@ impl SubscriptionsView { pub fn is_none(&self) -> bool { self.echo.is_empty() && self.ready.is_empty() } - - /// Current view of subscriptions of the node, which is initially the whole network - pub fn get_subscriptions(&self) -> Vec { - self.echo - .iter() - .chain(self.ready.iter()) - .cloned() - .collect::>() - .into_iter() - .collect() - } } diff --git a/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs b/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs index cc9f735b3..98187b06e 100644 --- a/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs @@ -1,10 +1,9 @@ use std::collections::HashMap; use std::sync::Arc; - +use tce_transport::{ProtocolEvents, ReliableBroadcastParams, ValidatorId}; use tokio::{spawn, sync::mpsc}; - -use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; use topos_core::uci::CertificateId; +use topos_crypto::messages::MessageSigner; use tracing::warn; pub mod task; @@ -33,17 +32,22 @@ pub struct TaskManager { pub event_sender: mpsc::Sender, pub tasks: HashMap, pub buffered_messages: HashMap>, + pub validator_id: ValidatorId, + pub message_signer: Arc, pub thresholds: ReliableBroadcastParams, pub shutdown_sender: mpsc::Sender<()>, pub validator_store: Arc, } impl TaskManager { + #[allow(clippy::too_many_arguments)] pub fn new( message_receiver: mpsc::Receiver, notify_task_completion: mpsc::Sender<(CertificateId, TaskStatus)>, subscription_view_receiver: mpsc::Receiver, event_sender: mpsc::Sender, + validator_id: ValidatorId, + message_signer: Arc, thresholds: ReliableBroadcastParams, validator_store: Arc, ) -> (Self, mpsc::Receiver<()>) { @@ -62,6 +66,8 @@ impl TaskManager { event_sender, tasks: HashMap::new(), buffered_messages: Default::default(), + validator_id, + message_signer, thresholds, shutdown_sender, validator_store, @@ -93,12 +99,14 @@ impl TaskManager { std::collections::hash_map::Entry::Vacant(entry) => { let broadcast_state = BroadcastState::new( cert.clone(), + self.validator_id, self.thresholds.echo_threshold, self.thresholds.ready_threshold, self.thresholds.delivery_threshold, self.event_sender.clone(), self.subscriptions.clone(), need_gossip, + self.message_signer.clone(), ); let (task, task_context) = Task::new(cert.id, self.task_completion_sender.clone(), broadcast_state, self.validator_store.clone()); diff --git a/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs b/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs index 50b09b4b0..72f7a463d 100644 --- a/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::future::IntoFuture; use std::pin::Pin; use std::sync::Arc; -use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; +use tce_transport::{ProtocolEvents, ReliableBroadcastParams, ValidatorId}; use tokio::sync::broadcast; use tokio::{spawn, sync::mpsc}; use topos_core::uci::CertificateId; @@ -25,6 +25,7 @@ use crate::sampler::SubscriptionsView; use crate::DoubleEchoCommand; use crate::TaskStatus; use task::{Task, TaskContext}; +use topos_crypto::messages::MessageSigner; type RunningTasks = FuturesUnordered + Send + 'static>>>; @@ -39,10 +40,12 @@ pub struct TaskManager { pub subscriptions: SubscriptionsView, pub event_sender: mpsc::Sender, pub tasks: HashMap, + pub message_signer: Arc, #[allow(clippy::type_complexity)] pub running_tasks: RunningTasks, pub buffered_messages: HashMap>, pub thresholds: ReliableBroadcastParams, + pub validator_id: ValidatorId, pub shutdown_sender: mpsc::Sender<()>, pub validator_store: Arc, pub broadcast_sender: broadcast::Sender, @@ -56,7 +59,9 @@ impl TaskManager { task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, subscription_view_receiver: mpsc::Receiver, event_sender: mpsc::Sender, + validator_id: ValidatorId, thresholds: ReliableBroadcastParams, + message_signer: Arc, validator_store: Arc, broadcast_sender: broadcast::Sender, ) -> (Self, mpsc::Receiver<()>) { @@ -72,6 +77,8 @@ impl TaskManager { tasks: HashMap::new(), running_tasks: FuturesUnordered::new(), buffered_messages: Default::default(), + validator_id, + message_signer, thresholds, shutdown_sender, validator_store, @@ -108,12 +115,14 @@ impl TaskManager { std::collections::hash_map::Entry::Vacant(entry) => { let broadcast_state = BroadcastState::new( cert.clone(), + self.validator_id, self.thresholds.echo_threshold, self.thresholds.ready_threshold, self.thresholds.delivery_threshold, self.event_sender.clone(), self.subscriptions.clone(), need_gossip, + self.message_signer.clone(), ); let (task, task_context) = Task::new( diff --git a/crates/topos-tce-broadcast/src/tests/mod.rs b/crates/topos-tce-broadcast/src/tests/mod.rs index 9419b8b08..d51de8aa6 100644 --- a/crates/topos-tce-broadcast/src/tests/mod.rs +++ b/crates/topos-tce-broadcast/src/tests/mod.rs @@ -4,13 +4,13 @@ use rstest::*; use std::collections::HashSet; use std::time::Duration; use tce_transport::ReliableBroadcastParams; -use topos_test_sdk::storage::create_validator_store; - use tokio::sync::mpsc::Receiver; - +use topos_crypto::messages::MessageSigner; use topos_test_sdk::constants::*; +use topos_test_sdk::storage::create_validator_store; const CHANNEL_SIZE: usize = 10; +const PRIVATE_KEY: &str = "47d361f6becb933a77d7e01dee7b1c1859b656adbd8428bf7bf9519503e5d5d6"; #[fixture] fn small_config() -> TceParams { @@ -57,14 +57,22 @@ async fn create_context(params: TceParams, folder_name: &'static str) -> (Double mpsc::channel::>(1); let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE); + let message_signer = MessageSigner::new(PRIVATE_KEY).unwrap(); + + let mut validators = HashSet::new(); + let validator_id = ValidatorId::from(message_signer.public_address); + validators.insert(validator_id); + let (broadcast_sender, broadcast_receiver) = broadcast::channel(CHANNEL_SIZE); let mut double_echo = DoubleEcho::new( params.broadcast_params, + validator_id, + message_signer, + validators, task_manager_message_sender.clone(), cmd_receiver, event_sender, double_echo_shutdown_receiver, - 0, validator_store, broadcast_sender, ); @@ -111,8 +119,19 @@ async fn reach_echo_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) .cloned() .collect::>(); + let message_signer = MessageSigner::new(PRIVATE_KEY).unwrap(); + let validator_id = ValidatorId::from(message_signer.public_address); + + let mut payload = Vec::new(); + payload.extend_from_slice(cert.id.as_array()); + payload.extend_from_slice(validator_id.as_bytes()); + + let signature = message_signer.sign_message(&payload).unwrap(); + for p in selected { - double_echo.handle_echo(p, cert.id).await; + double_echo + .handle_echo(p, cert.id, validator_id, signature) + .await; } } @@ -125,8 +144,20 @@ async fn reach_ready_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) .cloned() .collect::>(); + let message_signer = MessageSigner::new(PRIVATE_KEY).unwrap(); + + let validator_id = ValidatorId::from(message_signer.public_address); + + let mut payload = Vec::new(); + payload.extend_from_slice(cert.id.as_array()); + payload.extend_from_slice(validator_id.as_bytes()); + + let signature = message_signer.sign_message(&payload).unwrap(); + for p in selected { - double_echo.handle_ready(p, cert.id).await; + double_echo + .handle_ready(p, cert.id, validator_id, signature) + .await; } } @@ -139,8 +170,19 @@ async fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certifica .cloned() .collect::>(); + let message_signer = MessageSigner::new(PRIVATE_KEY).unwrap(); + let validator_id = ValidatorId::from(message_signer.public_address); + + let mut payload = Vec::new(); + payload.extend_from_slice(cert.id.as_array()); + payload.extend_from_slice(validator_id.as_bytes()); + + let signature = message_signer.sign_message(&payload).unwrap(); + for p in selected { - double_echo.handle_ready(p, cert.id).await; + double_echo + .handle_ready(p, cert.id, validator_id, signature) + .await; } } @@ -202,7 +244,7 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) #[case(medium_config())] #[tokio::test] #[trace] -#[timeout(Duration::from_secs(1))] +#[timeout(Duration::from_secs(4))] async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) { let (mut double_echo, mut ctx) = create_context(params, "trigger_ready_when_reached_enough_ready").await; diff --git a/crates/topos-tce-gatekeeper/src/client.rs b/crates/topos-tce-gatekeeper/src/client.rs index a74d40517..115c6031f 100644 --- a/crates/topos-tce-gatekeeper/src/client.rs +++ b/crates/topos-tce-gatekeeper/src/client.rs @@ -1,12 +1,11 @@ +use crate::{ + GatekeeperCommand, GatekeeperError, GetAllPeers, GetAllSubnets, GetRandomPeers, PushPeerList, +}; use async_trait::async_trait; use tokio::sync::{mpsc, oneshot}; use topos_core::uci::SubnetId; use topos_p2p::PeerId; -use crate::{ - GatekeeperCommand, GatekeeperError, GetAllPeers, GetAllSubnets, GetRandomPeers, PushPeerList, -}; - #[async_trait] pub trait GatekeeperClient: Send + Sync + 'static { async fn get_random_peers(&self, number: usize) -> Result, GatekeeperError>; diff --git a/crates/topos-tce-transport/Cargo.toml b/crates/topos-tce-transport/Cargo.toml index bd67800aa..bf42e89d8 100644 --- a/crates/topos-tce-transport/Cargo.toml +++ b/crates/topos-tce-transport/Cargo.toml @@ -13,7 +13,10 @@ path = 'src/lib.rs' clap.workspace = true serde = { workspace = true, features = ["rc"] } tracing.workspace = true - +thiserror.workspace = true +hex.workspace = true topos-core = { workspace = true, features = ["uci"] } topos-p2p = { path = "../topos-p2p/"} +topos-crypto = { path = "../topos-crypto" } + diff --git a/crates/topos-tce-transport/src/lib.rs b/crates/topos-tce-transport/src/lib.rs index e635f81ed..f9ca66e78 100644 --- a/crates/topos-tce-transport/src/lib.rs +++ b/crates/topos-tce-transport/src/lib.rs @@ -2,7 +2,10 @@ //! use clap::Parser; use serde::{Deserialize, Serialize}; +use std::str::FromStr; +use thiserror::Error; use topos_core::uci::{Certificate, CertificateId}; +use topos_crypto::messages::{Address, Signature, H160}; use topos_p2p::PeerId; #[derive(Parser, Clone, Debug, Default, Deserialize, Serialize)] @@ -55,9 +58,17 @@ pub enum TceCommands { /// Received G-set message OnGossip { cert: Certificate }, /// When echo reply received - OnEcho { certificate_id: CertificateId }, + OnEcho { + certificate_id: CertificateId, + signature: Signature, + validator_id: ValidatorId, + }, /// When ready reply received - OnReady { certificate_id: CertificateId }, + OnReady { + certificate_id: CertificateId, + signature: Signature, + validator_id: ValidatorId, + }, /// Given peer replied ok to the double echo request OnDoubleEchoOk {}, } @@ -101,10 +112,14 @@ pub enum ProtocolEvents { /// Indicates that 'echo' message broadcasting is required Echo { certificate_id: CertificateId, + signature: Signature, + validator_id: ValidatorId, }, /// Indicates that 'ready' message broadcasting is required Ready { certificate_id: CertificateId, + signature: Signature, + validator_id: ValidatorId, }, /// For simulation purpose, for now only caused by ill-formed sampling Die, @@ -112,3 +127,46 @@ pub enum ProtocolEvents { /// Stable Sample StableSample, } + +#[derive(Debug, Error)] +pub enum ValidatorIdConversionError { + #[error("Failed to parse address string as H160")] + ParseError, + #[error("Failed to convert byte array into H160")] + InvalidByteLength, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] +pub struct ValidatorId(H160); + +impl ValidatorId { + pub fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } + + pub fn address(&self) -> Address { + self.0 + } +} + +impl From for ValidatorId { + fn from(address: H160) -> Self { + ValidatorId(address) + } +} + +impl TryFrom<&str> for ValidatorId { + type Error = ValidatorIdConversionError; + + fn try_from(address: &str) -> Result { + H160::from_str(address) + .map_err(|_| ValidatorIdConversionError::ParseError) + .map(ValidatorId) + } +} + +impl std::fmt::Display for ValidatorId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "0x{}", hex::encode(self.0)) + } +} diff --git a/crates/topos-tce/Cargo.toml b/crates/topos-tce/Cargo.toml index 9937b316e..0012756ed 100644 --- a/crates/topos-tce/Cargo.toml +++ b/crates/topos-tce/Cargo.toml @@ -12,6 +12,7 @@ workspace = true async-trait.workspace = true bincode.workspace = true clap.workspace = true +hex.workspace = true futures.workspace = true opentelemetry.workspace = true prometheus-client.workspace = true @@ -34,6 +35,7 @@ tce_transport = { package = "topos-tce-transport", path = "../topos-tce-transpor topos-p2p = { path = "../topos-p2p" } topos-metrics = { path = "../topos-metrics" } topos-tce-api = { path = "../topos-tce-api"} +topos-crypto = { path = "../topos-crypto" } topos-tce-broadcast = { path = "../topos-tce-broadcast" } topos-tce-gatekeeper = { path = "../topos-tce-gatekeeper" } topos-tce-storage = { package = "topos-tce-storage", path = "../topos-tce-storage" } diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index 14f15d96b..507ffa1f6 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -57,14 +57,19 @@ impl AppContext { } }); } - - TceCommands::OnEcho { certificate_id } => { + TceCommands::OnEcho { + certificate_id, + signature, + validator_id, + } => { let channel = self.tce_cli.get_double_echo_channel(); spawn(async move { if let Err(e) = channel .send(DoubleEchoCommand::Echo { from_peer: from, + signature, certificate_id, + validator_id, }) .await { @@ -72,13 +77,19 @@ impl AppContext { } }); } - TceCommands::OnReady { certificate_id } => { + TceCommands::OnReady { + certificate_id, + validator_id, + signature, + } => { let channel = self.tce_cli.get_double_echo_channel(); spawn(async move { if let Err(e) = channel .send(DoubleEchoCommand::Ready { from_peer: from, + validator_id, certificate_id, + signature, }) .await { diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs index 9850641ed..a6d013f50 100644 --- a/crates/topos-tce/src/app_context/protocol.rs +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -35,9 +35,17 @@ impl AppContext { } } - ProtocolEvents::Echo { certificate_id } => { + ProtocolEvents::Echo { + certificate_id, + signature, + validator_id, + } => { // Send echo message - let data = NetworkMessage::from(TceCommands::OnEcho { certificate_id }); + let data = NetworkMessage::from(TceCommands::OnEcho { + certificate_id, + signature, + validator_id, + }); if let Err(e) = self .network_client @@ -48,8 +56,16 @@ impl AppContext { } } - ProtocolEvents::Ready { certificate_id } => { - let data = NetworkMessage::from(TceCommands::OnReady { certificate_id }); + ProtocolEvents::Ready { + certificate_id, + signature, + validator_id, + } => { + let data = NetworkMessage::from(TceCommands::OnReady { + certificate_id, + signature, + validator_id, + }); if let Err(e) = self .network_client diff --git a/crates/topos-tce/src/config.rs b/crates/topos-tce/src/config.rs index f964d3256..fbc505cc0 100644 --- a/crates/topos-tce/src/config.rs +++ b/crates/topos-tce/src/config.rs @@ -1,8 +1,9 @@ +use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use std::time::Duration; -use tce_transport::ReliableBroadcastParams; +use tce_transport::{ReliableBroadcastParams, ValidatorId}; use topos_p2p::{Multiaddr, PeerId}; pub use crate::AppContext; @@ -16,8 +17,10 @@ pub enum AuthKey { #[derive(Debug)] pub struct TceConfiguration { pub auth_key: Option, + pub signing_key: Option, pub tce_params: ReliableBroadcastParams, pub boot_peers: Vec<(PeerId, Multiaddr)>, + pub validators: HashSet, pub api_addr: SocketAddr, pub graphql_api_addr: SocketAddr, pub metrics_api_addr: SocketAddr, diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index aaa727165..77b96e0e3 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -1,14 +1,14 @@ -use std::{future::IntoFuture, sync::Arc}; - use config::TceConfiguration; use futures::StreamExt; use opentelemetry::global; +use std::{future::IntoFuture, sync::Arc}; use tokio::{ spawn, sync::{broadcast, mpsc}, }; use tokio_stream::wrappers::BroadcastStream; use tokio_util::sync::CancellationToken; +use topos_crypto::messages::MessageSigner; use topos_p2p::{ utils::{local_key_pair, local_key_pair_from_slice}, Multiaddr, @@ -22,7 +22,6 @@ use topos_tce_storage::{ Connection, RocksDBStorage, }; use tracing::{debug, warn}; - mod app_context; pub mod config; pub mod events; @@ -47,9 +46,22 @@ pub async fn run( None => local_key_pair(None), }; + let message_signer = match &config.signing_key { + Some(AuthKey::PrivateKey(pk)) => { + let bytes = pk.to_vec(); + let bytes_str = std::str::from_utf8(&bytes)?; + MessageSigner::new(bytes_str)? + } + _ => return Err(Box::try_from("Error, no singing key".to_string()).unwrap()), + }; + + let public_address = message_signer.public_address.to_string(); + + warn!("Public node address: {public_address}"); + let peer_id = key.public().to_peer_id(); - warn!("I am {}", peer_id); + warn!("I am {peer_id}"); tracing::Span::current().record("peer_id", &peer_id.to_string()); @@ -135,15 +147,19 @@ pub async fn run( debug!("Storage started"); debug!("Starting reliable broadcast"); + let (tce_cli, tce_stream) = ReliableBroadcastClient::new( ReliableBroadcastConfig { tce_params: config.tce_params.clone(), + validator_id: message_signer.public_address.into(), + validators: config.validators.clone(), + message_signer, }, - peer_id.to_string(), validator_store.clone(), broadcast_sender, ) .await; + debug!("Reliable broadcast started"); debug!("Starting the Synchronizer"); diff --git a/crates/topos-test-sdk/Cargo.toml b/crates/topos-test-sdk/Cargo.toml index 37271937d..2ad620006 100644 --- a/crates/topos-test-sdk/Cargo.toml +++ b/crates/topos-test-sdk/Cargo.toml @@ -12,12 +12,13 @@ topos-core = { workspace = true, features = ["uci"] } topos-p2p = { path = "../topos-p2p/" } topos-tce = { path = "../topos-tce/", optional = true } topos-tce-api = { path = "../topos-tce-api/", optional = true } +topos-crypto = { path = "../topos-crypto/" } topos-tce-storage = { path = "../topos-tce-storage/" } topos-tce-gatekeeper = { path = "../topos-tce-gatekeeper/", optional = true } topos-tce-synchronizer = { path = "../topos-tce-synchronizer/", optional = true } topos-tce-broadcast = { path = "../topos-tce-broadcast/", optional = true } topos-tce-transport = { path = "../topos-tce-transport/", optional = true } - +hex.workspace = true futures.workspace = true lazy_static = { version = "1.4.0" } libp2p.workspace = true diff --git a/crates/topos-test-sdk/build.rs b/crates/topos-test-sdk/build.rs index 66827817b..9b34944c2 100644 --- a/crates/topos-test-sdk/build.rs +++ b/crates/topos-test-sdk/build.rs @@ -6,7 +6,7 @@ fn main() { ) .expect("Unable to build PathBuf for topos-test-sdk"); - path.push("../../target/tmp/"); + path.push("./../../target/tmp/"); let path = path.as_path(); println!( "cargo:rustc-env=TOPOS_TEST_SDK_TMP={}", diff --git a/crates/topos-test-sdk/src/storage/mod.rs b/crates/topos-test-sdk/src/storage/mod.rs index 3830ff26e..91d0a978a 100644 --- a/crates/topos-test-sdk/src/storage/mod.rs +++ b/crates/topos-test-sdk/src/storage/mod.rs @@ -54,7 +54,7 @@ pub fn create_folder(folder_name: &str) -> PathBuf { let mut rng = rand::thread_rng(); temp_dir.push(format!( - "./{}/data_{}_{}/rocksdb", + "{}/data_{}_{}/rocksdb", folder_name, time.as_nanos(), rng.gen::() diff --git a/crates/topos-test-sdk/src/tce/mod.rs b/crates/topos-test-sdk/src/tce/mod.rs index d9d586016..8e1b2fab4 100644 --- a/crates/topos-test-sdk/src/tce/mod.rs +++ b/crates/topos-test-sdk/src/tce/mod.rs @@ -182,7 +182,6 @@ pub async fn start_node( let (sender, receiver) = broadcast::channel(100); let (tce_cli, tce_stream) = create_reliable_broadcast_client( create_reliable_broadcast_params(peers.len()), - config.keypair.public().to_peer_id().to_string(), validator_store.clone(), sender, ) diff --git a/crates/topos-test-sdk/src/tce/protocol.rs b/crates/topos-test-sdk/src/tce/protocol.rs index b0df07cdb..20fdf3584 100644 --- a/crates/topos-test-sdk/src/tce/protocol.rs +++ b/crates/topos-test-sdk/src/tce/protocol.rs @@ -1,25 +1,37 @@ -use std::sync::Arc; - use futures::Stream; - +use std::collections::HashSet; +use std::sync::Arc; use tokio::sync::broadcast; +use topos_crypto::messages::MessageSigner; use topos_tce_broadcast::{ReliableBroadcastClient, ReliableBroadcastConfig}; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; -use topos_tce_transport::{ProtocolEvents, ReliableBroadcastParams}; +use topos_tce_transport::{ProtocolEvents, ReliableBroadcastParams, ValidatorId}; + +const PRIVATE_KEY: &str = "47d361f6becb933a77d7e01dee7b1c1859b656adbd8428bf7bf9519503e5d5d6"; pub async fn create_reliable_broadcast_client( tce_params: ReliableBroadcastParams, - peer_id: String, storage: Arc, sender: broadcast::Sender, ) -> ( ReliableBroadcastClient, impl Stream + Unpin, ) { - let config = ReliableBroadcastConfig { tce_params }; + let message_signer = MessageSigner::new(PRIVATE_KEY).unwrap(); + + let mut validators = HashSet::new(); + let validator_id = ValidatorId::from(message_signer.public_address); + validators.insert(validator_id); + + let config = ReliableBroadcastConfig { + tce_params, + validator_id, + validators, + message_signer, + }; - ReliableBroadcastClient::new(config, peer_id, storage, sender).await + ReliableBroadcastClient::new(config, storage, sender).await } pub fn create_reliable_broadcast_params(number_of_nodes: usize) -> ReliableBroadcastParams { diff --git a/crates/topos/src/components/node/services.rs b/crates/topos/src/components/node/services.rs index 166019284..4992d4486 100644 --- a/crates/topos/src/components/node/services.rs +++ b/crates/topos/src/components/node/services.rs @@ -87,7 +87,9 @@ pub(crate) fn spawn_tce_process( .into_iter() .chain(config.parse_boot_peers()) .collect::>(), + validators: genesis.validators(), auth_key: keys.network.map(AuthKey::PrivateKey), + signing_key: keys.validator.map(AuthKey::PrivateKey), tce_addr: format!("/ip4/{}", config.libp2p_api_addr.ip()), tce_local_port: config.libp2p_api_addr.port(), tce_params: ReliableBroadcastParams::new(genesis.validator_count()), diff --git a/crates/topos/src/components/tce/commands/run.rs b/crates/topos/src/components/tce/commands/run.rs index 9a3d6cea1..059c62058 100644 --- a/crates/topos/src/components/tce/commands/run.rs +++ b/crates/topos/src/components/tce/commands/run.rs @@ -1,8 +1,9 @@ use clap::Args; use serde::Serialize; +use std::collections::HashSet; use std::net::SocketAddr; use topos_p2p::{Multiaddr, PeerId}; -use topos_tce_transport::ReliableBroadcastParams; +use topos_tce_transport::{ReliableBroadcastParams, ValidatorId}; #[derive(Args, Debug, Serialize)] #[command(about = "Run a full TCE instance")] @@ -12,6 +13,11 @@ pub struct Run { #[arg(long, default_value = "", env = "TCE_BOOT_PEERS")] pub boot_peers: String, + /// Validator nodes to connect to, list of Ethereum addresses, space separated, + /// quoted list like --validators='0xfd530a60b4b4cf799d74' + #[arg(long, default_value = "", env = "TCE_VALIDATORS", default_value = "")] + pub validators: String, + /// Advertised (externally visible) , /// if empty this machine ip address(es) are used #[arg(long, env = "TCE_EXT_HOST", default_value = "/ip4/0.0.0.0")] @@ -33,6 +39,10 @@ pub struct Run { #[clap(long, env = "TCE_LOCAL_KS")] pub local_key_seed: Option, + /// Local peer secret key seed (optional, used for testing) + #[clap(long, env = "TCE_LOCAL_VPK")] + pub local_validator_private_key: Option, + /// Storage database path, if not set RAM storage is used #[clap(long, default_value = "./default_db/", env = "TCE_DB_PATH")] pub db_path: Option, @@ -86,4 +96,20 @@ impl Run { }) .collect() } + + pub fn parse_validators(&self) -> HashSet { + if !self.validators.is_empty() { + return self + .validators + .split(&[',', ' ']) + .map(|address| { + ValidatorId::try_from(address).unwrap_or_else(|error| { + panic!("Failed to convert address to ValidatorId: {:?}", error) + }) + }) + .collect(); + } + + HashSet::new() + } } diff --git a/crates/topos/src/components/tce/mod.rs b/crates/topos/src/components/tce/mod.rs index ab0ae6d82..60fe888f9 100644 --- a/crates/topos/src/components/tce/mod.rs +++ b/crates/topos/src/components/tce/mod.rs @@ -89,10 +89,15 @@ pub(crate) async fn handle_command( Some(TceCommands::Run(cmd)) => { let config = TceConfiguration { boot_peers: cmd.parse_boot_peers(), + validators: cmd.parse_validators(), auth_key: cmd .local_key_seed .clone() .map(|s| AuthKey::Seed(s.as_bytes().to_vec())), + signing_key: cmd + .local_validator_private_key + .clone() + .map(|s| AuthKey::PrivateKey(s.as_bytes().to_vec())), tce_addr: cmd.tce_ext_host, tce_local_port: cmd.tce_local_port, tce_params: cmd.tce_params, diff --git a/crates/topos/src/config/genesis/mod.rs b/crates/topos/src/config/genesis/mod.rs index 6d6b85b61..2a070d46d 100644 --- a/crates/topos/src/config/genesis/mod.rs +++ b/crates/topos/src/config/genesis/mod.rs @@ -1,8 +1,11 @@ use rlp::Rlp; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::{fs, path::PathBuf}; use serde_json::Value; use topos_p2p::{Multiaddr, PeerId}; +use topos_tce_transport::ValidatorId; #[cfg(test)] pub(crate) mod tests; @@ -13,6 +16,11 @@ pub struct Genesis { pub json: Value, } +#[derive(Debug, Serialize, Deserialize)] +pub enum Error { + ParseValidators, +} + impl Genesis { pub fn new(path: PathBuf) -> Self { let genesis_file = fs::File::open(&path).expect("opened file"); @@ -58,7 +66,7 @@ impl Genesis { /// The `extraData` is patted with 32 bytes, and the validators are RLP encoded. /// Each validator is 20 bytes, with a SEAL at the end of the whole list (8 bytes) #[allow(dead_code)] - pub fn validators(&self) -> Vec { + pub fn validators(&self) -> HashSet { let extra_data = self.json["genesis"]["extraData"] .as_str() .unwrap() @@ -81,14 +89,18 @@ impl Genesis { // Get the first Rlp item (index 0) and iterate over its items let first_item = rlp.at(0).expect("Failed to get first RLP item"); - let mut validator_public_keys = Vec::new(); + let mut validator_public_keys = HashSet::new(); for i in 0..first_item.item_count().unwrap() { let validator_data = first_item.at(i).expect("Failed to get RLP item").data(); if let Ok(validator_data) = validator_data { let public_key = validator_data.to_vec(); let address = format!("0x{}", hex::encode(&public_key[1..=20])); - validator_public_keys.push(address); + validator_public_keys.insert( + ValidatorId::try_from(address.as_str()).unwrap_or_else(|error| { + panic!("Failed to convert address to ValidatorId: {:?}", error) + }), + ); } } diff --git a/crates/topos/src/config/genesis/tests.rs b/crates/topos/src/config/genesis/tests.rs index 3f2f8d996..a6b79c70f 100644 --- a/crates/topos/src/config/genesis/tests.rs +++ b/crates/topos/src/config/genesis/tests.rs @@ -1,5 +1,6 @@ use rstest::fixture; use rstest::rstest; +use topos_tce_transport::ValidatorId; use super::Genesis; @@ -31,9 +32,14 @@ pub fn test_parse_bootnodes(genesis: &Genesis) { pub fn test_extract_validators(genesis: &Genesis) { let validators = genesis.validators(); + let first = ValidatorId::try_from("0x100d617e4392c02b31bdce650b26b6c0c3e04f95").unwrap(); + let second = ValidatorId::try_from("0x92183cff18a1328e7d791d607589a15d9eee4bc4").unwrap(); + let third = ValidatorId::try_from("0xb4973cdb10894d1d1547673bd758589034c2bba5").unwrap(); + let fourth = ValidatorId::try_from("0xc16d83893cb61872206d4e271b813015d3242d94").unwrap(); + assert_eq!(validators.len(), 4); - assert_eq!(validators[0], "0x100d617e4392c02b31bdce650b26b6c0c3e04f95"); - assert_eq!(validators[1], "0x92183cff18a1328e7d791d607589a15d9eee4bc4"); - assert_eq!(validators[2], "0xb4973cdb10894d1d1547673bd758589034c2bba5"); - assert_eq!(validators[3], "0xc16d83893cb61872206d4e271b813015d3242d94"); + assert_eq!(validators.get(&first), Some(&first)); + assert_eq!(validators.get(&second), Some(&second)); + assert_eq!(validators.get(&third), Some(&third)); + assert_eq!(validators.get(&fourth), Some(&fourth)); } diff --git a/crates/topos/tests/snapshots/tce__help_display.snap b/crates/topos/tests/snapshots/tce__help_display.snap index 810244a09..60707477e 100644 --- a/crates/topos/tests/snapshots/tce__help_display.snap +++ b/crates/topos/tests/snapshots/tce__help_display.snap @@ -13,6 +13,8 @@ Options: Defines the verbosity level --home Home directory for the configuration [env: TOPOS_HOME=] [default: /home/runner/.config/topos] + --validators + Validator nodes to connect to, list of Ethereum addresses, space separated, quoted list like --validators='0xfd530a60b4b4cf799d74' [env: TCE_VALIDATORS=] [default: ] --tce-ext-host Advertised (externally visible) , if empty this machine ip address(es) are used [env: TCE_EXT_HOST=] [default: /ip4/0.0.0.0] --tce-local-port @@ -23,6 +25,8 @@ Options: WebAPI port [env: TCE_WEB_API_PORT=] [default: 8080] --local-key-seed Local peer secret key seed (optional, used for testing) [env: TCE_LOCAL_KS=] + --local-validator-private-key + Local peer secret key seed (optional, used for testing) [env: TCE_LOCAL_VPK=] --db-path Storage database path, if not set RAM storage is used [env: TCE_DB_PATH=] [default: ./default_db/] --api-addr diff --git a/tools/init.sh b/tools/init.sh index 5ef6b1b8d..b6ddd47fa 100755 --- a/tools/init.sh +++ b/tools/init.sh @@ -16,6 +16,17 @@ NODE_LIST_PATH=/tmp/shared/peer_nodes.json NODE="http://$HOSTNAME:1340" TCE_EXT_HOST="/dns4/$HOSTNAME" + +# List of validator public keys +public_keys=( + # public address for 122f3ae6ade1fd136b292cea4f6243c7811160352c8821528547a1fe7c459daf + "0xfa1dbe573e7ab7eb4fb772a349129cf0833f1154" + # public address for a2e33a9bad88f7b7568228f51d5274c471a9217162d46f1533b6a290f0be1baf + "0xfba339119da6dd78814055e50b5ae0ad4e880181" +) + +TCE_VALIDATORS=$(IFS=,; echo "${public_keys[*]}") + case "$1" in "boot") @@ -41,6 +52,8 @@ case "$1" in echo "TCE_LOCAL_KS: $HOSTNAME" export TCE_LOCAL_KS=$HOSTNAME + export TCE_LOCAL_VPK=122f3ae6ade1fd136b292cea4f6243c7811160352c8821528547a1fe7c459daf + export TCE_VALIDATORS export TCE_EXT_HOST exec "$TOPOS_BIN" "${@:2}" @@ -73,6 +86,8 @@ case "$1" in ) 201>"${PEER_LIST_PATH}.lock" export TCE_LOCAL_KS=$HOSTNAME + export TCE_LOCAL_VPK=a2e33a9bad88f7b7568228f51d5274c471a9217162d46f1533b6a290f0be1baf + export TCE_VALIDATORS export TCE_EXT_HOST until [ -f "$NODE_LIST_PATH" ]