From b6150fd374bff2f0b6a6ecfa5d5c5ff7d5535c14 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Thu, 23 Nov 2023 14:43:28 +0100 Subject: [PATCH 1/4] feat: select peer from p2p instead of gatekeeper Signed-off-by: Simon Paitrault --- crates/topos-p2p/Cargo.toml | 1 + crates/topos-p2p/src/client.rs | 10 ++ crates/topos-p2p/src/command.rs | 6 ++ crates/topos-p2p/src/error.rs | 3 + .../topos-p2p/src/runtime/handle_command.rs | 38 ++++++- crates/topos-p2p/src/tests/command/mod.rs | 1 + .../src/tests/command/random_peer.rs | 99 +++++++++++++++++++ crates/topos-p2p/src/tests/mod.rs | 1 + crates/topos-tce-synchronizer/src/builder.rs | 17 +--- .../src/checkpoints_collector/mod.rs | 18 ++-- crates/topos-tce/src/lib.rs | 1 - crates/topos-test-sdk/src/tce/synchronizer.rs | 1 - 12 files changed, 166 insertions(+), 30 deletions(-) create mode 100644 crates/topos-p2p/src/tests/command/mod.rs create mode 100644 crates/topos-p2p/src/tests/command/random_peer.rs diff --git a/crates/topos-p2p/Cargo.toml b/crates/topos-p2p/Cargo.toml index 66769469c..f45cf2f12 100644 --- a/crates/topos-p2p/Cargo.toml +++ b/crates/topos-p2p/Cargo.toml @@ -20,6 +20,7 @@ libp2p = { workspace = true, features = ["macros", "gossipsub", "tcp", "dns", "t pin-project = "1.1.3" libp2p-swarm-test = "0.2.0" prometheus-client.workspace = true +rand.workspace = true serde = { workspace = true, features = ["derive"] } smallvec = "1.11.1" thiserror.workspace = true diff --git a/crates/topos-p2p/src/client.rs b/crates/topos-p2p/src/client.rs index 137e9b687..68dfb57bd 100644 --- a/crates/topos-p2p/src/client.rs +++ b/crates/topos-p2p/src/client.rs @@ -36,6 +36,16 @@ impl NetworkClient { .await } + pub async fn random_known_peer(&self) -> Result { + let (sender, receiver) = oneshot::channel(); + Self::send_command_with_receiver( + &self.sender, + Command::RandomKnownPeer { sender }, + receiver, + ) + .await + } + pub async fn disconnect(&self) -> Result<(), P2PError> { let (sender, receiver) = oneshot::channel(); let command = Command::Disconnect { sender }; diff --git a/crates/topos-p2p/src/command.rs b/crates/topos-p2p/src/command.rs index ea7833686..94c5de62f 100644 --- a/crates/topos-p2p/src/command.rs +++ b/crates/topos-p2p/src/command.rs @@ -47,6 +47,11 @@ pub enum Command { id: uuid::Uuid, response: oneshot::Sender, }, + + /// Ask for a random known peer + RandomKnownPeer { + sender: oneshot::Sender>, + }, } impl Display for Command { @@ -54,6 +59,7 @@ impl Display for Command { match self { Command::StartListening { .. } => write!(f, "StartListening"), Command::ConnectedPeers { .. } => write!(f, "ConnectedPeers"), + Command::RandomKnownPeer { .. } => write!(f, "RandomKnownPeer"), Command::Disconnect { .. } => write!(f, "Disconnect"), Command::Gossip { .. } => write!(f, "GossipMessage"), Command::NewProxiedQuery { .. } => write!(f, "NewProxiedQuery"), diff --git a/crates/topos-p2p/src/error.rs b/crates/topos-p2p/src/error.rs index 0c5026c7d..15a92f2ed 100644 --- a/crates/topos-p2p/src/error.rs +++ b/crates/topos-p2p/src/error.rs @@ -65,4 +65,7 @@ pub enum CommandExecutionError { #[error("Connection with a peer has failed")] ConnectionClosed, + + #[error("Internal error: {0}")] + Internal(String), } diff --git a/crates/topos-p2p/src/runtime/handle_command.rs b/crates/topos-p2p/src/runtime/handle_command.rs index b61fae0f7..30ae26a09 100644 --- a/crates/topos-p2p/src/runtime/handle_command.rs +++ b/crates/topos-p2p/src/runtime/handle_command.rs @@ -1,5 +1,11 @@ -use crate::{error::P2PError, protocol_name, Command, Runtime}; +use std::collections::hash_map::Entry; + +use crate::{ + error::{CommandExecutionError, P2PError}, + protocol_name, Command, Runtime, +}; use libp2p::{kad::record::Key, PeerId}; +use rand::{seq::SliceRandom, thread_rng}; use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL; use tracing::{debug, error, info, warn}; @@ -38,6 +44,36 @@ impl Runtime { warn!("Unable to notify ConnectedPeers response: initiator is dropped"); } } + Command::RandomKnownPeer { sender } => { + if self.peer_set.is_empty() { + sender.send(Err(P2PError::CommandError( + CommandExecutionError::Internal( + "Asked for one random peer but there is currently no known peer" + .to_string(), + ), + ))); + + return; + } + + let mut range: Vec = (0..(self.peer_set.len() as u32)).collect(); + range.shuffle(&mut thread_rng()); + + if sender + .send( + range + .first() + .and_then(|index| self.peer_set.iter().nth((*index) as usize).cloned()) + .ok_or(P2PError::CommandError(CommandExecutionError::Internal( + "Unable to find a random peer for RandomKnownPeer command" + .to_string(), + ))), + ) + .is_err() + { + warn!("Unable to notify RandomKnownPeer response: initiator is dropped"); + } + } Command::Disconnect { sender } if self.swarm.listeners().count() == 0 => { if sender.send(Err(P2PError::AlreadyDisconnected)).is_err() { diff --git a/crates/topos-p2p/src/tests/command/mod.rs b/crates/topos-p2p/src/tests/command/mod.rs new file mode 100644 index 000000000..b197e3223 --- /dev/null +++ b/crates/topos-p2p/src/tests/command/mod.rs @@ -0,0 +1 @@ +mod random_peer; diff --git a/crates/topos-p2p/src/tests/command/random_peer.rs b/crates/topos-p2p/src/tests/command/random_peer.rs new file mode 100644 index 000000000..f1c738b40 --- /dev/null +++ b/crates/topos-p2p/src/tests/command/random_peer.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use rstest::rstest; +use test_log::test; +use tokio::spawn; +use topos_test_sdk::tce::NodeConfig; + +use crate::error::P2PError; + +#[rstest] +#[test(tokio::test)] +#[timeout(Duration::from_secs(5))] +async fn no_random_peer() { + let local = NodeConfig::from_seed(1); + + let (client, _, runtime) = crate::network::builder() + .peer_key(local.keypair.clone()) + .exposed_addresses(local.addr.clone()) + .listen_addr(local.addr.clone()) + .build() + .await + .expect("Unable to create p2p network"); + + let mut runtime = runtime.bootstrap().await.unwrap(); + + spawn(runtime.run()); + + let result = client.random_known_peer().await; + + assert!(result.is_err()); + assert!(matches!( + result, + Err(P2PError::CommandError( + crate::error::CommandExecutionError::Internal(error) + )) if error == "Asked for one random peer but there is currently no known peer" + )); +} + +#[rstest] +#[test(tokio::test)] +#[timeout(Duration::from_secs(5))] +async fn return_a_peer() { + let local = NodeConfig::from_seed(1); + let expected = NodeConfig::from_seed(2); + let expected_peer_id = expected.keypair.public().to_peer_id(); + + let (client, _, runtime) = crate::network::builder() + .peer_key(local.keypair.clone()) + .exposed_addresses(local.addr.clone()) + .listen_addr(local.addr.clone()) + .build() + .await + .expect("Unable to create p2p network"); + + let mut runtime = runtime.bootstrap().await.unwrap(); + + runtime.peer_set.insert(expected_peer_id); + + spawn(runtime.run()); + + let result = client.random_known_peer().await; + + assert!(result.is_ok()); + assert!(matches!( + result, + Ok(peer) if peer == expected_peer_id + )); +} + +#[rstest] +#[test(tokio::test)] +#[timeout(Duration::from_secs(5))] +async fn return_a_random_peer_between_100() { + let local = NodeConfig::from_seed(1); + + let (client, _, runtime) = crate::network::builder() + .peer_key(local.keypair.clone()) + .exposed_addresses(local.addr.clone()) + .listen_addr(local.addr.clone()) + .build() + .await + .expect("Unable to create p2p network"); + + let mut runtime = runtime.bootstrap().await.unwrap(); + + for i in 2..=100 { + let peer = NodeConfig::from_seed(i); + runtime.peer_set.insert(peer.keypair.public().to_peer_id()); + } + + spawn(runtime.run()); + + let first_try = client.random_known_peer().await.unwrap(); + let second_try = client.random_known_peer().await.unwrap(); + let third_try = client.random_known_peer().await.unwrap(); + + assert!(first_try != second_try); + assert!(first_try != third_try); +} diff --git a/crates/topos-p2p/src/tests/mod.rs b/crates/topos-p2p/src/tests/mod.rs index 1a3b54a7b..9fcbc013b 100644 --- a/crates/topos-p2p/src/tests/mod.rs +++ b/crates/topos-p2p/src/tests/mod.rs @@ -1,3 +1,4 @@ mod behaviour; +mod command; mod dht; mod support; diff --git a/crates/topos-tce-synchronizer/src/builder.rs b/crates/topos-tce-synchronizer/src/builder.rs index 2337cf180..c0d2e5190 100644 --- a/crates/topos-tce-synchronizer/src/builder.rs +++ b/crates/topos-tce-synchronizer/src/builder.rs @@ -4,7 +4,6 @@ use tokio::{spawn, sync::mpsc}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use topos_p2p::NetworkClient; -use topos_tce_gatekeeper::GatekeeperClient; use topos_tce_storage::validator::ValidatorStore; use crate::{ @@ -15,7 +14,6 @@ use crate::{ }; pub struct SynchronizerBuilder { - gatekeeper_client: Option, network_client: Option, store: Option>, sync_interval_seconds: u64, @@ -28,7 +26,6 @@ pub struct SynchronizerBuilder { impl Default for SynchronizerBuilder { fn default() -> Self { Self { - gatekeeper_client: None, network_client: None, store: None, sync_interval_seconds: 1, @@ -64,13 +61,7 @@ impl SynchronizerBuilder { CheckpointsCollectorError::NoNetworkClient, ))?; }, - gatekeeper: if let Some(gatekeeper) = self.gatekeeper_client { - gatekeeper - } else { - return Err(SynchronizerError::CheckpointsCollectorError( - CheckpointsCollectorError::NoGatekeeperClient, - ))?; - }, + store: if let Some(store) = self.store { store } else { @@ -103,12 +94,6 @@ impl SynchronizerBuilder { self } - pub fn with_gatekeeper_client(mut self, gatekeeper_client: GatekeeperClient) -> Self { - self.gatekeeper_client = Some(gatekeeper_client); - - self - } - pub fn with_network_client(mut self, network_client: NetworkClient) -> Self { self.network_client = Some(network_client); diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs index 8a83fb8c0..6a893dcaf 100644 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs +++ b/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs @@ -25,7 +25,6 @@ use topos_core::{ }; use topos_p2p::{error::P2PError, NetworkClient, PeerId}; -use topos_tce_gatekeeper::GatekeeperClient; use topos_tce_storage::{errors::StorageError, store::ReadStore, validator::ValidatorStore}; use tracing::{debug, error, warn}; use uuid::Uuid; @@ -44,7 +43,6 @@ pub struct CheckpointSynchronizer { pub(crate) config: CheckpointsCollectorConfig, pub(crate) network: NetworkClient, - pub(crate) gatekeeper: GatekeeperClient, #[allow(unused)] pub(crate) store: Arc, @@ -98,7 +96,7 @@ impl IntoFuture for CheckpointSynchronizer { #[derive(Debug, thiserror::Error)] enum SyncError { - #[error("Unable to fetch target peer from gatekeeper")] + #[error("Unable to fetch target peer from network layer")] UnableToFetchTargetPeer, #[error("Unable to parse subnet id")] @@ -215,11 +213,10 @@ impl CheckpointSynchronizer { certificate_ids: Vec, ) -> Result, SyncError> { let target_peer = self - .gatekeeper - .get_random_peers(1) + .network + .random_known_peer() .await - .map_err(|_| SyncError::UnableToFetchTargetPeer) - .map(|peers| peers.last().cloned().ok_or(SyncError::NoPeerAvailable))??; + .map_err(|_| SyncError::UnableToFetchTargetPeer)?; let request_id: Option = Some(Uuid::new_v4().into()); let req = FetchCertificatesRequest { @@ -253,11 +250,10 @@ impl CheckpointSynchronizer { async fn initiate_request(&mut self) -> Result<(), SyncError> { // 1. Ask a random peer for the diff between local and its latest checkpoint let target_peer = self - .gatekeeper - .get_random_peers(1) + .network + .random_known_peer() .await - .map_err(|_| SyncError::UnableToFetchTargetPeer) - .map(|peers| peers.last().cloned().ok_or(SyncError::NoPeerAvailable))??; + .map_err(|_| SyncError::UnableToFetchTargetPeer)?; let diff = self.ask_for_checkpoint(target_peer).await?; diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index be173af61..16cf6cb14 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -164,7 +164,6 @@ pub async fn run( topos_tce_synchronizer::Synchronizer::builder() .with_shutdown(shutdown.0.child_token()) .with_store(validator_store.clone()) - .with_gatekeeper_client(gatekeeper_client.clone()) .with_network_client(network_client.clone()) .build()?; diff --git a/crates/topos-test-sdk/src/tce/synchronizer.rs b/crates/topos-test-sdk/src/tce/synchronizer.rs index 4b63133bd..8cdaa5735 100644 --- a/crates/topos-test-sdk/src/tce/synchronizer.rs +++ b/crates/topos-test-sdk/src/tce/synchronizer.rs @@ -23,7 +23,6 @@ pub async fn create_synchronizer( topos_tce_synchronizer::Synchronizer::builder() .with_shutdown(shutdown) .with_store(store) - .with_gatekeeper_client(gatekeeper_client) .with_network_client(network_client) .build() .expect("Can't create the Synchronizer"); From b1a6e780b484c6e4012396954ddcd36c3bfca0b0 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Thu, 30 Nov 2023 15:20:39 +0100 Subject: [PATCH 2/4] chore: bumping rust-toolchain version Signed-off-by: Simon Paitrault --- rust-toolchain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain b/rust-toolchain index 58ec84d32..cdf516eeb 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1,3 +1,3 @@ [toolchain] -channel = "1.71.1" +channel = "1.74.0" profile = "minimal" From d86f8efa7d2d9cbfb23dac147148c6438e3e2268 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Thu, 30 Nov 2023 21:13:21 +0100 Subject: [PATCH 3/4] chore: cleanup old command/event Signed-off-by: Simon Paitrault --- .../topos-p2p/src/runtime/handle_command.rs | 20 ++---- .../src/tests/command/random_peer.rs | 2 +- crates/topos-tce-gatekeeper/src/builder.rs | 23 +------ crates/topos-tce-gatekeeper/src/client.rs | 10 +-- crates/topos-tce-gatekeeper/src/lib.rs | 68 +------------------ crates/topos-tce-gatekeeper/src/tests.rs | 32 +-------- crates/topos-tce/src/lib.rs | 7 +- crates/topos-test-sdk/src/tce/gatekeeper.rs | 6 +- crates/topos-test-sdk/src/tce/mod.rs | 9 +-- 9 files changed, 18 insertions(+), 159 deletions(-) diff --git a/crates/topos-p2p/src/runtime/handle_command.rs b/crates/topos-p2p/src/runtime/handle_command.rs index 30ae26a09..6ca6d9ba6 100644 --- a/crates/topos-p2p/src/runtime/handle_command.rs +++ b/crates/topos-p2p/src/runtime/handle_command.rs @@ -5,7 +5,7 @@ use crate::{ protocol_name, Command, Runtime, }; use libp2p::{kad::record::Key, PeerId}; -use rand::{seq::SliceRandom, thread_rng}; +use rand::{seq::SliceRandom, thread_rng, Rng}; use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL; use tracing::{debug, error, info, warn}; @@ -56,19 +56,13 @@ impl Runtime { return; } - let mut range: Vec = (0..(self.peer_set.len() as u32)).collect(); - range.shuffle(&mut thread_rng()); - + let selected_peer: usize = thread_rng().gen_range(0..(self.peer_set.len())); if sender - .send( - range - .first() - .and_then(|index| self.peer_set.iter().nth((*index) as usize).cloned()) - .ok_or(P2PError::CommandError(CommandExecutionError::Internal( - "Unable to find a random peer for RandomKnownPeer command" - .to_string(), - ))), - ) + .send(self.peer_set.iter().nth(selected_peer).cloned().ok_or( + P2PError::CommandError(CommandExecutionError::Internal( + "Unable to find a random peer for RandomKnownPeer command".to_string(), + )), + )) .is_err() { warn!("Unable to notify RandomKnownPeer response: initiator is dropped"); diff --git a/crates/topos-p2p/src/tests/command/random_peer.rs b/crates/topos-p2p/src/tests/command/random_peer.rs index f1c738b40..f6f5c9ae5 100644 --- a/crates/topos-p2p/src/tests/command/random_peer.rs +++ b/crates/topos-p2p/src/tests/command/random_peer.rs @@ -70,7 +70,7 @@ async fn return_a_peer() { #[rstest] #[test(tokio::test)] #[timeout(Duration::from_secs(5))] -async fn return_a_random_peer_between_100() { +async fn return_a_random_peer_among_100() { let local = NodeConfig::from_seed(1); let (client, _, runtime) = crate::network::builder() diff --git a/crates/topos-tce-gatekeeper/src/builder.rs b/crates/topos-tce-gatekeeper/src/builder.rs index 969e77487..8b37c0810 100644 --- a/crates/topos-tce-gatekeeper/src/builder.rs +++ b/crates/topos-tce-gatekeeper/src/builder.rs @@ -7,27 +7,7 @@ use topos_p2p::PeerId; use crate::{client::GatekeeperClient, Gatekeeper, GatekeeperError}; #[derive(Default)] -pub struct GatekeeperBuilder { - local_peer_id: Option, - local_peer_list: Option>, -} - -impl GatekeeperBuilder { - pub fn local_peer_id(mut self, peer_id: PeerId) -> Self { - self.local_peer_id = Some(peer_id); - - self - } - - /// Start the Gatekeeper with a set of known peers in the network - /// The Sync service for example is making use of this list to ask for - /// random peers - pub fn peer_list(mut self, peer_list: Vec) -> Self { - self.local_peer_list = Some(peer_list); - - self - } -} +pub struct GatekeeperBuilder {} impl IntoFuture for GatekeeperBuilder { type Output = Result<(GatekeeperClient, Gatekeeper), GatekeeperError>; @@ -45,7 +25,6 @@ impl IntoFuture for GatekeeperBuilder { }, Gatekeeper { shutdown, - peer_list: self.local_peer_list.unwrap_or_default(), commands: commands_recv, ..Gatekeeper::default() }, diff --git a/crates/topos-tce-gatekeeper/src/client.rs b/crates/topos-tce-gatekeeper/src/client.rs index 835003924..1e240b4e8 100644 --- a/crates/topos-tce-gatekeeper/src/client.rs +++ b/crates/topos-tce-gatekeeper/src/client.rs @@ -1,4 +1,4 @@ -use crate::{GatekeeperCommand, GatekeeperError, GetAllPeers, GetAllSubnets, GetRandomPeers}; +use crate::{GatekeeperCommand, GatekeeperError, GetAllSubnets}; use tokio::sync::{mpsc, oneshot}; use topos_core::uci::SubnetId; use topos_p2p::PeerId; @@ -20,14 +20,6 @@ impl GatekeeperClient { Ok(receiver.await?) } - pub async fn get_random_peers(&self, number: usize) -> Result, GatekeeperError> { - GetRandomPeers { number }.send_to(&self.commands).await - } - - pub async fn get_all_peers(&self) -> Result, GatekeeperError> { - GetAllPeers.send_to(&self.commands).await - } - pub async fn get_all_subnets(&self) -> Result, GatekeeperError> { GetAllSubnets.send_to(&self.commands).await } diff --git a/crates/topos-tce-gatekeeper/src/lib.rs b/crates/topos-tce-gatekeeper/src/lib.rs index bebc21bfa..3ce6c1f65 100644 --- a/crates/topos-tce-gatekeeper/src/lib.rs +++ b/crates/topos-tce-gatekeeper/src/lib.rs @@ -26,7 +26,6 @@ pub struct Gatekeeper { pub(crate) commands: mpsc::Receiver, pub(crate) tick_duration: Duration, - peer_list: Vec, subnet_list: Vec, } @@ -40,21 +39,11 @@ impl Default for Gatekeeper { shutdown, commands: commands_recv, tick_duration, - peer_list: Vec::default(), subnet_list: Vec::default(), } } } -#[async_trait::async_trait] -impl CommandHandler for Gatekeeper { - type Error = GatekeeperError; - - async fn handle(&mut self, _command: GetAllPeers) -> Result, Self::Error> { - Ok(self.peer_list.clone()) - } -} - #[async_trait::async_trait] impl CommandHandler for Gatekeeper { type Error = GatekeeperError; @@ -64,39 +53,6 @@ impl CommandHandler for Gatekeeper { } } -#[async_trait::async_trait] -impl CommandHandler for Gatekeeper { - type Error = GatekeeperError; - - async fn handle( - &mut self, - GetRandomPeers { number }: GetRandomPeers, - ) -> Result, Self::Error> { - let peer_list_len = self.peer_list.len(); - - if number > peer_list_len { - return Err(GatekeeperError::InvalidCommand(format!( - "Asked for {number} random peers when the Gatekeeper have {peer_list_len}" - ))); - } - - let mut range: Vec = (0..(peer_list_len as u32)).collect(); - range.shuffle(&mut thread_rng()); - - let iterator = range.iter().take(number); - - let mut peers = Vec::new(); - - for index in iterator { - if let Some(peer) = self.peer_list.get(*index as usize) { - peers.push(*peer); - } - } - - Ok(peers) - } -} - impl IntoFuture for Gatekeeper { type Output = Result<(), GatekeeperError>; @@ -113,12 +69,6 @@ impl IntoFuture for Gatekeeper { break sender; } Some(command) = self.commands.recv() => match command { - GatekeeperCommand::GetAllPeers(command, response_channel) => { - _ = response_channel.send(self.handle(command).await) - }, - GatekeeperCommand::GetRandomPeers(command, response_channel) => { - _ = response_channel.send(self.handle(command).await) - }, GatekeeperCommand::GetAllSubnets(command, response_channel) => { _ = response_channel.send(self.handle(command).await) }, @@ -169,25 +119,9 @@ pub enum GatekeeperError { RegisterCommands!( name = GatekeeperCommand, error = GatekeeperError, - commands = [GetAllPeers, GetRandomPeers, GetAllSubnets] + commands = [GetAllSubnets] ); -#[derive(Debug)] -pub struct GetAllPeers; - -impl Command for GetAllPeers { - type Result = Vec; -} - -#[derive(Debug)] -pub struct GetRandomPeers { - number: usize, -} - -impl Command for GetRandomPeers { - type Result = Vec; -} - #[derive(Debug)] pub struct GetAllSubnets; diff --git a/crates/topos-tce-gatekeeper/src/tests.rs b/crates/topos-tce-gatekeeper/src/tests.rs index c1731e5a2..11df3639c 100644 --- a/crates/topos-tce-gatekeeper/src/tests.rs +++ b/crates/topos-tce-gatekeeper/src/tests.rs @@ -9,10 +9,7 @@ use crate::{client::GatekeeperClient, Gatekeeper}; #[test(tokio::test)] async fn can_start_and_stop() -> Result<(), Box> { - let peer_id = topos_p2p::utils::local_key_pair(Some(99)) - .public() - .to_peer_id(); - let (client, server) = Gatekeeper::builder().local_peer_id(peer_id).await?; + let (client, server) = Gatekeeper::builder().await?; let handler = spawn(server.into_future()); @@ -23,32 +20,9 @@ async fn can_start_and_stop() -> Result<(), Box> { Ok(()) } -#[rstest] -#[test(tokio::test)] -async fn can_fetch_full_or_partial_list(#[future] gatekeeper: GatekeeperClient) { - let gatekeeper = gatekeeper.await; - - assert_eq!(10, gatekeeper.get_all_peers().await.unwrap().len()); - - let first = gatekeeper.get_random_peers(5).await.unwrap(); - assert_eq!(5, first.len()); - - let second = gatekeeper.get_random_peers(5).await.unwrap(); - assert_eq!(5, second.len()); - - assert_ne!(first, second); -} - #[fixture] -async fn gatekeeper>(peer_list: Vec

) -> GatekeeperClient { - let peer_id = topos_p2p::utils::local_key_pair(Some(99)) - .public() - .to_peer_id(); - let (client, server) = Gatekeeper::builder() - .local_peer_id(peer_id) - .peer_list(peer_list.into_iter().map(|p| p.into()).collect()) - .await - .unwrap(); +async fn gatekeeper() -> GatekeeperClient { + let (client, server) = Gatekeeper::builder().await.unwrap(); spawn(server.into_future()); diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 16cf6cb14..516ba2819 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -71,7 +71,6 @@ pub async fn run( let mut boot_peers = config.boot_peers.clone(); boot_peers.retain(|(p, _)| *p != peer_id); - let peer_list = boot_peers.iter().map(|(p, _)| *p).collect::>(); debug!("Starting the Storage"); let path = if let StorageConfiguration::RocksDB(Some(ref path)) = config.storage { path @@ -130,10 +129,8 @@ pub async fn run( debug!("p2p network started"); debug!("Starting the gatekeeper"); - let (gatekeeper_client, gatekeeper_runtime) = topos_tce_gatekeeper::Gatekeeper::builder() - .local_peer_id(peer_id) - .peer_list(peer_list) - .await?; + let (gatekeeper_client, gatekeeper_runtime) = + topos_tce_gatekeeper::Gatekeeper::builder().await?; spawn(gatekeeper_runtime.into_future()); debug!("Gatekeeper started"); diff --git a/crates/topos-test-sdk/src/tce/gatekeeper.rs b/crates/topos-test-sdk/src/tce/gatekeeper.rs index dc0b2a38b..5c72b3b0f 100644 --- a/crates/topos-test-sdk/src/tce/gatekeeper.rs +++ b/crates/topos-test-sdk/src/tce/gatekeeper.rs @@ -8,13 +8,9 @@ use tokio::task::JoinHandle; use topos_tce_gatekeeper::GatekeeperClient; use topos_tce_gatekeeper::GatekeeperError; -pub async fn create_gatekeeper>( - peer_id: P, - peer_list: Vec

, +pub async fn create_gatekeeper( ) -> Result<(GatekeeperClient, JoinHandle>), Box> { let (gatekeeper_client, gatekeeper_runtime) = topos_tce_gatekeeper::Gatekeeper::builder() - .local_peer_id(peer_id.into()) - .peer_list(peer_list.into_iter().map(|p| p.into()).collect()) .await .expect("Can't create the Gatekeeper"); diff --git a/crates/topos-test-sdk/src/tce/mod.rs b/crates/topos-test-sdk/src/tce/mod.rs index 3a931bde9..6e02a7163 100644 --- a/crates/topos-test-sdk/src/tce/mod.rs +++ b/crates/topos-test-sdk/src/tce/mod.rs @@ -213,12 +213,6 @@ pub async fn start_node( let validator_store = create_validator_store(certificates, futures::future::ready(fullnode_store.clone())).await; - let known_peers = peers - .iter() - .map(|p| p.keypair.public().to_peer_id()) - .filter(|&p| p != peer_id) - .collect::>(); - let router = GrpcRouter::new(tonic::transport::Server::builder()).add_service( SynchronizerServiceServer::new(SynchronizerService { validator_store: validator_store.clone(), @@ -257,8 +251,7 @@ pub async fn start_node( ) .await; - let (gatekeeper_client, gatekeeper_join_handle) = - create_gatekeeper(peer_id, known_peers).await.unwrap(); + let (gatekeeper_client, gatekeeper_join_handle) = create_gatekeeper().await.unwrap(); let (synchronizer_stream, synchronizer_join_handle) = create_synchronizer( gatekeeper_client.clone(), From 2f445c70d671b5271f626cbb262fdb4b1a56f5b8 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Fri, 1 Dec 2023 20:46:13 +0100 Subject: [PATCH 4/4] chore: update after review Signed-off-by: Simon Paitrault --- crates/topos-p2p/src/error.rs | 6 +++--- crates/topos-p2p/src/runtime/handle_command.rs | 17 ++++++++--------- .../topos-p2p/src/tests/command/random_peer.rs | 4 ++-- crates/topos-tce-gatekeeper/src/lib.rs | 1 - 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/crates/topos-p2p/src/error.rs b/crates/topos-p2p/src/error.rs index 15a92f2ed..3ef76a25e 100644 --- a/crates/topos-p2p/src/error.rs +++ b/crates/topos-p2p/src/error.rs @@ -52,7 +52,7 @@ pub enum CommandExecutionError { UnableToSendCommand(Command), #[error("Unable to perform query: {0}")] - RequestOutbandFailure(#[from] OutboundFailure), + RequestOutboundFailure(#[from] OutboundFailure), #[error("Unable to receive expected response of a oneshot channel")] UnableToReceiveCommandResponse(#[from] oneshot::error::RecvError), @@ -66,6 +66,6 @@ pub enum CommandExecutionError { #[error("Connection with a peer has failed")] ConnectionClosed, - #[error("Internal error: {0}")] - Internal(String), + #[error("No known peer in the peer set")] + NoKnownPeer, } diff --git a/crates/topos-p2p/src/runtime/handle_command.rs b/crates/topos-p2p/src/runtime/handle_command.rs index 6ca6d9ba6..07aedbfef 100644 --- a/crates/topos-p2p/src/runtime/handle_command.rs +++ b/crates/topos-p2p/src/runtime/handle_command.rs @@ -47,10 +47,7 @@ impl Runtime { Command::RandomKnownPeer { sender } => { if self.peer_set.is_empty() { sender.send(Err(P2PError::CommandError( - CommandExecutionError::Internal( - "Asked for one random peer but there is currently no known peer" - .to_string(), - ), + CommandExecutionError::NoKnownPeer, ))); return; @@ -58,11 +55,13 @@ impl Runtime { let selected_peer: usize = thread_rng().gen_range(0..(self.peer_set.len())); if sender - .send(self.peer_set.iter().nth(selected_peer).cloned().ok_or( - P2PError::CommandError(CommandExecutionError::Internal( - "Unable to find a random peer for RandomKnownPeer command".to_string(), - )), - )) + .send( + self.peer_set + .iter() + .nth(selected_peer) + .cloned() + .ok_or(P2PError::CommandError(CommandExecutionError::NoKnownPeer)), + ) .is_err() { warn!("Unable to notify RandomKnownPeer response: initiator is dropped"); diff --git a/crates/topos-p2p/src/tests/command/random_peer.rs b/crates/topos-p2p/src/tests/command/random_peer.rs index f6f5c9ae5..2cea9e4de 100644 --- a/crates/topos-p2p/src/tests/command/random_peer.rs +++ b/crates/topos-p2p/src/tests/command/random_peer.rs @@ -31,8 +31,8 @@ async fn no_random_peer() { assert!(matches!( result, Err(P2PError::CommandError( - crate::error::CommandExecutionError::Internal(error) - )) if error == "Asked for one random peer but there is currently no known peer" + crate::error::CommandExecutionError::NoKnownPeer + )) )); } diff --git a/crates/topos-tce-gatekeeper/src/lib.rs b/crates/topos-tce-gatekeeper/src/lib.rs index 3ce6c1f65..9c5a1c7a6 100644 --- a/crates/topos-tce-gatekeeper/src/lib.rs +++ b/crates/topos-tce-gatekeeper/src/lib.rs @@ -92,7 +92,6 @@ impl IntoFuture for Gatekeeper { impl Gatekeeper { pub(crate) const DEFAULT_TICK_DURATION: u64 = 10; - #[allow(dead_code)] pub fn builder() -> GatekeeperBuilder { GatekeeperBuilder::default() }