diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 5de1ce7dd84..a4b1c7f7dc0 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -5,8 +5,6 @@ - Removes the control pool and sends control messages on demand. -- Implement publish and forward message dropping. - - Implement backpressure by differentiating between priority and non priority messages. Drop `Publish` and `Forward` messages when the queue becomes full. See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index eb1b02f1aad..635c93bca08 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -342,9 +342,6 @@ pub struct Behaviour { /// Keep track of a set of internal metrics relating to gossipsub. metrics: Option, - /// Connection handler message queue channels. - handler_send_queues: HashMap, - /// Tracks the numbers of failed messages per peer-id. failed_messages: HashMap, } @@ -484,7 +481,6 @@ where config, subscription_filter, data_transform, - handler_send_queues: Default::default(), failed_messages: Default::default(), }) } @@ -551,10 +547,11 @@ where // send subscription request to all peers for peer in self.peer_topics.keys() { tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(peer) - .expect("Peerid should exist"); + .expect("Peer must be connected") + .sender; sender.subscribe(topic_hash.clone()); } @@ -582,10 +579,11 @@ where // announce to all peers for peer in self.peer_topics.keys() { tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(peer) - .expect("Peerid should exist"); + .expect("Peer should be connected") + .sender; sender.unsubscribe(topic_hash.clone()); } @@ -643,10 +641,10 @@ where let topic_hash = raw_message.topic.clone(); let mut recipient_peers = HashSet::new(); - if let Some(set) = self.topic_peers.get(&topic_hash) { + if let Some(peers_on_topic) = self.topic_peers.get(&topic_hash) { if self.config.flood_publish() { // Forward to all peers above score and all explicit peers - recipient_peers.extend(set.iter().filter(|p| { + recipient_peers.extend(peers_on_topic.iter().filter(|p| { self.explicit_peers.contains(*p) || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 })); @@ -696,9 +694,9 @@ where } } - // Explicit peers + // Explicit peers that are part of the topic for peer in &self.explicit_peers { - if set.contains(peer) { + if peers_on_topic.contains(peer) { recipient_peers.insert(*peer); } } @@ -734,34 +732,34 @@ where } // Send to peers we know are subscribed to the topic. - let mut errors = 0; + let mut publish_failed = true; for peer_id in recipient_peers.iter() { tracing::trace!(peer=%peer_id, "Sending message to peer"); - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(peer_id) - .expect("Peerid should exist"); - - if sender - .publish( - raw_message.clone(), - self.config.publish_queue_duration(), - self.metrics.as_mut(), - ) - .is_err() - { - self.failed_messages.entry(*peer_id).or_default().priority += 1; - - tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer"); - // Downscore the peer due to failed message. - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); + .expect("The peer must be connected") + .sender; + + match sender.publish( + raw_message.clone(), + self.config.publish_queue_duration(), + self.metrics.as_mut(), + ) { + Ok(_) => publish_failed = false, + Err(_) => { + self.failed_messages.entry(*peer_id).or_default().priority += 1; + + tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer"); + // Downscore the peer due to failed message. + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } } - - errors += 1; } } - if errors == recipient_peers.len() { + + if publish_failed { return Err(PublishError::InsufficientPeers); } @@ -1043,10 +1041,11 @@ where if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.graft(&peer_id, topic_hash.clone()); } - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(&peer_id) - .expect("Peerid should exist"); + .expect("Peer must be connected") + .sender; sender.graft(Graft { topic_hash: topic_hash.clone(), @@ -1147,10 +1146,11 @@ where tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer"); let on_unsubscribe = true; let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(&peer) - .expect("Peerid should exist"); + .expect("Peer must be connected") + .sender; sender.prune(prune); @@ -1314,10 +1314,11 @@ where iwant_ids_vec ); - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(peer_id) - .expect("Peerid should exist"); + .expect("Peer must be connected") + .sender; if sender .iwant(IWant { @@ -1371,10 +1372,11 @@ where ); } else { tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(peer_id) - .expect("Peerid should exist"); + .expect("Peer must be connected") + .sender; if sender .forward( @@ -1547,9 +1549,10 @@ where // build the prune messages to send let on_unsubscribe = false; let mut sender = self - .handler_send_queues + .connected_peers .get_mut(peer_id) - .expect("Peerid should exist") + .expect("Peer must be connected") + .sender .clone(); for prune in to_prune_topics @@ -2051,11 +2054,11 @@ where // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(propagation_source) - .expect("Peerid should exist"); - + .expect("Peer must be connected") + .sender; for topic_hash in topics_to_graft.into_iter() { sender.graft(Graft { topic_hash }); } @@ -2092,7 +2095,7 @@ where // before we add all the gossip from this heartbeat in order to gain a true measure of // steady-state size of the queues. if let Some(m) = &mut self.metrics { - for sender_queue in self.handler_send_queues.values() { + for sender_queue in self.connected_peers.values_mut().map(|v| &v.sender) { m.observe_priority_queue_size(sender_queue.priority_len()); m.observe_non_priority_queue_size(sender_queue.non_priority_len()); } @@ -2562,10 +2565,11 @@ where } // send an IHAVE message - let sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(&peer_id) - .expect("Peerid should exist"); + .expect("Peer must be connected") + .sender; if sender .ihave(IHave { topic_hash: topic_hash.clone(), @@ -2597,20 +2601,20 @@ where no_px: HashSet, ) { // handle the grafts and overlapping prunes per peer - for (peer, topics) in to_graft.into_iter() { + for (peer_id, topics) in to_graft.into_iter() { for topic in &topics { // inform scoring of graft if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.graft(&peer, topic.clone()); + peer_score.graft(&peer_id, topic.clone()); } // inform the handler of the peer being added to the mesh // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( - peer, + peer_id, vec![topic], &self.mesh, - self.peer_topics.get(&peer), + self.peer_topics.get(&peer_id), &mut self.events, &self.connected_peers, ); @@ -2623,21 +2627,22 @@ where // send the control messages let mut sender = self - .handler_send_queues - .get_mut(&peer) - .expect("Peerid should exist") + .connected_peers + .get_mut(&peer_id) + .expect("Peer must be connected") + .sender .clone(); // The following prunes are not due to unsubscribing. let prunes = to_prune - .remove(&peer) + .remove(&peer_id) .into_iter() .flatten() .map(|topic_hash| { self.make_prune( &topic_hash, - &peer, - self.config.do_px() && !no_px.contains(&peer), + &peer_id, + self.config.do_px() && !no_px.contains(&peer_id), false, ) }); @@ -2663,11 +2668,11 @@ where self.config.do_px() && !no_px.contains(peer), false, ); - let mut sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(peer) - .expect("Peerid should exist") - .clone(); + .expect("Peer must be connected") + .sender; sender.prune(prune); @@ -2704,33 +2709,31 @@ where tracing::debug!(message=%msg_id, "Forwarding message"); let mut recipient_peers = HashSet::new(); - { - // Populate the recipient peers mapping - - // Add explicit peers - for peer_id in &self.explicit_peers { - if let Some(topics) = self.peer_topics.get(peer_id) { - if Some(peer_id) != propagation_source - && !originating_peers.contains(peer_id) - && Some(peer_id) != message.source.as_ref() - && topics.contains(&message.topic) - { - recipient_peers.insert(*peer_id); - } + // Populate the recipient peers mapping + + // Add explicit peers + for peer_id in &self.explicit_peers { + if let Some(topics) = self.peer_topics.get(peer_id) { + if Some(peer_id) != propagation_source + && !originating_peers.contains(peer_id) + && Some(peer_id) != message.source.as_ref() + && topics.contains(&message.topic) + { + recipient_peers.insert(*peer_id); } } + } - // add mesh peers - let topic = &message.topic; - // mesh - if let Some(mesh_peers) = self.mesh.get(topic) { - for peer_id in mesh_peers { - if Some(peer_id) != propagation_source - && !originating_peers.contains(peer_id) - && Some(peer_id) != message.source.as_ref() - { - recipient_peers.insert(*peer_id); - } + // add mesh peers + let topic = &message.topic; + // mesh + if let Some(mesh_peers) = self.mesh.get(topic) { + for peer_id in mesh_peers { + if Some(peer_id) != propagation_source + && !originating_peers.contains(peer_id) + && Some(peer_id) != message.source.as_ref() + { + recipient_peers.insert(*peer_id); } } } @@ -2739,27 +2742,26 @@ where if !recipient_peers.is_empty() { for peer_id in recipient_peers.iter() { tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer"); - let sender = self - .handler_send_queues - .get_mut(peer_id) - .expect("Peerid should exist"); - if sender - .forward( - message.clone(), - self.config.forward_queue_duration(), - self.metrics.as_mut(), - ) - .is_err() - { - // Downscore the peer - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.failed_message_slow_peer(peer_id); + // Peer might still exist on the explicit peers list but it's no longer connected. + if let Some(sender) = self.connected_peers.get_mut(peer_id).map(|p| &mut p.sender) { + if sender + .forward( + message.clone(), + self.config.forward_queue_duration(), + self.metrics.as_mut(), + ) + .is_err() + { + // Downscore the peer + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + // Increment the failed message count + self.failed_messages + .entry(*peer_id) + .or_default() + .non_priority += 1; } - // Increment the failed message count - self.failed_messages - .entry(*peer_id) - .or_default() - .non_priority += 1; } } tracing::debug!("Completed forwarding message"); @@ -2865,7 +2867,6 @@ where &mut self, ConnectionEstablished { peer_id, - connection_id, endpoint, other_established, .. @@ -2893,20 +2894,6 @@ where } } - // By default we assume a peer is only a floodsub peer. - // - // The protocol negotiation occurs once a message is sent/received. Once this happens we - // update the type of peer that this is in order to determine which kind of routing should - // occur. - self.connected_peers - .entry(peer_id) - .or_insert(PeerConnections { - kind: PeerKind::Floodsub, - connections: vec![], - }) - .connections - .push(connection_id); - if other_established > 0 { return; // Not our first connection to this peer, hence nothing to do. } @@ -2926,11 +2913,11 @@ where tracing::debug!(peer=%peer_id, "New peer connected"); // We need to send our subscriptions to the newly-connected node. - let mut sender = self - .handler_send_queues + let sender = &mut self + .connected_peers .get_mut(&peer_id) - .expect("Peerid should exist") - .clone(); + .expect("Peer must be connected") + .sender; for topic_hash in self.mesh.clone().into_keys() { sender.subscribe(topic_hash); @@ -3061,7 +3048,6 @@ where } self.connected_peers.remove(&peer_id); - self.handler_send_queues.remove(&peer_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.remove_peer(&peer_id); @@ -3120,35 +3106,59 @@ where fn handle_established_inbound_connection( &mut self, - _: ConnectionId, + connection_id: ConnectionId, peer_id: PeerId, _: &Multiaddr, _: &Multiaddr, ) -> Result, ConnectionDenied> { - let sender = self - .handler_send_queues + // By default we assume a peer is only a floodsub peer. + // + // The protocol negotiation occurs once a message is sent/received. Once this happens we + // update the type of peer that this is in order to determine which kind of routing should + // occur. + let connected_peer = self + .connected_peers .entry(peer_id) - .or_insert_with(|| RpcSender::new(self.config.connection_handler_queue_len())); + .or_insert(PeerConnections { + kind: PeerKind::Floodsub, + connections: vec![], + sender: RpcSender::new(self.config.connection_handler_queue_len()), + }); + // Add the new connection + connected_peer.connections.push(connection_id); + Ok(Handler::new( self.config.protocol_config(), - sender.new_receiver(), + connected_peer.sender.new_receiver(), )) } fn handle_established_outbound_connection( &mut self, - _: ConnectionId, + connection_id: ConnectionId, peer_id: PeerId, _: &Multiaddr, _: Endpoint, ) -> Result, ConnectionDenied> { - let sender = self - .handler_send_queues + // By default we assume a peer is only a floodsub peer. + // + // The protocol negotiation occurs once a message is sent/received. Once this happens we + // update the type of peer that this is in order to determine which kind of routing should + // occur. + let connected_peer = self + .connected_peers .entry(peer_id) - .or_insert_with(|| RpcSender::new(self.config.connection_handler_queue_len())); + .or_insert(PeerConnections { + kind: PeerKind::Floodsub, + connections: vec![], + sender: RpcSender::new(self.config.connection_handler_queue_len()), + }); + // Add the new connection + connected_peer.connections.push(connection_id); + Ok(Handler::new( self.config.protocol_config(), - sender.new_receiver(), + connected_peer.sender.new_receiver(), )) } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index c511c87677d..29543b91a7b 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -232,11 +232,19 @@ where let sender = RpcSender::new(gs.config.connection_handler_queue_len()); let receiver = sender.new_receiver(); - gs.handler_send_queues.insert(peer, sender); + let connection_id = ConnectionId::new_unchecked(0); + gs.connected_peers.insert( + peer, + PeerConnections { + kind: kind.clone().unwrap_or(PeerKind::Floodsub), + connections: vec![connection_id], + sender, + }, + ); gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: peer, - connection_id: ConnectionId::new_unchecked(0), + connection_id, endpoint: &endpoint, failed_addresses: &[], other_established: 0, // first connection @@ -591,12 +599,20 @@ fn test_join() { peers.push(peer); let sender = RpcSender::new(gs.config.connection_handler_queue_len()); let receiver = sender.new_receiver(); - gs.handler_send_queues.insert(random_peer, sender); + let connection_id = ConnectionId::new_unchecked(0); + gs.connected_peers.insert( + random_peer, + PeerConnections { + kind: PeerKind::Floodsub, + connections: vec![connection_id], + sender, + }, + ); receivers.insert(random_peer, receiver); gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: random_peer, - connection_id: ConnectionId::new_unchecked(0), + connection_id, endpoint: &ConnectedPoint::Dialer { address, role_override: Endpoint::Dialer, @@ -958,6 +974,7 @@ fn test_get_random_peers() { PeerConnections { kind: PeerKind::Gossipsubv1_1, connections: vec![ConnectionId::new_unchecked(0)], + sender: RpcSender::new(gs.config.connection_handler_queue_len()), }, ) }) diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 0c2ee27d72c..1fd78383a51 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -109,12 +109,14 @@ impl std::fmt::Debug for MessageId { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub(crate) struct PeerConnections { /// The kind of protocol the peer supports. pub(crate) kind: PeerKind, /// Its current connections. pub(crate) connections: Vec, + /// The rpc sender to the peer. + pub(crate) sender: RpcSender, } /// Describes the types of peers that can exist in the gossipsub context. @@ -633,19 +635,19 @@ impl RpcSender { } /// Send a `RpcOut::IHave` message to the `RpcReceiver` - /// this is low priority and if queue is full the message is dropped. - pub(crate) fn ihave(&mut self, ihave: IHave) -> Result<(), ()> { + /// this is low priority, if the queue is full an Err is returned. + pub(crate) fn ihave(&mut self, ihave: IHave) -> Result<(), RpcOut> { self.non_priority .try_send(RpcOut::IHave(ihave)) - .map_err(|_| ()) + .map_err(|err| err.into_inner()) } /// Send a `RpcOut::IHave` message to the `RpcReceiver` - /// this is low priority and if queue is full the message is dropped. - pub(crate) fn iwant(&mut self, iwant: IWant) -> Result<(), ()> { + /// this is low priority, if the queue is full an Err is returned. + pub(crate) fn iwant(&mut self, iwant: IWant) -> Result<(), RpcOut> { self.non_priority .try_send(RpcOut::IWant(iwant)) - .map_err(|_| ()) + .map_err(|err| err.into_inner()) } /// Send a `RpcOut::Subscribe` message to the `RpcReceiver`