diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index f7d23b90935..0dad306aa7f 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -51,11 +51,11 @@ use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFi use crate::time_cache::DuplicateCache; use crate::topic::{Hasher, Topic, TopicHash}; use crate::transform::{DataTransform, IdentityTransform}; +use crate::types::PeerKind; use crate::types::{ ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, SubscriptionAction, }; -use crate::types::{PeerConnections, PeerKind}; use crate::{backoff::BackoffStorage, types::RpcSender}; use crate::{ config::{Config, ValidationMode}, @@ -77,6 +77,10 @@ use instant::SystemTime; use quick_protobuf::{MessageWrite, Writer}; use std::{cmp::Ordering::Equal, fmt::Debug}; +mod connected_peers; + +use connected_peers::ConnectedPeers; + #[cfg(test)] mod tests; @@ -267,15 +271,8 @@ pub struct Behaviour { /// duplicates from being propagated to the application and on the network. duplicate_cache: DuplicateCache, - /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and - /// the set of [`ConnectionId`]s. - connected_peers: HashMap, - - /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. - topic_peers: HashMap>, - - /// A map of all connected peers to their subscribed topics. - peer_topics: HashMap>, + /// A data structure managing the state of all connected peers. + connected_peers: ConnectedPeers, /// A set of all explicit peers. These are peers that remain connected and we unconditionally /// forward messages to, outside of the scoring system. @@ -285,12 +282,6 @@ pub struct Behaviour { /// Messages are not sent to and are rejected from these peers. blacklisted_peers: HashSet, - /// Overlay network of connected peers - Maps topics to connected gossipsub peers. - mesh: HashMap>, - - /// Map of topics to list of peers that we publish to, but don't subscribe to. - fanout: HashMap>, - /// The last publish time for fanout topics. fanout_last_pub: HashMap, @@ -313,10 +304,6 @@ pub struct Behaviour { /// be removed from this list which may result in a true outbound rediscovery. px_peers: HashSet, - /// Set of connected outbound peers (we only consider true outbound peers found through - /// discovery and not by PX). - outbound_peers: HashSet, - /// Stores optional peer score data together with thresholds, decay interval and gossip /// promises. peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>, @@ -453,12 +440,8 @@ where events: VecDeque::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), - topic_peers: HashMap::new(), - peer_topics: HashMap::new(), explicit_peers: HashSet::new(), blacklisted_peers: HashSet::new(), - mesh: HashMap::new(), - fanout: HashMap::new(), fanout_last_pub: HashMap::new(), backoffs: BackoffStorage::new( &config.prune_backoff(), @@ -472,11 +455,10 @@ where ), heartbeat_ticks: 0, px_peers: HashSet::new(), - outbound_peers: HashSet::new(), peer_score: None, count_received_ihave: HashMap::new(), count_sent_iwant: HashMap::new(), - connected_peers: HashMap::new(), + connected_peers: ConnectedPeers::default(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), config, subscription_filter, @@ -493,17 +475,21 @@ where { /// Lists the hashes of the topics we are currently subscribed to. pub fn topics(&self) -> impl Iterator { - self.mesh.keys() + self.connected_peers.mesh().keys() } /// Lists all mesh peers for a certain topic hash. pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator { - self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter()) + self.connected_peers + .mesh() + .get(topic_hash) + .into_iter() + .flat_map(|x| x.iter()) } pub fn all_mesh_peers(&self) -> impl Iterator { let mut res = BTreeSet::new(); - for peers in self.mesh.values() { + for peers in self.connected_peers.mesh().values() { res.extend(peers); } res.into_iter() @@ -511,14 +497,12 @@ where /// Lists all known peers and their associated subscribed topics. pub fn all_peers(&self) -> impl Iterator)> { - self.peer_topics - .iter() - .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect())) + self.connected_peers.all_peers_topics() } /// Lists all known peers and their associated protocol. pub fn peer_protocol(&self) -> impl Iterator { - self.connected_peers.iter().map(|(k, v)| (k, &v.kind)) + self.connected_peers.peer_protocol_list() } /// Returns the gossipsub score for a given peer, if one exists. @@ -539,20 +523,14 @@ where return Err(SubscriptionError::NotAllowed); } - if self.mesh.get(&topic_hash).is_some() { + if self.connected_peers.mesh().get(&topic_hash).is_some() { tracing::debug!(%topic, "Topic is already in the mesh"); return Ok(false); } // send subscription request to all peers - for peer_id in self.peer_topics.keys() { + for (peer_id, sender) in self.connected_peers.all_handler_senders() { tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer"); - let sender = &mut self - .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; - sender.subscribe(topic_hash.clone()); } @@ -570,21 +548,15 @@ where tracing::debug!(%topic, "Unsubscribing from topic"); let topic_hash = topic.hash(); - if self.mesh.get(&topic_hash).is_none() { + if self.connected_peers.mesh().get(&topic_hash).is_none() { tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic"); // we are not subscribed return Ok(false); } // announce to all peers - for peer_id in self.peer_topics.keys() { + for (peer_id, sender) in self.connected_peers.all_handler_senders() { tracing::debug!(%peer_id, "Sending UNSUBSCRIBE to peer"); - let sender = &mut self - .connected_peers - .get_mut(peer_id) - .expect("Peer should be connected") - .sender; - sender.unsubscribe(topic_hash.clone()); } @@ -603,14 +575,14 @@ where data: impl Into>, ) -> Result { let data = data.into(); - let topic = topic.into(); + let topic_hash = topic.into(); // Transform the data before building a raw_message. let transformed_data = self .data_transform - .outbound_transform(&topic, data.clone())?; + .outbound_transform(&topic_hash, data.clone())?; - let raw_message = self.build_raw_message(topic, transformed_data)?; + let raw_message = self.build_raw_message(topic_hash, transformed_data)?; // calculate the message id from the un-transformed data let msg_id = self.config.message_id(&Message { @@ -641,7 +613,12 @@ where let topic_hash = raw_message.topic.clone(); let mut recipient_peers = HashSet::new(); - if let Some(peers_on_topic) = self.topic_peers.get(&topic_hash) { + + if let Some(peers_on_topic) = self + .connected_peers + .get_peers_on_topic(&topic_hash) + .cloned() + { if self.config.flood_publish() { // Forward to all peers above score and all explicit peers recipient_peers.extend(peers_on_topic.iter().filter(|p| { @@ -649,7 +626,7 @@ where || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 })); } else { - match self.mesh.get(&raw_message.topic) { + match self.connected_peers.mesh_peers(&topic_hash) { // Mesh peers Some(mesh_peers) => { recipient_peers.extend(mesh_peers); @@ -658,19 +635,15 @@ where None => { tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); // If we have fanout peers add them to the map. - if self.fanout.contains_key(&topic_hash) { - for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + if let Some(fanout_peers) = self.connected_peers.fanout_peers(&topic_hash) { + for peer in fanout_peers { recipient_peers.insert(*peer); } } else { // We have no fanout peers, select mesh_n of them and add them to the fanout let mesh_n = self.config.mesh_n(); - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - &topic_hash, - mesh_n, - { + let new_peers = + get_random_peers(&self.connected_peers, &topic_hash, mesh_n, { |p| { !self.explicit_peers.contains(p) && !self @@ -679,12 +652,11 @@ where }) .0 } - }, - ); + }); // Add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); + self.connected_peers + .add_to_fanout(topic_hash.clone(), new_peers.iter()); for peer in new_peers { - tracing::debug!(%peer, "Peer added to fanout"); recipient_peers.insert(peer); } } @@ -702,11 +674,14 @@ where } // Floodsub peers - for (peer, connections) in &self.connected_peers { - if connections.kind == PeerKind::Floodsub - && !self - .score_below_threshold(peer, |ts| ts.publish_threshold) - .0 + for (peer, _connections) in self + .connected_peers + .peer_protocol_list() + .filter(|(_, v)| matches!(v, PeerKind::Floodsub)) + { + if !self + .score_below_threshold(peer, |ts| ts.publish_threshold) + .0 { recipient_peers.insert(*peer); } @@ -737,9 +712,8 @@ where tracing::trace!(peer=%peer_id, "Sending message to peer"); let sender = &mut self .connected_peers - .get_mut(peer_id) - .expect("The peer must be connected") - .sender; + .get_sender(peer_id) + .expect("The peer must be connected"); match sender.publish( raw_message.clone(), @@ -955,7 +929,7 @@ where tracing::debug!(topic=%topic_hash, "Running JOIN for topic"); // if we are already in the mesh, return - if self.mesh.contains_key(topic_hash) { + if self.connected_peers.are_we_subscribed_to_topic(topic_hash) { tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN"); return; } @@ -968,7 +942,7 @@ where // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do, // removing the fanout entry. - if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) { + if let Some(mut peers) = self.connected_peers.remove_all_fanout_peers(topic_hash) { tracing::debug!( topic=%topic_hash, "JOIN: Removing peers from the fanout for topic" @@ -983,33 +957,34 @@ where // Add up to mesh_n of them them to the mesh // NOTE: These aren't randomly added, currently FIFO - let add_peers = std::cmp::min(peers.len(), self.config.mesh_n()); + let number_of_peers_to_add = std::cmp::min(peers.len(), self.config.mesh_n()); tracing::debug!( topic=%topic_hash, "JOIN: Adding {:?} peers from the fanout for topic", - add_peers + number_of_peers_to_add ); - added_peers.extend(peers.iter().take(add_peers)); + added_peers.extend(peers.iter().take(number_of_peers_to_add)); - self.mesh.insert( + // Add them to the mesh + self.connected_peers.add_to_mesh( topic_hash.clone(), - peers.into_iter().take(add_peers).collect(), + peers.iter().take(number_of_peers_to_add), ); - // remove the last published time + // Remove the last fanout published time self.fanout_last_pub.remove(topic_hash); - } - let fanaout_added = added_peers.len(); - if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added) + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Fanout, added_peers.len()) + } } + let fanout_added = added_peers.len(); + // check if we need to get more peers, which we randomly select if added_peers.len() < self.config.mesh_n() { // get the peers let new_peers = get_random_peers( - &self.topic_peers, &self.connected_peers, topic_hash, self.config.mesh_n() - added_peers.len(), @@ -1026,15 +1001,18 @@ where "JOIN: Inserting {:?} random peers into the mesh", new_peers.len() ); - let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default(); - mesh_peers.extend(new_peers); + + // Add the new peers to the mesh + self.connected_peers + .add_to_mesh(topic_hash.clone(), new_peers.iter()); } - let random_added = added_peers.len() - fanaout_added; + let random_added = added_peers.len() - fanout_added; if let Some(m) = self.metrics.as_mut() { m.peers_included(topic_hash, Inclusion::Random, random_added) } + // Send GRAFT messages to all peers we have added to our mesh. for peer_id in added_peers { // Send a GRAFT control message tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer"); @@ -1043,9 +1021,8 @@ where } let sender = &mut self .connected_peers - .get_mut(&peer_id) - .expect("Peer must be connected") - .sender; + .get_sender(&peer_id) + .expect("Peer must be connected"); sender.graft(Graft { topic_hash: topic_hash.clone(), @@ -1055,10 +1032,8 @@ where peer_added_to_mesh( peer_id, vec![topic_hash], - &self.mesh, - self.peer_topics.get(&peer_id), - &mut self.events, &self.connected_peers, + &mut self.events, ); } @@ -1074,15 +1049,15 @@ where fn make_prune( &mut self, topic_hash: &TopicHash, - peer: &PeerId, + peer_id: &PeerId, do_px: bool, on_unsubscribe: bool, ) -> Prune { if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.prune(peer, topic_hash.clone()); + peer_score.prune(peer_id, topic_hash.clone()); } - match self.connected_peers.get(peer).map(|v| &v.kind) { + match self.connected_peers.peer_protocol(peer_id) { Some(PeerKind::Floodsub) => { tracing::error!("Attempted to prune a Floodsub peer"); } @@ -1103,11 +1078,10 @@ where // Select peers for peer exchange let peers = if do_px { get_random_peers( - &self.topic_peers, &self.connected_peers, topic_hash, self.config.prune_peers(), - |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0, + |p| p != peer_id && !self.score_below_threshold(p, |_| 0.0).0, ) .into_iter() .map(|p| PeerInfo { peer_id: Some(p) }) @@ -1123,7 +1097,7 @@ where }; // update backoff - self.backoffs.update_backoff(topic_hash, peer, backoff); + self.backoffs.update_backoff(topic_hash, peer_id, backoff); Prune { topic_hash: topic_hash.clone(), @@ -1137,7 +1111,7 @@ where tracing::debug!(topic=%topic_hash, "Running LEAVE for topic"); // If our mesh contains the topic, send prune to peers and delete it from the mesh - if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) { + if let Some(peers) = self.connected_peers.remove_all_mesh_peers(topic_hash) { if let Some(m) = self.metrics.as_mut() { m.left(topic_hash) } @@ -1149,9 +1123,8 @@ where self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe); let sender = &mut self .connected_peers - .get_mut(&peer_id) - .expect("Peer must be connected") - .sender; + .get_sender(&peer_id) + .expect("Peer must be connected"); sender.prune(prune); @@ -1159,10 +1132,8 @@ where peer_removed_from_mesh( peer_id, topic_hash, - &self.mesh, - self.peer_topics.get(&peer_id), - &mut self.events, &self.connected_peers, + &mut self.events, ); } } @@ -1171,7 +1142,7 @@ where /// Checks if the given peer is still connected and if not dials the peer again. fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) { - if !self.peer_topics.contains_key(peer_id) { + if !self.connected_peers.is_peer_connected(peer_id) { // Connect to peer tracing::debug!(peer=%peer_id, "Connecting to explicit peer"); self.events.push_back(ToSwarm::Dial { @@ -1259,8 +1230,8 @@ where }; for (topic, ids) in ihave_msgs { - // only process the message if we are subscribed - if !self.mesh.contains_key(&topic) { + // Only process the message if we are subscribed + if !self.connected_peers.are_we_subscribed_to_topic(&topic) { tracing::debug!( %topic, "IHAVE: Ignoring IHAVE - Not subscribed to topic" @@ -1317,9 +1288,8 @@ where let sender = &mut self .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; + .get_sender(peer_id) + .expect("Peer must be connected"); if sender .iwant(IWant { @@ -1356,10 +1326,15 @@ where } tracing::debug!(peer=%peer_id, "Handling IWANT for peer"); + let Some(sender) = self.connected_peers.get_sender(peer_id) else { + tracing::error!(peer=%peer_id, "Send handler non-existent from IWANT peer"); + return; + }; for id in iwant_msgs { // If we have it and the IHAVE count is not above the threshold, - // foward the message. + // forward the message. + // if let Some((msg, count)) = self .mcache .get_with_iwant_counts(&id, peer_id) @@ -1373,12 +1348,7 @@ where ); } else { tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - let sender = &mut self - .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; - + // Forward the message and handle queue overflows. if sender .forward( msg, @@ -1415,17 +1385,17 @@ where // For each topic, if a peer has grafted us, then we necessarily must be in their mesh // and they must be subscribed to the topic. Ensure we have recorded the mapping. for topic in &topics { - self.peer_topics - .entry(*peer_id) - .or_default() - .insert(topic.clone()); - self.topic_peers - .entry(topic.clone()) - .or_default() - .insert(*peer_id); + if self + .connected_peers + .add_peer_to_a_topic(peer_id, topic.clone()) + .is_err() + { + tracing::error!(peer_id=%peer_id, "GRAFT message from a peer that is not connected"); + return; + } } - // we don't GRAFT to/from explicit peers; complain loudly if this happens + // We don't GRAFT to/from explicit peers; complain loudly if this happens if self.explicit_peers.contains(peer_id) { tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer"); // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics @@ -1436,7 +1406,7 @@ where let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0); let now = Instant::now(); for topic_hash in topics { - if let Some(peers) = self.mesh.get_mut(&topic_hash) { + if let Some(peers) = self.connected_peers.mesh_peers(&topic_hash) { // if the peer is already in the mesh ignore the graft if peers.contains(peer_id) { tracing::debug!( @@ -1447,7 +1417,7 @@ where continue; } - // make sure we are not backing off that peer + // Make sure we are not backing off that peer if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id) { if backoff_time > now { @@ -1500,33 +1470,31 @@ where // check mesh upper bound and only allow graft if the upper bound is not reached or // if it is an outbound peer if peers.len() >= self.config.mesh_n_high() - && !self.outbound_peers.contains(peer_id) + && !self.connected_peers.outbound_peers().contains(peer_id) { to_prune_topics.insert(topic_hash.clone()); continue; } - // add peer to the mesh + // Add peer to the mesh tracing::debug!( peer=%peer_id, topic=%topic_hash, "GRAFT: Mesh link added for peer in topic" ); - if peers.insert(*peer_id) { - if let Some(m) = self.metrics.as_mut() { - m.peers_included(&topic_hash, Inclusion::Subscribed, 1) - } + self.connected_peers + .add_to_mesh(topic_hash.clone(), std::iter::once(peer_id)); + if let Some(m) = self.metrics.as_mut() { + m.peers_included(&topic_hash, Inclusion::Subscribed, 1) } // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( *peer_id, vec![&topic_hash], - &self.mesh, - self.peer_topics.get(peer_id), - &mut self.events, &self.connected_peers, + &mut self.events, ); if let Some((peer_score, ..)) = &mut self.peer_score { @@ -1551,9 +1519,8 @@ where let on_unsubscribe = false; let mut sender = self .connected_peers - .get_mut(peer_id) + .get_sender(peer_id) .expect("Peer must be connected") - .sender .clone(); for prune in to_prune_topics @@ -1580,35 +1547,35 @@ where reason: Churn, ) { let mut update_backoff = always_update_backoff; - if let Some(peers) = self.mesh.get_mut(topic_hash) { - // remove the peer if it exists in the mesh - if peers.remove(peer_id) { - tracing::debug!( - peer=%peer_id, - topic=%topic_hash, - "PRUNE: Removing peer from the mesh for topic" - ); - if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic_hash, reason, 1) - } + // Remove the peer if it exists in the mesh + if self + .connected_peers + .remove_peer_from_mesh(peer_id, topic_hash) + { + tracing::debug!( + peer=%peer_id, + topic=%topic_hash, + "PRUNE: Removing peer from the mesh for topic" + ); + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic_hash, reason, 1) + } - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.prune(peer_id, topic_hash.clone()); - } + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.prune(peer_id, topic_hash.clone()); + } - update_backoff = true; + update_backoff = true; - // inform the handler - peer_removed_from_mesh( - *peer_id, - topic_hash, - &self.mesh, - self.peer_topics.get(peer_id), - &mut self.events, - &self.connected_peers, - ); - } + // inform the handler + peer_removed_from_mesh( + *peer_id, + topic_hash, + &self.connected_peers, + &mut self.events, + ); } + if update_backoff { let time = if let Some(backoff) = backoff { Duration::from_secs(backoff) @@ -1632,7 +1599,7 @@ where for (topic_hash, px, backoff) in prune_data { self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune); - if self.mesh.contains_key(&topic_hash) { + if self.connected_peers.are_we_subscribed_to_topic(&topic_hash) { //connect to px peers if !px.is_empty() { // we ignore PX from peers with insufficient score @@ -1835,7 +1802,10 @@ where self.mcache.put(&msg_id, raw_message.clone()); // Dispatch the message to the user if we are subscribed to any of the topics - if self.mesh.contains_key(&message.topic) { + if self + .connected_peers + .are_we_subscribed_to_topic(&message.topic) + { tracing::debug!("Sending received message to user"); self.events .push_back(ToSwarm::GenerateEvent(Event::Message { @@ -1914,7 +1884,9 @@ where let mut unsubscribed_peers = Vec::new(); - let Some(subscribed_topics) = self.peer_topics.get_mut(propagation_source) else { + // When a peer connects it will have an empty set of subscribed topics at a minimum. + let Some(subscribed_topics) = self.connected_peers.get_topics_for_peer(propagation_source) + else { tracing::error!( peer=%propagation_source, "Subscription by unknown peer" @@ -1944,65 +1916,78 @@ where }; for subscription in filtered_topics { - // get the peers from the mapping, or insert empty lists if the topic doesn't exist + // Get the peers from the mapping, or insert empty lists if the topic doesn't exist let topic_hash = &subscription.topic_hash; - let peer_list = self.topic_peers.entry(topic_hash.clone()).or_default(); match subscription.action { SubscriptionAction::Subscribe => { - if peer_list.insert(*propagation_source) { - tracing::debug!( + match self + .connected_peers + .add_peer_to_a_topic(propagation_source, topic_hash.clone()) + { + Ok(true) => { + tracing::debug!( peer=%propagation_source, topic=%topic_hash, "SUBSCRIPTION: Adding gossip peer to topic" - ); + ); + } + Ok(false) => { + tracing::error!(peer_id=%propagation_source, "Received a subscription for a peer that is already subscribed"); + } + Err(()) => { + tracing::error!(peer_id=%propagation_source, "Received a subscription for a peer that is not connected"); + } } - // add to the peer_topics mapping - subscribed_topics.insert(topic_hash.clone()); - - // if the mesh needs peers add the peer to the mesh + // If the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) && matches!( - self.connected_peers - .get(propagation_source) - .map(|v| &v.kind), + self.connected_peers.peer_protocol(propagation_source), Some(PeerKind::Gossipsubv1_1) | Some(PeerKind::Gossipsub) ) + // It has the correct score && !Self::score_below_threshold_from_scores( &self.peer_score, propagation_source, |_| 0.0, ) .0 + // It satisfies the backoff constraints && !self .backoffs .is_backoff_with_slack(topic_hash, propagation_source) + // And it's not already in the mesh + && !self + .connected_peers + .mesh_peers(topic_hash).map(|v| v.contains(propagation_source)).unwrap_or(false) + && + self + .connected_peers + .mesh_peers(topic_hash) + .map(|p| p.len() < self.config.mesh_n_low()) + .unwrap_or(false) { - if let Some(peers) = self.mesh.get_mut(topic_hash) { - if peers.len() < self.config.mesh_n_low() - && peers.insert(*propagation_source) - { - tracing::debug!( - peer=%propagation_source, - topic=%topic_hash, - "SUBSCRIPTION: Adding peer to the mesh for topic" - ); - if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Subscribed, 1) - } - // send graft to the peer - tracing::debug!( - peer=%propagation_source, - topic=%topic_hash, - "Sending GRAFT to peer for topic" - ); - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.graft(propagation_source, topic_hash.clone()); - } - topics_to_graft.push(topic_hash.clone()); - } + self.connected_peers + .add_to_mesh(topic_hash.clone(), std::iter::once(propagation_source)); + tracing::debug!( + peer=%propagation_source, + topic=%topic_hash, + "SUBSCRIPTION: Adding peer to the mesh for topic" + ); + if let Some(m) = self.metrics.as_mut() { + m.peers_included(topic_hash, Inclusion::Subscribed, 1) + } + // send graft to the peer + tracing::debug!( + peer=%propagation_source, + topic=%topic_hash, + "Sending GRAFT to peer for topic" + ); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.graft(propagation_source, topic_hash.clone()); } + topics_to_graft.push(topic_hash.clone()); } // generates a subscription event to be polled application_event.push(ToSwarm::GenerateEvent(Event::Subscribed { @@ -2011,27 +1996,41 @@ where })); } SubscriptionAction::Unsubscribe => { - if peer_list.remove(propagation_source) { - tracing::debug!( - peer=%propagation_source, - topic=%topic_hash, - "SUBSCRIPTION: Removing gossip peer from topic" - ); + match self + .connected_peers + .remove_peer_from_a_topic(propagation_source, topic_hash) + { + Ok(true) => { + tracing::debug!( + peer_id=%propagation_source, + topic=%topic_hash, + "SUBSCRIPTION: Removing gossip peer from topic" + ); + unsubscribed_peers.push((*propagation_source, topic_hash.clone())); + // generate an unsubscribe event to be polled + application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed { + peer_id: *propagation_source, + topic: topic_hash.clone(), + })); + } + Ok(false) => { + tracing::error!(peer_id= %propagation_source, topic=%topic_hash, "Peer was not subscribed to a topic but we received an UNSUBSCRIBE event"); + } + Err(()) => { + tracing::error!(peer_id= %propagation_source, topic=%topic_hash, "Peer was not connected but we received an UNSUBSCRIBE event"); + } } - - // remove topic from the peer_topics mapping - subscribed_topics.remove(topic_hash); - unsubscribed_peers.push((*propagation_source, topic_hash.clone())); - // generate an unsubscribe event to be polled - application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed { - peer_id: *propagation_source, - topic: topic_hash.clone(), - })); } } if let Some(m) = self.metrics.as_mut() { - m.set_topic_peers(topic_hash, peer_list.len()); + m.set_topic_peers( + topic_hash, + self.connected_peers + .get_peers_on_topic(topic_hash) + .map(|v| v.len()) + .unwrap_or(0), + ); } } @@ -2046,10 +2045,8 @@ where peer_added_to_mesh( *propagation_source, topics_joined, - &self.mesh, - self.peer_topics.get(propagation_source), - &mut self.events, &self.connected_peers, + &mut self.events, ); } @@ -2057,9 +2054,9 @@ where // heartbeat. let sender = &mut self .connected_peers - .get_mut(propagation_source) - .expect("Peer must be connected") - .sender; + .get_sender(propagation_source) + .expect("Peer must be connected"); + for topic_hash in topics_to_graft.into_iter() { sender.graft(Graft { topic_hash }); } @@ -2096,7 +2093,7 @@ where // before we add all the gossip from this heartbeat in order to gain a true measure of // steady-state size of the queues. if let Some(m) = &mut self.metrics { - for sender_queue in self.connected_peers.values_mut().map(|v| &v.sender) { + for (_, sender_queue) in self.connected_peers.all_handler_senders() { m.observe_priority_queue_size(sender_queue.priority_len()); m.observe_non_priority_queue_size(sender_queue.non_priority_len()); } @@ -2128,30 +2125,29 @@ where // Cache the scores of all connected peers, and record metrics for current penalties. let mut scores = HashMap::with_capacity(self.connected_peers.len()); if let Some((peer_score, ..)) = &self.peer_score { - for peer_id in self.connected_peers.keys() { + for peer_id in self.connected_peers.all_peers().cloned() { scores .entry(peer_id) - .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut())); + .or_insert_with(|| peer_score.metric_score(&peer_id, self.metrics.as_mut())); } } // maintain the mesh for each topic - for (topic_hash, peers) in self.mesh.iter_mut() { + for (topic_hash, mesh_peers) in self.connected_peers.mesh().clone() { let explicit_peers = &self.explicit_peers; let backoffs = &self.backoffs; - let topic_peers = &self.topic_peers; - let outbound_peers = &self.outbound_peers; + let outbound_peers = self.connected_peers.outbound_peers().clone(); // drop all peers with negative score, without PX // if there is at some point a stable retain method for BTreeSet the following can be // written more efficiently with retain. let mut to_remove_peers = Vec::new(); - for peer_id in peers.iter() { + for peer_id in mesh_peers.iter() { let peer_score = *scores.get(peer_id).unwrap_or(&0.0); // Record the score per mesh if let Some(metrics) = self.metrics.as_mut() { - metrics.observe_mesh_peers_score(topic_hash, peer_score); + metrics.observe_mesh_peers_score(&topic_hash, peer_score); } if peer_score < 0.0 { @@ -2170,60 +2166,57 @@ where } if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len()) + m.peers_removed(&topic_hash, Churn::BadScore, to_remove_peers.len()) } for peer_id in to_remove_peers { - peers.remove(&peer_id); + self.connected_peers + .remove_peer_from_mesh(&peer_id, &topic_hash); } // too little peers - add some - if peers.len() < self.config.mesh_n_low() { + if mesh_peers.len() < self.config.mesh_n_low() { tracing::debug!( topic=%topic_hash, "HEARTBEAT: Mesh low. Topic contains: {} needs: {}", - peers.len(), + mesh_peers.len(), self.config.mesh_n_low() ); // not enough peers - get mesh_n - current_length more - let desired_peers = self.config.mesh_n() - peers.len(); - let peer_list = get_random_peers( - topic_peers, - &self.connected_peers, - topic_hash, - desired_peers, - |peer| { - !peers.contains(peer) + let desired_peers = self.config.mesh_n() - mesh_peers.len(); + let additional_peers = + get_random_peers(&self.connected_peers, &topic_hash, desired_peers, |peer| { + !mesh_peers.contains(peer) && !explicit_peers.contains(peer) - && !backoffs.is_backoff_with_slack(topic_hash, peer) + && !backoffs.is_backoff_with_slack(&topic_hash, peer) && *scores.get(peer).unwrap_or(&0.0) >= 0.0 - }, - ); - for peer in &peer_list { + }); + for peer in &additional_peers { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } // update the mesh - tracing::debug!("Updating mesh, new mesh: {:?}", peer_list); + tracing::debug!("Updating mesh, new mesh: {:?}", additional_peers); if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Random, peer_list.len()) + m.peers_included(&topic_hash, Inclusion::Random, additional_peers.len()) } - peers.extend(peer_list); + self.connected_peers + .add_to_mesh(topic_hash.clone(), additional_peers.iter()); } // too many peers - remove some - if peers.len() > self.config.mesh_n_high() { + if mesh_peers.len() > self.config.mesh_n_high() { tracing::debug!( topic=%topic_hash, "HEARTBEAT: Mesh high. Topic contains: {} needs: {}", - peers.len(), + mesh_peers.len(), self.config.mesh_n_high() ); - let excess_peer_no = peers.len() - self.config.mesh_n(); + let excess_peer_no = mesh_peers.len() - self.config.mesh_n(); // shuffle the peers and then sort by score ascending beginning with the worst let mut rng = thread_rng(); - let mut shuffled = peers.iter().copied().collect::>(); + let mut shuffled = mesh_peers.iter().copied().collect::>(); shuffled.shuffle(&mut rng); shuffled.sort_by(|p1, p2| { let score_p1 = *scores.get(p1).unwrap_or(&0.0); @@ -2232,11 +2225,11 @@ where score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal) }); // shuffle everything except the last retain_scores many peers (the best ones) - shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng); + shuffled[..mesh_peers.len() - self.config.retain_scores()].shuffle(&mut rng); // count total number of outbound peers let mut outbound = { - let outbound_peers = &self.outbound_peers; + let outbound_peers = self.connected_peers.outbound_peers(); shuffled .iter() .filter(|p| outbound_peers.contains(*p)) @@ -2246,11 +2239,11 @@ where // remove the first excess_peer_no allowed (by outbound restrictions) peers adding // them to to_prune let mut removed = 0; - for peer in shuffled { + for peer_id in shuffled { if removed == excess_peer_no { break; } - if self.outbound_peers.contains(&peer) { + if self.connected_peers.outbound_peers().contains(&peer_id) { if outbound <= self.config.mesh_outbound_min() { // do not remove anymore outbound peers continue; @@ -2261,54 +2254,57 @@ where } // remove the peer - peers.remove(&peer); - let current_topic = to_prune.entry(peer).or_insert_with(Vec::new); + self.connected_peers + .remove_peer_from_mesh(&peer_id, &topic_hash); + let current_topic = to_prune.entry(peer_id).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); removed += 1; } if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic_hash, Churn::Excess, removed) + m.peers_removed(&topic_hash, Churn::Excess, removed) } } // do we have enough outbound peers? - if peers.len() >= self.config.mesh_n_low() { + if mesh_peers.len() >= self.config.mesh_n_low() { // count number of outbound peers we have - let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() }; + let outbound = { + mesh_peers + .iter() + .filter(|p| outbound_peers.contains(*p)) + .count() + }; // if we have not enough outbound peers, graft to some new outbound peers if outbound < self.config.mesh_outbound_min() { let needed = self.config.mesh_outbound_min() - outbound; - let peer_list = get_random_peers( - topic_peers, - &self.connected_peers, - topic_hash, - needed, - |peer| { - !peers.contains(peer) + let additional_peers = + get_random_peers(&self.connected_peers, &topic_hash, needed, |peer| { + !mesh_peers.contains(peer) && !explicit_peers.contains(peer) - && !backoffs.is_backoff_with_slack(topic_hash, peer) + && !backoffs.is_backoff_with_slack(&topic_hash, peer) && *scores.get(peer).unwrap_or(&0.0) >= 0.0 && outbound_peers.contains(peer) - }, - ); - for peer in &peer_list { - let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); + }); + for peer_id in &additional_peers { + let current_topic = to_graft.entry(*peer_id).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } // update the mesh - tracing::debug!("Updating mesh, new mesh: {:?}", peer_list); + tracing::debug!("Updating mesh, new mesh: {:?}", additional_peers); if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len()) + m.peers_included(&topic_hash, Inclusion::Outbound, additional_peers.len()) } - peers.extend(peer_list); + + self.connected_peers + .add_to_mesh(topic_hash.clone(), additional_peers.iter()); } } // should we try to improve the mesh with opportunistic grafting? if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0 - && peers.len() > 1 + && mesh_peers.len() > 1 && self.peer_score.is_some() { if let Some((_, thresholds, _, _)) = &self.peer_score { @@ -2321,7 +2317,7 @@ where // recover from churn of good peers. // now compute the median peer score in the mesh - let mut peers_by_score: Vec<_> = peers.iter().collect(); + let mut peers_by_score: Vec<_> = mesh_peers.iter().collect(); peers_by_score.sort_by(|p1, p2| { let p1_score = *scores.get(p1).unwrap_or(&0.0); let p2_score = *scores.get(p2).unwrap_or(&0.0); @@ -2348,44 +2344,44 @@ where // if the median score is below the threshold, select a better peer (if any) and // GRAFT if median < thresholds.opportunistic_graft_threshold { - let peer_list = get_random_peers( - topic_peers, + let additional_peers = get_random_peers( &self.connected_peers, - topic_hash, + &topic_hash, self.config.opportunistic_graft_peers(), |peer_id| { - !peers.contains(peer_id) + !mesh_peers.contains(peer_id) && !explicit_peers.contains(peer_id) - && !backoffs.is_backoff_with_slack(topic_hash, peer_id) + && !backoffs.is_backoff_with_slack(&topic_hash, peer_id) && *scores.get(peer_id).unwrap_or(&0.0) > median }, ); - for peer in &peer_list { - let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); + for peer_id in &additional_peers { + let current_topic = to_graft.entry(*peer_id).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } // update the mesh tracing::debug!( topic=%topic_hash, "Opportunistically graft in topic with peers {:?}", - peer_list + additional_peers ); if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Random, peer_list.len()) + m.peers_included(&topic_hash, Inclusion::Random, additional_peers.len()) } - peers.extend(peer_list); + + self.connected_peers + .add_to_mesh(topic_hash.clone(), additional_peers.iter()); } } } // Register the final count of peers in the mesh if let Some(m) = self.metrics.as_mut() { - m.set_mesh_peers(topic_hash, peers.len()) + m.set_mesh_peers(&topic_hash, mesh_peers.len()) } } // remove expired fanout topics { - let fanout = &mut self.fanout; // help the borrow checker let fanout_ttl = self.config.fanout_ttl(); self.fanout_last_pub.retain(|topic_hash, last_pub_time| { if *last_pub_time + fanout_ttl < Instant::now() { @@ -2393,42 +2389,43 @@ where topic=%topic_hash, "HEARTBEAT: Fanout topic removed due to timeout" ); - fanout.remove(topic_hash); + self.connected_peers.remove_all_fanout_peers(topic_hash); return false; } true }); } - // maintain fanout + // Maintain fanout // check if our peers are still a part of the topic - for (topic_hash, peers) in self.fanout.iter_mut() { + for (topic_hash, peers) in self.connected_peers.fanout().clone().iter() { let mut to_remove_peers = Vec::new(); let publish_threshold = match &self.peer_score { Some((_, thresholds, _, _)) => thresholds.publish_threshold, _ => 0.0, }; - for peer in peers.iter() { + for peer_id in peers.iter() { // is the peer still subscribed to the topic? - let peer_score = *scores.get(peer).unwrap_or(&0.0); - match self.peer_topics.get(peer) { + let peer_score = *scores.get(peer_id).unwrap_or(&0.0); + match self.connected_peers.get_topics_for_peer(peer_id) { Some(topics) => { if !topics.contains(topic_hash) || peer_score < publish_threshold { tracing::debug!( topic=%topic_hash, "HEARTBEAT: Peer removed from fanout for topic" ); - to_remove_peers.push(*peer); + to_remove_peers.push(*peer_id); } } None => { // remove if the peer has disconnected - to_remove_peers.push(*peer); + to_remove_peers.push(*peer_id); } } } for to_remove in to_remove_peers { - peers.remove(&to_remove); + self.connected_peers + .remove_peer_from_fanout(&to_remove, topic_hash); } // not enough peers @@ -2440,24 +2437,21 @@ where ); let needed_peers = self.config.mesh_n() - peers.len(); let explicit_peers = &self.explicit_peers; - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - topic_hash, - needed_peers, - |peer_id| { + let new_peers = + get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| { !peers.contains(peer_id) && !explicit_peers.contains(peer_id) && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold - }, - ); - peers.extend(new_peers); + }); + self.connected_peers + .add_to_fanout(topic_hash.clone(), new_peers.iter()); } } if self.peer_score.is_some() { tracing::trace!("Mesh message deliveries: {:?}", { - self.mesh + self.connected_peers + .mesh() .iter() .map(|(t, peers)| { ( @@ -2514,7 +2508,13 @@ where /// and fanout peers fn emit_gossip(&mut self) { let mut rng = thread_rng(); - for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) { + for (topic_hash, peers) in self + .connected_peers + .mesh() + .clone() + .iter() + .chain(self.connected_peers.fanout().clone().iter()) + { let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash); if message_ids.is_empty() { continue; @@ -2540,17 +2540,12 @@ where ) }; // get gossip_lazy random peers - let to_msg_peers = get_random_peers_dynamic( - &self.topic_peers, - &self.connected_peers, - topic_hash, - n_map, - |peer| { + let to_msg_peers = + get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| { !peers.contains(peer) && !self.explicit_peers.contains(peer) && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0 - }, - ); + }); tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len()); @@ -2568,9 +2563,8 @@ where // send an IHAVE message let sender = &mut self .connected_peers - .get_mut(&peer_id) - .expect("Peer must be connected") - .sender; + .get_sender(&peer_id) + .expect("Peer must be connected"); if sender .ihave(IHave { topic_hash: topic_hash.clone(), @@ -2614,10 +2608,8 @@ where peer_added_to_mesh( peer_id, vec![topic], - &self.mesh, - self.peer_topics.get(&peer_id), - &mut self.events, &self.connected_peers, + &mut self.events, ); } @@ -2629,9 +2621,8 @@ where // send the control messages let mut sender = self .connected_peers - .get_mut(&peer_id) + .get_sender(&peer_id) .expect("Peer must be connected") - .sender .clone(); // The following prunes are not due to unsubscribing. @@ -2671,9 +2662,8 @@ where ); let sender = &mut self .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; + .get_sender(peer_id) + .expect("Peer must be connected"); sender.prune(prune); @@ -2681,10 +2671,8 @@ where peer_removed_from_mesh( *peer_id, topic_hash, - &self.mesh, - self.peer_topics.get(peer_id), - &mut self.events, &self.connected_peers, + &mut self.events, ); } } @@ -2714,7 +2702,7 @@ where // Add explicit peers for peer_id in &self.explicit_peers { - if let Some(topics) = self.peer_topics.get(peer_id) { + if let Some(topics) = self.connected_peers.get_topics_for_peer(peer_id) { if Some(peer_id) != propagation_source && !originating_peers.contains(peer_id) && Some(peer_id) != message.source.as_ref() @@ -2728,7 +2716,7 @@ where // add mesh peers let topic = &message.topic; // mesh - if let Some(mesh_peers) = self.mesh.get(topic) { + if let Some(mesh_peers) = self.connected_peers.mesh_peers(topic) { for peer_id in mesh_peers { if Some(peer_id) != propagation_source && !originating_peers.contains(peer_id) @@ -2741,13 +2729,12 @@ where // forward the message to peers if !recipient_peers.is_empty() { - for peer_id in recipient_peers.iter() { + for peer_id in recipient_peers.into_iter() { tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer"); let sender = &mut self .connected_peers - .get_mut(peer_id) - .expect("Peer must be connected") - .sender; + .get_sender(&peer_id) + .expect("Peer must have a sender"); if sender .forward( message.clone(), @@ -2758,11 +2745,11 @@ where { // Downscore the peer if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); + peer_score.failed_message_slow_peer(&peer_id); } // Increment the failed message count self.failed_messages - .entry(*peer_id) + .entry(peer_id) .or_default() .non_priority += 1; } @@ -2881,7 +2868,7 @@ where if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(&peer_id) { // The first connection is outbound and it is not a peer from peer exchange => mark // it as outbound peer - self.outbound_peers.insert(peer_id); + self.connected_peers.add_outbound_peer(peer_id); } // Add the IP to the peer scoring system @@ -2901,9 +2888,6 @@ where return; // Not our first connection to this peer, hence nothing to do. } - // Insert an empty set of the topics of this peer until known. - self.peer_topics.insert(peer_id, Default::default()); - if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.add_peer(peer_id); } @@ -2918,12 +2902,12 @@ where // We need to send our subscriptions to the newly-connected node. let sender = &mut self .connected_peers - .get_mut(&peer_id) + .get_sender(&peer_id) .expect("Peer must be connected") - .sender; + .clone(); - for topic_hash in self.mesh.clone().into_keys() { - sender.subscribe(topic_hash); + for topic_hash in self.connected_peers.mesh().keys() { + sender.subscribe(topic_hash.clone()); } } @@ -2950,111 +2934,40 @@ where } } - if remaining_established != 0 { - // Remove the connection from the list - if let Some(connections) = self.connected_peers.get_mut(&peer_id) { - let index = connections - .connections - .iter() - .position(|v| v == &connection_id) - .expect("Previously established connection to peer must be present"); - connections.connections.remove(index); - - // If there are more connections and this peer is in a mesh, inform the first connection - // handler. - if !connections.connections.is_empty() { - if let Some(topics) = self.peer_topics.get(&peer_id) { - for topic in topics { - if let Some(mesh_peers) = self.mesh.get(topic) { - if mesh_peers.contains(&peer_id) { - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::JoinedMesh, - handler: NotifyHandler::One(connections.connections[0]), - }); - break; - } - } - } - } - } - } - } else { - // remove from mesh, topic_peers, peer_topic and the fanout - tracing::debug!(peer=%peer_id, "Peer disconnected"); - { - let Some(topics) = self.peer_topics.get(&peer_id) else { - debug_assert!( - self.blacklisted_peers.contains(&peer_id), - "Disconnected node not in connected list" - ); - return; - }; - - // remove peer from all mappings - for topic in topics { - // check the mesh for the topic - if let Some(mesh_peers) = self.mesh.get_mut(topic) { - // check if the peer is in the mesh and remove it - if mesh_peers.remove(&peer_id) { - if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic, Churn::Dc, 1); - m.set_mesh_peers(topic, mesh_peers.len()); - } - }; - } - - // remove from topic_peers - if let Some(peer_list) = self.topic_peers.get_mut(topic) { - if !peer_list.remove(&peer_id) { - // debugging purposes - tracing::warn!( - peer=%peer_id, - "Disconnected node: peer not in topic_peers" - ); - } - if let Some(m) = self.metrics.as_mut() { - m.set_topic_peers(topic, peer_list.len()) - } - } else { - tracing::warn!( - peer=%peer_id, - topic=%topic, - "Disconnected node: peer with topic not in topic_peers" - ); - } - - // remove from fanout - self.fanout - .get_mut(topic) - .map(|peers| peers.remove(&peer_id)); - } - } + // Inform the connection mapping that a peer has disconnected. + self.connected_peers + .peer_disconnected(peer_id, connection_id, &mut self.metrics); + // If we are disconnecting + if remaining_established == 0 { // Forget px and outbound status for this peer self.px_peers.remove(&peer_id); - self.outbound_peers.remove(&peer_id); - - // Remove peer from peer_topics and connected_peers - // NOTE: It is possible the peer has already been removed from all mappings if it does not - // support the protocol. - self.peer_topics.remove(&peer_id); // If metrics are enabled, register the disconnection of a peer based on its protocol. if let Some(metrics) = self.metrics.as_mut() { let peer_kind = &self .connected_peers - .get(&peer_id) - .expect("Connected peer must be registered") - .kind; + .peer_protocol(&peer_id) + .expect("Connected peer must be registered"); metrics.peer_protocol_disconnected(peer_kind.clone()); } - self.connected_peers.remove(&peer_id); - if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.remove_peer(&peer_id); } + } else { + // We remain connected + // + // If the peer is in a mesh, inform the original handler. + if self.connected_peers.is_peer_in_mesh(&peer_id) { + if let Some(connection_id) = self.connected_peers.connection_id(&peer_id) { + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + event: HandlerIn::JoinedMesh, + handler: NotifyHandler::One(connection_id), + }); + } + } } } @@ -3114,26 +3027,13 @@ where _: &Multiaddr, _: &Multiaddr, ) -> Result, ConnectionDenied> { - // By default we assume a peer is only a floodsub peer. - // - // The protocol negotiation occurs once a message is sent/received. Once this happens we - // update the type of peer that this is in order to determine which kind of routing should - // occur. - let connected_peer = self - .connected_peers - .entry(peer_id) - .or_insert(PeerConnections { - kind: PeerKind::Floodsub, - connections: vec![], - sender: RpcSender::new(self.config.connection_handler_queue_len()), - }); // Add the new connection - connected_peer.connections.push(connection_id); + let sender = RpcSender::new(self.config.connection_handler_queue_len()); + let reciever = sender.new_receiver(); + self.connected_peers + .peer_connected(peer_id, connection_id, sender); - Ok(Handler::new( - self.config.protocol_config(), - connected_peer.sender.new_receiver(), - )) + Ok(Handler::new(self.config.protocol_config(), reciever)) } fn handle_established_outbound_connection( @@ -3143,26 +3043,13 @@ where _: &Multiaddr, _: Endpoint, ) -> Result, ConnectionDenied> { - // By default we assume a peer is only a floodsub peer. - // - // The protocol negotiation occurs once a message is sent/received. Once this happens we - // update the type of peer that this is in order to determine which kind of routing should - // occur. - let connected_peer = self - .connected_peers - .entry(peer_id) - .or_insert(PeerConnections { - kind: PeerKind::Floodsub, - connections: vec![], - sender: RpcSender::new(self.config.connection_handler_queue_len()), - }); // Add the new connection - connected_peer.connections.push(connection_id); + let sender = RpcSender::new(self.config.connection_handler_queue_len()); + let reciever = sender.new_receiver(); + self.connected_peers + .peer_connected(peer_id, connection_id, sender); - Ok(Handler::new( - self.config.protocol_config(), - connected_peer.sender.new_receiver(), - )) + Ok(Handler::new(self.config.protocol_config(), reciever)) } fn on_connection_handler_event( @@ -3188,18 +3075,9 @@ where .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported { peer_id: propagation_source, })); - } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) { - // Only change the value if the old value is Floodsub (the default set in - // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished). - // All other PeerKind changes are ignored. - tracing::debug!( - peer=%propagation_source, - peer_type=%kind, - "New peer type found for peer" - ); - if let PeerKind::Floodsub = conn.kind { - conn.kind = kind; - } + } else { + self.connected_peers + .update_connection_kind(propagation_source, kind); } } HandlerEvent::MessageDropped(rpc) => { @@ -3371,25 +3249,18 @@ where fn peer_added_to_mesh( peer_id: PeerId, new_topics: Vec<&TopicHash>, - mesh: &HashMap>, - known_topics: Option<&BTreeSet>, + connected_peers: &ConnectedPeers, events: &mut VecDeque>, - connections: &HashMap, ) { // Ensure there is an active connection - let connection_id = { - let conn = connections.get(&peer_id).expect("To be connected to peer."); - assert!( - !conn.connections.is_empty(), - "Must have at least one connection" - ); - conn.connections[0] - }; + let connection_id = connected_peers + .connection_id(&peer_id) + .expect("Mesh peer must be connected"); - if let Some(topics) = known_topics { + if let Some(topics) = connected_peers.get_topics_for_peer(&peer_id) { for topic in topics { if !new_topics.contains(&topic) { - if let Some(mesh_peers) = mesh.get(topic) { + if let Some(mesh_peers) = connected_peers.mesh_peers(topic) { if mesh_peers.contains(&peer_id) { // the peer is already in a mesh for another topic return; @@ -3412,23 +3283,18 @@ fn peer_added_to_mesh( fn peer_removed_from_mesh( peer_id: PeerId, old_topic: &TopicHash, - mesh: &HashMap>, - known_topics: Option<&BTreeSet>, + connected_peers: &ConnectedPeers, events: &mut VecDeque>, - connections: &HashMap, ) { // Ensure there is an active connection - let connection_id = connections - .get(&peer_id) - .expect("To be connected to peer.") - .connections - .first() - .expect("There should be at least one connection to a peer."); - - if let Some(topics) = known_topics { + let connection_id = connected_peers + .connection_id(&peer_id) + .expect("Mesh peer must be connected"); + + if let Some(topics) = connected_peers.get_topics_for_peer(&peer_id) { for topic in topics { if topic != old_topic { - if let Some(mesh_peers) = mesh.get(topic) { + if let Some(mesh_peers) = connected_peers.mesh_peers(topic) { if mesh_peers.contains(&peer_id) { // the peer exists in another mesh still return; @@ -3441,7 +3307,7 @@ fn peer_removed_from_mesh( events.push_back(ToSwarm::NotifyHandler { peer_id, event: HandlerIn::LeftMesh, - handler: NotifyHandler::One(*connection_id), + handler: NotifyHandler::One(connection_id), }); } @@ -3449,28 +3315,16 @@ fn peer_removed_from_mesh( /// filtered by the function `f`. The number of peers to get equals the output of `n_map` /// that gets as input the number of filtered peers. fn get_random_peers_dynamic( - topic_peers: &HashMap>, - connected_peers: &HashMap, + connected_peers: &ConnectedPeers, topic_hash: &TopicHash, // maps the number of total peers to the number of selected peers n_map: impl Fn(usize) -> usize, mut f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { - let mut gossip_peers = match topic_peers.get(topic_hash) { - // if they exist, filter the peers by `f` - Some(peer_list) => peer_list - .iter() - .copied() - .filter(|p| { - f(p) && match connected_peers.get(p) { - Some(connections) if connections.kind == PeerKind::Gossipsub => true, - Some(connections) if connections.kind == PeerKind::Gossipsubv1_1 => true, - _ => false, - } - }) - .collect(), - None => Vec::new(), - }; + let mut gossip_peers: Vec = connected_peers + .get_gossipsub_peers_on_topic(topic_hash) + .map(|peers| peers.filter(|p| f(p)).cloned().collect()) + .unwrap_or_default(); // if we have less than needed, return them let n = n_map(gossip_peers.len()); @@ -3491,13 +3345,12 @@ fn get_random_peers_dynamic( /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` /// filtered by the function `f`. fn get_random_peers( - topic_peers: &HashMap>, - connected_peers: &HashMap, + connected_peers: &ConnectedPeers, topic_hash: &TopicHash, n: usize, f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { - get_random_peers_dynamic(topic_peers, connected_peers, topic_hash, |_| n, f) + get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f) } /// Validates the combination of signing, privacy and message validation to ensure the @@ -3536,10 +3389,8 @@ impl fmt::Debug for Behaviour, + /// A map of all connected peers to their subscribed topics. + peer_topics: HashMap>, + /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. + topic_peers: HashMap>, + /// Overlay network of connected peers - Maps topics to connected gossipsub peers. + mesh: HashMap>, + /// Map of topics to list of peers that we publish to, but don't subscribe to. + fanout: HashMap>, + /// Set of connected outbound peers (we only consider true outbound peers found through + /// discovery and not by PX). + outbound_peers: HashSet, +} + +impl ConnectedPeers { + // Non mutable functions // + + /// Returns true if we are subscribed to the topic. + pub(crate) fn are_we_subscribed_to_topic(&self, topic: &TopicHash) -> bool { + self.mesh.contains_key(topic) + } + + /// Gives the current number of connected peers. + pub(crate) fn len(&self) -> usize { + self.peer_connections.len() + } + + /// Returns a reference to the mesh mapping. + pub(crate) fn mesh(&self) -> &HashMap> { + &self.mesh + } + + /// Returns a reference to the fanout mapping. + pub(crate) fn fanout(&self) -> &HashMap> { + &self.fanout + } + + /// List all the connected peers. + pub(crate) fn all_peers(&self) -> impl Iterator { + self.peer_connections.keys() + } + + /// Lists all known peers and their associated subscribed topics. + pub(crate) fn all_peers_topics(&self) -> impl Iterator)> { + self.peer_topics + .iter() + .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect())) + } + + /// Lists all known peers and their associated protocol. + pub(crate) fn peer_protocol_list(&self) -> impl Iterator { + self.peer_connections.iter().map(|(k, v)| (k, &v.kind)) + } + + /// Get a specific peer's protocol. + pub(crate) fn peer_protocol(&self, peer_id: &PeerId) -> Option { + self.peer_connections.get(peer_id).map(|v| v.kind.clone()) + } + + /// Returns if the peer is connected or not. + pub(crate) fn is_peer_connected(&self, peer_id: &PeerId) -> bool { + self.peer_connections.contains_key(peer_id) + } + + /// Returns peers that are subscribed to a specific topic. + pub(crate) fn get_peers_on_topic(&self, topic: &TopicHash) -> Option<&BTreeSet> { + self.topic_peers.get(topic) + } + + /// Gets the list of topics a peer is subscribed to. + pub(crate) fn get_topics_for_peer(&self, peer_id: &PeerId) -> Option<&BTreeSet> { + self.peer_topics.get(peer_id) + } + + /// Returns peers that are subscribed to a specific topic and are not floodsub peers. + pub(crate) fn get_gossipsub_peers_on_topic( + &self, + topic: &TopicHash, + ) -> Option> { + let peers = self.topic_peers.get(topic)?; + Some(peers.iter().filter(|p| { + match self + .peer_connections + .get(p) + .expect("Peer Topics must be in sync with peer connections") + { + connections if connections.kind == PeerKind::Gossipsub => true, + connections if connections.kind == PeerKind::Gossipsubv1_1 => true, + _ => false, + } + })) + } + + /// Returns the mesh peers for a given topic. + pub(crate) fn mesh_peers(&self, topic: &TopicHash) -> Option<&BTreeSet> { + self.mesh.get(topic) + } + + /// Returns the fanout peers for a given topic. + pub(crate) fn fanout_peers(&self, topic: &TopicHash) -> Option<&BTreeSet> { + self.fanout.get(topic) + } + + /// Returns the first connection ID for a peer if it is connected. + pub(crate) fn connection_id(&self, peer_id: &PeerId) -> Option { + self.peer_connections.get(peer_id).map(|v| v.connections[0]) + } + + /// Returns true if the peer is in a mesh. + pub(crate) fn is_peer_in_mesh(&self, peer_id: &PeerId) -> bool { + if let Some(topics) = self.peer_topics.get(peer_id) { + for topic in topics { + if let Some(mesh_peers) = self.mesh.get(topic) { + if mesh_peers.contains(peer_id) { + return true; + } + } + } + } + false + } + + /// Returns a reference to the outbound peer set. + pub(crate) fn outbound_peers(&self) -> &HashSet { + &self.outbound_peers + } + + // Mutable functions + + /// Adds an outbound peer to the mapping. + pub(crate) fn add_outbound_peer(&mut self, peer_id: PeerId) { + debug_assert!(self.peer_connections.contains_key(&peer_id)); + debug_assert!(self.peer_topics.contains_key(&peer_id)); + + self.outbound_peers.insert(peer_id); + } + + /// Adds peers to the fanout for a given topic. + pub(crate) fn add_to_fanout<'a, I: Iterator>( + &mut self, + topic: TopicHash, + peers: I, + ) { + let fanout_peers = self.fanout.entry(topic).or_default(); + + for peer_id in peers { + debug_assert!(self.peer_connections.contains_key(peer_id)); + debug_assert!(self.peer_topics.contains_key(peer_id)); + + tracing::debug!(%peer_id, "Peer added to fanout"); + fanout_peers.insert(*peer_id); + } + } + + /// Removes a single peer from the fanout. Returns true if the peer existed in the mesh. + pub(crate) fn remove_peer_from_fanout(&mut self, peer_id: &PeerId, topic: &TopicHash) -> bool { + debug_assert!(self.peer_connections.contains_key(peer_id)); + debug_assert!(self.peer_topics.contains_key(peer_id)); + + if let Some(peer_set) = self.fanout.get_mut(topic) { + peer_set.remove(peer_id) + } else { + false + } + } + + /// Adds peers to the mesh. + pub(crate) fn add_to_mesh<'a, I: Iterator>( + &mut self, + topic: TopicHash, + peers: I, + ) { + let mesh_peers = self.mesh.entry(topic).or_default(); + + for peer_id in peers { + debug_assert!(self.peer_connections.contains_key(peer_id)); + debug_assert!(self.peer_topics.contains_key(peer_id)); + + tracing::debug!(%peer_id, "Peer added to fanout"); + mesh_peers.insert(*peer_id); + } + } + + /// Removes a single peer from the mesh. Returns true if the peer existed in the mesh. + pub(crate) fn remove_peer_from_mesh(&mut self, peer_id: &PeerId, topic: &TopicHash) -> bool { + debug_assert!(self.peer_connections.contains_key(peer_id)); + debug_assert!(self.peer_topics.contains_key(peer_id)); + + if let Some(peer_set) = self.mesh.get_mut(topic) { + peer_set.remove(peer_id) + } else { + false + } + } + + /// Records a peer being subscribed to a specific topic. This fails if the peer is not in the + /// connected set. This returns Ok(true) if the peer was not previously in the topic. + pub(crate) fn add_peer_to_a_topic( + &mut self, + peer_id: &PeerId, + topic: TopicHash, + ) -> Result { + if !self.is_peer_connected(peer_id) { + return Err(()); + } + + // Add the peer to the topic mappings + self.peer_topics + .entry(*peer_id) + .or_default() + .insert(topic.clone()); + Ok(self.topic_peers.entry(topic).or_default().insert(*peer_id)) + } + + /// Records a peer being unsubscribed from a specific topic. This fails if the peer is not in the + /// connected set. This returns Ok(true) if the peer was in the topic. + pub(crate) fn remove_peer_from_a_topic( + &mut self, + peer_id: &PeerId, + topic: &TopicHash, + ) -> Result { + if !self.is_peer_connected(peer_id) { + return Err(()); + } + + // Remove the peer from the mappings + let Some(topics) = self.peer_topics.get_mut(peer_id) else { + return Err(()); + }; + topics.remove(topic); + + // Remove the peer from the mappings + let Some(peers) = self.topic_peers.get_mut(topic) else { + return Ok(false); // We may not know of this topic + }; + Ok(peers.remove(peer_id)) + } + + /// Removes and returns all fanout peers for a specific topic. + pub(crate) fn remove_all_fanout_peers( + &mut self, + topic: &TopicHash, + ) -> Option> { + self.fanout.remove(topic) + } + + /// Removes and returns all mesh peers for a specific topic. + pub(crate) fn remove_all_mesh_peers(&mut self, topic: &TopicHash) -> Option> { + self.mesh.remove(topic) + } + + // Connection / Disconnection Mutable Functions + + pub(crate) fn peer_connected( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + sender: RpcSender, + ) { + // By default we assume a peer is only a floodsub peer. + // + // The protocol negotiation occurs once a message is sent/received. Once this happens we + // update the type of peer that this is in order to determine which kind of routing should + // occur. + // + let connected_peer = self + .peer_connections + .entry(peer_id) + .or_insert(PeerConnections { + kind: PeerKind::Floodsub, + connections: vec![], + sender, + }); + connected_peer.connections.push(connection_id); + + // Add the peer to the required mappings + self.peer_topics.entry(peer_id).or_default(); + } + + /// A peer has disconnected, so all mappings need to be updated to reflect this. + pub(crate) fn peer_disconnected( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + metrics: &mut Option, + ) { + let Some(peer_connection) = self.peer_connections.get_mut(&peer_id) else { + tracing::error!(peer_id=%peer_id, "Libp2p reported a disconnection for a non-connected peer"); + self.remove_peer_from_all_mappings(peer_id, metrics); + return; + }; + + // Remove the connection from the list + let index = peer_connection + .connections + .iter() + .position(|v| v == &connection_id) + .expect("Previously established connection to peer must be present"); + peer_connection.connections.remove(index); + + // If the peer is no longer connected, remove it from all the mappings + if peer_connection.connections.is_empty() { + self.remove_peer_from_all_mappings(peer_id, metrics); + } + } + + // Helper function to remove the peer from all connected mappings. + fn remove_peer_from_all_mappings(&mut self, peer_id: PeerId, metrics: &mut Option) { + tracing::debug!(peer=%peer_id, "Peer disconnected"); + + // Find all the topics the peer is subsribed to + let subscribed_topics = self + .peer_topics + .get(&peer_id) + .expect("Peer must be be in this mapping on connection"); + + for topic in subscribed_topics { + // Remove from any meshes + if let Some(mesh_peers) = self.mesh.get_mut(topic) { + // check if the peer is in the mesh and remove it + if mesh_peers.remove(&peer_id) { + if let Some(m) = metrics.as_mut() { + m.peers_removed(topic, Churn::Dc, 1); + m.set_mesh_peers(topic, mesh_peers.len()); + } + }; + } + + // Remove from topic_peers + if let Some(peer_list) = self.topic_peers.get_mut(topic) { + if !peer_list.remove(&peer_id) { + tracing::warn!( + peer=%peer_id, + "Disconnected node: peer not in topic_peers" + ); + } + if let Some(m) = metrics.as_mut() { + m.set_topic_peers(topic, peer_list.len()) + } + } else { + tracing::warn!( + peer=%peer_id, + topic=%topic, + "Disconnected node: peer with topic not in topic_peers" + ); + } + + // Remove from Fanout + if let Some(peer_list) = self.fanout.get_mut(topic) { + peer_list.remove(&peer_id); + } + } + + // Remove it from peer_topics + self.peer_topics.remove(&peer_id); + + // Remove it from peer_connections + self.peer_connections.remove(&peer_id); + + // Remove outbound peers + self.outbound_peers.remove(&peer_id); + } + + /// Updates the connection kind + pub(crate) fn update_connection_kind(&mut self, peer_id: PeerId, new_kind: PeerKind) { + if let Some(connection) = self.peer_connections.get_mut(&peer_id) { + if let PeerKind::Floodsub = connection.kind { + // Only change the value if the old value is Floodsub (the default set in + // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished). + // All other PeerKind changes are ignored. + tracing::debug!( + peer_id=%peer_id, + peer_kind=%new_kind, + "New peer type found for peer" + ); + connection.kind = new_kind; + } + } + } + + // Send Queue Handler functions + + /// Get the send queue handler for all peer connections. + pub(crate) fn all_handler_senders( + &mut self, + ) -> impl Iterator { + self.peer_connections + .iter_mut() + .map(|(k, v)| (k, &mut v.sender)) + } + + /// Get the send queue handler for a peer. + pub(crate) fn get_sender(&mut self, peer_id: &PeerId) -> Option<&mut RpcSender> { + let connection = self.peer_connections.get_mut(peer_id)?; + Some(&mut connection.sender) + } + + // Test helper functions + + #[cfg(test)] + /// Returns the number of active connections for a peer. + pub(crate) fn total_connections(&self, peer_id: &PeerId) -> usize { + self.peer_connections + .get(peer_id) + .map(|connection| connection.connections.len()) + .unwrap_or(0) + } +} diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 704d8078496..00046c75f1e 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -233,14 +233,8 @@ where let sender = RpcSender::new(gs.config.connection_handler_queue_len()); let receiver = sender.new_receiver(); let connection_id = ConnectionId::new_unchecked(0); - gs.connected_peers.insert( - peer, - PeerConnections { - kind: kind.clone().unwrap_or(PeerKind::Floodsub), - connections: vec![connection_id], - sender, - }, - ); + gs.connected_peers + .peer_connected(peer, connection_id, sender); gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: peer, @@ -280,24 +274,20 @@ where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, { - if let Some(peer_connections) = gs.connected_peers.get(peer_id) { + while let Some(connection_id) = gs.connected_peers.connection_id(peer_id) { let fake_endpoint = ConnectedPoint::Dialer { address: Multiaddr::empty(), role_override: Endpoint::Dialer, - }; // this is not relevant - // peer_connections.connections should never be empty. + }; - let mut active_connections = peer_connections.connections.len(); - for connection_id in peer_connections.connections.clone() { - active_connections = active_connections.checked_sub(1).unwrap(); + let remaining_established = gs.connected_peers.total_connections(peer_id); - gs.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id: *peer_id, - connection_id, - endpoint: &fake_endpoint, - remaining_established: active_connections, - })); - } + gs.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id: *peer_id, + connection_id, + endpoint: &fake_endpoint, + remaining_established, + })); } } @@ -425,7 +415,7 @@ fn test_subscribe() { .create_network(); assert!( - gs.mesh.get(&topic_hashes[0]).is_some(), + gs.connected_peers.mesh().get(&topic_hashes[0]).is_some(), "Subscribe should add a new entry to the mesh[topic] hashmap" ); @@ -468,11 +458,11 @@ fn test_unsubscribe() { for topic_hash in &topic_hashes { assert!( - gs.topic_peers.get(topic_hash).is_some(), + gs.connected_peers.get_peers_on_topic(topic_hash).is_some(), "Topic_peers contain a topic entry" ); assert!( - gs.mesh.get(topic_hash).is_some(), + gs.connected_peers.mesh().get(topic_hash).is_some(), "mesh should contain a topic entry" ); } @@ -505,7 +495,7 @@ fn test_unsubscribe() { // check we clean up internal structures for topic_hash in &topic_hashes { assert!( - gs.mesh.get(topic_hash).is_none(), + gs.connected_peers.mesh().get(topic_hash).is_none(), "All topics should have been removed from the mesh" ); } @@ -556,7 +546,12 @@ fn test_join() { // should have added mesh_n nodes to the mesh assert!( - gs.mesh.get(&topic_hashes[0]).unwrap().len() == 6, + gs.connected_peers + .mesh() + .get(&topic_hashes[0]) + .unwrap() + .len() + == 6, "Should have added 6 nodes to the mesh" ); @@ -579,33 +574,24 @@ fn test_join() { // verify fanout nodes // add 3 random peers to the fanout[topic1] - gs.fanout - .insert(topic_hashes[1].clone(), Default::default()); let mut new_peers: Vec = vec![]; - for _ in 0..3 { let random_peer = PeerId::random(); // inform the behaviour of a new peer let address = "/ip4/127.0.0.1".parse::().unwrap(); - gs.handle_established_inbound_connection( - ConnectionId::new_unchecked(0), - random_peer, - &address, - &address, - ) - .unwrap(); - let sender = RpcSender::new(gs.config.connection_handler_queue_len()); - let receiver = sender.new_receiver(); let connection_id = ConnectionId::new_unchecked(0); - gs.connected_peers.insert( - random_peer, - PeerConnections { - kind: PeerKind::Floodsub, - connections: vec![connection_id], - sender, - }, - ); - receivers.insert(random_peer, receiver); + let handler = gs + .handle_established_inbound_connection( + connection_id.clone(), + random_peer, + &address, + &address, + ) + .unwrap(); + + if let Handler::Enabled(mut new_h) = handler { + receivers.insert(random_peer, new_h.receiver()); + } gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: random_peer, @@ -619,8 +605,8 @@ fn test_join() { })); // add the new peer to the fanout - let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap(); - fanout_peers.insert(random_peer); + gs.connected_peers + .add_to_fanout(topic_hashes[1].clone(), std::iter::once(&random_peer)); new_peers.push(random_peer); } @@ -629,10 +615,15 @@ fn test_join() { // the three new peers should have been added, along with 3 more from the pool. assert!( - gs.mesh.get(&topic_hashes[1]).unwrap().len() == 6, + gs.connected_peers + .mesh() + .get(&topic_hashes[1]) + .unwrap() + .len() + == 6, "Should have added 6 nodes to the mesh" ); - let mesh_peers = gs.mesh.get(&topic_hashes[1]).unwrap(); + let mesh_peers = gs.connected_peers.mesh().get(&topic_hashes[1]).unwrap(); for new_peer in new_peers { assert!( mesh_peers.contains(&new_peer), @@ -671,13 +662,15 @@ fn test_publish_without_flood_publishing() { .create_network(); assert!( - gs.mesh.get(&topic_hashes[0]).is_some(), + gs.connected_peers.mesh().get(&topic_hashes[0]).is_some(), "Subscribe should add a new entry to the mesh[topic] hashmap" ); // all peers should be subscribed to the topic assert_eq!( - gs.topic_peers.get(&topic_hashes[0]).map(|p| p.len()), + gs.connected_peers + .get_peers_on_topic(&topic_hashes[0]) + .map(|p| p.len()), Some(20), "Peers should be subscribed to the topic" ); @@ -747,7 +740,7 @@ fn test_fanout() { .create_network(); assert!( - gs.mesh.get(&topic_hashes[0]).is_some(), + gs.connected_peers.mesh().get(&topic_hashes[0]).is_some(), "Subscribe should add a new entry to the mesh[topic] hashmap" ); // Unsubscribe from topic @@ -762,7 +755,8 @@ fn test_fanout() { .unwrap(); assert_eq!( - gs.fanout + gs.connected_peers + .fanout() .get(&TopicHash::from_raw(fanout_topic)) .unwrap() .len(), @@ -844,7 +838,7 @@ fn test_inject_connected() { // should add the new peers to `peer_topics` with an empty vec as a gossipsub node for peer in peers { - let known_topics = gs.peer_topics.get(&peer).unwrap(); + let known_topics = gs.connected_peers.get_topics_for_peer(&peer).unwrap(); assert!( known_topics == &topic_hashes.iter().cloned().collect(), "The topics for each node should all topics" @@ -895,24 +889,38 @@ fn test_handle_received_subscriptions() { // verify the result - let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); + let peer_topics = gs + .connected_peers + .get_topics_for_peer(&peers[0]) + .unwrap() + .clone(); assert!( peer_topics == topic_hashes.iter().take(3).cloned().collect(), "First peer should be subscribed to three topics" ); - let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone(); + let peer_topics = gs + .connected_peers + .get_topics_for_peer(&peers[1]) + .unwrap() + .clone(); assert!( peer_topics == topic_hashes.iter().take(3).cloned().collect(), "Second peer should be subscribed to three topics" ); assert!( - gs.peer_topics.get(&unknown_peer).is_none(), + gs.connected_peers + .get_topics_for_peer(&unknown_peer) + .is_none(), "Unknown peer should not have been added" ); for topic_hash in topic_hashes[..3].iter() { - let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone(); + let topic_peers = gs + .connected_peers + .get_peers_on_topic(topic_hash) + .unwrap() + .clone(); assert!( topic_peers == peers[..2].iter().cloned().collect(), "Two peers should be added to the first three topics" @@ -929,13 +937,21 @@ fn test_handle_received_subscriptions() { &peers[0], ); - let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); + let peer_topics = gs + .connected_peers + .get_topics_for_peer(&peers[0]) + .unwrap() + .clone(); assert!( peer_topics == topic_hashes[1..3].iter().cloned().collect(), "Peer should be subscribed to two topics" ); - let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment + let topic_peers = gs + .connected_peers + .get_peers_on_topic(&topic_hashes[0]) + .unwrap() + .clone(); // only gossipsub at the moment assert!( topic_peers == peers[1..2].iter().cloned().collect(), "Only the second peers should be in the first topic" @@ -960,64 +976,39 @@ fn test_get_random_peers() { peers.push(PeerId::random()) } - gs.topic_peers - .insert(topic_hash.clone(), peers.iter().cloned().collect()); - - gs.connected_peers = peers - .iter() - .map(|p| { - ( - *p, - PeerConnections { - kind: PeerKind::Gossipsubv1_1, - connections: vec![ConnectionId::new_unchecked(0)], - sender: RpcSender::new(gs.config.connection_handler_queue_len()), - }, - ) - }) - .collect(); + for peer_id in &peers { + gs.connected_peers.peer_connected( + peer_id.clone(), + ConnectionId::new_unchecked(0), + RpcSender::new(gs.config.connection_handler_queue_len()), + ); + gs.connected_peers + .update_connection_kind(*peer_id, PeerKind::Gossipsub); + let _ = gs + .connected_peers + .add_peer_to_a_topic(peer_id, topic_hash.clone()); + } - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| { - true - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| true); assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned"); - let random_peers = get_random_peers( - &gs.topic_peers, - &gs.connected_peers, - &topic_hash, - 30, - |_| true, - ); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = get_random_peers( - &gs.topic_peers, - &gs.connected_peers, - &topic_hash, - 20, - |_| true, - ); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 0, |_| { - true - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_| true); assert!(random_peers.is_empty(), "Expected 0 peers to be returned"); // test the filter - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| { - false - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| false); assert!(random_peers.is_empty(), "Expected 0 peers to be returned"); - let random_peers = get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 10, { + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 10, { |peer| peers.contains(peer) }); assert!(random_peers.len() == 10, "Expected 10 peers to be returned"); @@ -1255,7 +1246,11 @@ fn test_handle_graft_is_subscribed() { gs.handle_graft(&peers[7], topic_hashes.clone()); assert!( - gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + gs.connected_peers + .mesh() + .get(&topic_hashes[0]) + .unwrap() + .contains(&peers[7]), "Expected peer to have been added to mesh" ); } @@ -1276,7 +1271,11 @@ fn test_handle_graft_is_not_subscribed() { ); assert!( - !gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + !gs.connected_peers + .mesh() + .get(&topic_hashes[0]) + .unwrap() + .contains(&peers[7]), "Expected peer to have been added to mesh" ); } @@ -1305,13 +1304,17 @@ fn test_handle_graft_multiple_topics() { for hash in topic_hashes.iter().take(2) { assert!( - gs.mesh.get(hash).unwrap().contains(&peers[7]), + gs.connected_peers + .mesh() + .get(hash) + .unwrap() + .contains(&peers[7]), "Expected peer to be in the mesh for the first 2 topics" ); } assert!( - gs.mesh.get(&topic_hashes[2]).is_none(), + gs.connected_peers.mesh().get(&topic_hashes[2]).is_none(), "Expected the second topic to not be in the mesh" ); } @@ -1326,10 +1329,14 @@ fn test_handle_prune_peer_in_mesh() { .create_network(); // insert peer into our mesh for 'topic1' - gs.mesh - .insert(topic_hashes[0].clone(), peers.iter().cloned().collect()); + gs.connected_peers + .add_to_mesh(topic_hashes[0].clone(), peers.iter()); assert!( - gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + gs.connected_peers + .mesh() + .get(&topic_hashes[0]) + .unwrap() + .contains(&peers[7]), "Expected peer to be in mesh" ); @@ -1341,7 +1348,11 @@ fn test_handle_prune_peer_in_mesh() { .collect(), ); assert!( - !gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + !gs.connected_peers + .mesh() + .get(&topic_hashes[0]) + .unwrap() + .contains(&peers[7]), "Expected peer to be removed from mesh" ); } @@ -1481,8 +1492,8 @@ fn test_handle_graft_explicit_peer() { gs.handle_graft(peer, topic_hashes.clone()); //peer got not added to mesh - assert!(gs.mesh[&topic_hashes[0]].is_empty()); - assert!(gs.mesh[&topic_hashes[1]].is_empty()); + assert!(gs.connected_peers.mesh()[&topic_hashes[0]].is_empty()); + assert!(gs.connected_peers.mesh()[&topic_hashes[1]].is_empty()); //check prunes assert!( @@ -1509,7 +1520,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //only peer 1 is in the mesh not peer 0 (which is an explicit peer) assert_eq!( - gs.mesh[&topic_hashes[0]], + gs.connected_peers.mesh()[&topic_hashes[0]], vec![peers[1]].into_iter().collect() ); @@ -1543,7 +1554,7 @@ fn do_not_graft_explicit_peer() { gs.heartbeat(); //mesh stays empty - assert_eq!(gs.mesh[&topic_hashes[0]], BTreeSet::new()); + assert_eq!(gs.connected_peers.mesh()[&topic_hashes[0]], BTreeSet::new()); //assert that no graft gets created to explicit peer assert_eq!( @@ -1617,7 +1628,10 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { gs.subscribe(&topic).unwrap(); //only peer 1 is in the mesh not peer 0 (which is an explicit peer) - assert_eq!(gs.mesh[&topic_hash], vec![peers[1]].into_iter().collect()); + assert_eq!( + gs.connected_peers.mesh()[&topic_hash], + vec![peers[1]].into_iter().collect() + ); //assert that graft gets created to non-explicit peer assert!( @@ -1666,7 +1680,10 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { gs.subscribe(&topic).unwrap(); //only peer 1 is in the mesh not peer 0 (which is an explicit peer) - assert_eq!(gs.mesh[&topic_hash], vec![peers[1]].into_iter().collect()); + assert_eq!( + gs.connected_peers.mesh()[&topic_hash], + vec![peers[1]].into_iter().collect() + ); //assert that graft gets created to non-explicit peer assert!( @@ -1749,7 +1766,7 @@ fn test_mesh_addition() { // Verify the pruned peers are removed from the mesh. assert_eq!( - gs.mesh.get(&topics[0]).unwrap().len(), + gs.connected_peers.mesh().get(&topics[0]).unwrap().len(), config.mesh_n_low() - 1 ); @@ -1757,7 +1774,10 @@ fn test_mesh_addition() { gs.heartbeat(); // Peers should be added to reach mesh_n - assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n()); + assert_eq!( + gs.connected_peers.mesh().get(&topics[0]).unwrap().len(), + config.mesh_n() + ); } // Tests the mesh maintenance subtraction @@ -1785,7 +1805,10 @@ fn test_mesh_subtraction() { gs.heartbeat(); // Peers should be removed to reach mesh_n - assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n()); + assert_eq!( + gs.connected_peers.mesh().get(&topics[0]).unwrap().len(), + config.mesh_n() + ); } #[test] @@ -1896,7 +1919,8 @@ fn test_prune_backoffed_peer_on_graft() { .create_network(); //remove peer from mesh and send prune to peer => this adds a backoff for this peer - gs.mesh.get_mut(&topics[0]).unwrap().remove(&peers[0]); + gs.connected_peers + .remove_peer_from_mesh(&peers[0], &topics[0]); gs.send_graft_prune( HashMap::new(), vec![(peers[0], vec![topics[0].clone()])] @@ -2253,7 +2277,10 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { } //assert current mesh size - assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); + assert_eq!( + gs.connected_peers.mesh()[&topics[0]].len(), + config.mesh_n_high() + ); //create an outbound and an inbound peer let (inbound, _in_reciver) = add_peer(&mut gs, &topics, false, false); @@ -2264,13 +2291,16 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { gs.handle_graft(&outbound, vec![topics[0].clone()]); //assert mesh size - assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high() + 1); + assert_eq!( + gs.connected_peers.mesh()[&topics[0]].len(), + config.mesh_n_high() + 1 + ); //inbound is not in mesh - assert!(!gs.mesh[&topics[0]].contains(&inbound)); + assert!(!gs.connected_peers.mesh()[&topics[0]].contains(&inbound)); //outbound is in mesh - assert!(gs.mesh[&topics[0]].contains(&outbound)); + assert!(gs.connected_peers.mesh()[&topics[0]].contains(&outbound)); } #[test] @@ -2308,16 +2338,21 @@ fn test_do_not_remove_too_many_outbound_peers() { } //mesh is overly full - assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n + m); + assert_eq!( + gs.connected_peers.mesh().get(&topics[0]).unwrap().len(), + n + m + ); // run a heartbeat gs.heartbeat(); // Peers should be removed to reach n - assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n); + assert_eq!(gs.connected_peers.mesh().get(&topics[0]).unwrap().len(), n); //all outbound peers are still in the mesh - assert!(outbound.iter().all(|p| gs.mesh[&topics[0]].contains(p))); + assert!(outbound + .iter() + .all(|p| gs.connected_peers.mesh()[&topics[0]].contains(p))); } #[test] @@ -2343,14 +2378,17 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { } // Nothing changed in the mesh yet - assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); + assert_eq!( + gs.connected_peers.mesh()[&topics[0]].len(), + config.mesh_n_high() + ); // run a heartbeat gs.heartbeat(); // The outbound peers got additionally added assert_eq!( - gs.mesh[&topics[0]].len(), + gs.connected_peers.mesh()[&topics[0]].len(), config.mesh_n_high() + config.mesh_outbound_min() ); } @@ -2380,7 +2418,7 @@ fn test_prune_negative_scored_peers() { gs.heartbeat(); //peer should not be in mesh anymore - assert!(gs.mesh[&topics[0]].is_empty()); + assert!(gs.connected_peers.mesh()[&topics[0]].is_empty()); //check prune message assert_eq!( @@ -2432,8 +2470,13 @@ fn test_dont_graft_to_negative_scored_peers() { gs.heartbeat(); //assert that mesh only contains p2 - assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), 1); - assert!(gs.mesh.get(&topics[0]).unwrap().contains(&p2)); + assert_eq!(gs.connected_peers.mesh().get(&topics[0]).unwrap().len(), 1); + assert!(gs + .connected_peers + .mesh() + .get(&topics[0]) + .unwrap() + .contains(&p2)); } ///Note that in this test also without a penalty the px would be ignored because of the @@ -3130,15 +3173,15 @@ fn test_keep_best_scoring_peers_on_oversubscription() { gs.set_application_score(peer, index as f64); } - assert_eq!(gs.mesh[&topics[0]].len(), n); + assert_eq!(gs.connected_peers.mesh()[&topics[0]].len(), n); //heartbeat to prune some peers gs.heartbeat(); - assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n()); + assert_eq!(gs.connected_peers.mesh()[&topics[0]].len(), config.mesh_n()); //mesh contains retain_scores best peers - assert!(gs.mesh[&topics[0]].is_superset( + assert!(gs.connected_peers.mesh()[&topics[0]].is_superset( &peers[(n - config.retain_scores())..] .iter() .cloned() @@ -4204,7 +4247,7 @@ fn test_scoring_p7_grafts_before_backoff() { //remove peers from mesh and send prune to them => this adds a backoff for the peers for peer in peers.iter().take(2) { - gs.mesh.get_mut(&topics[0]).unwrap().remove(peer); + gs.connected_peers.remove_peer_from_mesh(peer, &topics[0]); gs.send_graft_prune( HashMap::new(), HashMap::from([(*peer, vec![topics[0].clone()])]), @@ -4290,7 +4333,10 @@ fn test_opportunistic_grafting() { .collect(); //currently mesh equals peers - assert_eq!(gs.mesh[&topics[0]], peers.iter().cloned().collect()); + assert_eq!( + gs.connected_peers.mesh()[&topics[0]], + peers.iter().cloned().collect() + ); //give others high scores (but the first two have not high enough scores) for (i, peer) in peers.iter().enumerate().take(5) { @@ -4307,7 +4353,7 @@ fn test_opportunistic_grafting() { gs.heartbeat(); assert_eq!( - gs.mesh[&topics[0]].len(), + gs.connected_peers.mesh()[&topics[0]].len(), 5, "should not apply opportunistic grafting" ); @@ -4319,7 +4365,7 @@ fn test_opportunistic_grafting() { gs.heartbeat(); assert_eq!( - gs.mesh[&topics[0]].len(), + gs.connected_peers.mesh()[&topics[0]].len(), 5, "should not apply opportunistic grafting after first tick" ); @@ -4327,18 +4373,19 @@ fn test_opportunistic_grafting() { gs.heartbeat(); assert_eq!( - gs.mesh[&topics[0]].len(), + gs.connected_peers.mesh()[&topics[0]].len(), 7, "opportunistic grafting should have added 2 peers" ); assert!( - gs.mesh[&topics[0]].is_superset(&peers.iter().cloned().collect()), + gs.connected_peers.mesh()[&topics[0]].is_superset(&peers.iter().cloned().collect()), "old peers are still part of the mesh" ); assert!( - gs.mesh[&topics[0]].is_disjoint(&others.iter().map(|(p, _)| p).cloned().take(2).collect()), + gs.connected_peers.mesh()[&topics[0]] + .is_disjoint(&others.iter().map(|(p, _)| p).cloned().take(2).collect()), "peers below or equal to median should not be added in opportunistic grafting" ); } @@ -4807,7 +4854,10 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { queues.insert(p2, receiver2); //p1 and p2 are not in the mesh - assert!(!gs.mesh[&topics[0]].contains(&p1) && !gs.mesh[&topics[0]].contains(&p2)); + assert!( + !gs.connected_peers.mesh()[&topics[0]].contains(&p1) + && !gs.connected_peers.mesh()[&topics[0]].contains(&p2) + ); //publish a message let publish_data = vec![0; 42]; @@ -4888,7 +4938,8 @@ fn test_do_not_use_floodsub_in_fanout() { ); assert!( - !gs.fanout[&topics[0]].contains(&p1) && !gs.fanout[&topics[0]].contains(&p2), + !gs.connected_peers.fanout()[&topics[0]].contains(&p1) + && !gs.connected_peers.fanout()[&topics[0]].contains(&p2), "Floodsub peers are not allowed in fanout" ); } @@ -4918,7 +4969,7 @@ fn test_dont_add_floodsub_peers_to_mesh_on_join() { gs.join(&topics[0]); assert!( - gs.mesh[&topics[0]].is_empty(), + gs.connected_peers.mesh()[&topics[0]].is_empty(), "Floodsub peers should not get added to mesh" ); } @@ -5019,7 +5070,7 @@ fn test_dont_add_floodsub_peers_to_mesh_in_heartbeat() { gs.heartbeat(); assert!( - gs.mesh[&topics[0]].is_empty(), + gs.connected_peers.mesh()[&topics[0]].is_empty(), "Floodsub peers should not get added to mesh" ); } @@ -5041,9 +5092,10 @@ fn test_public_api() { ); assert_eq!( - gs.mesh_peers(&TopicHash::from_raw("topic1")) + gs.connected_peers + .mesh_peers(&TopicHash::from_raw("topic1")) .cloned() - .collect::>(), + .unwrap_or_default(), peers, "Expected peers for a registered topic to contain all peers." ); @@ -5152,7 +5204,7 @@ fn test_graft_without_subscribe() { .create_network(); assert!( - gs.mesh.get(&topic_hashes[0]).is_some(), + gs.connected_peers.mesh().get(&topic_hashes[0]).is_some(), "Subscribe should add a new entry to the mesh[topic] hashmap" );