Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

feat: refactor peer selection for synchronization #382

Merged
merged 4 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/topos-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ libp2p = { workspace = true, features = ["macros", "gossipsub", "tcp", "dns", "t
pin-project = "1.1.3"
libp2p-swarm-test = "0.2.0"
prometheus-client.workspace = true
rand.workspace = true
serde = { workspace = true, features = ["derive"] }
smallvec = "1.11.1"
thiserror.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ impl NetworkClient {
.await
}

pub async fn random_known_peer(&self) -> Result<PeerId, P2PError> {
let (sender, receiver) = oneshot::channel();
Self::send_command_with_receiver(
&self.sender,
Command::RandomKnownPeer { sender },
receiver,
)
.await
}

pub async fn disconnect(&self) -> Result<(), P2PError> {
let (sender, receiver) = oneshot::channel();
let command = Command::Disconnect { sender };
Expand Down
6 changes: 6 additions & 0 deletions crates/topos-p2p/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ pub enum Command {
id: uuid::Uuid,
response: oneshot::Sender<OutboundConnection>,
},

/// Ask for a random known peer
RandomKnownPeer {
sender: oneshot::Sender<Result<PeerId, P2PError>>,
},
}

impl Display for Command {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Command::StartListening { .. } => write!(f, "StartListening"),
Command::ConnectedPeers { .. } => write!(f, "ConnectedPeers"),
Command::RandomKnownPeer { .. } => write!(f, "RandomKnownPeer"),
Command::Disconnect { .. } => write!(f, "Disconnect"),
Command::Gossip { .. } => write!(f, "GossipMessage"),
Command::NewProxiedQuery { .. } => write!(f, "NewProxiedQuery"),
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ pub enum CommandExecutionError {

#[error("Connection with a peer has failed")]
ConnectionClosed,

Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
#[error("Internal error: {0}")]
Internal(String),
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
}
32 changes: 31 additions & 1 deletion crates/topos-p2p/src/runtime/handle_command.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use crate::{error::P2PError, protocol_name, Command, Runtime};
use std::collections::hash_map::Entry;

use crate::{
error::{CommandExecutionError, P2PError},
protocol_name, Command, Runtime,
};
use libp2p::{kad::record::Key, PeerId};
use rand::{seq::SliceRandom, thread_rng, Rng};
use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL;
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -38,6 +44,30 @@ impl Runtime {
warn!("Unable to notify ConnectedPeers response: initiator is dropped");
}
}
Command::RandomKnownPeer { sender } => {
if self.peer_set.is_empty() {
sender.send(Err(P2PError::CommandError(
CommandExecutionError::Internal(
"Asked for one random peer but there is currently no known peer"
.to_string(),
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
),
)));

return;
}

let selected_peer: usize = thread_rng().gen_range(0..(self.peer_set.len()));
if sender
.send(self.peer_set.iter().nth(selected_peer).cloned().ok_or(
P2PError::CommandError(CommandExecutionError::Internal(
"Unable to find a random peer for RandomKnownPeer command".to_string(),
)),
))
.is_err()
{
warn!("Unable to notify RandomKnownPeer response: initiator is dropped");
}
}

Command::Disconnect { sender } if self.swarm.listeners().count() == 0 => {
if sender.send(Err(P2PError::AlreadyDisconnected)).is_err() {
Expand Down
1 change: 1 addition & 0 deletions crates/topos-p2p/src/tests/command/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod random_peer;
99 changes: 99 additions & 0 deletions crates/topos-p2p/src/tests/command/random_peer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::time::Duration;

use rstest::rstest;
use test_log::test;
use tokio::spawn;
use topos_test_sdk::tce::NodeConfig;

use crate::error::P2PError;

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(5))]
async fn no_random_peer() {
let local = NodeConfig::from_seed(1);

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.build()
.await
.expect("Unable to create p2p network");

let mut runtime = runtime.bootstrap().await.unwrap();

spawn(runtime.run());

let result = client.random_known_peer().await;

assert!(result.is_err());
assert!(matches!(
result,
Err(P2PError::CommandError(
crate::error::CommandExecutionError::Internal(error)
)) if error == "Asked for one random peer but there is currently no known peer"
));
}

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(5))]
async fn return_a_peer() {
let local = NodeConfig::from_seed(1);
let expected = NodeConfig::from_seed(2);
let expected_peer_id = expected.keypair.public().to_peer_id();

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.build()
.await
.expect("Unable to create p2p network");

let mut runtime = runtime.bootstrap().await.unwrap();

runtime.peer_set.insert(expected_peer_id);

spawn(runtime.run());

let result = client.random_known_peer().await;

assert!(result.is_ok());
assert!(matches!(
result,
Ok(peer) if peer == expected_peer_id
));
}

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(5))]
async fn return_a_random_peer_among_100() {
let local = NodeConfig::from_seed(1);

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.build()
.await
.expect("Unable to create p2p network");

let mut runtime = runtime.bootstrap().await.unwrap();

for i in 2..=100 {
let peer = NodeConfig::from_seed(i);
runtime.peer_set.insert(peer.keypair.public().to_peer_id());
}

spawn(runtime.run());

let first_try = client.random_known_peer().await.unwrap();
let second_try = client.random_known_peer().await.unwrap();
let third_try = client.random_known_peer().await.unwrap();

assert!(first_try != second_try);
assert!(first_try != third_try);
}
1 change: 1 addition & 0 deletions crates/topos-p2p/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod behaviour;
mod command;
mod dht;
mod support;
23 changes: 1 addition & 22 deletions crates/topos-tce-gatekeeper/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,7 @@ use topos_p2p::PeerId;
use crate::{client::GatekeeperClient, Gatekeeper, GatekeeperError};

