From 4c16cfbeaee60f62dd33dc9106cde013a02ffd41 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Wed, 21 Feb 2024 10:11:29 +0100 Subject: [PATCH 1/5] feat: create health mechanism P2P Signed-off-by: Simon Paitrault --- Cargo.lock | 1 + crates/topos-config/src/edge/command.rs | 1 + crates/topos-p2p/src/behaviour.rs | 11 + crates/topos-p2p/src/behaviour/discovery.rs | 36 +-- crates/topos-p2p/src/behaviour/gossip.rs | 158 ++++++---- crates/topos-p2p/src/client.rs | 7 - crates/topos-p2p/src/command.rs | 19 +- crates/topos-p2p/src/config.rs | 6 + crates/topos-p2p/src/error.rs | 26 +- crates/topos-p2p/src/event.rs | 29 +- crates/topos-p2p/src/network.rs | 10 +- .../topos-p2p/src/runtime/handle_command.rs | 54 +--- crates/topos-p2p/src/runtime/handle_event.rs | 82 ++++- .../src/runtime/handle_event/discovery.rs | 151 ++++++---- .../src/runtime/handle_event/gossipsub.rs | 8 +- .../src/runtime/handle_event/grpc.rs | 6 +- .../src/runtime/handle_event/peer_info.rs | 6 +- crates/topos-p2p/src/runtime/mod.rs | 132 ++++++-- .../src/tests/command/random_peer.rs | 15 +- crates/topos-tce-api/src/runtime/builder.rs | 10 +- crates/topos-tce-api/src/runtime/sync_task.rs | 8 +- crates/topos-tce-api/src/stream/mod.rs | 1 + .../tests/grpc/certificate_precedence.rs | 12 +- crates/topos-tce-api/tests/runtime.rs | 13 +- .../benches/double_echo.rs | 2 +- crates/topos-tce-broadcast/src/lib.rs | 8 +- crates/topos-tce-broadcast/src/tests/mod.rs | 2 +- crates/topos-tce-proxy/Cargo.toml | 1 + crates/topos-tce-proxy/tests/tce_tests.rs | 68 ++--- crates/topos-tce-synchronizer/src/builder.rs | 4 +- .../src/checkpoints_collector/tests.rs | 8 +- .../tests/integration.rs | 9 +- crates/topos-tce/Cargo.toml | 2 + crates/topos-tce/src/app_context/network.rs | 281 +++++++++--------- crates/topos-tce/src/lib.rs | 70 +++-- crates/topos-test-sdk/src/storage/mod.rs | 16 +- crates/topos-test-sdk/src/tce/mod.rs | 68 +++-- crates/topos-test-sdk/src/tce/p2p.rs | 46 ++- crates/topos-test-sdk/src/tce/protocol.rs | 5 +- crates/topos-test-sdk/src/tce/public_api.rs | 2 + crates/topos-test-sdk/src/tce/synchronizer.rs | 3 +- crates/topos/tests/cert_delivery.rs | 6 +- crates/topos/tests/config.rs | 48 +-- 43 files changed, 845 insertions(+), 606 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c3afd32e..4359e1d62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8263,6 +8263,7 @@ dependencies = [ "futures", "hex", "hyper 0.14.28", + "libp2p", "opentelemetry", "rstest", "serde", diff --git a/crates/topos-config/src/edge/command.rs b/crates/topos-config/src/edge/command.rs index ce5e8a4c2..33c0026e3 100644 --- a/crates/topos-config/src/edge/command.rs +++ b/crates/topos-config/src/edge/command.rs @@ -64,6 +64,7 @@ impl CommandConfig { } pub async fn spawn(self) -> Result { + info!("Spawning Polygon Edge with args: {:?}", self.binary_path); let mut command = Command::new(self.binary_path); command.kill_on_drop(true); command.args(self.args); diff --git a/crates/topos-p2p/src/behaviour.rs b/crates/topos-p2p/src/behaviour.rs index c87635edb..798c43fa8 100644 --- a/crates/topos-p2p/src/behaviour.rs +++ b/crates/topos-p2p/src/behaviour.rs @@ -8,6 +8,17 @@ pub(crate) mod grpc; pub(crate) mod peer_info; pub(crate) mod topos; +/// Represents the health status of a behaviour inside the p2p layer +#[derive(Default, PartialEq, Eq)] +pub(crate) enum HealthStatus { + #[default] + Initializing, + Healthy, + Unhealthy, + #[allow(unused)] + Recovering, +} + #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "ComposedEvent")] pub(crate) struct Behaviour { diff --git a/crates/topos-p2p/src/behaviour/discovery.rs b/crates/topos-p2p/src/behaviour/discovery.rs index af6344ecc..5a6c51d22 100644 --- a/crates/topos-p2p/src/behaviour/discovery.rs +++ b/crates/topos-p2p/src/behaviour/discovery.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::pin::Pin; use std::task::Poll; +use std::time::Duration; use crate::error::P2PError; use crate::{config::DiscoveryConfig, error::CommandExecutionError}; @@ -18,6 +19,8 @@ use libp2p::{ use tokio::sync::oneshot; use tracing::{debug, error, info}; +use super::HealthStatus; + pub type PendingRecordRequest = oneshot::Sender, CommandExecutionError>>; /// DiscoveryBehaviour is responsible to discover and manage connections with peers @@ -29,6 +32,8 @@ pub(crate) struct DiscoveryBehaviour { pub(crate) current_bootstrap_query_id: Option, /// The next bootstrap query interval used to schedule the next bootstrap query pub(crate) next_bootstrap_query: Option>>, + /// The health status of the discovery behaviour + pub(crate) health_status: HealthStatus, } impl DiscoveryBehaviour { @@ -70,6 +75,7 @@ impl DiscoveryBehaviour { inner: kademlia, current_bootstrap_query_id: None, next_bootstrap_query: Some(Box::pin(tokio::time::interval(config.bootstrap_interval))), + health_status: Default::default(), } } @@ -79,33 +85,21 @@ impl DiscoveryBehaviour { /// Then multiple random PeerId are created in order to randomly walk the network. pub fn bootstrap(&mut self) -> Result<(), P2PError> { if self.current_bootstrap_query_id.is_none() { - match self.inner.bootstrap() { - Ok(query_id) => { - info!("Started kademlia bootstrap with query_id: {query_id:?}"); - self.current_bootstrap_query_id = Some(query_id); - } - Err(error) => { - error!("Unable to start kademlia bootstrap: {error:?}"); - return Err(P2PError::BootstrapError( - "Unable to start kademlia bootstrap", - )); - } - } + let query_id = self.inner.bootstrap()?; + debug!("Started kademlia bootstrap query with query_id: {query_id:?}"); + self.current_bootstrap_query_id = Some(query_id); } Ok(()) } - pub fn get_addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - if let Some(key_ref) = self.inner.kbucket(*peer_id) { - key_ref - .iter() - .filter(|e| e.node.key.preimage() == peer_id) - .map(|e| e.node.value.first().clone()) - .collect() - } else { - Vec::new() + /// Change the interval of the next bootstrap queries + pub fn change_interval(&mut self, duration: Duration) -> Result<(), P2PError> { + if let Some(interval) = self.next_bootstrap_query.as_mut() { + interval.set(tokio::time::interval(duration)); } + + Ok(()) } } diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index d01c3bb75..2f1d0621e 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -1,24 +1,30 @@ use std::collections::hash_map::DefaultHasher; +use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, VecDeque}, env, task::Poll, time::Duration, }; +use libp2p::swarm::{ConnectionClosed, FromSwarm}; +use libp2p::PeerId; use libp2p::{ - gossipsub::{self, IdentTopic, Message, MessageAuthenticity, MessageId}, + gossipsub::{self, IdentTopic, Message, MessageAuthenticity}, identity::Keypair, swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm}, }; use prost::Message as ProstMessage; use topos_core::api::grpc::tce::v1::Batch; -use topos_metrics::{P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL, P2P_GOSSIP_BATCH_SIZE}; -use tracing::{debug, error, warn}; +use topos_metrics::P2P_GOSSIP_BATCH_SIZE; +use tracing::{debug, error}; +use crate::error::P2PError; use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY}; +use super::HealthStatus; + const MAX_BATCH_SIZE: usize = 10; pub struct Behaviour { @@ -26,7 +32,10 @@ pub struct Behaviour { gossipsub: gossipsub::Behaviour, pending: HashMap<&'static str, VecDeque>>, tick: tokio::time::Interval, - cache: HashSet, + /// List of connected peers per topics + connected_peer: HashMap<&'static str, HashSet>, + /// The health status of the gossip behaviour + pub(crate) health_status: HealthStatus, } impl Behaviour { @@ -48,18 +57,15 @@ impl Behaviour { Ok(0) } - pub fn subscribe(&mut self) -> Result<(), &'static str> { + pub fn subscribe(&mut self) -> Result<(), P2PError> { self.gossipsub - .subscribe(&gossipsub::IdentTopic::new(TOPOS_GOSSIP)) - .unwrap(); + .subscribe(&gossipsub::IdentTopic::new(TOPOS_GOSSIP))?; self.gossipsub - .subscribe(&gossipsub::IdentTopic::new(TOPOS_ECHO)) - .unwrap(); + .subscribe(&gossipsub::IdentTopic::new(TOPOS_ECHO))?; self.gossipsub - .subscribe(&gossipsub::IdentTopic::new(TOPOS_READY)) - .unwrap(); + .subscribe(&gossipsub::IdentTopic::new(TOPOS_READY))?; Ok(()) } @@ -107,7 +113,9 @@ impl Behaviour { .unwrap_or(Ok(100)) .unwrap(), )), - cache: HashSet::new(), + + connected_peer: Default::default(), + health_status: Default::default(), } } } @@ -148,6 +156,24 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { + if let FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + endpoint, + remaining_established, + .. + }) = &event + { + debug!( + "Connection closed: {:?} {:?} {:?} {:?}", + peer_id, connection_id, endpoint, remaining_established + ); + + for (_, topic) in self.connected_peer.iter_mut() { + topic.remove(peer_id); + } + } + self.gossipsub.on_swarm_event(event) } @@ -185,9 +211,60 @@ impl NetworkBehaviour for Behaviour { } } - let event = match self.gossipsub.poll(cx) { + match self.gossipsub.poll(cx) { Poll::Pending => return Poll::Pending, - Poll::Ready(ToSwarm::GenerateEvent(event)) => Some(event), + Poll::Ready(ToSwarm::GenerateEvent(event)) => match event { + gossipsub::Event::Message { + propagation_source, + message_id, + message: + Message { + source, + data, + topic, + .. + }, + } => match topic.as_str() { + TOPOS_GOSSIP => { + return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( + crate::event::GossipEvent::Message { + topic: TOPOS_GOSSIP, + message: data, + source, + }, + ))) + } + TOPOS_ECHO => { + return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( + crate::event::GossipEvent::Message { + topic: TOPOS_ECHO, + message: data, + source, + }, + ))) + } + TOPOS_READY => { + return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( + crate::event::GossipEvent::Message { + topic: TOPOS_READY, + message: data, + source, + }, + ))) + } + _ => {} + }, + gossipsub::Event::Subscribed { peer_id, topic } => { + debug!("Subscribed to {:?} with {peer_id}", topic); + self.health_status = HealthStatus::Healthy; + } + gossipsub::Event::Unsubscribed { peer_id, topic } => { + debug!("Unsubscribed from {:?} with {peer_id}", topic); + } + gossipsub::Event::GossipsubNotSupported { peer_id } => { + debug!("Gossipsub not supported by {:?}", peer_id); + } + }, Poll::Ready(ToSwarm::ListenOn { opts }) => { return Poll::Ready(ToSwarm::ListenOn { opts }) } @@ -226,57 +303,6 @@ impl NetworkBehaviour for Behaviour { } Poll::Ready(event) => { warn!("Unhandled event in gossip behaviour: {:?}", event); - None - } - }; - - if let Some(gossipsub::Event::Message { ref message_id, .. }) = event { - if self.cache.contains(message_id) { - P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL.inc(); - } - } - - if let Some(gossipsub::Event::Message { - propagation_source, - message_id, - message: - Message { - source, - data, - sequence_number, - topic, - }, - }) = event - { - match topic.as_str() { - TOPOS_GOSSIP => { - return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent { - topic: TOPOS_GOSSIP, - message: data, - source, - }, - ))) - } - TOPOS_ECHO => { - return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent { - topic: TOPOS_ECHO, - message: data, - source, - }, - ))) - } - TOPOS_READY => { - return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent { - topic: TOPOS_READY, - message: data, - source, - }, - ))) - } - _ => {} } } diff --git a/crates/topos-p2p/src/client.rs b/crates/topos-p2p/src/client.rs index 1e2a1f667..f33eee9e0 100644 --- a/crates/topos-p2p/src/client.rs +++ b/crates/topos-p2p/src/client.rs @@ -39,13 +39,6 @@ impl NetworkClient { .await } - pub async fn disconnect(&self) -> Result<(), P2PError> { - let (sender, receiver) = oneshot::channel(); - let command = Command::Disconnect { sender }; - - Self::send_command_with_receiver(&self.sender, command, receiver).await - } - pub fn publish( &self, topic: &'static str, diff --git a/crates/topos-p2p/src/command.rs b/crates/topos-p2p/src/command.rs index 40cf5ac94..63432c42c 100644 --- a/crates/topos-p2p/src/command.rs +++ b/crates/topos-p2p/src/command.rs @@ -1,12 +1,9 @@ use std::fmt::Display; -use libp2p::{Multiaddr, PeerId}; +use libp2p::PeerId; use tokio::sync::oneshot; -use crate::{ - behaviour::grpc::connection::OutboundConnection, - error::{CommandExecutionError, P2PError}, -}; +use crate::{behaviour::grpc::connection::OutboundConnection, error::P2PError}; #[derive(Debug)] pub enum Command { @@ -15,16 +12,6 @@ pub enum Command { sender: oneshot::Sender, P2PError>>, }, - /// Disconnect the node - Disconnect { - sender: oneshot::Sender>, - }, - - /// Try to discover a peer based on its PeerId - Discover { - to: PeerId, - sender: oneshot::Sender, CommandExecutionError>>, - }, Gossip { topic: &'static str, data: Vec, @@ -52,10 +39,8 @@ impl Display for Command { match self { Command::ConnectedPeers { .. } => write!(f, "ConnectedPeers"), Command::RandomKnownPeer { .. } => write!(f, "RandomKnownPeer"), - Command::Disconnect { .. } => write!(f, "Disconnect"), Command::Gossip { .. } => write!(f, "GossipMessage"), Command::NewProxiedQuery { .. } => write!(f, "NewProxiedQuery"), - Command::Discover { to, .. } => write!(f, "Discover(to: {to})"), } } } diff --git a/crates/topos-p2p/src/config.rs b/crates/topos-p2p/src/config.rs index 38bd340c2..7603037a2 100644 --- a/crates/topos-p2p/src/config.rs +++ b/crates/topos-p2p/src/config.rs @@ -32,6 +32,9 @@ pub struct DiscoveryConfig { pub provider_publication_interval: Option, /// Interval at which the node will send bootstrap query to the network pub bootstrap_interval: Duration, + /// Interval at which the node will send fast bootstrap query to the network + /// Mostly used when the node is bootstrapping and failed to connect to boot peers + pub fast_bootstrap_interval: Duration, } impl Default for DiscoveryConfig { @@ -42,6 +45,7 @@ impl Default for DiscoveryConfig { publication_interval: Some(Duration::from_secs(10)), provider_publication_interval: Some(Duration::from_secs(10)), bootstrap_interval: Duration::from_secs(Self::BOOTSTRAP_INTERVAL), + fast_bootstrap_interval: Duration::from_secs(Self::FAST_BOOTSTRAP_INTERVAL), } } } @@ -49,6 +53,8 @@ impl Default for DiscoveryConfig { impl DiscoveryConfig { /// Default bootstrap interval in seconds pub const BOOTSTRAP_INTERVAL: u64 = 60; + /// Default fast bootstrap interval in seconds + pub const FAST_BOOTSTRAP_INTERVAL: u64 = 5; pub fn with_replication_factor(mut self, replication_factor: NonZeroUsize) -> Self { self.replication_factor = replication_factor; diff --git a/crates/topos-p2p/src/error.rs b/crates/topos-p2p/src/error.rs index e9ac60fb7..09a728bbf 100644 --- a/crates/topos-p2p/src/error.rs +++ b/crates/topos-p2p/src/error.rs @@ -1,7 +1,8 @@ use std::io; use libp2p::{ - noise::Error as NoiseError, request_response::OutboundFailure, PeerId, TransportError, + gossipsub::SubscriptionError, kad::NoKnownPeers, noise::Error as NoiseError, + request_response::OutboundFailure, TransportError, }; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; @@ -10,23 +11,24 @@ use crate::{behaviour::grpc::error::OutboundConnectionError, command::Command}; #[derive(Error, Debug)] pub enum P2PError { - #[error("Can't dial on self")] - CantDialSelf, - #[error("Already dialed {0}")] - AlreadyDialed(PeerId), - #[error("Already disconnected")] - AlreadyDisconnected, - #[error("Error during dialling")] - DialError, #[error("Unable build a network: peer_key missing")] MissingPeerKey, + #[error("Unable to reach any bootnode")] + UnableToReachBootnode, + + #[error("The handle on the runtime failed")] + JoinHandleFailure, + #[error(transparent)] CommandError(#[from] CommandExecutionError), #[error("An error occurred on the Transport layer: {0}")] TransportError(#[from] TransportError), + #[error("An error occured trying to subscribe to gossip topic: {0}")] + SubscriptionError(#[from] SubscriptionError), + #[error("Unable to receive expected response of a oneshot channel")] OneshotReceiveError(#[from] oneshot::error::RecvError), @@ -36,14 +38,14 @@ pub enum P2PError { #[error("Error during bootstrap phase: {0}")] BootstrapError(&'static str), + #[error("Kademlia bootstrap query error: {0}")] + KademliaBootstrapError(#[from] NoKnownPeers), + #[error("Unable to execute shutdown on the p2p runtime: {0}")] ShutdownCommunication(mpsc::error::SendError>), #[error("Unable to create gRPC client")] UnableToCreateGrpcClient(#[from] OutboundConnectionError), - - #[error("Public addresses is empty")] - MissingPublicAddresses, } #[derive(Error, Debug)] diff --git a/crates/topos-p2p/src/event.rs b/crates/topos-p2p/src/event.rs index cb0dc02b9..b20822263 100644 --- a/crates/topos-p2p/src/event.rs +++ b/crates/topos-p2p/src/event.rs @@ -1,12 +1,16 @@ use libp2p::{identify, kad, PeerId}; -use crate::behaviour::grpc; +use crate::behaviour::{grpc, HealthStatus}; +/// Represents the events that the Gossip protocol can emit #[derive(Debug)] -pub struct GossipEvent { - pub source: Option, - pub topic: &'static str, - pub message: Vec, +pub enum GossipEvent { + /// A message has been received from a peer on one of the subscribed topics + Message { + source: Option, + topic: &'static str, + message: Vec, + }, } #[derive(Debug)] @@ -42,7 +46,22 @@ impl From for ComposedEvent { } } +/// Represents the events that the p2p layer can emit #[derive(Debug)] pub enum Event { + /// An event emitted when a gossip message is received Gossip { from: PeerId, data: Vec }, + /// An event emitted when the p2p layer becomes healthy + Healthy, + /// An event emitted when the p2p layer becomes unhealthy + Unhealthy, +} + +impl From<&HealthStatus> for Event { + fn from(value: &HealthStatus) -> Self { + match value { + HealthStatus::Healthy => Event::Healthy, + _ => Event::Unhealthy, + } + } } diff --git a/crates/topos-p2p/src/network.rs b/crates/topos-p2p/src/network.rs index 256903882..12c7fd0db 100644 --- a/crates/topos-p2p/src/network.rs +++ b/crates/topos-p2p/src/network.rs @@ -1,6 +1,8 @@ use super::{Behaviour, Event, NetworkClient, Runtime}; use crate::{ - behaviour::{discovery::DiscoveryBehaviour, gossip, grpc, peer_info::PeerInfoBehaviour}, + behaviour::{ + discovery::DiscoveryBehaviour, gossip, grpc, peer_info::PeerInfoBehaviour, HealthStatus, + }, config::{DiscoveryConfig, NetworkConfig}, constants::{ self, COMMAND_STREAM_BUFFER_SIZE, DISCOVERY_PROTOCOL, EVENT_STREAM_BUFFER, @@ -191,6 +193,7 @@ impl<'a> NetworkBuilder<'a> { swarm, config: self.config, peer_set: self.known_peers.iter().map(|(p, _)| *p).collect(), + boot_peers: self.known_peers.iter().map(|(p, _)| *p).collect(), command_receiver, event_sender, local_peer_id: peer_id, @@ -199,6 +202,11 @@ impl<'a> NetworkBuilder<'a> { active_listeners: HashSet::new(), pending_record_requests: HashMap::new(), shutdown, + state_machine: crate::runtime::StateMachine { + connected_to_bootpeer_retry_count: 3, + ..Default::default() + }, + health_status: HealthStatus::Initializing, }, )) } diff --git a/crates/topos-p2p/src/runtime/handle_command.rs b/crates/topos-p2p/src/runtime/handle_command.rs index 67e8d52a8..864712473 100644 --- a/crates/topos-p2p/src/runtime/handle_command.rs +++ b/crates/topos-p2p/src/runtime/handle_command.rs @@ -3,9 +3,10 @@ use crate::{ protocol_name, Command, Runtime, }; use libp2p::{kad::RecordKey, PeerId}; + use rand::{thread_rng, Rng}; use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, warn}; impl Runtime { pub(crate) async fn handle_command(&mut self, command: Command) { @@ -61,57 +62,6 @@ impl Runtime { } } - Command::Disconnect { sender } if self.swarm.listeners().count() == 0 => { - if sender.send(Err(P2PError::AlreadyDisconnected)).is_err() { - warn!( - reason = %P2PError::AlreadyDisconnected, - "Unable to notify Disconnection failure: initiator is dropped", - ); - } - } - - Command::Disconnect { sender } => { - // TODO: Listeners must be handled by topos behaviour not discovery - let listeners = self.active_listeners.iter().cloned().collect::>(); - - listeners.iter().for_each(|listener| { - self.swarm.remove_listener(*listener); - }); - - let peers: Vec = self.swarm.connected_peers().cloned().collect(); - - for peer_id in peers { - if self.swarm.disconnect_peer_id(peer_id).is_err() { - info!("Peer {peer_id} wasn't connected during Disconnection command"); - } - } - - if sender.send(Ok(())).is_err() { - warn!("Unable to notify Disconnection: initiator is dropped",); - } - } - - Command::Discover { to, sender } => { - let behaviour = self.swarm.behaviour_mut(); - let addr = behaviour.discovery.get_addresses_of_peer(&to); - - info!("Checking if we know {to} -> KAD {:?}", addr); - if addr.is_empty() { - info!("We don't know {to}, fetching its Multiaddr from DHT"); - let query_id = behaviour - .discovery - .inner - .get_record(RecordKey::new(&to.to_string())); - - debug!("Created a get_record query {query_id:?} for discovering {to}"); - if let Some(id) = self.pending_record_requests.insert(query_id, sender) { - warn!("Discover request {id:?} was overwritten by {query_id:?}"); - } - } else { - _ = sender.send(Ok(addr)); - } - } - Command::Gossip { topic, data: message, diff --git a/crates/topos-p2p/src/runtime/handle_event.rs b/crates/topos-p2p/src/runtime/handle_event.rs index 94122e7a6..27fc4486d 100644 --- a/crates/topos-p2p/src/runtime/handle_event.rs +++ b/crates/topos-p2p/src/runtime/handle_event.rs @@ -1,36 +1,40 @@ use libp2p::{multiaddr::Protocol, swarm::SwarmEvent}; use tracing::{debug, error, info, warn}; -use crate::{event::ComposedEvent, Event, Runtime}; +use crate::{error::P2PError, event::ComposedEvent, Event, Runtime}; mod discovery; mod gossipsub; mod grpc; mod peer_info; +pub type EventResult = Result<(), P2PError>; + #[async_trait::async_trait] pub(crate) trait EventHandler { - async fn handle(&mut self, event: T); + async fn handle(&mut self, event: T) -> EventResult; } #[async_trait::async_trait] impl EventHandler for Runtime { - async fn handle(&mut self, event: Event) { + async fn handle(&mut self, event: Event) -> EventResult { if let Err(error) = self.event_sender.try_send(event) { warn!(reason = %error, "Unable to send NetworkEvent event to outer stream"); } + + Ok(()) } } #[async_trait::async_trait] impl EventHandler for Runtime { - async fn handle(&mut self, event: ComposedEvent) { + async fn handle(&mut self, event: ComposedEvent) -> EventResult { match event { ComposedEvent::Kademlia(event) => self.handle(event).await, ComposedEvent::PeerInfo(event) => self.handle(event).await, ComposedEvent::Gossipsub(event) => self.handle(event).await, ComposedEvent::Grpc(event) => self.handle(event).await, - ComposedEvent::Void => (), + ComposedEvent::Void => Ok(()), } } } @@ -51,11 +55,26 @@ impl EventHandler> for Runtime { self.active_listeners.insert(listener_id); } + SwarmEvent::OutgoingConnectionError { - peer_id, - error, connection_id, - } => { + peer_id: Some(peer_id), + error, + } if self + .state_machine + .successfully_connect_to_bootpeer + .is_none() + && self.state_machine.dialed_bootpeer.contains(&connection_id) => + { + warn!("Unable to connect to bootpeer {peer_id}: {error:?}"); + self.state_machine.dialed_bootpeer.remove(&connection_id); + if self.state_machine.dialed_bootpeer.is_empty() { + // We tried to connect to all bootnode without success + error!("Unable to connect to any bootnode"); + } + } + + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { if let Some(peer_id) = peer_id { error!( "OutgoingConnectionError peer_id: {peer_id} | error: {error:?} | \ @@ -66,6 +85,26 @@ impl EventHandler> for Runtime { "OutgoingConnectionError for unknown peer | error: {error:?} | \ connection_id: {connection_id}" ); + error!("OutgoingConnectionError {error:?}"); + } + } + + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + endpoint, + num_established, + concurrent_dial_errors, + established_in, + } if self.state_machine.dialed_bootpeer.contains(&connection_id) => { + info!("Successfully connect to bootpeer {peer_id}"); + if self + .state_machine + .successfully_connect_to_bootpeer + .is_none() + { + self.state_machine.successfully_connect_to_bootpeer = Some(connection_id); + _ = self.state_machine.dialed_bootpeer.remove(&connection_id); } } @@ -76,6 +115,13 @@ impl EventHandler> for Runtime { "Connection established with peer {peer_id} as {:?}", endpoint.to_endpoint() ); + + if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size { + if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() { + error!("Unable to subscribe to gossipsub topic: {}", error); + // TODO: Deal with initial subscribe error + } + } } incoming_connection_error @ SwarmEvent::IncomingConnectionError { .. } => { @@ -108,6 +154,14 @@ impl EventHandler> for Runtime { debug!("ConnectionClosed {peer_id} because of {cause:?}"); } + SwarmEvent::Dialing { + peer_id: Some(ref peer_id), + connection_id, + } if self.boot_peers.contains(peer_id) => { + info!("Dialing bootpeer {peer_id} on connection: {connection_id}"); + self.state_machine.dialed_bootpeer.insert(connection_id); + } + SwarmEvent::Dialing { peer_id, connection_id, @@ -115,9 +169,7 @@ impl EventHandler> for Runtime { debug!("Dialing peer_id: {peer_id:?} | connection_id: {connection_id}"); } - SwarmEvent::Behaviour(event) => { - self.handle(event).await; - } + SwarmEvent::Behaviour(event) => self.handle(event).await?, SwarmEvent::ExpiredListenAddr { listener_id, @@ -131,5 +183,13 @@ impl EventHandler> for Runtime { warn!("Unhandled SwarmEvent: {:?}", event); } } + + let behaviour = self.swarm.behaviour(); + + if let Some(event) = self.healthy_status_changed() { + _ = self.event_sender.send(Event::Healthy).await; + } + + Ok(()) } } diff --git a/crates/topos-p2p/src/runtime/handle_event/discovery.rs b/crates/topos-p2p/src/runtime/handle_event/discovery.rs index 8aeac681f..7ecf0aac1 100644 --- a/crates/topos-p2p/src/runtime/handle_event/discovery.rs +++ b/crates/topos-p2p/src/runtime/handle_event/discovery.rs @@ -1,16 +1,13 @@ -use libp2p::{ - kad::{Event, GetRecordOk, QueryResult}, - Multiaddr, -}; +use libp2p::kad::{BootstrapOk, BootstrapResult, Event, QueryResult}; use tracing::{debug, error, warn}; -use crate::{error::CommandExecutionError, Runtime}; +use crate::{behaviour::HealthStatus, error::P2PError, Runtime}; -use super::EventHandler; +use super::{EventHandler, EventResult}; #[async_trait::async_trait] impl EventHandler> for Runtime { - async fn handle(&mut self, event: Box) { + async fn handle(&mut self, event: Box) -> EventResult { match *event { Event::InboundRequest { request } => { // warn!("InboundRequest {:?}", request); @@ -35,75 +32,107 @@ impl EventHandler> for Runtime { } Event::OutboundQueryProgressed { - result: QueryResult::Bootstrap(res), id, - .. - } => { - debug!("BootstrapResult query: {id:?}, {res:?}"); + result: + QueryResult::Bootstrap(BootstrapResult::Ok(BootstrapOk { + peer, + num_remaining, + })), + stats, + step, + } if num_remaining == 0 + && self.swarm.behaviour().discovery.health_status == HealthStatus::Initializing => + { + warn!( + "Bootstrap query finished but unable to connect to bootnode {:?} {:?}", + stats, step + ); + + let behaviour = self.swarm.behaviour_mut(); + + behaviour.discovery.health_status = HealthStatus::Unhealthy; + _ = behaviour + .discovery + .change_interval(self.config.discovery.fast_bootstrap_interval); } Event::OutboundQueryProgressed { - result: QueryResult::PutRecord(Err(e)), id, - .. - } => { - error!("PutRecord Failed query_id: {id:?}, error: {e:?}"); + result: + QueryResult::Bootstrap(BootstrapResult::Ok(BootstrapOk { + peer, + num_remaining, + })), + stats, + step, + } if num_remaining == 0 + && self + .state_machine + .successfully_connect_to_bootpeer + .is_none() + && self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy => + { + warn!( + "Bootstrap query finished but unable to connect to bootnode {:?} {:?}", + stats, step + ); + match self + .state_machine + .connected_to_bootpeer_retry_count + .checked_sub(1) + { + None => { + error!("Unable to connect to bootnode, stopping"); + + return Err(P2PError::UnableToReachBootnode); + } + Some(new) => { + self.state_machine.connected_to_bootpeer_retry_count = new; + } + } + } + Event::OutboundQueryProgressed { + id, + result: + QueryResult::Bootstrap(BootstrapResult::Ok(BootstrapOk { + peer, + num_remaining, + })), + stats, + step, + } if num_remaining == 0 + && self + .state_machine + .successfully_connect_to_bootpeer + .is_some() + && self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy => + { + warn!( + "Bootstrap query finished with bootnode {:?} {:?}", + stats, step + ); + + let behaviour = self.swarm.behaviour_mut(); + behaviour.discovery.health_status = HealthStatus::Healthy; + _ = behaviour + .discovery + .change_interval(self.config.discovery.bootstrap_interval); } Event::OutboundQueryProgressed { - result: QueryResult::GetRecord(res), + result: QueryResult::Bootstrap(res), id, .. - } => match res { - Ok(GetRecordOk::FoundRecord(result)) => { - debug!("GetRecordOk query: {id:?}, {result:?}"); - if let Some(sender) = self.pending_record_requests.remove(&id) { - if let Ok(addr) = Multiaddr::try_from(result.record.value.clone()) { - if let Some(peer_id) = result.record.publisher { - if !sender.is_closed() { - debug!("Adding {peer_id:?} address {addr:?} to DHT"); - self.swarm - .behaviour_mut() - .discovery - .inner - .add_address(&peer_id, addr.clone()); - - if sender.send(Ok(vec![addr.clone()])).is_err() { - // TODO: Hash the QueryId - warn!( - "Could not notify Record query ({id:?}) response \ - because initiator is dropped" - ); - } - } - } - } - } - } - - Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates }) => {} - - Err(error) => { - if let Some(sender) = self.pending_record_requests.remove(&id) { - if sender - .send(Err(CommandExecutionError::DHTGetRecordFailed)) - .is_err() - { - // TODO: Hash the QueryId - warn!( - "Could not notify Record query ({id:?}) response because \ - initiator is dropped" - ); - } - } - warn!("GetRecordError query_id: {id:?}, error: {error:?}"); - } - }, + } => { + debug!("BootstrapResult query: {id:?}, {res:?}"); + } Event::OutboundQueryProgressed { id, result, stats, .. } => {} Event::ModeChanged { new_mode } => {} } + + Ok(()) } } diff --git a/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs b/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs index 3fc056cfb..ddb2aa9ff 100644 --- a/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs +++ b/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs @@ -9,12 +9,12 @@ use crate::{constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOS use prost::Message; use topos_core::api::grpc::tce::v1::Batch; -use super::EventHandler; +use super::{EventHandler, EventResult}; #[async_trait::async_trait] impl EventHandler for Runtime { - async fn handle(&mut self, event: GossipEvent) { - if let GossipEvent { + async fn handle(&mut self, event: GossipEvent) -> EventResult { + if let GossipEvent::Message { source: Some(source), message, topic, @@ -70,5 +70,7 @@ impl EventHandler for Runtime { } } } + + Ok(()) } } diff --git a/crates/topos-p2p/src/runtime/handle_event/grpc.rs b/crates/topos-p2p/src/runtime/handle_event/grpc.rs index 9fa870bc3..a17c83a6a 100644 --- a/crates/topos-p2p/src/runtime/handle_event/grpc.rs +++ b/crates/topos-p2p/src/runtime/handle_event/grpc.rs @@ -1,8 +1,10 @@ use crate::{behaviour::grpc, Runtime}; -use super::EventHandler; +use super::{EventHandler, EventResult}; #[async_trait::async_trait] impl EventHandler for Runtime { - async fn handle(&mut self, _event: grpc::Event) {} + async fn handle(&mut self, _event: grpc::Event) -> EventResult { + Ok(()) + } } diff --git a/crates/topos-p2p/src/runtime/handle_event/peer_info.rs b/crates/topos-p2p/src/runtime/handle_event/peer_info.rs index 8660a04d4..4de3348fd 100644 --- a/crates/topos-p2p/src/runtime/handle_event/peer_info.rs +++ b/crates/topos-p2p/src/runtime/handle_event/peer_info.rs @@ -3,11 +3,11 @@ use tracing::info; use crate::{constants::PEER_INFO_PROTOCOL, Runtime}; -use super::EventHandler; +use super::{EventHandler, EventResult}; #[async_trait::async_trait] impl EventHandler> for Runtime { - async fn handle(&mut self, event: Box) { + async fn handle(&mut self, event: Box) -> EventResult { if let IdentifyEvent::Received { peer_id, info, .. } = *event { let IdentifyInfo { protocol_version, @@ -34,5 +34,7 @@ impl EventHandler> for Runtime { } } } + + Ok(()) } } diff --git a/crates/topos-p2p/src/runtime/mod.rs b/crates/topos-p2p/src/runtime/mod.rs index f2276096f..a42076030 100644 --- a/crates/topos-p2p/src/runtime/mod.rs +++ b/crates/topos-p2p/src/runtime/mod.rs @@ -1,13 +1,22 @@ use std::collections::{HashMap, HashSet}; use crate::{ - behaviour::discovery::PendingRecordRequest, config::NetworkConfig, - runtime::handle_event::EventHandler, Behaviour, Command, Event, + behaviour::{discovery::PendingRecordRequest, HealthStatus}, + config::NetworkConfig, + error::P2PError, + runtime::handle_event::EventHandler, + Behaviour, Command, Event, }; -use libp2p::{core::transport::ListenerId, kad::QueryId, Multiaddr, PeerId, Swarm}; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::StreamExt; -use tracing::{debug, error, info}; +use libp2p::{ + core::transport::ListenerId, kad::QueryId, swarm::ConnectionId, Multiaddr, PeerId, Swarm, +}; +use tokio::{ + spawn, + sync::{mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_stream::{Stream, StreamExt}; +use tracing::{debug, error, info, Instrument}; pub struct Runtime { pub(crate) config: NetworkConfig, @@ -20,6 +29,9 @@ pub struct Runtime { pub(crate) listening_on: Vec, pub(crate) public_addresses: Vec, + /// Boot peers to connect used to bootstrap the p2p layer + pub(crate) boot_peers: Vec, + /// Contains current listenerId of the swarm pub active_listeners: HashSet, @@ -28,60 +40,99 @@ pub struct Runtime { /// Shutdown signal receiver from the client pub(crate) shutdown: mpsc::Receiver>, + + /// Internal state machine of the p2p layer + pub(crate) state_machine: StateMachine, + + pub(crate) health_status: HealthStatus, } mod handle_command; mod handle_event; +/// Internal state machine of the p2p layer +/// +/// This struct may change in the future to be more flexible and to handle more +/// complex state transitions/representation. +#[derive(Default)] +pub(crate) struct StateMachine { + /// Indicates if the node has external addresses configured + pub(crate) has_external_addresses: bool, + /// Indicates if the node is listening on any address + pub(crate) is_listening: bool, + /// List the boot peers that the node has tried to connect to + pub(crate) dialed_bootpeer: HashSet, + /// Indicates if the node has successfully connected to a boot peer + pub(crate) successfully_connect_to_bootpeer: Option, + /// Track the number of retries to connect to boot peers + pub(crate) connected_to_bootpeer_retry_count: usize, +} + impl Runtime { /// Bootstrap the p2p layer runtime with the given configuration. /// This method will configure, launch and start queries. /// The result of this call is a p2p layer bootstrap but it doesn't mean it is /// ready. - pub async fn bootstrap(&mut self) -> Result<(), Box> { + pub async fn bootstrap + Unpin + Send>( + mut self, + event_stream: &mut S, + ) -> Result>, P2PError> { debug!("Added public addresses: {:?}", self.public_addresses); for address in &self.public_addresses { self.swarm.add_external_address(address.clone()); + self.state_machine.has_external_addresses = true; } - debug!("Starting to listen on {:?}", self.listening_on); + for addr in &self.listening_on { if let Err(error) = self.swarm.listen_on(addr.clone()) { error!("Couldn't start listening on {} because of {error:?}", addr); - return Err(Box::new(error)); + return Err(P2PError::TransportError(error)); } - } - if !self.peer_set.is_empty() { - debug!( - "{} Connected to some peers, start a bootstrap query", - self.local_peer_id - ); - self.swarm.behaviour_mut().discovery.bootstrap()?; + self.state_machine.is_listening = true; } - while let Some(event) = self.swarm.next().await { - self.handle(event).await; + let mut handle = spawn(self.run().in_current_span()); - if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size { - break; + // Wait for first healthy + loop { + tokio::select! { + result = &mut handle => { + match result { + Ok(Ok(_)) => info!("P2P layer has been shutdown"), + Ok(Err(error)) => { + error!("P2P layer has failed with error: {:?}", error); + + return Err(error); + } + Err(_) => { + error!("P2P layer has failed in an unexpected way."); + return Err(P2PError::JoinHandleFailure); + } + } + } + Some(event) = event_stream.next() => { + if let Event::Healthy = event { + info!("P2P layer is healthy"); + break; + } + } } } - let gossipsub = &mut self.swarm.behaviour_mut().gossipsub; - - gossipsub.subscribe()?; - - Ok(()) + Ok(handle) } /// Run p2p runtime - pub async fn run(mut self) -> Result<(), ()> { + pub async fn run(mut self) -> Result<(), P2PError> { let shutdowned: Option> = loop { tokio::select! { - Some(event) = self.swarm.next() => self.handle(event).await, - Some(command) = self.command_receiver.recv() => self.handle_command(command).await, + Some(event) = self.swarm.next() => { + self.handle(event).in_current_span().await? + }, + Some(command) = self.command_receiver.recv() => self.handle_command(command).in_current_span().await, shutdown = self.shutdown.recv() => { break shutdown; } @@ -95,4 +146,29 @@ impl Runtime { Ok(()) } + + pub(crate) fn healthy_status_changed(&mut self) -> Option { + let behaviours = self.swarm.behaviour(); + let gossipsub = &behaviours.gossipsub.health_status; + let discovery = &behaviours.discovery.health_status; + + let new_status = match (discovery, gossipsub) { + (HealthStatus::Initializing, _) | (_, HealthStatus::Initializing) => { + HealthStatus::Initializing + } + (HealthStatus::Unhealthy, _) | (_, HealthStatus::Unhealthy) => HealthStatus::Unhealthy, + (HealthStatus::Recovering, _) | (_, HealthStatus::Recovering) => { + HealthStatus::Recovering + } + (HealthStatus::Healthy, HealthStatus::Healthy) => HealthStatus::Healthy, + }; + + if self.health_status != new_status { + self.health_status = new_status; + + Some((&self.health_status).into()) + } else { + None + } + } } diff --git a/crates/topos-p2p/src/tests/command/random_peer.rs b/crates/topos-p2p/src/tests/command/random_peer.rs index b50ba3875..02c6382db 100644 --- a/crates/topos-p2p/src/tests/command/random_peer.rs +++ b/crates/topos-p2p/src/tests/command/random_peer.rs @@ -13,7 +13,7 @@ use crate::error::P2PError; async fn no_random_peer() { let local = NodeConfig::from_seed(1); - let (client, _, mut runtime) = crate::network::builder() + let (client, stream, runtime) = crate::network::builder() .minimum_cluster_size(0) .peer_key(local.keypair.clone()) .public_addresses(&[local.addr.clone()]) @@ -24,9 +24,7 @@ async fn no_random_peer() { .await .expect("Unable to create p2p network"); - runtime.bootstrap().await.unwrap(); - - spawn(runtime.run()); + tokio::spawn(runtime.run()); let result = client.random_known_peer().await; @@ -47,7 +45,7 @@ async fn return_a_peer() { let expected = NodeConfig::from_seed(2); let expected_peer_id = expected.keypair.public().to_peer_id(); - let (client, _, mut runtime) = crate::network::builder() + let (client, stream, mut runtime) = crate::network::builder() .minimum_cluster_size(0) .peer_key(local.keypair.clone()) .public_addresses(vec![local.addr.clone()]) @@ -56,10 +54,7 @@ async fn return_a_peer() { .await .expect("Unable to create p2p network"); - runtime.bootstrap().await.unwrap(); - runtime.peer_set.insert(expected_peer_id); - spawn(runtime.run()); let result = client.random_known_peer().await; @@ -77,7 +72,7 @@ async fn return_a_peer() { async fn return_a_random_peer_among_100() { let local = NodeConfig::from_seed(1); - let (client, _, mut runtime) = crate::network::builder() + let (client, stream, mut runtime) = crate::network::builder() .minimum_cluster_size(0) .peer_key(local.keypair.clone()) .public_addresses(vec![local.addr.clone()]) @@ -86,8 +81,6 @@ async fn return_a_random_peer_among_100() { .await .expect("Unable to create p2p network"); - runtime.bootstrap().await.unwrap(); - for i in 2..=100 { let peer = NodeConfig::from_seed(i); runtime.peer_set.insert(peer.keypair.public().to_peer_id()); diff --git a/crates/topos-tce-api/src/runtime/builder.rs b/crates/topos-tce-api/src/runtime/builder.rs index 15ef7aad1..be7563ffd 100644 --- a/crates/topos-tce-api/src/runtime/builder.rs +++ b/crates/topos-tce-api/src/runtime/builder.rs @@ -9,6 +9,7 @@ use topos_core::api::grpc::tce::v1::StatusResponse; use topos_tce_storage::{ types::CertificateDeliveredWithPositions, validator::ValidatorStore, StorageClient, }; +use tracing::Instrument; use crate::{ constants::CHANNEL_SIZE, graphql::builder::ServerBuilder as GraphQLBuilder, @@ -102,12 +103,13 @@ impl RuntimeBuilder { .command_sender(internal_runtime_command_sender.clone()) .serve_addr(self.grpc_socket_addr) .build() + .in_current_span() .await; let (command_sender, runtime_command_receiver) = mpsc::channel(CHANNEL_SIZE); let (shutdown_channel, shutdown_receiver) = mpsc::channel::>(1); - let grpc_handler = spawn(grpc); + let grpc_handler = spawn(grpc.in_current_span()); let graphql_handler = if let Some(graphql_addr) = self.graphql_socket_addr { tracing::info!("Serving GraphQL on {}", graphql_addr); @@ -121,7 +123,8 @@ impl RuntimeBuilder { ) .runtime(internal_runtime_command_sender.clone()) .serve_addr(Some(graphql_addr)) - .build(); + .build() + .in_current_span(); spawn(graphql.await) } else { spawn(async move { @@ -135,7 +138,8 @@ impl RuntimeBuilder { let metrics_server = MetricsBuilder::default() .serve_addr(Some(metrics_addr)) - .build(); + .build() + .in_current_span(); spawn(metrics_server.await) } else { spawn(async move { diff --git a/crates/topos-tce-api/src/runtime/sync_task.rs b/crates/topos-tce-api/src/runtime/sync_task.rs index 1f51fe385..d0c5ccec7 100644 --- a/crates/topos-tce-api/src/runtime/sync_task.rs +++ b/crates/topos-tce-api/src/runtime/sync_task.rs @@ -60,7 +60,7 @@ pub(crate) struct SyncTask { /// last certificate id delivered to the stream pub(crate) target_subnet_stream_positions: TargetSubnetStreamPositions, /// The connection to the database layer through a StorageClient - pub(crate) storage: StorageClient, + pub(crate) store: StorageClient, /// The notifier is used to send certificates to the stream pub(crate) notifier: Sender, /// If a new stream is registered with the same Uuid, the sync task will be cancelled @@ -80,7 +80,7 @@ impl SyncTask { status: SyncTaskStatus::Running, stream_id, target_subnet_stream_positions, - storage, + store: storage, notifier, cancel_token, } @@ -103,7 +103,7 @@ impl IntoFuture for SyncTask { return (self.stream_id, self.status); } let source_subnet_list = self - .storage + .store .get_target_source_subnet_list(*target_subnet_id) .await; @@ -136,7 +136,7 @@ impl IntoFuture for SyncTask { return (self.stream_id, self.status); } if let Ok(certificates_with_positions) = self - .storage + .store .fetch_certificates(FetchCertificatesFilter::Target { target_stream_position: CertificateTargetStreamPosition { target_subnet_id: *target_subnet_id, diff --git a/crates/topos-tce-api/src/stream/mod.rs b/crates/topos-tce-api/src/stream/mod.rs index 0cef0d097..58ab21759 100644 --- a/crates/topos-tce-api/src/stream/mod.rs +++ b/crates/topos-tce-api/src/stream/mod.rs @@ -175,6 +175,7 @@ impl Stream { } } } + } else => break, diff --git a/crates/topos-tce-api/tests/grpc/certificate_precedence.rs b/crates/topos-tce-api/tests/grpc/certificate_precedence.rs index b11252cb0..bbe80438a 100644 --- a/crates/topos-tce-api/tests/grpc/certificate_precedence.rs +++ b/crates/topos-tce-api/tests/grpc/certificate_precedence.rs @@ -16,12 +16,12 @@ use topos_tce_storage::validator::ValidatorStore; #[rstest] #[test(tokio::test)] async fn fetch_latest_pending_certificates() { - let fullnode_store = create_fullnode_store(vec![]).await; + let fullnode_store = create_fullnode_store(&[]).await; let validator_store: Arc = - create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await; + create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await; let (api_context, _) = create_public_api( - storage_client(vec![]), + storage_client(&[]), broadcast_stream(), futures::future::ready(validator_store.clone()), ) @@ -62,12 +62,12 @@ async fn fetch_latest_pending_certificates() { #[rstest] #[test(tokio::test)] async fn fetch_latest_pending_certificates_with_conflicts() { - let fullnode_store = create_fullnode_store(vec![]).await; + let fullnode_store = create_fullnode_store(&[]).await; let validator_store: Arc = - create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await; + create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await; let (api_context, _) = create_public_api( - storage_client(vec![]), + storage_client(&[]), broadcast_stream(), futures::future::ready(validator_store.clone()), ) diff --git a/crates/topos-tce-api/tests/runtime.rs b/crates/topos-tce-api/tests/runtime.rs index ce451386a..cb7ceeab2 100644 --- a/crates/topos-tce-api/tests/runtime.rs +++ b/crates/topos-tce-api/tests/runtime.rs @@ -120,7 +120,7 @@ async fn can_catchup_with_old_certs( #[from(create_certificate_chain)] certificates: Vec, ) { - let storage_client = storage_client::partial_1(certificates.clone()); + let storage_client = storage_client::partial_1(&certificates[..]); let (mut api_context, _) = create_public_api::partial_1(storage_client).await; let mut client = api_context.api_client; @@ -214,7 +214,7 @@ async fn can_catchup_with_old_certs_with_position( let fullnode_store = create_fullnode_store::default().await; let store = create_validator_store( - certificates.clone(), + &certificates[..], futures::future::ready(fullnode_store.clone()), ) .await; @@ -337,7 +337,7 @@ async fn boots_healthy_graphql_server( let fullnode_store = create_fullnode_store::default().await; let store = create_validator_store( - certificates.clone(), + &certificates[..], futures::future::ready(fullnode_store.clone()), ) .await; @@ -380,7 +380,7 @@ async fn graphql_server_enables_cors( let fullnode_store = create_fullnode_store::default().await; let store = create_validator_store( - certificates.clone(), + &certificates[..], futures::future::ready(fullnode_store.clone()), ) .await; @@ -450,7 +450,7 @@ async fn can_query_graphql_endpoint_for_certificates( let fullnode_store = create_fullnode_store::default().await; let store = create_validator_store( - certificates.clone(), + &certificates[..], futures::future::ready(fullnode_store.clone()), ) .await; @@ -635,8 +635,7 @@ async fn check_storage_pool_stats( let fullnode_store = create_fullnode_store::default().await; - let store = - create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await; + let store = create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await; STORAGE_PENDING_POOL_COUNT.set(10); STORAGE_PRECEDENCE_POOL_COUNT.set(200); diff --git a/crates/topos-tce-broadcast/benches/double_echo.rs b/crates/topos-tce-broadcast/benches/double_echo.rs index 26fe78b91..840751199 100644 --- a/crates/topos-tce-broadcast/benches/double_echo.rs +++ b/crates/topos-tce-broadcast/benches/double_echo.rs @@ -9,7 +9,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { .build() .unwrap(); - let store = runtime.block_on(async { create_validator_store::partial_1(vec![]).await }); + let store = runtime.block_on(async { create_validator_store::partial_1(&[]).await }); c.bench_function("double_echo", |b| { b.to_async(FuturesExecutor).iter(|| async { diff --git a/crates/topos-tce-broadcast/src/lib.rs b/crates/topos-tce-broadcast/src/lib.rs index 0ae98ee3d..9e407c178 100644 --- a/crates/topos-tce-broadcast/src/lib.rs +++ b/crates/topos-tce-broadcast/src/lib.rs @@ -48,7 +48,7 @@ use topos_core::uci::{Certificate, CertificateId}; use topos_crypto::messages::{MessageSigner, Signature}; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; -use tracing::{debug, error}; +use tracing::{debug, error, Instrument}; pub use topos_core::uci; @@ -143,7 +143,11 @@ impl ReliableBroadcastClient { broadcast_sender, ); - spawn(double_echo.run(task_manager_message_receiver)); + spawn( + double_echo + .run(task_manager_message_receiver) + .in_current_span(), + ); ( Self { diff --git a/crates/topos-tce-broadcast/src/tests/mod.rs b/crates/topos-tce-broadcast/src/tests/mod.rs index 112e8df07..840f57bc8 100644 --- a/crates/topos-tce-broadcast/src/tests/mod.rs +++ b/crates/topos-tce-broadcast/src/tests/mod.rs @@ -58,7 +58,7 @@ struct Context { } async fn create_context(params: TceParams) -> (DoubleEcho, Context) { - let validator_store = create_validator_store::partial_1(vec![]).await; + let validator_store = create_validator_store::default().await; let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE); let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE); let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) = diff --git a/crates/topos-tce-proxy/Cargo.toml b/crates/topos-tce-proxy/Cargo.toml index f762b0ef5..c899eec7c 100644 --- a/crates/topos-tce-proxy/Cargo.toml +++ b/crates/topos-tce-proxy/Cargo.toml @@ -39,6 +39,7 @@ opentelemetry.workspace = true base64ct.workspace = true [dev-dependencies] +libp2p.workspace = true topos-tce = { path = "../topos-tce" } rstest = { workspace = true, features = ["async-timeout"] } test-log.workspace = true diff --git a/crates/topos-tce-proxy/tests/tce_tests.rs b/crates/topos-tce-proxy/tests/tce_tests.rs index 0f3593f0e..64f5e4027 100644 --- a/crates/topos-tce-proxy/tests/tce_tests.rs +++ b/crates/topos-tce-proxy/tests/tce_tests.rs @@ -23,23 +23,22 @@ use topos_core::uci::SUBNET_ID_LENGTH; use topos_tce_proxy::client::{TceClient, TceClientBuilder}; use topos_tce_proxy::worker::TceProxyWorker; use topos_tce_proxy::{TceProxyCommand, TceProxyConfig, TceProxyEvent}; +use topos_test_sdk::tce::{start_node, NodeConfig}; use tracing::{debug, error, info, warn}; use topos_test_sdk::{ certificates::create_certificate_chain, constants::*, - tce::{start_node, TceContext}, + tce::{create_network, TceContext}, }; pub const SOURCE_SUBNET_ID_1_NUMBER_OF_PREFILLED_CERTIFICATES: usize = 15; pub const SOURCE_SUBNET_ID_2_NUMBER_OF_PREFILLED_CERTIFICATES: usize = 10; -#[rstest] #[test(tokio::test)] -async fn test_tce_submit_certificate( - #[future] start_node: TceContext, -) -> Result<(), Box> { - let mut context = start_node.await; +async fn test_tce_submit_certificate() -> Result<(), Box> { + let mut network = create_network(3, &[]).await; + let context = network.values_mut().last().unwrap(); let source_subnet_id: SubnetId = SOURCE_SUBNET_ID_1.into(); let prev_certificate_id: CertificateId = CERTIFICATE_ID_1.into(); @@ -77,12 +76,10 @@ async fn test_tce_submit_certificate( Ok(()) } -#[rstest] #[test(tokio::test)] -async fn test_tce_watch_certificates( - #[future] start_node: TceContext, -) -> Result<(), Box> { - let mut context = start_node.await; +async fn test_tce_watch_certificates() -> Result<(), Box> { + let mut network = create_network(3, &[]).await; + let context = network.values_mut().last().unwrap(); let source_subnet_id: SubnetId = SubnetId { value: [1u8; SUBNET_ID_LENGTH].to_vec(), @@ -143,12 +140,10 @@ async fn test_tce_watch_certificates( Ok(()) } -#[rstest] #[test(tokio::test)] -async fn test_tce_get_source_head_certificate( - #[future] start_node: TceContext, -) -> Result<(), Box> { - let mut context = start_node.await; +async fn test_tce_get_source_head_certificate() -> Result<(), Box> { + let mut network = create_network(3, &[]).await; + let context = network.values_mut().last().unwrap(); let source_subnet_id: SubnetId = SOURCE_SUBNET_ID_1.into(); let default_cert_id: CertificateId = PREV_CERTIFICATE_ID.into(); @@ -246,12 +241,10 @@ async fn test_tce_get_source_head_certificate( Ok(()) } -#[rstest] #[test(tokio::test)] -async fn test_tce_get_last_pending_certificates( - #[future] start_node: TceContext, -) -> Result<(), Box> { - let mut context = start_node.await; +async fn test_tce_get_last_pending_certificates() -> Result<(), Box> { + let mut network = create_network(3, &[]).await; + let context = network.values_mut().last().unwrap(); let source_subnet_id: SubnetId = SOURCE_SUBNET_ID_1.into(); let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 10); @@ -345,11 +338,10 @@ async fn test_tce_get_last_pending_certificates( #[timeout(Duration::from_secs(300))] async fn test_tce_open_stream_with_checkpoint( input_certificates: Vec, - #[with(input_certificates.clone())] - #[future] - start_node: TceContext, ) -> Result<(), Box> { - let mut context = start_node.await; + let mut network = create_network(3, &input_certificates[..]).await; + + let context = network.values_mut().last().unwrap(); let source_subnet_id_1: SubnetId = SubnetId { value: SOURCE_SUBNET_ID_1.into(), @@ -537,12 +529,9 @@ fn input_certificates() -> Vec { certificates } -#[rstest] #[test(tokio::test)] -async fn test_tce_proxy_submit_certificate( - #[future] start_node: TceContext, -) -> Result<(), Box> { - let mut context = start_node.await; +async fn test_tce_proxy_submit_certificate() -> Result<(), Box> { + let mut context = start_node::partial_2(&[], NodeConfig::standalone()).await; let source_subnet_id = SOURCE_SUBNET_ID_1; let target_subnet_stream_positions = Vec::new(); @@ -662,12 +651,10 @@ async fn create_tce_client( Ok((tce_client, receiving_certificate_stream)) } -#[rstest] #[test(tokio::test)] async fn test_tce_client_submit_and_get_last_pending_certificate( - #[future] start_node: TceContext, ) -> Result<(), Box> { - let mut context = start_node.await; + let mut context = start_node::partial_2(&[], NodeConfig::standalone()).await; let mut certificates = Vec::new(); certificates.append(&mut create_certificate_chain( @@ -720,12 +707,9 @@ async fn test_tce_client_submit_and_get_last_pending_certificate( Ok(()) } -#[rstest] #[test(tokio::test)] -async fn test_tce_client_get_empty_history_source_head( - #[future] start_node: TceContext, -) -> Result<(), Box> { - let mut context = start_node.await; +async fn test_tce_client_get_empty_history_source_head() -> Result<(), Box> { + let mut context = start_node::partial_2(&[], NodeConfig::standalone()).await; let (mut tce_client, _receiving_certificate_stream) = create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?; @@ -754,11 +738,9 @@ async fn test_tce_client_get_empty_history_source_head( #[test(tokio::test)] async fn test_tce_client_get_source_head( input_certificates: Vec, - #[with(input_certificates.clone())] - #[future] - start_node: TceContext, ) -> Result<(), Box> { - let mut context = start_node.await; + let mut network = create_network(3, &input_certificates[..]).await; + let context = network.values_mut().last().unwrap(); // Tce is prefilled with delivered certificates let source_subnet_id_1_prefilled_certificates = @@ -816,7 +798,7 @@ async fn test_tce_client_get_source_head( #[timeout(Duration::from_secs(30))] async fn test_tce_client_submit_and_get_certificate_delivered( ) -> Result<(), Box> { - let peers_context = topos_test_sdk::tce::create_network(5, vec![]).await; + let peers_context = topos_test_sdk::tce::create_network(5, &[]).await; let mut peers = peers_context.into_iter(); let mut sending_tce: TceContext = peers.next().expect("valid peer 1").1; let mut receiving_tce: TceContext = peers.next().expect("valid peer 2").1; diff --git a/crates/topos-tce-synchronizer/src/builder.rs b/crates/topos-tce-synchronizer/src/builder.rs index 63b92b688..a7b70e08a 100644 --- a/crates/topos-tce-synchronizer/src/builder.rs +++ b/crates/topos-tce-synchronizer/src/builder.rs @@ -5,6 +5,7 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use topos_p2p::NetworkClient; use topos_tce_storage::validator::ValidatorStore; +use tracing::Instrument; use crate::{ checkpoints_collector::{CheckpointSynchronizer, CheckpointsCollectorError}, @@ -72,7 +73,8 @@ impl SynchronizerBuilder { shutdown: shutdown.child_token(), events: sync_events, } - .into_future(), + .into_future() + .in_current_span(), ); Ok(( diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests.rs index 6e7925a90..882c8e5cb 100644 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests.rs +++ b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests.rs @@ -65,7 +65,7 @@ async fn check_fetch_certificates() { create_certificate_chain(subnet, &[topos_test_sdk::constants::TARGET_SUBNET_ID_1], 1); let boot_node = NodeConfig::from_seed(1); - let cluster = create_network(5, certificates.clone()).await; + let cluster = create_network(5, &certificates[..]).await; let boot_node = cluster .get(&boot_node.keypair.public().to_peer_id()) .unwrap() @@ -78,9 +78,9 @@ async fn check_fetch_certificates() { ..Default::default() }; - let fullnode_store = create_fullnode_store(vec![]).await; + let fullnode_store = create_fullnode_store(&[]).await; let validator_store = - create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await; + create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await; let router = GrpcRouter::new(tonic::transport::Server::builder()).add_service( SynchronizerServiceServer::new(SynchronizerService { @@ -89,7 +89,7 @@ async fn check_fetch_certificates() { ); let (client, _, _) = cfg - .bootstrap(&[boot_node.clone()], Some(router)) + .bootstrap(&[cfg.clone(), boot_node.clone()], Some(router)) .await .unwrap(); diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs index 25dd85100..8ac29619a 100644 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs +++ b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs @@ -20,14 +20,14 @@ use crate::SynchronizerService; #[rstest] #[test(tokio::test)] -#[timeout(Duration::from_secs(5))] +#[timeout(Duration::from_secs(10))] async fn network_test() { let subnet = topos_test_sdk::constants::SOURCE_SUBNET_ID_1; let certificates: Vec = create_certificate_chain(subnet, &[topos_test_sdk::constants::TARGET_SUBNET_ID_1], 1); let boot_node = NodeConfig::from_seed(1); - let cluster = create_network(5, certificates.clone()).await; + let cluster = create_network(5, &certificates[..]).await; let boot_node = cluster .get(&boot_node.keypair.public().to_peer_id()) .unwrap() @@ -40,7 +40,10 @@ async fn network_test() { ..Default::default() }; - let (client, _, _) = cfg.bootstrap(&[boot_node.clone()], None).await.unwrap(); + let (client, _, _) = cfg + .bootstrap(&[cfg.clone(), boot_node.clone()], None) + .await + .unwrap(); use topos_core::api::grpc::shared::v1::Uuid as APIUuid; diff --git a/crates/topos-tce/Cargo.toml b/crates/topos-tce/Cargo.toml index f679c2701..de5b6d275 100644 --- a/crates/topos-tce/Cargo.toml +++ b/crates/topos-tce/Cargo.toml @@ -9,6 +9,7 @@ rust-version = "1.65" workspace = true [dependencies] +libp2p.workspace = true async-trait.workspace = true bincode.workspace = true clap.workspace = true @@ -44,6 +45,7 @@ topos-telemetry = { path = "../topos-telemetry" } axum = "0.7.4" axum-prometheus = "0.6" + [dev-dependencies] topos-test-sdk = { path = "../topos-test-sdk/" } async-stream.workspace = true diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index 32d8a078a..61732cbdd 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -22,155 +22,162 @@ impl AppContext { &evt ); - let NetEvent::Gossip { data, from } = evt; - if let Ok(DoubleEchoRequest { - request: Some(double_echo_request), - }) = DoubleEchoRequest::decode(&data[..]) - { - match double_echo_request { - double_echo_request::Request::Gossip(Gossip { - certificate: Some(certificate), - }) => match uci::Certificate::try_from(certificate) { - Ok(cert) => { - if let hash_map::Entry::Vacant(entry) = self.delivery_latency.entry(cert.id) - { - entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer()); - } - info!( - "Received certificate {} from GossipSub from {}", - cert.id, from - ); - - match self.validator_store.insert_pending_certificate(&cert) { - Ok(Some(_)) => { - debug!( - "Certificate {} has been inserted into pending pool", - cert.id - ); - } - Ok(None) => { - debug!( - "Certificate {} from subnet {} has been inserted into \ - precedence pool waiting for {}", - cert.id, cert.source_subnet_id, cert.prev_id - ); + if let NetEvent::Gossip { data, from } = evt { + if let Ok(DoubleEchoRequest { + request: Some(double_echo_request), + }) = DoubleEchoRequest::decode(&data[..]) + { + match double_echo_request { + double_echo_request::Request::Gossip(Gossip { + certificate: Some(certificate), + }) => match uci::Certificate::try_from(certificate) { + Ok(cert) => { + if let hash_map::Entry::Vacant(entry) = + self.delivery_latency.entry(cert.id) + { + entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer()); } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyPending, - )) => { - debug!( - "Certificate {} has been already added to the pending pool, \ - skipping", - cert.id - ); + info!( + "Received certificate {} from GossipSub from {}", + cert.id, from + ); + + match self.validator_store.insert_pending_certificate(&cert) { + Ok(Some(_)) => { + debug!( + "Certificate {} has been inserted into pending pool", + cert.id + ); + } + Ok(None) => { + debug!( + "Certificate {} from subnet {} has been inserted into \ + precedence pool waiting for {}", + cert.id, cert.source_subnet_id, cert.prev_id + ); + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )) => { + debug!( + "Certificate {} has been already added to the pending \ + pool, skipping", + cert.id + ); + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyExists, + )) => { + debug!( + "Certificate {} has been already delivered, skipping", + cert.id + ); + } + Err(error) => { + error!( + "Unable to insert pending certificate {}: {}", + cert.id, error + ); + } } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyExists, - )) => { - debug!( - "Certificate {} has been already delivered, skipping", - cert.id + } + Err(e) => { + error!("Failed to parse the received Certificate: {e}"); + } + }, + double_echo_request::Request::Echo(Echo { + certificate_id: Some(certificate_id), + signature: Some(signature), + validator_id: Some(validator_id), + }) => { + let channel = self.tce_cli.get_double_echo_channel(); + spawn(async move { + let certificate_id = certificate_id.clone().try_into().map_err(|e| { + error!( + "Failed to parse the CertificateId {certificate_id} from \ + Echo: {e}" ); - } - Err(error) => { + e + }); + let validator_id = validator_id.clone().try_into().map_err(|e| { error!( - "Unable to insert pending certificate {}: {}", - cert.id, error + "Failed to parse the ValidatorId {validator_id} from Echo: {e}" ); - } - } - } - Err(e) => { - error!("Failed to parse the received Certificate: {e}"); - } - }, - double_echo_request::Request::Echo(Echo { - certificate_id: Some(certificate_id), - signature: Some(signature), - validator_id: Some(validator_id), - }) => { - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - let certificate_id = certificate_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the CertificateId {certificate_id} from Echo: {e}" - ); - e - }); - let validator_id = validator_id.clone().try_into().map_err(|e| { - error!("Failed to parse the ValidatorId {validator_id} from Echo: {e}"); - e - }); + e + }); - if let (Ok(certificate_id), Ok(validator_id)) = - (certificate_id, validator_id) - { - trace!( - "Received Echo message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from: {from}", - certificate_id = certificate_id, - validator_id = validator_id - ); - if let Err(e) = channel - .send(DoubleEchoCommand::Echo { - signature: signature.into(), - certificate_id, - validator_id, - }) - .await + if let (Ok(certificate_id), Ok(validator_id)) = + (certificate_id, validator_id) { - error!("Unable to pass received Echo message: {:?}", e); + trace!( + "Received Echo message, certificate_id: {certificate_id}, \ + validator_id: {validator_id} from: {from}", + certificate_id = certificate_id, + validator_id = validator_id + ); + + if let Err(e) = channel + .send(DoubleEchoCommand::Echo { + signature: signature.into(), + certificate_id, + validator_id, + }) + .await + { + error!("Unable to pass received Echo message: {:?}", e); + } + } else { + error!("Unable to process Echo message due to invalid data"); } - } else { - error!("Unable to process Echo message due to invalid data"); - } - }); - } - double_echo_request::Request::Ready(Ready { - certificate_id: Some(certificate_id), - signature: Some(signature), - validator_id: Some(validator_id), - }) => { - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - let certificate_id = certificate_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the CertificateId {certificate_id} from Ready: \ - {e}" - ); - e }); - let validator_id = validator_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the ValidatorId {validator_id} from Ready: {e}" - ); - e - }); - if let (Ok(certificate_id), Ok(validator_id)) = - (certificate_id, validator_id) - { - trace!( - "Received Ready message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from: {from}", - certificate_id = certificate_id, - validator_id = validator_id - ); - if let Err(e) = channel - .send(DoubleEchoCommand::Ready { - signature: signature.into(), - certificate_id, - validator_id, - }) - .await + } + double_echo_request::Request::Ready(Ready { + certificate_id: Some(certificate_id), + signature: Some(signature), + validator_id: Some(validator_id), + }) => { + let channel = self.tce_cli.get_double_echo_channel(); + spawn(async move { + let certificate_id = certificate_id.clone().try_into().map_err(|e| { + error!( + "Failed to parse the CertificateId {certificate_id} from \ + Ready: {e}" + ); + e + }); + let validator_id = validator_id.clone().try_into().map_err(|e| { + error!( + "Failed to parse the ValidatorId {validator_id} from Ready: \ + {e}" + ); + e + }); + if let (Ok(certificate_id), Ok(validator_id)) = + (certificate_id, validator_id) { - error!("Unable to pass received Ready message, {:?}", e); + trace!( + "Received Ready message, certificate_id: {certificate_id}, \ + validator_id: {validator_id} from: {from}", + certificate_id = certificate_id, + validator_id = validator_id + ); + if let Err(e) = channel + .send(DoubleEchoCommand::Ready { + signature: signature.into(), + certificate_id, + validator_id, + }) + .await + { + error!("Unable to pass received Ready message: {:?}", e); + } + } else { + error!("Unable to process Ready message due to invalid data"); } - } else { - error!("Unable to process Ready message due to invalid data"); - } - }); + }); + } + _ => {} } - _ => {} } } } diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 34fe92d62..3062e983d 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -61,6 +61,7 @@ pub async fn run( config: &TceConfig, shutdown: (CancellationToken, mpsc::Sender<()>), ) -> Result, Box> { + // Preboot phase - start topos_metrics::init_metrics(); let key = match config.auth_key.as_ref() { @@ -91,6 +92,8 @@ pub async fn run( boot_peers.retain(|(p, _)| *p != peer_id); let is_validator = config.validators.contains(&validator_id); + // Preboot phase - stop + // Healthiness phase - start debug!("Starting the Storage"); let path = if let StorageConfiguration::RocksDB(Some(ref path)) = config.storage { path @@ -106,6 +109,8 @@ pub async fn run( let fullnode_store = validator_store.get_fullnode_store(); + let storage_client = StorageClient::new(validator_store.clone()); + let certificates_synced = fullnode_store .count_certificates_delivered() .map_err(|error| format!("Unable to count certificates delivered: {error}"))?; @@ -132,7 +137,7 @@ pub async fn run( ), ); - let (network_client, event_stream, mut network_runtime) = topos_p2p::network::builder() + let (network_client, mut event_stream, network_runtime) = topos_p2p::network::builder() .peer_key(key) .listen_addresses(config.p2p.listen_addresses.clone()) .minimum_cluster_size(config.minimum_tce_cluster_size) @@ -143,9 +148,37 @@ pub async fn run( .await?; debug!("Starting the p2p network"); - network_runtime.bootstrap().await?; - let _network_handler = spawn(network_runtime.run()); - debug!("p2p network started"); + let _network_handle = network_runtime.bootstrap(&mut event_stream).await?; + debug!("P2P layer bootstrapped"); + + debug!("Creating the Synchronizer"); + + let (synchronizer_runtime, synchronizer_stream) = + topos_tce_synchronizer::Synchronizer::builder() + .with_config(config.synchronization.clone()) + .with_shutdown(shutdown.0.child_token()) + .with_store(validator_store.clone()) + .with_network_client(network_client.clone()) + .build()?; + + debug!("Synchronizer created"); + + debug!("Starting gRPC api"); + let (broadcast_sender, broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE); + + let (api_client, api_stream, ctx) = topos_tce_api::Runtime::builder() + .with_peer_id(peer_id.to_string()) + .with_broadcast_stream(broadcast_receiver.resubscribe()) + .serve_grpc_addr(config.grpc_api_addr) + .serve_graphql_addr(config.graphql_api_addr) + .serve_metrics_addr(config.metrics_api_addr) + .store(validator_store.clone()) + .storage(storage_client.clone()) + .build_and_launch() + .await; + debug!("gRPC api started"); + + // Healthiness phase - stop debug!("Starting the gatekeeper"); let (gatekeeper_client, gatekeeper_runtime) = @@ -154,10 +187,6 @@ pub async fn run( spawn(gatekeeper_runtime.into_future()); debug!("Gatekeeper started"); - let (broadcast_sender, broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE); - - let storage_client = StorageClient::new(validator_store.clone()); - debug!("Starting reliable broadcast"); let (tce_cli, tce_stream) = ReliableBroadcastClient::new( @@ -174,32 +203,7 @@ pub async fn run( debug!("Reliable broadcast started"); - debug!("Starting the Synchronizer"); - - let (synchronizer_runtime, synchronizer_stream) = - topos_tce_synchronizer::Synchronizer::builder() - .with_config(config.synchronization.clone()) - .with_shutdown(shutdown.0.child_token()) - .with_store(validator_store.clone()) - .with_network_client(network_client.clone()) - .build()?; - spawn(synchronizer_runtime.into_future()); - debug!("Synchronizer started"); - - debug!("Starting gRPC api"); - let (api_client, api_stream, ctx) = topos_tce_api::Runtime::builder() - .with_peer_id(peer_id.to_string()) - .with_broadcast_stream(broadcast_receiver.resubscribe()) - .serve_grpc_addr(config.grpc_api_addr) - .serve_graphql_addr(config.graphql_api_addr) - .serve_metrics_addr(config.metrics_api_addr) - .store(validator_store.clone()) - .storage(storage_client.clone()) - .build_and_launch() - .await; - debug!("gRPC api started"); - // setup transport-tce-storage-api connector let (app_context, _tce_stream) = AppContext::new( is_validator, diff --git a/crates/topos-test-sdk/src/storage/mod.rs b/crates/topos-test-sdk/src/storage/mod.rs index f8416809e..2f634a45c 100644 --- a/crates/topos-test-sdk/src/storage/mod.rs +++ b/crates/topos-test-sdk/src/storage/mod.rs @@ -11,8 +11,8 @@ use topos_tce_storage::{ use crate::folder_name; -#[fixture(certificates = Vec::new())] -pub async fn storage_client(certificates: Vec) -> StorageClient { +#[fixture(certificates = &[])] +pub async fn storage_client(certificates: &[CertificateDelivered]) -> StorageClient { let store = create_validator_store::partial_1(certificates).await; StorageClient::new(store) @@ -27,9 +27,9 @@ pub fn create_folder(folder_name: &str) -> PathBuf { path } -#[fixture(certificates = Vec::new())] +#[fixture(certificates = &[])] pub async fn create_validator_store( - certificates: Vec, + certificates: &[CertificateDelivered], #[future] create_fullnode_store: Arc, ) -> Arc { let temp_dir = create_folder::default(); @@ -39,7 +39,7 @@ pub async fn create_validator_store( ValidatorStore::open(&temp_dir, fullnode_store).expect("Unable to create validator store"); store - .insert_certificates_delivered(&certificates) + .insert_certificates_delivered(certificates) .await .expect("Unable to insert predefined certificates"); @@ -53,8 +53,8 @@ pub async fn create_validator_store_with_fullnode( .expect("Unable to create validator store") } -#[fixture(certificates = Vec::new())] -pub async fn create_fullnode_store(certificates: Vec) -> Arc { +#[fixture(certificates = &[])] +pub async fn create_fullnode_store(certificates: &[CertificateDelivered]) -> Arc { let temp_dir = create_folder::default(); let perpetual_tables = Arc::new(ValidatorPerpetualTables::open(&temp_dir)); @@ -75,7 +75,7 @@ pub async fn create_fullnode_store(certificates: Vec) -> A .expect("Unable to create full node store"); store - .insert_certificates_delivered(&certificates[..]) + .insert_certificates_delivered(certificates) .await .unwrap(); diff --git a/crates/topos-test-sdk/src/tce/mod.rs b/crates/topos-test-sdk/src/tce/mod.rs index 80e128a18..e683d0668 100644 --- a/crates/topos-test-sdk/src/tce/mod.rs +++ b/crates/topos-test-sdk/src/tce/mod.rs @@ -16,6 +16,7 @@ use tonic::transport::Channel; use tonic::Request; use tonic::Response; use tonic::Status; +use tracing::Instrument; use tonic::transport::server::Router; use tonic::transport::Server; @@ -61,7 +62,7 @@ pub struct TceContext { pub api_entrypoint: String, pub api_grpc_client: ApiServiceClient, // GRPC Client for this peer (tce node) pub console_grpc_client: ConsoleServiceClient, // Console TCE GRPC Client for this peer (tce node) - pub runtime_join_handle: JoinHandle>, + pub runtime_join_handle: JoinHandle>, pub app_join_handle: JoinHandle<()>, pub gatekeeper_join_handle: JoinHandle>, pub synchronizer_join_handle: JoinHandle>, @@ -98,6 +99,7 @@ pub struct NodeConfig { pub keypair: Keypair, pub addr: Multiaddr, pub minimum_cluster_size: usize, + pub dummy: bool, } impl Default for NodeConfig { @@ -107,6 +109,13 @@ impl Default for NodeConfig { } impl NodeConfig { + pub fn standalone() -> Self { + Self { + dummy: true, + ..Default::default() + } + } + pub fn from_seed(seed: u8) -> Self { let (keypair, port, addr) = local_peer(seed); @@ -116,6 +125,7 @@ impl NodeConfig { keypair, addr, minimum_cluster_size: 0, + dummy: false, } } @@ -131,7 +141,7 @@ impl NodeConfig { ( NetworkClient, impl Stream + Unpin + Send, - JoinHandle>, + JoinHandle>, ), Box, > { @@ -142,6 +152,7 @@ impl NodeConfig { peers, self.minimum_cluster_size, router, + self.dummy, ) .await } @@ -158,6 +169,7 @@ impl NodeConfig { peers, self.minimum_cluster_size, router, + self.dummy, ) .await } @@ -193,13 +205,14 @@ pub fn create_dummy_router() -> Router { #[fixture( config = NodeConfig::default(), - peers = &[], certificates = Vec::new(), + peers = &[], + certificates = &[], validator_id = ValidatorId::default(), validators = HashSet::default(), message_signer = default_message_signer()) ] pub async fn start_node( - certificates: Vec, + certificates: &[CertificateDelivered], config: NodeConfig, peers: &[NodeConfig], validator_id: ValidatorId, @@ -208,9 +221,11 @@ pub async fn start_node( ) -> TceContext { let is_validator = validators.contains(&validator_id); let peer_id = config.keypair.public().to_peer_id(); - let fullnode_store = create_fullnode_store(vec![]).await; + let fullnode_store = create_fullnode_store(&[]).in_current_span().await; let validator_store = - create_validator_store(certificates, futures::future::ready(fullnode_store.clone())).await; + create_validator_store(certificates, futures::future::ready(fullnode_store.clone())) + .in_current_span() + .await; let router = GrpcRouter::new(tonic::transport::Server::builder()).add_service( SynchronizerServiceServer::new(SynchronizerService { @@ -225,7 +240,9 @@ pub async fn start_node( peers, config.minimum_cluster_size, Some(router), + config.dummy, ) + .in_current_span() .await .expect("Unable to bootstrap tce network"); @@ -239,6 +256,7 @@ pub async fn start_node( validator_store.clone(), sender, ) + .in_current_span() .await; let api_storage_client = storage_client.clone(); @@ -248,6 +266,7 @@ pub async fn start_node( receiver.resubscribe(), futures::future::ready(validator_store.clone()), ) + .in_current_span() .await; let (gatekeeper_client, gatekeeper_join_handle) = create_gatekeeper().await.unwrap(); @@ -257,6 +276,7 @@ pub async fn start_node( network_client.clone(), validator_store.clone(), ) + .in_current_span() .await; let (app, event_stream) = AppContext::new( @@ -275,14 +295,17 @@ pub async fn start_node( let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - let app_join_handle = spawn(app.run( - network_stream, - tce_stream, - api_stream, - synchronizer_stream, - BroadcastStream::new(receiver).filter_map(|v| futures::future::ready(v.ok())), - (shutdown_token, shutdown_sender), - )); + let app_join_handle = spawn( + app.run( + network_stream, + tce_stream, + api_stream, + synchronizer_stream, + BroadcastStream::new(receiver).filter_map(|v| futures::future::ready(v.ok())), + (shutdown_token, shutdown_sender), + ) + .in_current_span(), + ); TceContext { node_config: config, @@ -312,11 +335,10 @@ fn build_peer_config_pool(peer_number: u8) -> Vec { pub async fn start_pool( peer_number: u8, - certificates: Vec, + certificates: &[CertificateDelivered], ) -> HashMap { let mut clients = HashMap::new(); let peers = build_peer_config_pool(peer_number); - println!("Peer configs: {:?}", peers); let mut validators = Vec::new(); let mut message_signers = Vec::new(); @@ -335,13 +357,16 @@ pub async fn start_pool( let validator_id = validators[i]; let signer = message_signers[i].clone(); let config_cloned = config.clone(); - let certificates_cloned = certificates.clone(); let peers_cloned = peers.clone(); let validators_cloned = validators.clone(); + let context = tracing::info_span!( + "start_node", + "peer_id" = config_cloned.peer_id().to_string() + ); let fut = async move { let client = start_node( - certificates_cloned, + certificates, config_cloned, &peers_cloned, validator_id, @@ -350,6 +375,7 @@ pub async fn start_pool( .collect::>(), signer, ) + .instrument(context) .await; (client.peer_id, client) @@ -364,9 +390,13 @@ pub async fn start_pool( clients } +#[fixture( + peer_number = 2, + certificates = &[] +)] pub async fn create_network( peer_number: usize, - certificates: Vec, + certificates: &[CertificateDelivered], ) -> HashMap { // List of peers (tce nodes) with their context let mut peers_context = start_pool(peer_number as u8, certificates).await; diff --git a/crates/topos-test-sdk/src/tce/p2p.rs b/crates/topos-test-sdk/src/tce/p2p.rs index d7921778d..a9369a889 100644 --- a/crates/topos-test-sdk/src/tce/p2p.rs +++ b/crates/topos-test-sdk/src/tce/p2p.rs @@ -3,6 +3,7 @@ use std::error::Error; use futures::Stream; use libp2p::Multiaddr; use tokio::{spawn, task::JoinHandle}; +use tracing::Instrument; use crate::p2p::keypair_from_seed; use topos_p2p::{error::P2PError, Event, GrpcContext, GrpcRouter, NetworkClient, Runtime}; @@ -16,6 +17,7 @@ pub async fn create_network_worker( peers: &[NodeConfig], minimum_cluster_size: usize, router: Option, + dummy: bool, ) -> Result< ( NetworkClient, @@ -24,11 +26,26 @@ pub async fn create_network_worker( ), P2PError, > { + if !dummy && peers.len() < 2 { + println!("peers {:?}", peers); + return Err(P2PError::UnableToReachBootnode); + } + let key = keypair_from_seed(seed); let _peer_id = key.public().to_peer_id(); - let known_peers = if seed == 1 { + let known_peers = if dummy { vec![] + } else if seed == 1 { + vec![( + peers[1].keypair.public().to_peer_id(), + peers[1].addr.clone(), + )] + } else if seed == 2 { + vec![( + peers[0].keypair.public().to_peer_id(), + peers[0].addr.clone(), + )] } else { peers .iter() @@ -55,6 +72,7 @@ pub async fn create_network_worker( .minimum_cluster_size(minimum_cluster_size) .grpc_context(grpc_context) .build() + .in_current_span() .await } @@ -65,21 +83,37 @@ pub async fn bootstrap_network( peers: &[NodeConfig], minimum_cluster_size: usize, router: Option, + dummy: bool, ) -> Result< ( NetworkClient, impl Stream + Unpin + Send, - JoinHandle>, + JoinHandle>, ), Box, > { - let (network_client, network_stream, mut runtime) = - create_network_worker(seed, port, vec![addr], peers, minimum_cluster_size, router).await?; + let (network_client, mut network_stream, runtime) = create_network_worker( + seed, + port, + vec![addr], + peers, + minimum_cluster_size, + router, + dummy, + ) + .in_current_span() + .await?; - runtime.bootstrap().await?; + let runtime_join_handle = if dummy { + spawn(runtime.run().in_current_span()) + } else { + runtime + .bootstrap(&mut network_stream) + .in_current_span() + .await? + }; println!("Network bootstrap done."); - let runtime_join_handle = spawn(runtime.run()); Ok((network_client, network_stream, runtime_join_handle)) } diff --git a/crates/topos-test-sdk/src/tce/protocol.rs b/crates/topos-test-sdk/src/tce/protocol.rs index 8efb8b3b7..cce340edf 100644 --- a/crates/topos-test-sdk/src/tce/protocol.rs +++ b/crates/topos-test-sdk/src/tce/protocol.rs @@ -9,6 +9,7 @@ use topos_tce_broadcast::event::ProtocolEvents; use topos_tce_broadcast::{ReliableBroadcastClient, ReliableBroadcastConfig}; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; +use tracing::Instrument; pub async fn create_reliable_broadcast_client( validator_id: ValidatorId, @@ -28,7 +29,9 @@ pub async fn create_reliable_broadcast_client( message_signer, }; - ReliableBroadcastClient::new(config, storage, sender).await + ReliableBroadcastClient::new(config, storage, sender) + .in_current_span() + .await } pub fn create_reliable_broadcast_params(number_of_nodes: usize) -> ReliableBroadcastParams { diff --git a/crates/topos-test-sdk/src/tce/public_api.rs b/crates/topos-test-sdk/src/tce/public_api.rs index 8e87a1f69..6874d8ea5 100644 --- a/crates/topos-test-sdk/src/tce/public_api.rs +++ b/crates/topos-test-sdk/src/tce/public_api.rs @@ -16,6 +16,7 @@ use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; use topos_tce_storage::StorageClient; use tracing::warn; +use tracing::Instrument; use crate::networking::get_available_addr; use crate::storage::create_validator_store; @@ -65,6 +66,7 @@ pub async fn create_public_api( .store(store) .storage(storage_client) .build_and_launch() + .in_current_span() .await; let api_channel = channel::Endpoint::from_str(&api_endpoint) diff --git a/crates/topos-test-sdk/src/tce/synchronizer.rs b/crates/topos-test-sdk/src/tce/synchronizer.rs index 13a41f448..a6cd2f8df 100644 --- a/crates/topos-test-sdk/src/tce/synchronizer.rs +++ b/crates/topos-test-sdk/src/tce/synchronizer.rs @@ -3,6 +3,7 @@ use std::future::IntoFuture; use std::sync::Arc; use tokio::{spawn, task::JoinHandle}; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use topos_p2p::NetworkClient; use topos_tce_gatekeeper::GatekeeperClient; @@ -27,7 +28,7 @@ pub async fn create_synchronizer( .build() .expect("Can't create the Synchronizer"); - let synchronizer_join_handle = spawn(synchronizer_runtime.into_future()); + let synchronizer_join_handle = spawn(synchronizer_runtime.into_future().in_current_span()); (synchronizer_stream, synchronizer_join_handle) } diff --git a/crates/topos/tests/cert_delivery.rs b/crates/topos/tests/cert_delivery.rs index 0a61a0331..61bdfce93 100644 --- a/crates/topos/tests/cert_delivery.rs +++ b/crates/topos/tests/cert_delivery.rs @@ -42,7 +42,7 @@ fn get_subset_of_subnets(subnets: &[SubnetId], subset_size: usize) -> Vec = Vec::new(); @@ -107,7 +107,7 @@ async fn cert_delivery() { warn!("Starting the cluster..."); // List of peers (tce nodes) with their context - let mut peers_context = create_network(peer_number, vec![]).await; + let mut peers_context = create_network(peer_number, &[]).await; warn!("Cluster started, starting clients..."); // Connected tce clients are passing received certificates to this mpsc::Receiver, collect all of them @@ -438,7 +438,7 @@ async fn run_assert_certificate_full_delivery( number_of_nodes: usize, timeout_broadcast: Duration, ) -> Result<(), Box> { - let mut peers_context = create_network(number_of_nodes, vec![]).await; + let mut peers_context = create_network(number_of_nodes, &[]).await; for (_peer_id, client) in peers_context.iter_mut() { let response = client diff --git a/crates/topos/tests/config.rs b/crates/topos/tests/config.rs index 3ac3c53c8..34dd9abf8 100644 --- a/crates/topos/tests/config.rs +++ b/crates/topos/tests/config.rs @@ -261,7 +261,7 @@ mod serial_integration { let node_edge_path_env = setup_polygon_edge(node_up_home_env).await; let node_up_name_env = "TEST_NODE_UP"; let node_up_role_env = "full-node"; - let node_up_subnet_env = "topos-up-env-subnet"; + let node_up_subnet_env = "topos"; let mut cmd = Command::cargo_bin("topos")?; cmd.arg("node") @@ -302,32 +302,32 @@ mod serial_integration { .env("TOPOS_POLYGON_EDGE_BIN_PATH", &node_edge_path_env) .env("TOPOS_HOME", node_up_home_env) .env("TOPOS_NODE_NAME", node_up_name_env) - .arg("up"); - let mut cmd = tokio::process::Command::from(cmd).spawn().unwrap(); - let output = tokio::time::timeout(std::time::Duration::from_secs(10), cmd.wait()).await; - - // Check if node up was successful - match output { - Ok(Ok(exit_status)) => { - if !exit_status.success() { - println!("Exited with error output {:?}", exit_status.code()); - cmd.kill().await?; - panic!("Node up failed"); - } - } - Ok(Err(e)) => { - println!("Node exited with error: {e}"); - // Kill the subprocess - cmd.kill().await?; - panic!("Node up failed"); - } - Err(_) => { - println!("Node up is running correctly, time-outed"); - // Kill the subprocess - cmd.kill().await?; + .env("RUST_LOG", "topos=debug") + .arg("up") + .stdout(Stdio::piped()); + + let cmd = tokio::process::Command::from(cmd).spawn().unwrap(); + let pid = cmd.id().unwrap(); + let _ = tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + let s = System::new_all(); + if let Some(process) = s.process(Pid::from_u32(pid)) { + if process.kill_with(Signal::Term).is_none() { + eprintln!("This signal isn't supported on this platform"); } } + if let Ok(output) = cmd.wait_with_output().await { + assert!(output.status.success()); + let stdout = output.stdout; + let stdout = String::from_utf8_lossy(&stdout); + + let reg = + Regex::new(r#"Local node is listening on "\/ip4\/.*\/tcp\/9090\/p2p\/"#).unwrap(); + assert!(reg.is_match(&stdout)); + } else { + panic!("Failed to shutdown gracefully"); + } // Cleanup std::fs::remove_dir_all(node_up_home_env)?; From 58f2b3f69f5eaea720cfac35fc05ca1b3135486f Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Wed, 6 Mar 2024 19:24:04 +0100 Subject: [PATCH 2/5] chore: update e2e-tests refs Signed-off-by: Simon Paitrault --- .github/workflows/docker_build_push.yml | 4 +-- crates/topos-p2p/src/behaviour/discovery.rs | 12 +++++-- crates/topos-p2p/src/behaviour/gossip.rs | 4 ++- crates/topos-p2p/src/network.rs | 14 +++++++- crates/topos-p2p/src/runtime/handle_event.rs | 1 + crates/topos-tce-proxy/tests/tce_tests.rs | 27 +++++---------- .../tests/integration.rs | 2 +- crates/topos-test-sdk/src/tce/mod.rs | 1 - crates/topos-test-sdk/src/tce/p2p.rs | 33 +++---------------- 9 files changed, 44 insertions(+), 54 deletions(-) diff --git a/.github/workflows/docker_build_push.yml b/.github/workflows/docker_build_push.yml index 09248d1c2..578d0f618 100644 --- a/.github/workflows/docker_build_push.yml +++ b/.github/workflows/docker_build_push.yml @@ -37,7 +37,7 @@ jobs: workflow_file_name: topos:integration-tests.yml ref: main wait_interval: 60 - client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}" }' + client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}", "local-erc20-messaging-infra-ref": "feature/tec-23" }' frontend-erc20-e2e: runs-on: ubuntu-latest @@ -59,4 +59,4 @@ jobs: workflow_file_name: frontend:erc20-messaging.yml ref: main wait_interval: 60 - client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}" }' + client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}", "local-erc20-messaging-infra-ref": "feature/tec-23" }' diff --git a/crates/topos-p2p/src/behaviour/discovery.rs b/crates/topos-p2p/src/behaviour/discovery.rs index 5a6c51d22..5a08fcc7a 100644 --- a/crates/topos-p2p/src/behaviour/discovery.rs +++ b/crates/topos-p2p/src/behaviour/discovery.rs @@ -74,8 +74,16 @@ impl DiscoveryBehaviour { Self { inner: kademlia, current_bootstrap_query_id: None, - next_bootstrap_query: Some(Box::pin(tokio::time::interval(config.bootstrap_interval))), - health_status: Default::default(), + next_bootstrap_query: if known_peers.is_empty() { + None + } else { + Some(Box::pin(tokio::time::interval(config.bootstrap_interval))) + }, + health_status: if known_peers.is_empty() { + HealthStatus::Healthy + } else { + HealthStatus::default() + }, } } diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index 2f1d0621e..7ffa5c8dc 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -256,7 +256,9 @@ impl NetworkBehaviour for Behaviour { }, gossipsub::Event::Subscribed { peer_id, topic } => { debug!("Subscribed to {:?} with {peer_id}", topic); - self.health_status = HealthStatus::Healthy; + if self.health_status != HealthStatus::Healthy { + self.health_status = HealthStatus::Healthy; + } } gossipsub::Event::Unsubscribed { peer_id, topic } => { debug!("Unsubscribed from {:?} with {peer_id}", topic); diff --git a/crates/topos-p2p/src/network.rs b/crates/topos-p2p/src/network.rs index 12c7fd0db..827039679 100644 --- a/crates/topos-p2p/src/network.rs +++ b/crates/topos-p2p/src/network.rs @@ -14,7 +14,13 @@ use crate::{ }; use futures::Stream; use libp2p::{ - core::upgrade, dns, identity::Keypair, kad::store::MemoryStore, noise, swarm, tcp::Config, + core::upgrade, + dns, + identity::Keypair, + kad::store::MemoryStore, + noise, + swarm::{self, ConnectionId}, + tcp::Config, Multiaddr, PeerId, Swarm, Transport, }; use std::{ @@ -204,6 +210,12 @@ impl<'a> NetworkBuilder<'a> { shutdown, state_machine: crate::runtime::StateMachine { connected_to_bootpeer_retry_count: 3, + successfully_connect_to_bootpeer: if self.known_peers.is_empty() { + // Node seems to be a boot node + Some(ConnectionId::new_unchecked(0)) + } else { + None + }, ..Default::default() }, health_status: HealthStatus::Initializing, diff --git a/crates/topos-p2p/src/runtime/handle_event.rs b/crates/topos-p2p/src/runtime/handle_event.rs index 27fc4486d..fd3fab0e5 100644 --- a/crates/topos-p2p/src/runtime/handle_event.rs +++ b/crates/topos-p2p/src/runtime/handle_event.rs @@ -138,6 +138,7 @@ impl EventHandler> for Runtime { {connection_id} | send_back_addr: {send_back_addr}" ) } + SwarmEvent::ListenerClosed { listener_id, addresses, diff --git a/crates/topos-tce-proxy/tests/tce_tests.rs b/crates/topos-tce-proxy/tests/tce_tests.rs index 64f5e4027..21598bf14 100644 --- a/crates/topos-tce-proxy/tests/tce_tests.rs +++ b/crates/topos-tce-proxy/tests/tce_tests.rs @@ -26,19 +26,14 @@ use topos_tce_proxy::{TceProxyCommand, TceProxyConfig, TceProxyEvent}; use topos_test_sdk::tce::{start_node, NodeConfig}; use tracing::{debug, error, info, warn}; -use topos_test_sdk::{ - certificates::create_certificate_chain, - constants::*, - tce::{create_network, TceContext}, -}; +use topos_test_sdk::{certificates::create_certificate_chain, constants::*, tce::TceContext}; pub const SOURCE_SUBNET_ID_1_NUMBER_OF_PREFILLED_CERTIFICATES: usize = 15; pub const SOURCE_SUBNET_ID_2_NUMBER_OF_PREFILLED_CERTIFICATES: usize = 10; #[test(tokio::test)] async fn test_tce_submit_certificate() -> Result<(), Box> { - let mut network = create_network(3, &[]).await; - let context = network.values_mut().last().unwrap(); + let mut context = start_node::partial_2(&[], NodeConfig::standalone()).await; let source_subnet_id: SubnetId = SOURCE_SUBNET_ID_1.into(); let prev_certificate_id: CertificateId = CERTIFICATE_ID_1.into(); @@ -78,8 +73,7 @@ async fn test_tce_submit_certificate() -> Result<(), Box> #[test(tokio::test)] async fn test_tce_watch_certificates() -> Result<(), Box> { - let mut network = create_network(3, &[]).await; - let context = network.values_mut().last().unwrap(); + let mut context = start_node::partial_2(&[], NodeConfig::standalone()).await; let source_subnet_id: SubnetId = SubnetId { value: [1u8; SUBNET_ID_LENGTH].to_vec(), @@ -142,8 +136,7 @@ async fn test_tce_watch_certificates() -> Result<(), Box> #[test(tokio::test)] async fn test_tce_get_source_head_certificate() -> Result<(), Box> { - let mut network = create_network(3, &[]).await; - let context = network.values_mut().last().unwrap(); + let mut context = start_node::partial_2(&[], NodeConfig::standalone()).await; let source_subnet_id: SubnetId = SOURCE_SUBNET_ID_1.into(); let default_cert_id: CertificateId = PREV_CERTIFICATE_ID.into(); @@ -243,8 +236,7 @@ async fn test_tce_get_source_head_certificate() -> Result<(), Box Result<(), Box> { - let mut network = create_network(3, &[]).await; - let context = network.values_mut().last().unwrap(); + let mut context = start_node::partial_2(&[], NodeConfig::standalone()).await; let source_subnet_id: SubnetId = SOURCE_SUBNET_ID_1.into(); let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 10); @@ -339,9 +331,8 @@ async fn test_tce_get_last_pending_certificates() -> Result<(), Box, ) -> Result<(), Box> { - let mut network = create_network(3, &input_certificates[..]).await; - - let context = network.values_mut().last().unwrap(); + let mut context = + start_node::partial_2(&input_certificates[..], NodeConfig::standalone()).await; let source_subnet_id_1: SubnetId = SubnetId { value: SOURCE_SUBNET_ID_1.into(), @@ -739,8 +730,8 @@ async fn test_tce_client_get_empty_history_source_head() -> Result<(), Box, ) -> Result<(), Box> { - let mut network = create_network(3, &input_certificates[..]).await; - let context = network.values_mut().last().unwrap(); + let mut context = + start_node::partial_2(&input_certificates[..], NodeConfig::standalone()).await; // Tce is prefilled with delivered certificates let source_subnet_id_1_prefilled_certificates = diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs index 8ac29619a..c6ebb71cb 100644 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs +++ b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests/integration.rs @@ -20,7 +20,7 @@ use crate::SynchronizerService; #[rstest] #[test(tokio::test)] -#[timeout(Duration::from_secs(10))] +#[timeout(Duration::from_secs(5))] async fn network_test() { let subnet = topos_test_sdk::constants::SOURCE_SUBNET_ID_1; let certificates: Vec = diff --git a/crates/topos-test-sdk/src/tce/mod.rs b/crates/topos-test-sdk/src/tce/mod.rs index e683d0668..6f2425f0d 100644 --- a/crates/topos-test-sdk/src/tce/mod.rs +++ b/crates/topos-test-sdk/src/tce/mod.rs @@ -169,7 +169,6 @@ impl NodeConfig { peers, self.minimum_cluster_size, router, - self.dummy, ) .await } diff --git a/crates/topos-test-sdk/src/tce/p2p.rs b/crates/topos-test-sdk/src/tce/p2p.rs index a9369a889..882928068 100644 --- a/crates/topos-test-sdk/src/tce/p2p.rs +++ b/crates/topos-test-sdk/src/tce/p2p.rs @@ -17,7 +17,6 @@ pub async fn create_network_worker( peers: &[NodeConfig], minimum_cluster_size: usize, router: Option, - dummy: bool, ) -> Result< ( NetworkClient, @@ -26,26 +25,11 @@ pub async fn create_network_worker( ), P2PError, > { - if !dummy && peers.len() < 2 { - println!("peers {:?}", peers); - return Err(P2PError::UnableToReachBootnode); - } - let key = keypair_from_seed(seed); let _peer_id = key.public().to_peer_id(); - let known_peers = if dummy { + let known_peers = if seed == 1 { vec![] - } else if seed == 1 { - vec![( - peers[1].keypair.public().to_peer_id(), - peers[1].addr.clone(), - )] - } else if seed == 2 { - vec![( - peers[0].keypair.public().to_peer_id(), - peers[0].addr.clone(), - )] } else { peers .iter() @@ -92,17 +76,10 @@ pub async fn bootstrap_network( ), Box, > { - let (network_client, mut network_stream, runtime) = create_network_worker( - seed, - port, - vec![addr], - peers, - minimum_cluster_size, - router, - dummy, - ) - .in_current_span() - .await?; + let (network_client, mut network_stream, runtime) = + create_network_worker(seed, port, vec![addr], peers, minimum_cluster_size, router) + .in_current_span() + .await?; let runtime_join_handle = if dummy { spawn(runtime.run().in_current_span()) From f3547a3bd5ef844f1ededac8ab587dfd52eda770 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Thu, 7 Mar 2024 20:48:25 +0100 Subject: [PATCH 3/5] fix: fixing review comments Signed-off-by: Simon Paitrault --- crates/topos-p2p/src/behaviour/discovery.rs | 2 +- crates/topos-p2p/src/behaviour/gossip.rs | 2 +- crates/topos-p2p/src/config.rs | 4 ++ crates/topos-p2p/src/network.rs | 6 +-- crates/topos-p2p/src/runtime/handle_event.rs | 26 ++++++------ .../src/runtime/handle_event/discovery.rs | 41 +++++++++---------- crates/topos-p2p/src/runtime/mod.rs | 24 +++++------ crates/topos-tce-api/src/runtime/sync_task.rs | 4 +- 8 files changed, 56 insertions(+), 53 deletions(-) diff --git a/crates/topos-p2p/src/behaviour/discovery.rs b/crates/topos-p2p/src/behaviour/discovery.rs index 5a08fcc7a..67fe0766f 100644 --- a/crates/topos-p2p/src/behaviour/discovery.rs +++ b/crates/topos-p2p/src/behaviour/discovery.rs @@ -82,7 +82,7 @@ impl DiscoveryBehaviour { health_status: if known_peers.is_empty() { HealthStatus::Healthy } else { - HealthStatus::default() + HealthStatus::Initializing }, } } diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index 7ffa5c8dc..aad402f5c 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -32,7 +32,7 @@ pub struct Behaviour { gossipsub: gossipsub::Behaviour, pending: HashMap<&'static str, VecDeque>>, tick: tokio::time::Interval, - /// List of connected peers per topics + /// List of connected peers per topic. connected_peer: HashMap<&'static str, HashSet>, /// The health status of the gossip behaviour pub(crate) health_status: HealthStatus, diff --git a/crates/topos-p2p/src/config.rs b/crates/topos-p2p/src/config.rs index 7603037a2..3fb46b733 100644 --- a/crates/topos-p2p/src/config.rs +++ b/crates/topos-p2p/src/config.rs @@ -31,9 +31,13 @@ pub struct DiscoveryConfig { pub publication_interval: Option, pub provider_publication_interval: Option, /// Interval at which the node will send bootstrap query to the network + /// + /// Defaults to [DiscoveryConfig::BOOTSTRAP_INTERVAL] pub bootstrap_interval: Duration, /// Interval at which the node will send fast bootstrap query to the network /// Mostly used when the node is bootstrapping and failed to connect to boot peers + /// + /// Defaults to [DiscoveryConfig::FAST_BOOTSTRAP_INTERVAL] pub fast_bootstrap_interval: Duration, } diff --git a/crates/topos-p2p/src/network.rs b/crates/topos-p2p/src/network.rs index 827039679..cf0c0a5c0 100644 --- a/crates/topos-p2p/src/network.rs +++ b/crates/topos-p2p/src/network.rs @@ -208,9 +208,9 @@ impl<'a> NetworkBuilder<'a> { active_listeners: HashSet::new(), pending_record_requests: HashMap::new(), shutdown, - state_machine: crate::runtime::StateMachine { - connected_to_bootpeer_retry_count: 3, - successfully_connect_to_bootpeer: if self.known_peers.is_empty() { + health_state: crate::runtime::HealthState { + bootpeer_connection_retries: 3, + successfully_connected_to_bootpeer: if self.known_peers.is_empty() { // Node seems to be a boot node Some(ConnectionId::new_unchecked(0)) } else { diff --git a/crates/topos-p2p/src/runtime/handle_event.rs b/crates/topos-p2p/src/runtime/handle_event.rs index fd3fab0e5..5f3607747 100644 --- a/crates/topos-p2p/src/runtime/handle_event.rs +++ b/crates/topos-p2p/src/runtime/handle_event.rs @@ -61,14 +61,14 @@ impl EventHandler> for Runtime { peer_id: Some(peer_id), error, } if self - .state_machine - .successfully_connect_to_bootpeer + .health_state + .successfully_connected_to_bootpeer .is_none() - && self.state_machine.dialed_bootpeer.contains(&connection_id) => + && self.health_state.dialed_bootpeer.contains(&connection_id) => { warn!("Unable to connect to bootpeer {peer_id}: {error:?}"); - self.state_machine.dialed_bootpeer.remove(&connection_id); - if self.state_machine.dialed_bootpeer.is_empty() { + self.health_state.dialed_bootpeer.remove(&connection_id); + if self.health_state.dialed_bootpeer.is_empty() { // We tried to connect to all bootnode without success error!("Unable to connect to any bootnode"); } @@ -96,15 +96,15 @@ impl EventHandler> for Runtime { num_established, concurrent_dial_errors, established_in, - } if self.state_machine.dialed_bootpeer.contains(&connection_id) => { - info!("Successfully connect to bootpeer {peer_id}"); + } if self.health_state.dialed_bootpeer.contains(&connection_id) => { + info!("Successfully connected to bootpeer {peer_id}"); if self - .state_machine - .successfully_connect_to_bootpeer + .health_state + .successfully_connected_to_bootpeer .is_none() { - self.state_machine.successfully_connect_to_bootpeer = Some(connection_id); - _ = self.state_machine.dialed_bootpeer.remove(&connection_id); + self.health_state.successfully_connected_to_bootpeer = Some(connection_id); + _ = self.health_state.dialed_bootpeer.remove(&connection_id); } } @@ -160,7 +160,7 @@ impl EventHandler> for Runtime { connection_id, } if self.boot_peers.contains(peer_id) => { info!("Dialing bootpeer {peer_id} on connection: {connection_id}"); - self.state_machine.dialed_bootpeer.insert(connection_id); + self.health_state.dialed_bootpeer.insert(connection_id); } SwarmEvent::Dialing { @@ -188,7 +188,7 @@ impl EventHandler> for Runtime { let behaviour = self.swarm.behaviour(); if let Some(event) = self.healthy_status_changed() { - _ = self.event_sender.send(Event::Healthy).await; + _ = self.event_sender.send(event).await; } Ok(()) diff --git a/crates/topos-p2p/src/runtime/handle_event/discovery.rs b/crates/topos-p2p/src/runtime/handle_event/discovery.rs index 7ecf0aac1..c08195829 100644 --- a/crates/topos-p2p/src/runtime/handle_event/discovery.rs +++ b/crates/topos-p2p/src/runtime/handle_event/discovery.rs @@ -1,5 +1,5 @@ use libp2p::kad::{BootstrapOk, BootstrapResult, Event, QueryResult}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use crate::{behaviour::HealthStatus, error::P2PError, Runtime}; @@ -44,8 +44,8 @@ impl EventHandler> for Runtime { && self.swarm.behaviour().discovery.health_status == HealthStatus::Initializing => { warn!( - "Bootstrap query finished but unable to connect to bootnode {:?} {:?}", - stats, step + "Bootstrap query finished but unable to connect to bootnode during \ + initialization, switching to unhealthy and fast bootstrap mode", ); let behaviour = self.swarm.behaviour_mut(); @@ -67,27 +67,26 @@ impl EventHandler> for Runtime { step, } if num_remaining == 0 && self - .state_machine - .successfully_connect_to_bootpeer + .health_state + .successfully_connected_to_bootpeer .is_none() && self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy => { - warn!( - "Bootstrap query finished but unable to connect to bootnode {:?} {:?}", - stats, step - ); - match self - .state_machine - .connected_to_bootpeer_retry_count - .checked_sub(1) - { + match self.health_state.bootpeer_connection_retries.checked_sub(1) { None => { - error!("Unable to connect to bootnode, stopping"); + error!( + "Bootstrap query finished but unable to connect to bootnode, stopping" + ); return Err(P2PError::UnableToReachBootnode); } Some(new) => { - self.state_machine.connected_to_bootpeer_retry_count = new; + warn!( + "Bootstrap query finished but unable to connect to bootnode, retrying \ + {} more times", + new + ); + self.health_state.bootpeer_connection_retries = new; } } } @@ -102,14 +101,14 @@ impl EventHandler> for Runtime { step, } if num_remaining == 0 && self - .state_machine - .successfully_connect_to_bootpeer + .health_state + .successfully_connected_to_bootpeer .is_some() && self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy => { - warn!( - "Bootstrap query finished with bootnode {:?} {:?}", - stats, step + info!( + "Bootstrap query finished with bootnode, switching to healthy and normal \ + bootstrap mode", ); let behaviour = self.swarm.behaviour_mut(); diff --git a/crates/topos-p2p/src/runtime/mod.rs b/crates/topos-p2p/src/runtime/mod.rs index a42076030..69ed30592 100644 --- a/crates/topos-p2p/src/runtime/mod.rs +++ b/crates/topos-p2p/src/runtime/mod.rs @@ -41,8 +41,8 @@ pub struct Runtime { /// Shutdown signal receiver from the client pub(crate) shutdown: mpsc::Receiver>, - /// Internal state machine of the p2p layer - pub(crate) state_machine: StateMachine, + /// Internal health state of the p2p layer + pub(crate) health_state: HealthState, pub(crate) health_status: HealthStatus, } @@ -50,22 +50,22 @@ pub struct Runtime { mod handle_command; mod handle_event; -/// Internal state machine of the p2p layer +/// Internal health state of the p2p layer /// /// This struct may change in the future to be more flexible and to handle more /// complex state transitions/representation. #[derive(Default)] -pub(crate) struct StateMachine { +pub(crate) struct HealthState { /// Indicates if the node has external addresses configured pub(crate) has_external_addresses: bool, /// Indicates if the node is listening on any address pub(crate) is_listening: bool, - /// List the boot peers that the node has tried to connect to + /// List the bootnodes that the node has tried to connect to pub(crate) dialed_bootpeer: HashSet, - /// Indicates if the node has successfully connected to a boot peer - pub(crate) successfully_connect_to_bootpeer: Option, - /// Track the number of retries to connect to boot peers - pub(crate) connected_to_bootpeer_retry_count: usize, + /// Indicates if the node has successfully connected to a bootnode + pub(crate) successfully_connected_to_bootpeer: Option, + /// Track the number of remaining retries to connect to any bootnode + pub(crate) bootpeer_connection_retries: usize, } impl Runtime { @@ -80,7 +80,7 @@ impl Runtime { debug!("Added public addresses: {:?}", self.public_addresses); for address in &self.public_addresses { self.swarm.add_external_address(address.clone()); - self.state_machine.has_external_addresses = true; + self.health_state.has_external_addresses = true; } debug!("Starting to listen on {:?}", self.listening_on); @@ -91,12 +91,12 @@ impl Runtime { return Err(P2PError::TransportError(error)); } - self.state_machine.is_listening = true; + self.health_state.is_listening = true; } let mut handle = spawn(self.run().in_current_span()); - // Wait for first healthy + // Await the Event::Healthy coming from freshly started p2p layer loop { tokio::select! { result = &mut handle => { diff --git a/crates/topos-tce-api/src/runtime/sync_task.rs b/crates/topos-tce-api/src/runtime/sync_task.rs index d0c5ccec7..2c5fd2d7f 100644 --- a/crates/topos-tce-api/src/runtime/sync_task.rs +++ b/crates/topos-tce-api/src/runtime/sync_task.rs @@ -72,7 +72,7 @@ impl SyncTask { pub(crate) fn new( stream_id: Uuid, target_subnet_stream_positions: TargetSubnetStreamPositions, - storage: StorageClient, + store: StorageClient, notifier: Sender, cancel_token: CancellationToken, ) -> Self { @@ -80,7 +80,7 @@ impl SyncTask { status: SyncTaskStatus::Running, stream_id, target_subnet_stream_positions, - store: storage, + store, notifier, cancel_token, } From 649071285cf9e02b9c9550612049b6f81ed013dc Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Mon, 11 Mar 2024 18:39:23 +0100 Subject: [PATCH 4/5] fix: fixing review comments Signed-off-by: Simon Paitrault --- crates/topos-p2p/src/behaviour.rs | 1 + crates/topos-p2p/src/behaviour/discovery.rs | 3 +++ crates/topos-p2p/src/behaviour/gossip.rs | 11 +++++++++-- crates/topos-p2p/src/config.rs | 8 ++++---- crates/topos-p2p/src/error.rs | 3 +++ crates/topos-p2p/src/event.rs | 3 +++ crates/topos-p2p/src/runtime/handle_command.rs | 1 - crates/topos-p2p/src/runtime/handle_event.rs | 11 ++++++++--- crates/topos-p2p/src/runtime/mod.rs | 4 +++- 9 files changed, 34 insertions(+), 11 deletions(-) diff --git a/crates/topos-p2p/src/behaviour.rs b/crates/topos-p2p/src/behaviour.rs index 798c43fa8..75b5e6c35 100644 --- a/crates/topos-p2p/src/behaviour.rs +++ b/crates/topos-p2p/src/behaviour.rs @@ -15,6 +15,7 @@ pub(crate) enum HealthStatus { Initializing, Healthy, Unhealthy, + Killing, #[allow(unused)] Recovering, } diff --git a/crates/topos-p2p/src/behaviour/discovery.rs b/crates/topos-p2p/src/behaviour/discovery.rs index 67fe0766f..67a18aee4 100644 --- a/crates/topos-p2p/src/behaviour/discovery.rs +++ b/crates/topos-p2p/src/behaviour/discovery.rs @@ -74,6 +74,9 @@ impl DiscoveryBehaviour { Self { inner: kademlia, current_bootstrap_query_id: None, + // If the `discovery` behaviour is created without known_peers + // The bootstrap query interval is disabled only when the local + // node is a lonely bootnode, other nodes will join it. next_bootstrap_query: if known_peers.is_empty() { None } else { diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index aad402f5c..35716fe4a 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -18,7 +18,7 @@ use libp2p::{ use prost::Message as ProstMessage; use topos_core::api::grpc::tce::v1::Batch; use topos_metrics::P2P_GOSSIP_BATCH_SIZE; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::error::P2PError; use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY}; @@ -256,7 +256,14 @@ impl NetworkBehaviour for Behaviour { }, gossipsub::Event::Subscribed { peer_id, topic } => { debug!("Subscribed to {:?} with {peer_id}", topic); - if self.health_status != HealthStatus::Healthy { + + // If the behaviour isn't already healthy we check if this event + // triggers a switch to healthy + if self.health_status != HealthStatus::Healthy + && self.gossipsub.topics().all(|topic| { + self.gossipsub.mesh_peers(topic).peekable().peek().is_some() + }) + { self.health_status = HealthStatus::Healthy; } } diff --git a/crates/topos-p2p/src/config.rs b/crates/topos-p2p/src/config.rs index 3fb46b733..0574aa23c 100644 --- a/crates/topos-p2p/src/config.rs +++ b/crates/topos-p2p/src/config.rs @@ -48,17 +48,17 @@ impl Default for DiscoveryConfig { replication_interval: Some(Duration::from_secs(10)), publication_interval: Some(Duration::from_secs(10)), provider_publication_interval: Some(Duration::from_secs(10)), - bootstrap_interval: Duration::from_secs(Self::BOOTSTRAP_INTERVAL), - fast_bootstrap_interval: Duration::from_secs(Self::FAST_BOOTSTRAP_INTERVAL), + bootstrap_interval: Self::BOOTSTRAP_INTERVAL, + fast_bootstrap_interval: Self::FAST_BOOTSTRAP_INTERVAL, } } } impl DiscoveryConfig { /// Default bootstrap interval in seconds - pub const BOOTSTRAP_INTERVAL: u64 = 60; + pub const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(60); /// Default fast bootstrap interval in seconds - pub const FAST_BOOTSTRAP_INTERVAL: u64 = 5; + pub const FAST_BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5); pub fn with_replication_factor(mut self, replication_factor: NonZeroUsize) -> Self { self.replication_factor = replication_factor; diff --git a/crates/topos-p2p/src/error.rs b/crates/topos-p2p/src/error.rs index 09a728bbf..3fe69f0f9 100644 --- a/crates/topos-p2p/src/error.rs +++ b/crates/topos-p2p/src/error.rs @@ -46,6 +46,9 @@ pub enum P2PError { #[error("Unable to create gRPC client")] UnableToCreateGrpcClient(#[from] OutboundConnectionError), + + #[error("Gossip topics subscription failed")] + GossipTopicSubscriptionFailure, } #[derive(Error, Debug)] diff --git a/crates/topos-p2p/src/event.rs b/crates/topos-p2p/src/event.rs index b20822263..a9ea4e2b4 100644 --- a/crates/topos-p2p/src/event.rs +++ b/crates/topos-p2p/src/event.rs @@ -55,12 +55,15 @@ pub enum Event { Healthy, /// An event emitted when the p2p layer becomes unhealthy Unhealthy, + /// An event emitted when the p2p layer is shutting down + Killing, } impl From<&HealthStatus> for Event { fn from(value: &HealthStatus) -> Self { match value { HealthStatus::Healthy => Event::Healthy, + HealthStatus::Killing => Event::Killing, _ => Event::Unhealthy, } } diff --git a/crates/topos-p2p/src/runtime/handle_command.rs b/crates/topos-p2p/src/runtime/handle_command.rs index 864712473..572a83798 100644 --- a/crates/topos-p2p/src/runtime/handle_command.rs +++ b/crates/topos-p2p/src/runtime/handle_command.rs @@ -2,7 +2,6 @@ use crate::{ error::{CommandExecutionError, P2PError}, protocol_name, Command, Runtime, }; -use libp2p::{kad::RecordKey, PeerId}; use rand::{thread_rng, Rng}; use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL; diff --git a/crates/topos-p2p/src/runtime/handle_event.rs b/crates/topos-p2p/src/runtime/handle_event.rs index 5f3607747..5ccd60523 100644 --- a/crates/topos-p2p/src/runtime/handle_event.rs +++ b/crates/topos-p2p/src/runtime/handle_event.rs @@ -41,7 +41,7 @@ impl EventHandler for Runtime { #[async_trait::async_trait] impl EventHandler> for Runtime { - async fn handle(&mut self, event: SwarmEvent) { + async fn handle(&mut self, event: SwarmEvent) -> EventResult { match event { SwarmEvent::NewListenAddr { listener_id, @@ -74,7 +74,11 @@ impl EventHandler> for Runtime { } } - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + SwarmEvent::OutgoingConnectionError { + peer_id, + error, + connection_id, + } => { if let Some(peer_id) = peer_id { error!( "OutgoingConnectionError peer_id: {peer_id} | error: {error:?} | \ @@ -119,7 +123,8 @@ impl EventHandler> for Runtime { if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size { if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() { error!("Unable to subscribe to gossipsub topic: {}", error); - // TODO: Deal with initial subscribe error + + return Err(P2PError::GossipTopicSubscriptionFailure); } } } diff --git a/crates/topos-p2p/src/runtime/mod.rs b/crates/topos-p2p/src/runtime/mod.rs index 69ed30592..8e25dfff5 100644 --- a/crates/topos-p2p/src/runtime/mod.rs +++ b/crates/topos-p2p/src/runtime/mod.rs @@ -29,7 +29,7 @@ pub struct Runtime { pub(crate) listening_on: Vec, pub(crate) public_addresses: Vec, - /// Boot peers to connect used to bootstrap the p2p layer + /// Well-known or pre-configured bootnodes to connect to in order to bootstrap the p2p layer pub(crate) boot_peers: Vec, /// Contains current listenerId of the swarm @@ -44,6 +44,7 @@ pub struct Runtime { /// Internal health state of the p2p layer pub(crate) health_state: HealthState, + /// Health status of the p2p layer pub(crate) health_status: HealthStatus, } @@ -153,6 +154,7 @@ impl Runtime { let discovery = &behaviours.discovery.health_status; let new_status = match (discovery, gossipsub) { + (HealthStatus::Killing, _) | (_, HealthStatus::Killing) => HealthStatus::Killing, (HealthStatus::Initializing, _) | (_, HealthStatus::Initializing) => { HealthStatus::Initializing } From 69601cff57a04212699a10a9a7506623adc2b6fa Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Tue, 12 Mar 2024 16:40:59 +0100 Subject: [PATCH 5/5] chore: remove temporary local-erc20 ref Signed-off-by: Simon Paitrault --- .github/workflows/docker_build_push.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/docker_build_push.yml b/.github/workflows/docker_build_push.yml index 578d0f618..70cfac485 100644 --- a/.github/workflows/docker_build_push.yml +++ b/.github/workflows/docker_build_push.yml @@ -37,7 +37,7 @@ jobs: workflow_file_name: topos:integration-tests.yml ref: main wait_interval: 60 - client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}", "local-erc20-messaging-infra-ref": "feature/tec-23" }' + client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}" }' frontend-erc20-e2e: runs-on: ubuntu-latest @@ -59,4 +59,4 @@ jobs: workflow_file_name: frontend:erc20-messaging.yml ref: main wait_interval: 60 - client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}", "local-erc20-messaging-infra-ref": "feature/tec-23" }' + client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}" }'