Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: fixing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd committed Mar 12, 2024
1 parent 442a900 commit 3f3afaf
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 37 deletions.
1 change: 1 addition & 0 deletions crates/topos-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(crate) enum HealthStatus {
Initializing,
Healthy,
Unhealthy,
Killing,
#[allow(unused)]
Recovering,
}
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ impl DiscoveryBehaviour {
Self {
inner: kademlia,
current_bootstrap_query_id: None,
// If the `discovery` behaviour is created without known_peers
// The bootstrap query interval is disabled only when the local
// node is a lonely bootnode, other nodes will join it.
next_bootstrap_query: if known_peers.is_empty() {
None
} else {
Expand Down
9 changes: 8 additions & 1 deletion crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,14 @@ impl NetworkBehaviour for Behaviour {
},
gossipsub::Event::Subscribed { peer_id, topic } => {
debug!("Subscribed to {:?} with {peer_id}", topic);
if self.health_status != HealthStatus::Healthy {

// If the behaviour isn't already healthy we check if this event
// triggers a switch to healthy
if self.health_status != HealthStatus::Healthy
&& self.gossipsub.topics().all(|topic| {
self.gossipsub.mesh_peers(topic).peekable().peek().is_some()
})
{
self.health_status = HealthStatus::Healthy;
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/topos-p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ impl Default for DiscoveryConfig {
replication_interval: Some(Duration::from_secs(10)),
publication_interval: Some(Duration::from_secs(10)),
provider_publication_interval: Some(Duration::from_secs(10)),
bootstrap_interval: Duration::from_secs(Self::BOOTSTRAP_INTERVAL),
fast_bootstrap_interval: Duration::from_secs(Self::FAST_BOOTSTRAP_INTERVAL),
bootstrap_interval: Self::BOOTSTRAP_INTERVAL,
fast_bootstrap_interval: Self::FAST_BOOTSTRAP_INTERVAL,
}
}
}

impl DiscoveryConfig {
/// Default bootstrap interval in seconds
pub const BOOTSTRAP_INTERVAL: u64 = 60;
pub const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(60);
/// Default fast bootstrap interval in seconds
pub const FAST_BOOTSTRAP_INTERVAL: u64 = 5;
pub const FAST_BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5);

pub fn with_replication_factor(mut self, replication_factor: NonZeroUsize) -> Self {
self.replication_factor = replication_factor;
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub enum P2PError {

#[error("Unable to create gRPC client")]
UnableToCreateGrpcClient(#[from] OutboundConnectionError),

#[error("Gossip topics subscription failed")]
GossipTopicSubscriptionFailure,
}

#[derive(Error, Debug)]
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ pub enum Event {
Healthy,
/// An event emitted when the p2p layer becomes unhealthy
Unhealthy,
/// An event emitted when the p2p layer is shutting down
Killing,
}

impl From<&HealthStatus> for Event {
fn from(value: &HealthStatus) -> Self {
match value {
HealthStatus::Healthy => Event::Healthy,
HealthStatus::Killing => Event::Killing,
_ => Event::Unhealthy,
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ impl
if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size {
if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() {
error!("Unable to subscribe to gossipsub topic: {}", error);
// TODO: Deal with initial subscribe error

return Err(P2PError::GossipTopicSubscriptionFailure);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct Runtime {
pub(crate) listening_on: Vec<Multiaddr>,
pub(crate) public_addresses: Vec<Multiaddr>,

/// Boot peers to connect used to bootstrap the p2p layer
/// Well-known or pre-configured bootnodes to connect to in order to bootstrap the p2p layer
pub(crate) boot_peers: Vec<PeerId>,

/// Contains current listenerId of the swarm
Expand All @@ -44,6 +44,7 @@ pub struct Runtime {
/// Internal health state of the p2p layer
pub(crate) health_state: HealthState,

/// Health status of the p2p layer
pub(crate) health_status: HealthStatus,
}

Expand Down Expand Up @@ -153,6 +154,7 @@ impl Runtime {
let discovery = &behaviours.discovery.health_status;

let new_status = match (discovery, gossipsub) {
(HealthStatus::Killing, _) | (_, HealthStatus::Killing) => HealthStatus::Killing,
(HealthStatus::Initializing, _) | (_, HealthStatus::Initializing) => {
HealthStatus::Initializing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use topos_test_sdk::{
};
use uuid::Uuid;

mod mock;

use crate::SynchronizerService;

#[rstest]
Expand Down

This file was deleted.

0 comments on commit 3f3afaf

Please sign in to comment.