#[derive(Default)]
pub struct GatekeeperBuilder {
local_peer_id: Option<PeerId>,
local_peer_list: Option<Vec<PeerId>>,
}

impl GatekeeperBuilder {
pub fn local_peer_id(mut self, peer_id: PeerId) -> Self {
self.local_peer_id = Some(peer_id);

self
}

/// Start the Gatekeeper with a set of known peers in the network
/// The Sync service for example is making use of this list to ask for
/// random peers
pub fn peer_list(mut self, peer_list: Vec<PeerId>) -> Self {
self.local_peer_list = Some(peer_list);

self
}
}
pub struct GatekeeperBuilder {}
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved

impl IntoFuture for GatekeeperBuilder {
type Output = Result<(GatekeeperClient, Gatekeeper), GatekeeperError>;
Expand All @@ -45,7 +25,6 @@ impl IntoFuture for GatekeeperBuilder {
},
Gatekeeper {
shutdown,
peer_list: self.local_peer_list.unwrap_or_default(),
commands: commands_recv,
..Gatekeeper::default()
},
Expand Down
10 changes: 1 addition & 9 deletions crates/topos-tce-gatekeeper/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{GatekeeperCommand, GatekeeperError, GetAllPeers, GetAllSubnets, GetRandomPeers};
use crate::{GatekeeperCommand, GatekeeperError, GetAllSubnets};
use tokio::sync::{mpsc, oneshot};
use topos_core::uci::SubnetId;
use topos_p2p::PeerId;
Expand All @@ -20,14 +20,6 @@ impl GatekeeperClient {
Ok(receiver.await?)
}

pub async fn get_random_peers(&self, number: usize) -> Result<Vec<PeerId>, GatekeeperError> {
GetRandomPeers { number }.send_to(&self.commands).await
}

pub async fn get_all_peers(&self) -> Result<Vec<PeerId>, GatekeeperError> {
GetAllPeers.send_to(&self.commands).await
}

pub async fn get_all_subnets(&self) -> Result<Vec<SubnetId>, GatekeeperError> {
GetAllSubnets.send_to(&self.commands).await
}
Expand Down
68 changes: 1 addition & 67 deletions crates/topos-tce-gatekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pub struct Gatekeeper {
pub(crate) commands: mpsc::Receiver<GatekeeperCommand>,
pub(crate) tick_duration: Duration,

peer_list: Vec<PeerId>,
subnet_list: Vec<SubnetId>,
}

Expand All @@ -40,21 +39,11 @@ impl Default for Gatekeeper {
shutdown,
commands: commands_recv,
tick_duration,
peer_list: Vec::default(),
subnet_list: Vec::default(),
}
}
}

#[async_trait::async_trait]
impl CommandHandler<GetAllPeers> for Gatekeeper {
type Error = GatekeeperError;

async fn handle(&mut self, _command: GetAllPeers) -> Result<Vec<PeerId>, Self::Error> {
Ok(self.peer_list.clone())
}
}

#[async_trait::async_trait]
impl CommandHandler<GetAllSubnets> for Gatekeeper {
type Error = GatekeeperError;
Expand All @@ -64,39 +53,6 @@ impl CommandHandler<GetAllSubnets> for Gatekeeper {
}
}

#[async_trait::async_trait]
impl CommandHandler<GetRandomPeers> for Gatekeeper {
type Error = GatekeeperError;

async fn handle(
&mut self,
GetRandomPeers { number }: GetRandomPeers,
) -> Result<Vec<PeerId>, Self::Error> {
let peer_list_len = self.peer_list.len();

if number > peer_list_len {
return Err(GatekeeperError::InvalidCommand(format!(
"Asked for {number} random peers when the Gatekeeper have {peer_list_len}"
)));
}

let mut range: Vec<u32> = (0..(peer_list_len as u32)).collect();
range.shuffle(&mut thread_rng());

let iterator = range.iter().take(number);

let mut peers = Vec::new();

for index in iterator {
if let Some(peer) = self.peer_list.get(*index as usize) {
peers.push(*peer);
}
}

Ok(peers)
}
}

impl IntoFuture for Gatekeeper {
type Output = Result<(), GatekeeperError>;

Expand All @@ -113,12 +69,6 @@ impl IntoFuture for Gatekeeper {
break sender;
}
Some(command) = self.commands.recv() => match command {
GatekeeperCommand::GetAllPeers(command, response_channel) => {
_ = response_channel.send(self.handle(command).await)
},
GatekeeperCommand::GetRandomPeers(command, response_channel) => {
_ = response_channel.send(self.handle(command).await)
},
GatekeeperCommand::GetAllSubnets(command, response_channel) => {
_ = response_channel.send(self.handle(command).await)
},
Expand Down Expand Up @@ -169,25 +119,9 @@ pub enum GatekeeperError {
RegisterCommands!(
name = GatekeeperCommand,
error = GatekeeperError,
commands = [GetAllPeers, GetRandomPeers, GetAllSubnets]
commands = [GetAllSubnets]
);

#[derive(Debug)]
pub struct GetAllPeers;

impl Command for GetAllPeers {
type Result = Vec<PeerId>;
}

#[derive(Debug)]
pub struct GetRandomPeers {
number: usize,
}

impl Command for GetRandomPeers {
type Result = Vec<PeerId>;
}

#[derive(Debug)]
pub struct GetAllSubnets;

Expand Down
Loading
Loading