Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: create health mechanism P2P
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd committed Mar 1, 2024
1 parent 9f81725 commit 93d26bc
Show file tree
Hide file tree
Showing 25 changed files with 712 additions and 549 deletions.
1 change: 1 addition & 0 deletions crates/topos-config/src/edge/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl CommandConfig {
}

pub async fn spawn(self) -> Result<ExitStatus, std::io::Error> {
info!("Spawning Polygon Edge with args: {:?}", self.binary_path);
let mut command = Command::new(self.binary_path);
command.kill_on_drop(true);
command.args(self.args);
Expand Down
9 changes: 9 additions & 0 deletions crates/topos-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ pub(crate) mod grpc;
pub(crate) mod peer_info;
pub(crate) mod topos;

#[derive(Default, PartialEq, Eq)]
pub(crate) enum HealthStatus {
#[default]
Initializing,
Healthy,
Unhealthy,
Recovering,
}

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "ComposedEvent")]
pub(crate) struct Behaviour {
Expand Down
35 changes: 14 additions & 21 deletions crates/topos-p2p/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use crate::error::P2PError;
use crate::{config::DiscoveryConfig, error::CommandExecutionError};
Expand All @@ -18,6 +19,8 @@ use libp2p::{
use tokio::sync::oneshot;
use tracing::{debug, error, info};

use super::HealthStatus;

pub type PendingRecordRequest = oneshot::Sender<Result<Vec<Multiaddr>, CommandExecutionError>>;

/// DiscoveryBehaviour is responsible to discover and manage connections with peers
Expand All @@ -29,6 +32,8 @@ pub(crate) struct DiscoveryBehaviour {
pub(crate) current_bootstrap_query_id: Option<QueryId>,
/// The next bootstrap query interval used to schedule the next bootstrap query
pub(crate) next_bootstrap_query: Option<Pin<Box<tokio::time::Interval>>>,

pub(crate) health_status: HealthStatus,
}

impl DiscoveryBehaviour {
Expand Down Expand Up @@ -70,6 +75,7 @@ impl DiscoveryBehaviour {
inner: kademlia,
current_bootstrap_query_id: None,
next_bootstrap_query: Some(Box::pin(tokio::time::interval(config.bootstrap_interval))),
health_status: Default::default(),
}
}

Expand All @@ -79,33 +85,20 @@ impl DiscoveryBehaviour {
/// Then multiple random PeerId are created in order to randomly walk the network.
pub fn bootstrap(&mut self) -> Result<(), P2PError> {
if self.current_bootstrap_query_id.is_none() {
match self.inner.bootstrap() {
Ok(query_id) => {
info!("Started kademlia bootstrap with query_id: {query_id:?}");
self.current_bootstrap_query_id = Some(query_id);
}
Err(error) => {
error!("Unable to start kademlia bootstrap: {error:?}");
return Err(P2PError::BootstrapError(
"Unable to start kademlia bootstrap",
));
}
}
let query_id = self.inner.bootstrap()?;
info!("Started kademlia bootstrap query with query_id: {query_id:?}");
self.current_bootstrap_query_id = Some(query_id);
}

Ok(())
}

pub fn get_addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
if let Some(key_ref) = self.inner.kbucket(*peer_id) {
key_ref
.iter()
.filter(|e| e.node.key.preimage() == peer_id)
.map(|e| e.node.value.first().clone())
.collect()
} else {
Vec::new()
pub fn change_interval(&mut self, duration: Duration) -> Result<(), P2PError> {
if let Some(interval) = self.next_bootstrap_query.as_mut() {
interval.set(tokio::time::interval(duration));
}

Ok(())
}
}

Expand Down
152 changes: 89 additions & 63 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, VecDeque},
env,
task::Poll,
time::Duration,
};

use libp2p::swarm::{ConnectionClosed, FromSwarm};
use libp2p::PeerId;
use libp2p::{
gossipsub::{self, IdentTopic, Message, MessageAuthenticity, MessageId},
gossipsub::{self, IdentTopic, Message, MessageAuthenticity},
identity::Keypair,
swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm},
};
use prost::Message as ProstMessage;
use topos_core::api::grpc::tce::v1::Batch;
use topos_metrics::{P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL, P2P_GOSSIP_BATCH_SIZE};
use topos_metrics::P2P_GOSSIP_BATCH_SIZE;
use tracing::{debug, error};

use crate::error::P2PError;
use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};

use super::HealthStatus;

const MAX_BATCH_SIZE: usize = 10;

pub struct Behaviour {
batch_size: usize,
gossipsub: gossipsub::Behaviour,
pending: HashMap<&'static str, VecDeque<Vec<u8>>>,
tick: tokio::time::Interval,
cache: HashSet<MessageId>,
connected_peer: HashMap<&'static str, HashSet<PeerId>>,
pub(crate) healthy_status: HealthStatus,
}

impl Behaviour {
Expand All @@ -48,18 +55,15 @@ impl Behaviour {
Ok(0)
}

pub fn subscribe(&mut self) -> Result<(), &'static str> {
pub fn subscribe(&mut self) -> Result<(), P2PError> {
self.gossipsub
.subscribe(&gossipsub::IdentTopic::new(TOPOS_GOSSIP))
.unwrap();
.subscribe(&gossipsub::IdentTopic::new(TOPOS_GOSSIP))?;

self.gossipsub
.subscribe(&gossipsub::IdentTopic::new(TOPOS_ECHO))
.unwrap();
.subscribe(&gossipsub::IdentTopic::new(TOPOS_ECHO))?;

self.gossipsub
.subscribe(&gossipsub::IdentTopic::new(TOPOS_READY))
.unwrap();
.subscribe(&gossipsub::IdentTopic::new(TOPOS_READY))?;

Ok(())
}
Expand Down Expand Up @@ -107,7 +111,9 @@ impl Behaviour {
.unwrap_or(Ok(100))
.unwrap(),
)),
cache: HashSet::new(),

