Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

feat: add p2p layer health check #464

Merged
merged 5 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker_build_push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
workflow_file_name: topos:integration-tests.yml
ref: main
wait_interval: 60
client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}" }'
client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}" }'

frontend-erc20-e2e:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/topos-config/src/edge/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl CommandConfig {
}

pub async fn spawn(self) -> Result<ExitStatus, std::io::Error> {
info!("Spawning Polygon Edge with args: {:?}", self.binary_path);
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
let mut command = Command::new(self.binary_path);
command.kill_on_drop(true);
command.args(self.args);
Expand Down
12 changes: 12 additions & 0 deletions crates/topos-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ pub(crate) mod grpc;
pub(crate) mod peer_info;
pub(crate) mod topos;

/// Represents the health status of a behaviour inside the p2p layer
#[derive(Default, PartialEq, Eq)]
pub(crate) enum HealthStatus {
#[default]
Initializing,
Healthy,
Unhealthy,
Killing,
#[allow(unused)]
Recovering,
}

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "ComposedEvent")]
pub(crate) struct Behaviour {
Expand Down
49 changes: 27 additions & 22 deletions crates/topos-p2p/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use crate::error::P2PError;
use crate::{config::DiscoveryConfig, error::CommandExecutionError};
Expand All @@ -18,6 +19,8 @@ use libp2p::{
use tokio::sync::oneshot;
use tracing::{debug, error, info};

use super::HealthStatus;

pub type PendingRecordRequest = oneshot::Sender<Result<Vec<Multiaddr>, CommandExecutionError>>;

/// DiscoveryBehaviour is responsible to discover and manage connections with peers
Expand All @@ -29,6 +32,8 @@ pub(crate) struct DiscoveryBehaviour {
pub(crate) current_bootstrap_query_id: Option<QueryId>,
/// The next bootstrap query interval used to schedule the next bootstrap query
pub(crate) next_bootstrap_query: Option<Pin<Box<tokio::time::Interval>>>,
/// The health status of the discovery behaviour
pub(crate) health_status: HealthStatus,
}

impl DiscoveryBehaviour {
Expand Down Expand Up @@ -69,7 +74,19 @@ impl DiscoveryBehaviour {
Self {
inner: kademlia,
current_bootstrap_query_id: None,
next_bootstrap_query: Some(Box::pin(tokio::time::interval(config.bootstrap_interval))),
// If the `discovery` behaviour is created without known_peers
// The bootstrap query interval is disabled only when the local
// node is a lonely bootnode, other nodes will join it.
next_bootstrap_query: if known_peers.is_empty() {
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
None
} else {
Some(Box::pin(tokio::time::interval(config.bootstrap_interval)))
},
health_status: if known_peers.is_empty() {
HealthStatus::Healthy
} else {
HealthStatus::Initializing
},
}
}

Expand All @@ -79,33 +96,21 @@ impl DiscoveryBehaviour {
/// Then multiple random PeerId are created in order to randomly walk the network.
pub fn bootstrap(&mut self) -> Result<(), P2PError> {
if self.current_bootstrap_query_id.is_none() {
match self.inner.bootstrap() {
Ok(query_id) => {
info!("Started kademlia bootstrap with query_id: {query_id:?}");
self.current_bootstrap_query_id = Some(query_id);
}
Err(error) => {
error!("Unable to start kademlia bootstrap: {error:?}");
return Err(P2PError::BootstrapError(
"Unable to start kademlia bootstrap",
));
}
}
let query_id = self.inner.bootstrap()?;
debug!("Started kademlia bootstrap query with query_id: {query_id:?}");
self.current_bootstrap_query_id = Some(query_id);
}

Ok(())
}

pub fn get_addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
if let Some(key_ref) = self.inner.kbucket(*peer_id) {
key_ref
.iter()
.filter(|e| e.node.key.preimage() == peer_id)
.map(|e| e.node.value.first().clone())
.collect()
} else {
Vec::new()
/// Change the interval of the next bootstrap queries
pub fn change_interval(&mut self, duration: Duration) -> Result<(), P2PError> {
if let Some(interval) = self.next_bootstrap_query.as_mut() {
interval.set(tokio::time::interval(duration));
}

Ok(())
}
}

Expand Down
165 changes: 100 additions & 65 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,41 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, VecDeque},
env,
task::Poll,
time::Duration,
};

use libp2p::swarm::{ConnectionClosed, FromSwarm};
use libp2p::PeerId;
use libp2p::{
gossipsub::{self, IdentTopic, Message, MessageAuthenticity, MessageId},
gossipsub::{self, IdentTopic, Message, MessageAuthenticity},
identity::Keypair,
swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm},
};
use prost::Message as ProstMessage;
use topos_core::api::grpc::tce::v1::Batch;
use topos_metrics::{P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL, P2P_GOSSIP_BATCH_SIZE};
use topos_metrics::P2P_GOSSIP_BATCH_SIZE;
use tracing::{debug, error, warn};

use crate::error::P2PError;
use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};

use super::HealthStatus;

const MAX_BATCH_SIZE: usize = 10;

