diff --git a/crates/topos-p2p/src/behaviour.rs b/crates/topos-p2p/src/behaviour.rs index 798c43fa8..75b5e6c35 100644 --- a/crates/topos-p2p/src/behaviour.rs +++ b/crates/topos-p2p/src/behaviour.rs @@ -15,6 +15,7 @@ pub(crate) enum HealthStatus { Initializing, Healthy, Unhealthy, + Killing, #[allow(unused)] Recovering, } diff --git a/crates/topos-p2p/src/behaviour/discovery.rs b/crates/topos-p2p/src/behaviour/discovery.rs index 8f195d327..fd2893c36 100644 --- a/crates/topos-p2p/src/behaviour/discovery.rs +++ b/crates/topos-p2p/src/behaviour/discovery.rs @@ -74,6 +74,9 @@ impl DiscoveryBehaviour { Self { inner: kademlia, current_bootstrap_query_id: None, + // If the `discovery` behaviour is created without known_peers + // The bootstrap query interval is disabled only when the local + // node is a lonely bootnode, other nodes will join it. next_bootstrap_query: if known_peers.is_empty() { None } else { diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index c981fcd19..3c0a69218 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -257,7 +257,14 @@ impl NetworkBehaviour for Behaviour { }, gossipsub::Event::Subscribed { peer_id, topic } => { debug!("Subscribed to {:?} with {peer_id}", topic); - if self.health_status != HealthStatus::Healthy { + + // If the behaviour isn't already healthy we check if this event + // triggers a switch to healthy + if self.health_status != HealthStatus::Healthy + && self.gossipsub.topics().all(|topic| { + self.gossipsub.mesh_peers(topic).peekable().peek().is_some() + }) + { self.health_status = HealthStatus::Healthy; } } diff --git a/crates/topos-p2p/src/config.rs b/crates/topos-p2p/src/config.rs index 3fb46b733..0574aa23c 100644 --- a/crates/topos-p2p/src/config.rs +++ b/crates/topos-p2p/src/config.rs @@ -48,17 +48,17 @@ impl Default for DiscoveryConfig { replication_interval: Some(Duration::from_secs(10)), publication_interval: Some(Duration::from_secs(10)), provider_publication_interval: Some(Duration::from_secs(10)), - bootstrap_interval: Duration::from_secs(Self::BOOTSTRAP_INTERVAL), - fast_bootstrap_interval: Duration::from_secs(Self::FAST_BOOTSTRAP_INTERVAL), + bootstrap_interval: Self::BOOTSTRAP_INTERVAL, + fast_bootstrap_interval: Self::FAST_BOOTSTRAP_INTERVAL, } } } impl DiscoveryConfig { /// Default bootstrap interval in seconds - pub const BOOTSTRAP_INTERVAL: u64 = 60; + pub const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(60); /// Default fast bootstrap interval in seconds - pub const FAST_BOOTSTRAP_INTERVAL: u64 = 5; + pub const FAST_BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5); pub fn with_replication_factor(mut self, replication_factor: NonZeroUsize) -> Self { self.replication_factor = replication_factor; diff --git a/crates/topos-p2p/src/error.rs b/crates/topos-p2p/src/error.rs index 09a728bbf..3fe69f0f9 100644 --- a/crates/topos-p2p/src/error.rs +++ b/crates/topos-p2p/src/error.rs @@ -46,6 +46,9 @@ pub enum P2PError { #[error("Unable to create gRPC client")] UnableToCreateGrpcClient(#[from] OutboundConnectionError), + + #[error("Gossip topics subscription failed")] + GossipTopicSubscriptionFailure, } #[derive(Error, Debug)] diff --git a/crates/topos-p2p/src/event.rs b/crates/topos-p2p/src/event.rs index 5f641729f..c311d8977 100644 --- a/crates/topos-p2p/src/event.rs +++ b/crates/topos-p2p/src/event.rs @@ -54,12 +54,15 @@ pub enum Event { Healthy, /// An event emitted when the p2p layer becomes unhealthy Unhealthy, + /// An event emitted when the p2p layer is shutting down + Killing, } impl From<&HealthStatus> for Event { fn from(value: &HealthStatus) -> Self { match value { HealthStatus::Healthy => Event::Healthy, + HealthStatus::Killing => Event::Killing, _ => Event::Unhealthy, } } diff --git a/crates/topos-p2p/src/runtime/handle_event.rs b/crates/topos-p2p/src/runtime/handle_event.rs index 411d40252..91ebbad92 100644 --- a/crates/topos-p2p/src/runtime/handle_event.rs +++ b/crates/topos-p2p/src/runtime/handle_event.rs @@ -126,7 +126,8 @@ impl if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size { if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() { error!("Unable to subscribe to gossipsub topic: {}", error); - // TODO: Deal with initial subscribe error + + return Err(P2PError::GossipTopicSubscriptionFailure); } } } diff --git a/crates/topos-p2p/src/runtime/mod.rs b/crates/topos-p2p/src/runtime/mod.rs index 69ed30592..8e25dfff5 100644 --- a/crates/topos-p2p/src/runtime/mod.rs +++ b/crates/topos-p2p/src/runtime/mod.rs @@ -29,7 +29,7 @@ pub struct Runtime { pub(crate) listening_on: Vec, pub(crate) public_addresses: Vec, - /// Boot peers to connect used to bootstrap the p2p layer + /// Well-known or pre-configured bootnodes to connect to in order to bootstrap the p2p layer pub(crate) boot_peers: Vec, /// Contains current listenerId of the swarm @@ -44,6 +44,7 @@ pub struct Runtime { /// Internal health state of the p2p layer pub(crate) health_state: HealthState, + /// Health status of the p2p layer pub(crate) health_status: HealthStatus, } @@ -153,6 +154,7 @@ impl Runtime { let discovery = &behaviours.discovery.health_status; let new_status = match (discovery, gossipsub) { + (HealthStatus::Killing, _) | (_, HealthStatus::Killing) => HealthStatus::Killing, (HealthStatus::Initializing, _) | (_, HealthStatus::Initializing) => { HealthStatus::Initializing } diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs index 5857dd91e..c6ebb71cb 100644 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs +++ b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs @@ -16,8 +16,6 @@ use topos_test_sdk::{ }; use uuid::Uuid; -mod mock; - use crate::SynchronizerService; #[rstest] diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration/mock.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration/mock.rs deleted file mode 100644 index 5b9158a0b..000000000 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration/mock.rs +++ /dev/null @@ -1,28 +0,0 @@ -use async_trait::async_trait; -use tonic::Request; -use topos_core::api::grpc::tce::v1::{ - synchronizer_service_server::SynchronizerService, CheckpointRequest, FetchCertificatesRequest, -}; - -struct MockSynchronizerServer {} - -#[async_trait] -impl SynchronizerService for MockSynchronizerServer { - async fn fetch_checkpoint( - &self, - _request: Request, - ) -> Result, tonic::Status> - { - todo!() - } - - async fn fetch_certificates( - &self, - _request: Request, - ) -> Result< - tonic::Response, - tonic::Status, - > { - todo!() - } -}