connected_peer: Default::default(),
healthy_status: Default::default(),
}
}
}
Expand Down Expand Up @@ -148,6 +154,25 @@ impl NetworkBehaviour for Behaviour {
}

fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<Self::ConnectionHandler>) {
match &event {
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
remaining_established,
..
}) => {
debug!(
"Connection closed: {:?} {:?} {:?} {:?}",
peer_id, connection_id, endpoint, remaining_established
);

for (_, mut topic) in self.connected_peer.iter_mut() {
topic.remove(peer_id);
}
}
_ => {}
}
self.gossipsub.on_swarm_event(event)
}

Expand Down Expand Up @@ -188,7 +213,58 @@ impl NetworkBehaviour for Behaviour {

let event = match self.gossipsub.poll(cx, params) {
Poll::Pending => return Poll::Pending,
Poll::Ready(ToSwarm::GenerateEvent(event)) => Some(event),
Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
gossipsub::Event::Message {
propagation_source,
message_id,
message:
Message {
source,
data,
topic,
..
},
} => match topic.as_str() {
TOPOS_GOSSIP => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
topic: TOPOS_GOSSIP,
message: data,
source,
},
)))
}
TOPOS_ECHO => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
topic: TOPOS_ECHO,
message: data,
source,
},
)))
}
TOPOS_READY => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
topic: TOPOS_READY,
message: data,
source,
},
)))
}
_ => {}
},
gossipsub::Event::Subscribed { peer_id, topic } => {
debug!("Subscribed to {:?} with {peer_id}", topic);
self.healthy_status = HealthStatus::Healthy;
}
gossipsub::Event::Unsubscribed { peer_id, topic } => {
debug!("Unsubscribed from {:?} with {peer_id}", topic);
}
gossipsub::Event::GossipsubNotSupported { peer_id } => {
debug!("Gossipsub not supported by {:?}", peer_id);
}
},
Poll::Ready(ToSwarm::ListenOn { opts }) => {
return Poll::Ready(ToSwarm::ListenOn { opts })
}
Expand Down Expand Up @@ -227,56 +303,6 @@ impl NetworkBehaviour for Behaviour {
}
};

if let Some(gossipsub::Event::Message { ref message_id, .. }) = event {
if self.cache.contains(message_id) {
P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL.inc();
}
}

if let Some(gossipsub::Event::Message {
propagation_source,
message_id,
message:
Message {
source,
data,
sequence_number,
topic,
},
}) = event
{
match topic.as_str() {
TOPOS_GOSSIP => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent {
topic: TOPOS_GOSSIP,
message: data,
source,
},
)))
}
TOPOS_ECHO => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent {
topic: TOPOS_ECHO,
message: data,
source,
},
)))
}
TOPOS_READY => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent {
topic: TOPOS_READY,
message: data,
source,
},
)))
}
_ => {}
}
}

Poll::Pending
}
}
7 changes: 0 additions & 7 deletions crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ impl NetworkClient {
.await
}

pub async fn disconnect(&self) -> Result<(), P2PError> {
let (sender, receiver) = oneshot::channel();
let command = Command::Disconnect { sender };

Self::send_command_with_receiver(&self.sender, command, receiver).await
}

pub fn publish<T: std::fmt::Debug + prost::Message + 'static>(
&self,
topic: &'static str,
Expand Down
19 changes: 2 additions & 17 deletions crates/topos-p2p/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::fmt::Display;

use libp2p::{Multiaddr, PeerId};
use libp2p::PeerId;
use tokio::sync::oneshot;

use crate::{
behaviour::grpc::connection::OutboundConnection,
error::{CommandExecutionError, P2PError},
};
use crate::{behaviour::grpc::connection::OutboundConnection, error::P2PError};

#[derive(Debug)]
pub enum Command {
Expand All @@ -15,16 +12,6 @@ pub enum Command {
sender: oneshot::Sender<Result<Vec<PeerId>, P2PError>>,
},

/// Disconnect the node
Disconnect {
sender: oneshot::Sender<Result<(), P2PError>>,
},

/// Try to discover a peer based on its PeerId
Discover {
to: PeerId,
sender: oneshot::Sender<Result<Vec<Multiaddr>, CommandExecutionError>>,
},
Gossip {
topic: &'static str,
data: Vec<u8>,
Expand Down Expand Up @@ -52,10 +39,8 @@ impl Display for Command {
match self {
Command::ConnectedPeers { .. } => write!(f, "ConnectedPeers"),
Command::RandomKnownPeer { .. } => write!(f, "RandomKnownPeer"),
Command::Disconnect { .. } => write!(f, "Disconnect"),
Command::Gossip { .. } => write!(f, "GossipMessage"),
Command::NewProxiedQuery { .. } => write!(f, "NewProxiedQuery"),
Command::Discover { to, .. } => write!(f, "Discover(to: {to})"),
}
}
}
Loading

0 comments on commit 93d26bc

Please sign in to comment.