pub struct Behaviour {
batch_size: usize,
gossipsub: gossipsub::Behaviour,
pending: HashMap<&'static str, VecDeque<Vec<u8>>>,
tick: tokio::time::Interval,
cache: HashSet<MessageId>,
/// List of connected peers per topic.
connected_peer: HashMap<&'static str, HashSet<PeerId>>,
/// The health status of the gossip behaviour
pub(crate) health_status: HealthStatus,
}

impl Behaviour {
Expand All @@ -48,18 +57,15 @@ impl Behaviour {
Ok(0)
}

pub fn subscribe(&mut self) -> Result<(), &'static str> {
pub fn subscribe(&mut self) -> Result<(), P2PError> {
self.gossipsub
.subscribe(&gossipsub::IdentTopic::new(TOPOS_GOSSIP))
.unwrap();
.subscribe(&gossipsub::IdentTopic::new(TOPOS_GOSSIP))?;

self.gossipsub
.subscribe(&gossipsub::IdentTopic::new(TOPOS_ECHO))
.unwrap();
.subscribe(&gossipsub::IdentTopic::new(TOPOS_ECHO))?;

self.gossipsub
.subscribe(&gossipsub::IdentTopic::new(TOPOS_READY))
.unwrap();
.subscribe(&gossipsub::IdentTopic::new(TOPOS_READY))?;

Ok(())
}
Expand Down Expand Up @@ -107,7 +113,9 @@ impl Behaviour {
.unwrap_or(Ok(100))
.unwrap(),
)),
cache: HashSet::new(),

connected_peer: Default::default(),
health_status: Default::default(),
}
}
}
Expand Down Expand Up @@ -148,6 +156,24 @@ impl NetworkBehaviour for Behaviour {
}

fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
if let FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
remaining_established,
..
}) = &event
{
debug!(
"Connection closed: {:?} {:?} {:?} {:?}",
peer_id, connection_id, endpoint, remaining_established
);

for (_, topic) in self.connected_peer.iter_mut() {
topic.remove(peer_id);
}
}

self.gossipsub.on_swarm_event(event)
}

Expand Down Expand Up @@ -185,9 +211,69 @@ impl NetworkBehaviour for Behaviour {
}
}

let event = match self.gossipsub.poll(cx) {
match self.gossipsub.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(ToSwarm::GenerateEvent(event)) => Some(event),
Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
gossipsub::Event::Message {
propagation_source,
message_id,
message:
Message {
source,
data,
topic,
..
},
} => match topic.as_str() {
TOPOS_GOSSIP => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
topic: TOPOS_GOSSIP,
message: data,
source,
},
)))
}
TOPOS_ECHO => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
topic: TOPOS_ECHO,
message: data,
source,
},
)))
}
TOPOS_READY => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
topic: TOPOS_READY,
message: data,
source,
},
)))
}
_ => {}
},
gossipsub::Event::Subscribed { peer_id, topic } => {
debug!("Subscribed to {:?} with {peer_id}", topic);

// If the behaviour isn't already healthy we check if this event
// triggers a switch to healthy
if self.health_status != HealthStatus::Healthy
&& self.gossipsub.topics().all(|topic| {
self.gossipsub.mesh_peers(topic).peekable().peek().is_some()
})
{
self.health_status = HealthStatus::Healthy;
}
}
gossipsub::Event::Unsubscribed { peer_id, topic } => {
debug!("Unsubscribed from {:?} with {peer_id}", topic);
}
gossipsub::Event::GossipsubNotSupported { peer_id } => {
debug!("Gossipsub not supported by {:?}", peer_id);
}
},
Poll::Ready(ToSwarm::ListenOn { opts }) => {
return Poll::Ready(ToSwarm::ListenOn { opts })
}
Expand Down Expand Up @@ -226,57 +312,6 @@ impl NetworkBehaviour for Behaviour {
}
Poll::Ready(event) => {
warn!("Unhandled event in gossip behaviour: {:?}", event);
None
}
};

if let Some(gossipsub::Event::Message { ref message_id, .. }) = event {
if self.cache.contains(message_id) {
P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL.inc();
}
}

if let Some(gossipsub::Event::Message {
propagation_source,
message_id,
message:
Message {
source,
data,
sequence_number,
topic,
},
}) = event
{
match topic.as_str() {
TOPOS_GOSSIP => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent {
topic: TOPOS_GOSSIP,
message: data,
source,
},
)))
}
TOPOS_ECHO => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent {
topic: TOPOS_ECHO,
message: data,
source,
},
)))
}
TOPOS_READY => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent {
topic: TOPOS_READY,
message: data,
source,
},
)))
}
_ => {}
}
}

Expand Down
7 changes: 0 additions & 7 deletions crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ impl NetworkClient {
.await
}

pub async fn disconnect(&self) -> Result<(), P2PError> {
let (sender, receiver) = oneshot::channel();
let command = Command::Disconnect { sender };

Self::send_command_with_receiver(&self.sender, command, receiver).await
}

pub fn publish<T: std::fmt::Debug + prost::Message + 'static>(
&self,
topic: &'static str,
Expand Down
Loading
Loading