diff --git a/Cargo.lock b/Cargo.lock index 8932f40a04c..34c96c38b9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2838,7 +2838,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.47.0" +version = "0.47.1" dependencies = [ "async-std", "asynchronous-codec", diff --git a/Cargo.toml b/Cargo.toml index 31c3a8e4b9e..f4a4e2e9897 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ libp2p-core = { version = "0.42.0", path = "core" } libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" } libp2p-dns = { version = "0.42.0", path = "transports/dns" } libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" } -libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" } +libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.45.0", path = "protocols/identify" } libp2p-identity = { version = "0.2.9" } libp2p-kad = { version = "0.46.1", path = "protocols/kad" } diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 8e115052d31..ba86367872c 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.47.1 +- Implement backpressure by differentiating between priority and non priority messages. + Drop `Publish` and `Forward` messages when the queue becomes full. + See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914) + ## 0.47.0 @@ -8,8 +13,6 @@ - Use `web-time` instead of `instant`. See [PR 5347](https://github.com/libp2p/rust-libp2p/pull/5347). -## 0.46.1 - - Deprecate `Rpc` in preparation for removing it from the public API because it is an internal type. See [PR 4833](https://github.com/libp2p/rust-libp2p/pull/4833). diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 4cb590bed0c..3d0b8fddc22 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-gossipsub" edition = "2021" rust-version = { workspace = true } description = "Gossipsub protocol for libp2p" -version = "0.47.0" +version = "0.47.1" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -42,7 +42,7 @@ void = "1.0.2" prometheus-client = { workspace = true } [dev-dependencies] -async-std = { version = "1.6.3", features = ["unstable"] } +async-std = { version = "1.6.3", features = ["unstable", "attributes"] } hex = "0.4.2" libp2p-core = { workspace = true } libp2p-yamux = { workspace = true } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 0d1af1ada0c..e43e020fc40 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -29,6 +29,7 @@ use std::{ time::Duration, }; +use futures::channel::mpsc::channel; use futures::StreamExt; use futures_ticker::Ticker; use prometheus_client::registry::Registry; @@ -47,8 +48,6 @@ use libp2p_swarm::{ }; use web_time::{Instant, SystemTime}; -use crate::backoff::BackoffStorage; -use crate::config::{Config, ValidationMode}; use crate::gossip_promises::GossipPromises; use crate::handler::{Handler, HandlerEvent, HandlerIn}; use crate::mcache::MessageCache; @@ -63,7 +62,12 @@ use crate::types::{ ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, SubscriptionAction, }; -use crate::types::{PeerConnections, PeerKind, RpcOut}; +use crate::types::{PeerConnections, PeerKind}; +use crate::{backoff::BackoffStorage, types::RpcSender}; +use crate::{ + config::{Config, ValidationMode}, + types::RpcReceiver, +}; use crate::{rpc_proto::proto, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; use quick_protobuf::{MessageWrite, Writer}; @@ -535,10 +539,14 @@ where } // send subscription request to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.peer_topics.keys() { tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); - let event = RpcOut::Subscribe(topic_hash.clone()); - self.send_message(peer, event); + let sender = self + .connected_peers + .get_mut(peer) + .expect("Peerid should exist"); + + sender.subscribe(topic_hash.clone()); } // call JOIN(topic) @@ -563,10 +571,14 @@ where } // announce to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.peer_topics.keys() { tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); - let event = RpcOut::Unsubscribe(topic_hash.clone()); - self.send_message(peer, event); + let sender = self + .connected_peers + .get_mut(peer) + .expect("Peerid should exist"); + + sender.unsubscribe(topic_hash.clone()); } // call LEAVE(topic) @@ -713,9 +725,20 @@ where } // Send to peers we know are subscribed to the topic. + let mut publish_failed = true; for peer_id in recipient_peers.iter() { tracing::trace!(peer=%peer_id, "Sending message to peer"); - self.send_message(*peer_id, RpcOut::Publish(raw_message.clone())); + let sender = self + .connected_peers + .get_mut(peer_id) + .expect("Peerid should exist"); + + publish_failed &= sender + .publish(raw_message.clone(), self.metrics.as_mut()) + .is_err(); + } + if publish_failed { + return Err(PublishError::InsufficientPeers); } tracing::debug!(message=%msg_id, "Published message"); @@ -1313,7 +1336,12 @@ where ); } else { tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - self.send_message(*peer_id, RpcOut::Forward(msg)); + let sender = self + .connected_peers + .get_mut(peer_id) + .expect("Peerid should exist"); + + sender.forward(msg, self.metrics.as_mut()); } } } @@ -1466,13 +1494,18 @@ where if !to_prune_topics.is_empty() { // build the prune messages to send let on_unsubscribe = false; + let mut sender = self + .connected_peers + .remove(peer_id) + .expect("Peerid should exist"); + for action in to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) - .collect::>() { - self.send_message(*peer_id, RpcOut::Control(action)); + sender.control(action); } + self.connected_peers.insert(*peer_id, sender); // Send the prune messages to the peer tracing::debug!( peer=%peer_id, @@ -1966,12 +1999,16 @@ where // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. + let sender = self + .connected_peers + .get_mut(propagation_source) + .expect("Peerid should exist"); + for action in topics_to_graft .into_iter() .map(|topic_hash| ControlAction::Graft { topic_hash }) - .collect::>() { - self.send_message(*propagation_source, RpcOut::Control(action)) + sender.control(action); } // Notify the application of the subscriptions @@ -2506,6 +2543,13 @@ where // It therefore must be in at least one mesh and we do not need to inform the handler // of its removal from another. + // send the control messages + let mut sender = self + .connected_peers + .get_mut(&peer) + .expect("Peerid should exist") + .clone(); + // The following prunes are not due to unsubscribing. let prunes = to_prune .remove(&peer) @@ -2520,9 +2564,8 @@ where ) }); - // send the control messages - for msg in control_msgs.chain(prunes).collect::>() { - self.send_message(peer, RpcOut::Control(msg)); + for msg in control_msgs.chain(prunes) { + sender.control(msg); } } @@ -2536,7 +2579,13 @@ where self.config.do_px() && !no_px.contains(peer), false, ); - self.send_message(*peer, RpcOut::Control(prune)); + let mut sender = self + .connected_peers + .get_mut(peer) + .expect("Peerid should exist") + .clone(); + + sender.control(prune); // inform the handler peer_removed_from_mesh( @@ -2605,11 +2654,13 @@ where // forward the message to peers if !recipient_peers.is_empty() { - let event = RpcOut::Forward(message.clone()); - for peer in recipient_peers.iter() { tracing::debug!(%peer, message=%msg_id, "Sending message to peer"); - self.send_message(*peer, event.clone()); + let sender = self + .connected_peers + .get_mut(peer) + .expect("Peerid should exist"); + sender.forward(message.clone(), self.metrics.as_mut()); } tracing::debug!("Completed forwarding message"); Ok(true) @@ -2723,7 +2774,12 @@ where fn flush_control_pool(&mut self) { for (peer, controls) in self.control_pool.drain().collect::>() { for msg in controls { - self.send_message(peer, RpcOut::Control(msg)); + let sender = self + .connected_peers + .get_mut(&peer) + .expect("Peerid should exist"); + + sender.control(msg); } } @@ -2731,28 +2787,11 @@ where self.pending_iwant_msgs.clear(); } - /// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it - /// is not already an arc. - fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) { - if let Some(m) = self.metrics.as_mut() { - if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc { - // register bytes sent on the internal metrics. - m.msg_sent(&message.topic, message.raw_protobuf_len()); - } - } - - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(rpc), - handler: NotifyHandler::Any, - }); - } - fn on_connection_established( &mut self, ConnectionEstablished { peer_id, - connection_id, + connection_id: _, endpoint, other_established, .. @@ -2780,20 +2819,6 @@ where } } - // By default we assume a peer is only a floodsub peer. - // - // The protocol negotiation occurs once a message is sent/received. Once this happens we - // update the type of peer that this is in order to determine which kind of routing should - // occur. - self.connected_peers - .entry(peer_id) - .or_insert(PeerConnections { - kind: PeerKind::Floodsub, - connections: vec![], - }) - .connections - .push(connection_id); - if other_established > 0 { return; // Not our first connection to this peer, hence nothing to do. } @@ -2813,8 +2838,13 @@ where tracing::debug!(peer=%peer_id, "New peer connected"); // We need to send our subscriptions to the newly-connected node. + let sender = self + .connected_peers + .get_mut(&peer_id) + .expect("Peerid should exist"); + for topic_hash in self.mesh.clone().into_keys() { - self.send_message(peer_id, RpcOut::Subscribe(topic_hash)); + sender.subscribe(topic_hash); } } @@ -2844,16 +2874,11 @@ where if remaining_established != 0 { // Remove the connection from the list if let Some(connections) = self.connected_peers.get_mut(&peer_id) { - let index = connections - .connections - .iter() - .position(|v| v == &connection_id) - .expect("Previously established connection to peer must be present"); - connections.connections.remove(index); + connections.connections.remove(&connection_id); // If there are more connections and this peer is in a mesh, inform the first connection // handler. - if !connections.connections.is_empty() { + if let Some(alternative_connection_id) = connections.connections.keys().next() { if let Some(topics) = self.peer_topics.get(&peer_id) { for topic in topics { if let Some(mesh_peers) = self.mesh.get(topic) { @@ -2861,7 +2886,7 @@ where self.events.push_back(ToSwarm::NotifyHandler { peer_id, event: HandlerIn::JoinedMesh, - handler: NotifyHandler::One(connections.connections[0]), + handler: NotifyHandler::One(*alternative_connection_id), }); break; } @@ -3000,23 +3025,73 @@ where fn handle_established_inbound_connection( &mut self, - _: ConnectionId, - _: PeerId, + connection_id: ConnectionId, + peer_id: PeerId, _: &Multiaddr, _: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(Handler::new(self.config.protocol_config())) + let (priority_sender, priority_receiver) = + channel(self.config.connection_handler_queue_len()); + let (non_priority_sender, non_priority_receiver) = + channel(self.config.connection_handler_queue_len()); + let sender = RpcSender { + priority: priority_sender, + non_priority: non_priority_sender, + }; + let receiver = RpcReceiver { + priority: priority_receiver.peekable(), + non_priority: non_priority_receiver.peekable(), + }; + // By default we assume a peer is only a floodsub peer. + // + // The protocol negotiation occurs once a message is sent/received. Once this happens we + // update the type of peer that this is in order to determine which kind of routing should + // occur. + let peer_info = + self.connected_peers + .entry(peer_id) + .or_insert_with_key(|_| PeerConnections { + kind: PeerKind::Floodsub, + connections: Default::default(), + }); + peer_info.connections.insert(connection_id, sender); + Ok(Handler::new(self.config.protocol_config(), receiver)) } fn handle_established_outbound_connection( &mut self, - _: ConnectionId, - _: PeerId, + connection_id: ConnectionId, + peer_id: PeerId, _: &Multiaddr, _: Endpoint, _: PortUse, ) -> Result, ConnectionDenied> { - Ok(Handler::new(self.config.protocol_config())) + let (priority_sender, priority_receiver) = + channel(self.config.connection_handler_queue_len()); + let (non_priority_sender, non_priority_receiver) = + channel(self.config.connection_handler_queue_len()); + let sender = RpcSender { + priority: priority_sender, + non_priority: non_priority_sender, + }; + let receiver = RpcReceiver { + priority: priority_receiver.peekable(), + non_priority: non_priority_receiver.peekable(), + }; + // By default we assume a peer is only a floodsub peer. + // + // The protocol negotiation occurs once a message is sent/received. Once this happens we + // update the type of peer that this is in order to determine which kind of routing should + // occur. + let peer_info = + self.connected_peers + .entry(peer_id) + .or_insert_with_key(|_| PeerConnections { + kind: PeerKind::Floodsub, + connections: Default::default(), + }); + peer_info.connections.insert(connection_id, sender); + Ok(Handler::new(self.config.protocol_config(), receiver)) } fn on_connection_handler_event( @@ -3201,7 +3276,10 @@ fn peer_added_to_mesh( !conn.connections.is_empty(), "Must have at least one connection" ); - conn.connections[0] + conn.connections + .keys() + .next() + .expect("To be connected to peer") }; if let Some(topics) = known_topics { @@ -3220,7 +3298,7 @@ fn peer_added_to_mesh( events.push_back(ToSwarm::NotifyHandler { peer_id, event: HandlerIn::JoinedMesh, - handler: NotifyHandler::One(connection_id), + handler: NotifyHandler::One(*connection_id), }); } @@ -3240,7 +3318,8 @@ fn peer_removed_from_mesh( .get(&peer_id) .expect("To be connected to peer.") .connections - .first() + .keys() + .next() .expect("There should be at least one connection to a peer."); if let Some(topics) = known_topics { @@ -3384,7 +3463,7 @@ impl fmt::Debug for PublishConfig { #[cfg(test)] mod local_test { use super::*; - use crate::IdentTopic; + use crate::{types::RpcOut, IdentTopic}; use quickcheck::*; fn test_message() -> RawMessage { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index c44152ed2f9..7920d9ec69b 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -22,11 +22,18 @@ use super::*; use crate::subscription_filter::WhitelistSubscriptionFilter; -use crate::{config::ConfigBuilder, types::Rpc, IdentTopic as Topic}; +use crate::transform::{DataTransform, IdentityTransform}; +use crate::types::{RpcOut, RpcReceiver}; +use crate::ValidationError; +use crate::{ + config::Config, config::ConfigBuilder, types::Rpc, IdentTopic as Topic, TopicScoreParams, +}; use async_std::net::Ipv4Addr; use byteorder::{BigEndian, ByteOrder}; -use libp2p_core::ConnectedPoint; +use futures::stream; +use libp2p_core::{ConnectedPoint, Endpoint}; use rand::Rng; +use std::future::poll_fn; use std::thread::sleep; #[derive(Default, Debug)] @@ -53,7 +60,14 @@ where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, { - pub(crate) fn create_network(self) -> (Behaviour, Vec, Vec) { + pub(crate) fn create_network( + self, + ) -> ( + Behaviour, + Vec, + HashMap, + Vec, + ) { let keypair = libp2p_identity::Keypair::generate_ed25519(); // create a gossipsub struct let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( @@ -81,10 +95,11 @@ where // build and connect peer_no random peers let mut peers = vec![]; + let mut receivers = HashMap::new(); let empty = vec![]; for i in 0..self.peer_no { - peers.push(add_peer( + let (peer, receiver) = add_peer( &mut gs, if self.to_subscribe { &topic_hashes @@ -93,10 +108,12 @@ where }, i < self.outbound, i < self.explicit, - )); + ); + peers.push(peer); + receivers.insert(peer, receiver); } - (gs, peers, topic_hashes) + (gs, peers, receivers, topic_hashes) } fn peer_no(mut self, peer_no: usize) -> Self { @@ -160,7 +177,7 @@ fn add_peer( topic_hashes: &[TopicHash], outbound: bool, explicit: bool, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -174,7 +191,7 @@ fn add_peer_with_addr( outbound: bool, explicit: bool, address: Multiaddr, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -196,7 +213,7 @@ fn add_peer_with_addr_and_kind( explicit: bool, address: Multiaddr, kind: Option, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -215,6 +232,27 @@ where } }; + let (priority_sender, priority_receiver) = channel(gs.config.connection_handler_queue_len()); + let (non_priority_sender, non_priority_receiver) = + channel(gs.config.connection_handler_queue_len()); + let sender = RpcSender { + priority: priority_sender, + non_priority: non_priority_sender, + }; + let receiver = RpcReceiver { + priority: priority_receiver.peekable(), + non_priority: non_priority_receiver.peekable(), + }; + + let mut peer_info = PeerConnections { + kind: PeerKind::Floodsub, + connections: Default::default(), + }; + peer_info + .connections + .insert(ConnectionId::new_unchecked(0), sender); + gs.connected_peers.insert(peer, peer_info); + gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: peer, connection_id: ConnectionId::new_unchecked(0), @@ -245,7 +283,7 @@ where &peer, ); } - peer + (peer, receiver) } fn disconnect_peer(gs: &mut Behaviour, peer_id: &PeerId) @@ -262,12 +300,12 @@ where // peer_connections.connections should never be empty. let mut active_connections = peer_connections.connections.len(); - for connection_id in peer_connections.connections.clone() { + for connection_id in peer_connections.connections.clone().keys() { active_connections = active_connections.checked_sub(1).unwrap(); gs.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id: *peer_id, - connection_id, + connection_id: *connection_id, endpoint: &fake_endpoint, remaining_established: active_connections, cause: None, @@ -378,16 +416,16 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc { } } -#[test] +#[async_std::test] /// Test local node subscribing to a topic -fn test_subscribe() { +async fn test_subscribe() { // The node should: // - Create an empty vector in mesh[topic] // - Send subscription request to all peers // - run JOIN(topic) let subscribe_topic = vec![String::from("test_subscribe")]; - let (gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(subscribe_topic) .to_subscribe(true) @@ -399,27 +437,22 @@ fn test_subscribe() { ); // collect all the subscriptions - let subscriptions = gs - .events - .iter() + close_senders(&mut gs.connected_peers); + let subscriptions = stream::select_all(receivers.into_values()) .filter(|e| { - matches!( - e, - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(_)), - .. - } - ) + let matches = matches!(e, RpcOut::Subscribe(_)); + async move { matches } }) - .count(); + .count() + .await; // we sent a subscribe to all known peers assert_eq!(subscriptions, 20); } -#[test] /// Test unsubscribe. -fn test_unsubscribe() { +#[async_std::test] +async fn test_unsubscribe() { // Unsubscribe should: // - Remove the mesh entry for topic // - Send UNSUBSCRIBE to all known peers @@ -432,7 +465,7 @@ fn test_unsubscribe() { .collect::>(); // subscribe to topic_strings - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) @@ -460,16 +493,15 @@ fn test_unsubscribe() { ); // collect all the subscriptions - let subscriptions = gs - .events - .iter() - .fold(0, |collected_subscriptions, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(_)), - .. - } => collected_subscriptions + 1, - _ => collected_subscriptions, - }); + close_senders(&mut gs.connected_peers); + let subscriptions = stream::select_all(receivers.into_values()) + .fold(0, |collected_subscriptions, e| async move { + match e { + RpcOut::Subscribe(_) => collected_subscriptions + 1, + _ => collected_subscriptions, + } + }) + .await; // we sent a unsubscribe to all known peers, for two topics assert_eq!(subscriptions, 40); @@ -501,7 +533,7 @@ fn test_join() { .map(|t| Topic::new(t.clone())) .collect::>(); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, _receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) @@ -555,14 +587,27 @@ fn test_join() { gs.fanout .insert(topic_hashes[1].clone(), Default::default()); let mut new_peers: Vec = vec![]; + + let mut peers = vec![]; for _ in 0..3 { let random_peer = PeerId::random(); // inform the behaviour of a new peer + let address = "/ip4/127.0.0.1".parse::().unwrap(); + let peer = gs + .handle_established_inbound_connection( + ConnectionId::new_unchecked(0), + random_peer, + &address, + &address, + ) + .unwrap(); + peers.push(peer); + gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: random_peer, connection_id: ConnectionId::new_unchecked(0), endpoint: &ConnectedPoint::Dialer { - address: "/ip4/127.0.0.1".parse::().unwrap(), + address, role_override: Endpoint::Dialer, port_use: PortUse::Reuse, }, @@ -602,8 +647,8 @@ fn test_join() { } /// Test local node publish to subscribed topic -#[test] -fn test_publish_without_flood_publishing() { +#[async_std::test] +async fn test_publish_without_flood_publishing() { // node should: // - Send publish message to all peers // - Insert message into gs.mcache and gs.received @@ -615,7 +660,7 @@ fn test_publish_without_flood_publishing() { .unwrap(); let publish_topic = String::from("test_publish"); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![publish_topic.clone()]) .to_subscribe(true) @@ -639,19 +684,18 @@ fn test_publish_without_flood_publishing() { gs.publish(Topic::new(publish_topic), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Publish(message)), - .. - } => { - collected_publish.push(message); - collected_publish + close_senders(&mut gs.connected_peers); + let publishes = stream::select_all(receivers.into_values()) + .fold(vec![], |mut collected_publish, e| async move { + match e { + RpcOut::Publish(message) => { + collected_publish.push(message); + collected_publish + } + _ => collected_publish, } - _ => collected_publish, - }); + }) + .await; // Transform the inbound message let message = &gs @@ -680,8 +724,8 @@ fn test_publish_without_flood_publishing() { } /// Test local node publish to unsubscribed topic -#[test] -fn test_fanout() { +#[async_std::test] +async fn test_fanout() { // node should: // - Populate fanout peers // - Send publish message to fanout peers @@ -694,7 +738,7 @@ fn test_fanout() { .unwrap(); let fanout_topic = String::from("test_fanout"); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![fanout_topic.clone()]) .to_subscribe(true) @@ -726,19 +770,18 @@ fn test_fanout() { ); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Publish(message)), - .. - } => { - collected_publish.push(message); - collected_publish + close_senders(&mut gs.connected_peers); + let publishes = stream::select_all(receivers.into_values()) + .fold(vec![], |mut collected_publish, e| async move { + match e { + RpcOut::Publish(message) => { + collected_publish.push(message); + collected_publish + } + _ => collected_publish, } - _ => collected_publish, - }); + }) + .await; // Transform the inbound message let message = &gs @@ -765,10 +808,10 @@ fn test_fanout() { ); } -#[test] +#[async_std::test] /// Test the gossipsub NetworkBehaviour peer connection logic. -fn test_inject_connected() { - let (gs, peers, topic_hashes) = inject_nodes1() +async fn test_inject_connected() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -776,26 +819,28 @@ fn test_inject_connected() { // check that our subscriptions are sent to each of the peers // collect all the SendEvents - let subscriptions = gs - .events - .into_iter() - .filter_map(|e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(topic)), - peer_id, - .. - } => Some((peer_id, topic)), + close_senders(&mut gs.connected_peers); + let subscriptions = stream::select_all( + receivers + .into_iter() + .map(|(peer_id, receiver)| stream::repeat(peer_id).zip(receiver)), + ) + .filter_map(|(peer_id, e)| async move { + match e { + RpcOut::Subscribe(topic) => Some((peer_id, topic)), _ => None, - }) - .fold( - HashMap::>::new(), - |mut subs, (peer, sub)| { - let mut peer_subs = subs.remove(&peer).unwrap_or_default(); - peer_subs.push(sub.into_string()); - subs.insert(peer, peer_subs); - subs - }, - ); + } + }) + .fold( + HashMap::>::new(), + |mut subs, (peer, sub)| async move { + let mut peer_subs = subs.remove(&peer).unwrap_or_default(); + peer_subs.push(sub.into_string()); + subs.insert(peer, peer_subs); + subs + }, + ) + .await; // check that there are two subscriptions sent to each peer for peer_subs in subscriptions.values() { @@ -830,7 +875,7 @@ fn test_handle_received_subscriptions() { .iter() .map(|&t| String::from(t)) .collect(); - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topics) .to_subscribe(false) @@ -935,7 +980,7 @@ fn test_get_random_peers() { *p, PeerConnections { kind: PeerKind::Gossipsubv1_1, - connections: vec![ConnectionId::new_unchecked(0)], + connections: Default::default(), }, ) }) @@ -988,9 +1033,9 @@ fn test_get_random_peers() { } /// Tests that the correct message is sent when a peer asks for a message in our cache. -#[test] -fn test_handle_iwant_msg_cached() { - let (mut gs, peers, _) = inject_nodes1() +#[async_std::test] +async fn test_handle_iwant_msg_cached() { + let (mut gs, peers, receivers, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1018,18 +1063,18 @@ fn test_handle_iwant_msg_cached() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // the messages we are sending - let sent_messages = gs.events.into_iter().fold( - Vec::::new(), - |mut collected_messages, e| match e { - ToSwarm::NotifyHandler { event, .. } => { - if let HandlerIn::Message(RpcOut::Forward(message)) = event { - collected_messages.push(message); + close_senders(&mut gs.connected_peers); + let sent_messages = stream::select_all(receivers.into_values()) + .fold( + Vec::::new(), + |mut collected_messages, e| async move { + if let RpcOut::Forward(msg) = e { + collected_messages.push(msg) } collected_messages - } - _ => collected_messages, - }, - ); + }, + ) + .await; assert!( sent_messages @@ -1041,14 +1086,15 @@ fn test_handle_iwant_msg_cached() { } /// Tests that messages are sent correctly depending on the shifting of the message cache. -#[test] -fn test_handle_iwant_msg_cached_shifted() { - let (mut gs, peers, _) = inject_nodes1() +#[async_std::test] +async fn test_handle_iwant_msg_cached_shifted() { + let (mut gs, peers, receivers, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) .create_network(); + let mut rpcs = stream::select_all(receivers.into_values()); // perform 10 memshifts and check that it leaves the cache for shift in 1..10 { let raw_message = RawMessage { @@ -1075,29 +1121,31 @@ fn test_handle_iwant_msg_cached_shifted() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); - // is the message is being sent? - let message_exists = gs.events.iter().any(|e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Forward(message)), - .. - } => { - gs.config.message_id( + // default history_length is 5, expect no messages after shift > 5 + if shift < 5 { + // is the message being sent? + let mut message_exists = false; + if let Some(RpcOut::Forward(message)) = rpcs.next().await { + if gs.config.message_id( &gs.data_transform .inbound_transform(message.clone()) .unwrap(), ) == msg_id + { + message_exists = true; + } } - _ => false, - }); - // default history_length is 5, expect no messages after shift > 5 - if shift < 5 { + assert!( message_exists, "Expected the cached message to be sent to an IWANT peer before 5 shifts" ); } else { + // assert all receivers are empty. + close_senders(&mut gs.connected_peers); + let queue_is_empty = rpcs.next().await.is_none(); assert!( - !message_exists, + queue_is_empty, "Expected the cached message to not be sent to an IWANT peer after 5 shifts" ); } @@ -1107,7 +1155,7 @@ fn test_handle_iwant_msg_cached_shifted() { #[test] // tests that an event is not created when a peers asks for a message not in our cache fn test_handle_iwant_msg_not_cached() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1126,7 +1174,7 @@ fn test_handle_iwant_msg_not_cached() { #[test] // tests that an event is created when a peer shares that it has a message we want fn test_handle_ihave_subscribed_and_msg_not_cached() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1158,7 +1206,7 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // tests that an event is not created when a peer shares that it has a message that // we already have fn test_handle_ihave_subscribed_and_msg_cached() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1180,7 +1228,7 @@ fn test_handle_ihave_subscribed_and_msg_cached() { // test that an event is not created when a peer shares that it has a message in // a topic that we are not subscribed to fn test_handle_ihave_not_subscribed() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(20) .topics(vec![]) .to_subscribe(true) @@ -1206,7 +1254,7 @@ fn test_handle_ihave_not_subscribed() { // tests that a peer is added to our mesh when we are both subscribed // to the same topic fn test_handle_graft_is_subscribed() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1224,7 +1272,7 @@ fn test_handle_graft_is_subscribed() { // tests that a peer is not added to our mesh when they are subscribed to // a topic that we are not fn test_handle_graft_is_not_subscribed() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1249,7 +1297,7 @@ fn test_handle_graft_multiple_topics() { .map(|&t| String::from(t)) .collect(); - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topics) .to_subscribe(true) @@ -1279,7 +1327,7 @@ fn test_handle_graft_multiple_topics() { #[test] // tests that a peer is removed from our mesh fn test_handle_prune_peer_in_mesh() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1306,25 +1354,26 @@ fn test_handle_prune_peer_in_mesh() { ); } -fn count_control_msgs( +async fn count_control_msgs( gs: &Behaviour, + receivers: &mut HashMap, mut filter: impl FnMut(&PeerId, &ControlAction) -> bool, ) -> usize { - gs.control_pool + let mut collected_messages = gs + .control_pool .iter() .map(|(peer_id, actions)| actions.iter().filter(|m| filter(peer_id, m)).count()) - .sum::() - + gs.events - .iter() - .filter(|e| match e { - ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(RpcOut::Control(action)), - .. - } => filter(peer_id, action), - _ => false, - }) - .count() + .sum::(); + for (peer_id, receiver) in receivers.iter_mut() { + while !poll_fn(|cx| Poll::Ready(receiver.poll_is_empty(cx))).await { + if let Some(RpcOut::Control(action)) = receiver.next().await { + if filter(peer_id, &action) { + collected_messages += 1 + } + } + } + } + collected_messages } fn flush_events(gs: &mut Behaviour) { @@ -1332,10 +1381,27 @@ fn flush_events(gs: &mut Behaviour gs.events.clear(); } +async fn flush_receivers(receivers: &mut HashMap) { + for c in receivers.values_mut() { + while !poll_fn(|cx| Poll::Ready(c.poll_is_empty(cx))).await { + let _ = c.next().await; + } + } +} + +fn close_senders(peers: &mut HashMap) { + for peer in peers.values_mut() { + for sender in peer.connections.values_mut() { + sender.priority.close_channel(); + sender.non_priority.close_channel(); + } + } +} + #[test] // tests that a peer added as explicit peer gets connected to fn test_explicit_peer_gets_connected() { - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(Vec::new()) .to_subscribe(true) @@ -1368,7 +1434,7 @@ fn test_explicit_peer_reconnects() { .check_explicit_peers_ticks(2) .build() .unwrap(); - let (mut gs, others, _) = inject_nodes1() + let (mut gs, others, _, _) = inject_nodes1() .peer_no(1) .topics(Vec::new()) .to_subscribe(true) @@ -1416,9 +1482,9 @@ fn test_explicit_peer_reconnects() { ); } -#[test] -fn test_handle_graft_explicit_peer() { - let (mut gs, peers, topic_hashes) = inject_nodes1() +#[async_std::test] +async fn test_handle_graft_explicit_peer() { + let (mut gs, peers, mut receivers, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1435,21 +1501,23 @@ fn test_handle_graft_explicit_peer() { assert!(gs.mesh[&topic_hashes[1]].is_empty()); //check prunes + close_senders(&mut gs.connected_peers); assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == peer + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == peer && match m { ControlAction::Prune { topic_hash, .. } => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], _ => false, }) + .await >= 2, "Not enough prunes sent when grafting from explicit peer" ); } -#[test] -fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { - let (gs, peers, topic_hashes) = inject_nodes1() +#[async_std::test] +async fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { + let (gs, peers, mut receivers, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1465,24 +1533,26 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) + .await >= 1, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[0] + && matches!(m, ControlAction::Graft { .. })) + .await, 0, "A graft message got created to an explicit peer" ); } -#[test] -fn do_not_graft_explicit_peer() { - let (mut gs, others, topic_hashes) = inject_nodes1() +#[async_std::test] +async fn do_not_graft_explicit_peer() { + let (mut gs, others, mut receivers, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic")]) .to_subscribe(true) @@ -1497,16 +1567,17 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &others[0] - && matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &others[0] + && matches!(m, ControlAction::Graft { .. })) + .await, 0, "A graft message got created to an explicit peer" ); } -#[test] -fn do_forward_messages_to_explicit_peers() { - let (mut gs, peers, topic_hashes) = inject_nodes1() +#[async_std::test] +async fn do_forward_messages_to_explicit_peers() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1526,29 +1597,33 @@ fn do_forward_messages_to_explicit_peers() { validated: true, }; gs.handle_received_message(message.clone(), &local_id); - + close_senders(&mut gs.connected_peers); assert_eq!( - gs.events - .iter() - .filter(|e| match e { - ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(RpcOut::Forward(m)), - .. - } => { - peer_id == &peers[0] && m.data == message.data + stream::select_all( + receivers + .into_iter() + .map(|(peer_id, receiver)| stream::repeat(peer_id).zip(receiver)) + ) + .filter_map(|(peer_id, e)| { + let peers = peers.clone(); + let message = message.clone(); + async move { + match e { + RpcOut::Forward(m) if peer_id == peers[0] && m.data == message.data => Some(()), + _ => None, } - _ => false, - }) - .count(), + } + }) + .count() + .await, 1, "The message did not get forwarded to the explicit peer" ); } -#[test] -fn explicit_peers_not_added_to_mesh_on_subscribe() { - let (mut gs, peers, _) = inject_nodes1() +#[async_std::test] +async fn explicit_peers_not_added_to_mesh_on_subscribe() { + let (mut gs, peers, mut receivers, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1577,24 +1652,26 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) + .await > 0, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[0] + && matches!(m, ControlAction::Graft { .. })) + .await, 0, "A graft message got created to an explicit peer" ); } -#[test] -fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { - let (mut gs, peers, _) = inject_nodes1() +#[async_std::test] +async fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { + let (mut gs, peers, mut receivers, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1626,16 +1703,18 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) + .await >= 1, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[0] + && matches!(m, ControlAction::Graft { .. })) + .await, 0, "A graft message got created to an explicit peer" ); @@ -1643,7 +1722,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { #[test] fn no_gossip_gets_sent_to_explicit_peers() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1690,7 +1769,7 @@ fn test_mesh_addition() { let config: Config = Config::default(); // Adds mesh_low peers and PRUNE 2 giving us a deficit. - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(config.mesh_n() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1726,7 +1805,7 @@ fn test_mesh_subtraction() { // Adds mesh_low peers and PRUNE 2 giving us a deficit. let n = config.mesh_n_high() + 10; //make all outbound connections so that we allow grafting to all - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1750,7 +1829,7 @@ fn test_mesh_subtraction() { fn test_connect_to_px_peers_on_handle_prune() { let config: Config = Config::default(); - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1801,12 +1880,12 @@ fn test_connect_to_px_peers_on_handle_prune() { )); } -#[test] -fn test_send_px_and_backoff_in_prune() { +#[async_std::test] +async fn test_send_px_and_backoff_in_prune() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1823,7 +1902,7 @@ fn test_send_px_and_backoff_in_prune() { //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -1837,17 +1916,18 @@ fn test_send_px_and_backoff_in_prune() { config.prune_peers() && backoff.unwrap() == config.prune_backoff().as_secs(), _ => false, - }), + }) + .await, 1 ); } -#[test] -fn test_prune_backoffed_peer_on_graft() { +#[async_std::test] +async fn test_prune_backoffed_peer_on_graft() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1864,14 +1944,15 @@ fn test_prune_backoffed_peer_on_graft() { ); //ignore all messages until now - gs.events.clear(); + flush_events(&mut gs); + flush_receivers(&mut receivers).await; //handle graft gs.handle_graft(&peers[0], vec![topics[0].clone()]); //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -1883,20 +1964,21 @@ fn test_prune_backoffed_peer_on_graft() { peers.is_empty() && backoff.unwrap() == config.prune_backoff().as_secs(), _ => false, - }), + }) + .await, 1 ); } -#[test] -fn test_do_not_graft_within_backoff_period() { +#[async_std::test] +async fn test_do_not_graft_within_backoff_period() { let config = ConfigBuilder::default() .backoff_slack(1) .heartbeat_interval(Duration::from_millis(100)) .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1908,6 +1990,7 @@ fn test_do_not_graft_within_backoff_period() { //forget all events until now flush_events(&mut gs); + flush_receivers(&mut receivers).await; //call heartbeat gs.heartbeat(); @@ -1921,7 +2004,11 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &mut receivers, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) + .await, 0, "Graft message created too early within backoff period" ); @@ -1932,13 +2019,18 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &mut receivers, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) + .await + > 0, "No graft message was created after backoff period" ); } -#[test] -fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without_backoff() { +#[async_std::test] +async fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without_backoff() { //set default backoff period to 1 second let config = ConfigBuilder::default() .prune_backoff(Duration::from_millis(90)) @@ -1947,7 +2039,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1959,6 +2051,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //forget all events until now flush_events(&mut gs); + flush_receivers(&mut receivers).await; //call heartbeat gs.heartbeat(); @@ -1970,7 +2063,11 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &mut receivers, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) + .await, 0, "Graft message created too early within backoff period" ); @@ -1981,13 +2078,18 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &mut receivers, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) + .await + > 0, "No graft message was created after backoff period" ); } -#[test] -fn test_unsubscribe_backoff() { +#[async_std::test] +async fn test_unsubscribe_backoff() { const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100); let config = ConfigBuilder::default() .backoff_slack(1) @@ -2000,7 +2102,7 @@ fn test_unsubscribe_backoff() { let topic = String::from("test"); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec![topic.clone()]) .to_subscribe(true) @@ -2010,10 +2112,11 @@ fn test_unsubscribe_backoff() { let _ = gs.unsubscribe(&Topic::new(topic)); assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &mut receivers, |_, m| match m { ControlAction::Prune { backoff, .. } => backoff == &Some(1), _ => false, - }), + }) + .await, 1, "Peer should be pruned with `unsubscribe_backoff`." ); @@ -2022,6 +2125,7 @@ fn test_unsubscribe_backoff() { // forget all events until now flush_events(&mut gs); + flush_receivers(&mut receivers).await; // call heartbeat gs.heartbeat(); @@ -2035,7 +2139,11 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &mut receivers, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) + .await, 0, "Graft message created too early within backoff period" ); @@ -2046,18 +2154,23 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &mut receivers, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) + .await + > 0, "No graft message was created after backoff period" ); } -#[test] -fn test_flood_publish() { +#[async_std::test] +async fn test_flood_publish() { let config: Config = Config::default(); let topic = "test"; // Adds more peers than mesh can hold to test flood publishing - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, receivers, _) = inject_nodes1() .peer_no(config.mesh_n_high() + 10) .topics(vec![topic.into()]) .to_subscribe(true) @@ -2068,18 +2181,18 @@ fn test_flood_publish() { gs.publish(Topic::new(topic), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { + close_senders(&mut gs.connected_peers); + let publishes = stream::select_all(receivers.into_values()) + .fold(vec![], |mut collected_publish, e| async move { + match e { + RpcOut::Publish(message) => { collected_publish.push(message); + collected_publish } - collected_publish + _ => collected_publish, } - _ => collected_publish, - }); + }) + .await; // Transform the inbound message let message = &gs @@ -2107,13 +2220,13 @@ fn test_flood_publish() { ); } -#[test] -fn test_gossip_to_at_least_gossip_lazy_peers() { +#[async_std::test] +async fn test_gossip_to_at_least_gossip_lazy_peers() { let config: Config = Config::default(); //add more peers than in mesh to test gossipping //by default only mesh_n_low peers will get added to mesh - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, mut receivers, topic_hashes) = inject_nodes1() .peer_no(config.mesh_n_low() + config.gossip_lazy() + 1) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2141,24 +2254,25 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, |_, action| match action { + count_control_msgs(&gs, &mut receivers, |_, action| match action { ControlAction::IHave { topic_hash, message_ids, } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), _ => false, - }), + }) + .await, config.gossip_lazy() ); } -#[test] -fn test_gossip_to_at_most_gossip_factor_peers() { +#[async_std::test] +async fn test_gossip_to_at_most_gossip_factor_peers() { let config: Config = Config::default(); //add a lot of peers let m = config.mesh_n_low() + config.gossip_lazy() * (2.0 / config.gossip_factor()) as usize; - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, mut receivers, topic_hashes) = inject_nodes1() .peer_no(m) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2185,13 +2299,14 @@ fn test_gossip_to_at_most_gossip_factor_peers() { let msg_id = gs.config.message_id(message); //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, |_, action| match action { + count_control_msgs(&gs, &mut receivers, |_, action| match action { ControlAction::IHave { topic_hash, message_ids, } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), _ => false, - }), + }) + .await, ((m - config.mesh_n_low()) as f64 * config.gossip_factor()) as usize ); } @@ -2201,7 +2316,7 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { let config: Config = Config::default(); //enough peers to fill the mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2216,8 +2331,8 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); //create an outbound and an inbound peer - let inbound = add_peer(&mut gs, &topics, false, false); - let outbound = add_peer(&mut gs, &topics, true, false); + let (inbound, _in_reciver) = add_peer(&mut gs, &topics, false, false); + let (outbound, _out_receiver) = add_peer(&mut gs, &topics, true, false); //send grafts gs.handle_graft(&inbound, vec![topics[0].clone()]); @@ -2247,7 +2362,7 @@ fn test_do_not_remove_too_many_outbound_peers() { .unwrap(); //fill the mesh with inbound connections - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2262,7 +2377,7 @@ fn test_do_not_remove_too_many_outbound_peers() { //create m outbound connections and graft (we will accept the graft) let mut outbound = HashSet::new(); for _ in 0..m { - let peer = add_peer(&mut gs, &topics, true, false); + let (peer, _) = add_peer(&mut gs, &topics, true, false); outbound.insert(peer); gs.handle_graft(&peer, topics.clone()); } @@ -2285,7 +2400,7 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { let config: Config = Config::default(); // Fill full mesh with inbound peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2297,8 +2412,9 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { } //create config.mesh_outbound_min() many outbound connections without grafting + let mut peers = vec![]; for _ in 0..config.mesh_outbound_min() { - add_peer(&mut gs, &topics, true, false); + peers.push(add_peer(&mut gs, &topics, true, false)); } // Nothing changed in the mesh yet @@ -2314,12 +2430,12 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { ); } -#[test] -fn test_prune_negative_scored_peers() { +#[async_std::test] +async fn test_prune_negative_scored_peers() { let config = Config::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2343,7 +2459,7 @@ fn test_prune_negative_scored_peers() { //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -2355,7 +2471,8 @@ fn test_prune_negative_scored_peers() { peers.is_empty() && backoff.unwrap() == config.prune_backoff().as_secs(), _ => false, - }), + }) + .await, 1 ); } @@ -2364,7 +2481,7 @@ fn test_prune_negative_scored_peers() { fn test_dont_graft_to_negative_scored_peers() { let config = Config::default(); //init full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2376,8 +2493,8 @@ fn test_dont_graft_to_negative_scored_peers() { .create_network(); //add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 to negative gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 1); @@ -2403,7 +2520,7 @@ fn test_ignore_px_from_negative_scored_peer() { let config = Config::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2441,8 +2558,8 @@ fn test_ignore_px_from_negative_scored_peer() { ); } -#[test] -fn test_only_send_nonnegative_scoring_peers_in_px() { +#[async_std::test] +async fn test_only_send_nonnegative_scoring_peers_in_px() { let config = ConfigBuilder::default() .prune_peers(16) .do_px() @@ -2450,7 +2567,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { .unwrap(); // Build mesh with three peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(3) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2477,7 +2594,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { // Check that px in prune message only contains third peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &mut receivers, |peer_id, m| peer_id == &peers[1] && match m { ControlAction::Prune { topic_hash, @@ -2488,13 +2605,14 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { && px.len() == 1 && px[0].peer_id.as_ref().unwrap() == &peers[2], _ => false, - }), + }) + .await, 1 ); } -#[test] -fn test_do_not_gossip_to_peers_below_gossip_threshold() { +#[async_std::test] +async fn test_do_not_gossip_to_peers_below_gossip_threshold() { let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { @@ -2503,7 +2621,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { }; // Build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2517,8 +2635,8 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { } // Add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2550,7 +2668,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( - count_control_msgs(&gs, |peer, action| match action { + count_control_msgs(&gs, &mut receivers, |peer, action| match action { ControlAction::IHave { topic_hash, message_ids, @@ -2563,13 +2681,14 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { } } _ => false, - }), + }) + .await, 1 ); } -#[test] -fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { +#[async_std::test] +async fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { @@ -2578,7 +2697,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { }; // Build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2594,8 +2713,10 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { } // Add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2626,18 +2747,22 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { gs.handle_iwant(&p2, vec![msg_id.clone()]); // the messages we are sending - let sent_messages = gs - .events - .into_iter() - .fold(vec![], |mut collected_messages, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Forward(message)) = event { - collected_messages.push((peer_id, message)); - } + close_senders(&mut gs.connected_peers); + let sent_messages = stream::select_all( + receivers + .into_iter() + .map(|(peer_id, receiver)| stream::repeat(peer_id).zip(receiver)), + ) + .fold(vec![], |mut collected_messages, (peer_id, e)| async move { + match e { + RpcOut::Forward(message) => { + collected_messages.push((peer_id, message)); collected_messages } _ => collected_messages, - }); + } + }) + .await; //the message got sent to p2 assert!(sent_messages @@ -2657,8 +2782,8 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { .all(|(peer_id, msg)| !(peer_id == &p1 && gs.config.message_id(&msg) == msg_id))); } -#[test] -fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { +#[async_std::test] +async fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { @@ -2666,7 +2791,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { ..PeerScoreThresholds::default() }; //build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2682,8 +2807,8 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { } //add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 below peer_score_thresholds.gossip_threshold //note that penalties get squared so two penalties means a score of @@ -2714,7 +2839,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( - count_control_msgs(&gs, |peer, c| match c { + count_control_msgs(&gs, &mut receivers, |peer, c| match c { ControlAction::IWant { message_ids } => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); @@ -2723,13 +2848,14 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { false }, _ => false, - }), + }) + .await, 1 ); } -#[test] -fn test_do_not_publish_to_peer_below_publish_threshold() { +#[async_std::test] +async fn test_do_not_publish_to_peer_below_publish_threshold() { let config = ConfigBuilder::default() .flood_publish(false) .build() @@ -2742,7 +2868,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { }; //build mesh with no peers and no subscribed topics - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, mut receivers, _) = inject_nodes1() .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); @@ -2752,8 +2878,10 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { let topics = vec![topic.hash()]; //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.publish_threshold //note that penalties get squared so two penalties means a score of @@ -2771,26 +2899,30 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { gs.publish(topic, publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push((peer_id, message)); - } + close_senders(&mut gs.connected_peers); + let publishes = stream::select_all( + receivers + .into_iter() + .map(|(peer_id, receiver)| stream::repeat(peer_id).zip(receiver)), + ) + .fold(vec![], |mut collected_publish, (peer_id, e)| async move { + match e { + RpcOut::Publish(message) => { + collected_publish.push((peer_id, message)); collected_publish } _ => collected_publish, - }); + } + }) + .await; //assert only published to p2 assert_eq!(publishes.len(), 1); assert_eq!(publishes[0].0, p2); } -#[test] -fn test_do_not_flood_publish_to_peer_below_publish_threshold() { +#[async_std::test] +async fn test_do_not_flood_publish_to_peer_below_publish_threshold() { let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { @@ -2799,15 +2931,17 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { ..PeerScoreThresholds::default() }; //build mesh with no peers - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.publish_threshold //note that penalties get squared so two penalties means a score of @@ -2825,18 +2959,22 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push((peer_id, message)); - } + close_senders(&mut gs.connected_peers); + let publishes = stream::select_all( + receivers + .into_iter() + .map(|(peer_id, receiver)| stream::repeat(peer_id).zip(receiver)), + ) + .fold(vec![], |mut collected_publish, (peer_id, e)| async move { + match e { + RpcOut::Publish(message) => { + collected_publish.push((peer_id, message)); collected_publish } _ => collected_publish, - }); + } + }) + .await; //assert only published to p2 assert_eq!(publishes.len(), 1); @@ -2855,15 +2993,15 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { }; //build mesh with no peers - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, _, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config.clone()) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 below peer_score_thresholds.graylist_threshold //note that penalties get squared so two penalties means a score of @@ -2985,7 +3123,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() { ..PeerScoreThresholds::default() }; // Build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3056,7 +3194,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() { //build mesh with more peers than mesh can hold let n = config.mesh_n_high() + 1; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3116,7 +3254,7 @@ fn test_scoring_p1() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3198,7 +3336,7 @@ fn test_scoring_p2() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3298,7 +3436,7 @@ fn test_scoring_p3() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3399,7 +3537,7 @@ fn test_scoring_p3b() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3491,7 +3629,7 @@ fn test_scoring_p4_valid_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3550,7 +3688,7 @@ fn test_scoring_p4_invalid_signature() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3608,7 +3746,7 @@ fn test_scoring_p4_message_from_self() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3658,7 +3796,7 @@ fn test_scoring_p4_ignored_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3717,7 +3855,7 @@ fn test_scoring_p4_application_invalidated_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3779,7 +3917,7 @@ fn test_scoring_p4_application_invalid_message_from_two_peers() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3849,7 +3987,7 @@ fn test_scoring_p4_three_application_invalid_messages() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3933,7 +4071,7 @@ fn test_scoring_p4_decay() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3987,7 +4125,7 @@ fn test_scoring_p5() { }; //build mesh with one peer - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -4013,7 +4151,7 @@ fn test_scoring_p6() { ..Default::default() }; - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(vec![]) .to_subscribe(false) @@ -4026,20 +4164,20 @@ fn test_scoring_p6() { //create 5 peers with the same ip let addr = Multiaddr::from(Ipv4Addr::new(10, 1, 2, 3)); let peers = vec![ - add_peer_with_addr(&mut gs, &[], false, false, addr.clone()), - add_peer_with_addr(&mut gs, &[], false, false, addr.clone()), - add_peer_with_addr(&mut gs, &[], true, false, addr.clone()), - add_peer_with_addr(&mut gs, &[], true, false, addr.clone()), - add_peer_with_addr(&mut gs, &[], true, true, addr.clone()), + add_peer_with_addr(&mut gs, &[], false, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &[], false, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &[], true, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &[], true, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &[], true, true, addr.clone()).0, ]; //create 4 other peers with other ip let addr2 = Multiaddr::from(Ipv4Addr::new(10, 1, 2, 4)); let others = vec![ - add_peer_with_addr(&mut gs, &[], false, false, addr2.clone()), - add_peer_with_addr(&mut gs, &[], false, false, addr2.clone()), - add_peer_with_addr(&mut gs, &[], true, false, addr2.clone()), - add_peer_with_addr(&mut gs, &[], true, false, addr2.clone()), + add_peer_with_addr(&mut gs, &[], false, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &[], false, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &[], true, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &[], true, false, addr2.clone()).0, ]; //no penalties yet @@ -4146,7 +4284,7 @@ fn test_scoring_p7_grafts_before_backoff() { ..Default::default() }; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4223,7 +4361,7 @@ fn test_opportunistic_grafting() { ..Default::default() }; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(5) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4252,7 +4390,7 @@ fn test_opportunistic_grafting() { } //set scores for peers in the mesh - for (i, peer) in others.iter().enumerate().take(5) { + for (i, (peer, _receiver)) in others.iter().enumerate().take(5) { gs.set_application_score(peer, 0.0 + i as f64); } @@ -4292,15 +4430,15 @@ fn test_opportunistic_grafting() { ); assert!( - gs.mesh[&topics[0]].is_disjoint(&others.iter().cloned().take(2).collect()), + gs.mesh[&topics[0]].is_disjoint(&others.iter().map(|(p, _)| p).cloned().take(2).collect()), "peers below or equal to median should not be added in opportunistic grafting" ); } -#[test] -fn test_ignore_graft_from_unknown_topic() { +#[async_std::test] +async fn test_ignore_graft_from_unknown_topic() { //build gossipsub without subscribing to any topics - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, mut receivers, _) = inject_nodes1() .peer_no(0) .topics(vec![]) .to_subscribe(false) @@ -4311,24 +4449,29 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, |_, a| matches!(a, ControlAction::Prune { .. })), + count_control_msgs(&gs, &mut receivers, |_, a| matches!( + a, + ControlAction::Prune { .. } + )) + .await, 0, "we should not prune after graft in unknown topic" ); } -#[test] -fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { +#[async_std::test] +async fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { let config = Config::default(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + receivers.insert(peer, receiver); //receive a message let mut seq = 0; @@ -4342,7 +4485,8 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.handle_received_message(m1, &PeerId::random()); //clear events - gs.events.clear(); + flush_events(&mut gs); + flush_receivers(&mut receivers).await; //the first gossip_retransimission many iwants return the valid message, all others are // ignored. @@ -4350,30 +4494,29 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.handle_iwant(&peer, vec![id.clone()]); } + close_senders(&mut gs.connected_peers); assert_eq!( - gs.events - .iter() - .filter(|e| matches!( - e, - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Forward(_)), - .. + stream::select_all(receivers.into_values()) + .fold(0, |mut fwds, e| async move { + if let RpcOut::Forward(_) = e { + fwds += 1; } - )) - .count(), + fwds + }) + .await, config.gossip_retransimission() as usize, "not more then gossip_retransmission many messages get sent back" ); } -#[test] -fn test_ignore_too_many_ihaves() { +#[async_std::test] +async fn test_ignore_too_many_ihaves() { let config = ConfigBuilder::default() .max_ihave_messages(10) .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4381,7 +4524,8 @@ fn test_ignore_too_many_ihaves() { .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + receivers.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4410,8 +4554,8 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( - count_control_msgs(&gs, |p, action| p == &peer - && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), + count_control_msgs(&gs, &mut receivers, |p, action| p == &peer + && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))).await, 10, "exactly the first ten ihaves should be processed and one iwant for each created" ); @@ -4433,22 +4577,23 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 20 messages assert_eq!( - count_control_msgs(&gs, |p, action| p == &peer - && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1)), + count_control_msgs(&gs, &mut receivers, |p, action| p == &peer + && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1)) + .await, 20, "all 20 should get sent" ); } -#[test] -fn test_ignore_too_many_messages_in_ihave() { +#[async_std::test] +async fn test_ignore_too_many_messages_in_ihave() { let config = ConfigBuilder::default() .max_ihave_messages(10) .max_ihave_length(10) .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4456,7 +4601,8 @@ fn test_ignore_too_many_messages_in_ihave() { .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + receivers.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4482,7 +4628,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &mut receivers, |p, action| match action { ControlAction::IWant { message_ids } => p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); @@ -4490,7 +4636,8 @@ fn test_ignore_too_many_messages_in_ihave() { true }, _ => false, - }), + }) + .await, 2, "the third ihave should get ignored and no iwant sent" ); @@ -4507,28 +4654,29 @@ fn test_ignore_too_many_messages_in_ihave() { //we sent 20 iwant messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &mut receivers, |p, action| match action { ControlAction::IWant { message_ids } => p == &peer && { sum += message_ids.len(); true }, _ => false, - }), + }) + .await, 3 ); assert_eq!(sum, 20, "exactly 20 iwants should get sent"); } -#[test] -fn test_limit_number_of_message_ids_inside_ihave() { +#[async_std::test] +async fn test_limit_number_of_message_ids_inside_ihave() { let config = ConfigBuilder::default() .max_ihave_messages(10) .max_ihave_length(100) .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4541,8 +4689,8 @@ fn test_limit_number_of_message_ids_inside_ihave() { } //add two other peers not in the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _) = add_peer(&mut gs, &topics, false, false); + let (p2, _) = add_peer(&mut gs, &topics, false, false); //receive 200 messages from another peer let mut seq = 0; @@ -4561,7 +4709,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves2 = HashSet::new(); assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &mut receivers, |p, action| match action { ControlAction::IHave { message_ids, .. } => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); @@ -4574,7 +4722,8 @@ fn test_limit_number_of_message_ids_inside_ihave() { } } _ => false, - }), + }) + .await, 2, "should have emitted one ihave to p1 and one to p2" ); @@ -4618,7 +4767,7 @@ fn test_iwant_penalties() { }; // fill the mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4642,7 +4791,7 @@ fn test_iwant_penalties() { let mut first_messages = Vec::new(); let mut second_messages = Vec::new(); let mut seq = 0; - for peer in &other_peers { + for (peer, _receiver) in &other_peers { let msg1 = random_message(&mut seq, &topics); let msg2 = random_message(&mut seq, &topics); @@ -4665,19 +4814,19 @@ fn test_iwant_penalties() { } // the peers send us all the first message ids in time - for (index, peer) in other_peers.iter().enumerate() { + for (index, (peer, _receiver)) in other_peers.iter().enumerate() { gs.handle_received_message(first_messages[index].clone(), peer); } // now we do a heartbeat no penalization should have been applied yet gs.heartbeat(); - for peer in &other_peers { + for (peer, _receiver) in &other_peers { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 0.0); } // receive the first twenty of the other peers then send their response - for (index, peer) in other_peers.iter().enumerate().take(20) { + for (index, (peer, _receiver)) in other_peers.iter().enumerate().take(20) { gs.handle_received_message(second_messages[index].clone(), peer); } @@ -4688,7 +4837,7 @@ fn test_iwant_penalties() { gs.heartbeat(); // now we get the second messages from the last 80 peers. - for (index, peer) in other_peers.iter().enumerate() { + for (index, (peer, _receiver)) in other_peers.iter().enumerate() { if index > 19 { gs.handle_received_message(second_messages[index].clone(), peer); } @@ -4702,7 +4851,7 @@ fn test_iwant_penalties() { let mut single_penalized = 0; let mut double_penalized = 0; - for (i, peer) in other_peers.iter().enumerate() { + for (i, (peer, _receiver)) in other_peers.iter().enumerate() { let score = gs.peer_score.as_ref().unwrap().0.score(peer); if score == 0.0 { not_penalized += 1; @@ -4724,13 +4873,13 @@ fn test_iwant_penalties() { assert_eq!(double_penalized, 0); } -#[test] -fn test_publish_to_floodsub_peers_without_flood_publish() { +#[async_std::test] +async fn test_publish_to_floodsub_peers_without_flood_publish() { let config = ConfigBuilder::default() .flood_publish(false) .build() .unwrap(); - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4738,7 +4887,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .create_network(); //add two floodsub peer, one explicit, one implicit - let p1 = add_peer_with_addr_and_kind( + let (p1, receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4746,7 +4895,11 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { Multiaddr::empty(), Some(PeerKind::Floodsub), ); - let p2 = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + receivers.insert(p1, receiver1); + + let (p2, receiver2) = + add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + receivers.insert(p2, receiver2); //p1 and p2 are not in the mesh assert!(!gs.mesh[&topics[0]].contains(&p1) && !gs.mesh[&topics[0]].contains(&p2)); @@ -4756,35 +4909,35 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = gs - .events - .iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == &p1 || peer_id == &p2 { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push(message); - } - } - collected_publish - } - _ => collected_publish, - }); + close_senders(&mut gs.connected_peers); + let publishes = stream::select_all( + receivers + .into_iter() + .map(|(peer_id, receiver)| stream::repeat(peer_id).zip(receiver)), + ) + .fold(0, |mut collected_publish, (peer_id, e)| async move { + if matches!(e, + RpcOut::Publish(_) if peer_id == p1 || peer_id == p2) + { + collected_publish += 1; + } + collected_publish + }) + .await; assert_eq!( - publishes.len(), - 2, + publishes, 2, "Should send a publish message to all floodsub peers" ); } -#[test] -fn test_do_not_use_floodsub_in_fanout() { +#[async_std::test] +async fn test_do_not_use_floodsub_in_fanout() { let config = ConfigBuilder::default() .flood_publish(false) .build() .unwrap(); - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, mut receivers, _) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(Vec::new()) .to_subscribe(false) @@ -4795,7 +4948,7 @@ fn test_do_not_use_floodsub_in_fanout() { let topics = vec![topic.hash()]; //add two floodsub peer, one explicit, one implicit - let p1 = add_peer_with_addr_and_kind( + let (p1, receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4803,31 +4956,35 @@ fn test_do_not_use_floodsub_in_fanout() { Multiaddr::empty(), Some(PeerKind::Floodsub), ); - let p2 = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + receivers.insert(p1, receiver1); + let (p2, receiver2) = + add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + + receivers.insert(p2, receiver2); //publish a message let publish_data = vec![0; 42]; gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = gs - .events - .iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == &p1 || peer_id == &p2 { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push(message); - } - } - collected_publish - } - _ => collected_publish, - }); + close_senders(&mut gs.connected_peers); + let publishes = stream::select_all( + receivers + .into_iter() + .map(|(peer_id, receiver)| stream::repeat(peer_id).zip(receiver)), + ) + .fold(0, |mut collected_publish, (peer_id, e)| async move { + if matches!(e, + RpcOut::Publish(_) if peer_id == p1 || peer_id == p2) + { + collected_publish += 1; + } + collected_publish + }) + .await; assert_eq!( - publishes.len(), - 2, + publishes, 2, "Should send a publish message to all floodsub peers" ); @@ -4839,7 +4996,7 @@ fn test_do_not_use_floodsub_in_fanout() { #[test] fn test_dont_add_floodsub_peers_to_mesh_on_join() { - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(Vec::new()) .to_subscribe(false) @@ -4867,16 +5024,16 @@ fn test_dont_add_floodsub_peers_to_mesh_on_join() { ); } -#[test] -fn test_dont_send_px_to_old_gossipsub_peers() { - let (mut gs, _, topics) = inject_nodes1() +#[async_std::test] +async fn test_dont_send_px_to_old_gossipsub_peers() { + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(0) .topics(vec!["test".into()]) .to_subscribe(false) .create_network(); //add an old gossipsub peer - let p1 = add_peer_with_addr_and_kind( + let (p1, _receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4894,19 +5051,20 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &mut receivers, |_, m| match m { ControlAction::Prune { peers: px, .. } => !px.is_empty(), _ => false, - }), + }) + .await, 0, "Should not send px to floodsub peers" ); } -#[test] -fn test_dont_send_floodsub_peers_in_px() { +#[async_std::test] +async fn test_dont_send_floodsub_peers_in_px() { //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -4932,10 +5090,11 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &mut receivers, |_, m| match m { ControlAction::Prune { peers: px, .. } => !px.is_empty(), _ => false, - }), + }) + .await, 0, "Should not include floodsub peers in px" ); @@ -4943,7 +5102,7 @@ fn test_dont_send_floodsub_peers_in_px() { #[test] fn test_dont_add_floodsub_peers_to_mesh_in_heartbeat() { - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, _, topics) = inject_nodes1() .peer_no(0) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4971,7 +5130,7 @@ fn test_dont_add_floodsub_peers_to_mesh_in_heartbeat() { // Some very basic test of public api methods. #[test] fn test_public_api() { - let (gs, peers, topic_hashes) = inject_nodes1() + let (gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(4) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5003,7 +5162,7 @@ fn test_public_api() { fn test_subscribe_to_invalid_topic() { let t1 = Topic::new("t1"); let t2 = Topic::new("t2"); - let (mut gs, _, _) = inject_nodes::() + let (mut gs, _, _, _) = inject_nodes::() .subscription_filter(WhitelistSubscriptionFilter( vec![t1.hash()].into_iter().collect(), )) @@ -5014,10 +5173,10 @@ fn test_subscribe_to_invalid_topic() { assert!(gs.subscribe(&t2).is_err()); } -#[test] -fn test_subscribe_and_graft_with_negative_score() { +#[async_std::test] +async fn test_subscribe_and_graft_with_negative_score() { //simulate a communication between two gossipsub instances - let (mut gs1, _, topic_hashes) = inject_nodes1() + let (mut gs1, _, _, topic_hashes) = inject_nodes1() .topics(vec!["test".into()]) .scoring(Some(( PeerScoreParams::default(), @@ -5025,14 +5184,14 @@ fn test_subscribe_and_graft_with_negative_score() { ))) .create_network(); - let (mut gs2, _, _) = inject_nodes1().create_network(); + let (mut gs2, _, mut receivers, _) = inject_nodes1().create_network(); let connection_id = ConnectionId::new_unchecked(0); let topic = Topic::new("test"); - let p2 = add_peer(&mut gs1, &Vec::new(), true, false); - let p1 = add_peer(&mut gs2, &topic_hashes, false, false); + let (p2, _receiver1) = add_peer(&mut gs1, &Vec::new(), true, false); + let (p1, _receiver2) = add_peer(&mut gs2, &topic_hashes, false, false); //add penalty to peer p2 gs1.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); @@ -5042,43 +5201,43 @@ fn test_subscribe_and_graft_with_negative_score() { //subscribe to topic in gs2 gs2.subscribe(&topic).unwrap(); - let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, gs2: &mut Behaviour<_, _>| { - //collect messages to p1 - let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == p1 { - if let HandlerIn::Message(m) = event { - Some(m) - } else { - None - } - } else { - None - } + //forward the subscribe message + for (peer_id, r) in receivers.iter_mut() { + while !poll_fn(|cx| Poll::Ready(r.poll_is_empty(cx))).await { + if peer_id == &p1 { + let rpc = r.next().await.unwrap(); + gs1.on_connection_handler_event( + p2, + connection_id, + HandlerEvent::Message { + rpc: proto_to_message(&rpc.into_protobuf()), + invalid_messages: vec![], + }, + ); } - _ => None, - }); - for message in messages_to_p1 { - gs1.on_connection_handler_event( - p2, - connection_id, - HandlerEvent::Message { - rpc: proto_to_message(&message.into_protobuf()), - invalid_messages: vec![], - }, - ); } - }; - - //forward the subscribe message - forward_messages_to_p1(&mut gs1, &mut gs2); + } //heartbeats on both gs1.heartbeat(); gs2.heartbeat(); //forward messages again - forward_messages_to_p1(&mut gs1, &mut gs2); + for (peer_id, r) in receivers.iter_mut() { + while !poll_fn(|cx| Poll::Ready(r.poll_is_empty(cx))).await { + if peer_id == &p1 { + let rpc = r.next().await.unwrap(); + gs1.on_connection_handler_event( + p2, + connection_id, + HandlerEvent::Message { + rpc: proto_to_message(&rpc.into_protobuf()), + invalid_messages: vec![], + }, + ); + } + } + } //nobody got penalized assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score); @@ -5095,7 +5254,7 @@ fn test_graft_without_subscribe() { let topic = String::from("test_subscribe"); let subscribe_topic = vec![topic.clone()]; let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()]; - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(1) .topics(subscribe_topic) .to_subscribe(false) diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index febe2514a30..bd0df9201d7 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -95,6 +95,7 @@ pub struct Config { max_ihave_messages: usize, iwant_followup_time: Duration, published_message_ids_cache_time: Duration, + connection_handler_queue_len: usize, } impl Config { @@ -350,6 +351,11 @@ impl Config { pub fn published_message_ids_cache_time(&self) -> Duration { self.published_message_ids_cache_time } + + /// The max number of messages a `ConnectionHandler` can buffer. The default is 1000. + pub fn connection_handler_queue_len(&self) -> usize { + self.connection_handler_queue_len + } } impl Default for Config { @@ -417,6 +423,7 @@ impl Default for ConfigBuilder { max_ihave_messages: 10, iwant_followup_time: Duration::from_secs(3), published_message_ids_cache_time: Duration::from_secs(10), + connection_handler_queue_len: 1000, }, invalid_protocol: false, } @@ -782,6 +789,11 @@ impl ConfigBuilder { self } + /// The max number of messages a `ConnectionHandler` can buffer. The default is 5000. + pub fn connection_handler_queue_len(&mut self, len: usize) { + self.config.connection_handler_queue_len = len; + } + /// Constructs a [`Config`] from the given configuration and validates the settings. pub fn build(&self) -> Result { // check all constraints on config diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 88def13a521..812fe4c8f00 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -20,7 +20,7 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::rpc_proto::proto; -use crate::types::{PeerKind, RawMessage, Rpc, RpcOut}; +use crate::types::{PeerKind, RawMessage, Rpc, RpcReceiver}; use crate::ValidationError; use asynchronous_codec::Framed; use futures::future::Either; @@ -32,7 +32,6 @@ use libp2p_swarm::handler::{ FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; use libp2p_swarm::Stream; -use smallvec::SmallVec; use std::{ pin::Pin, task::{Context, Poll}, @@ -61,8 +60,6 @@ pub enum HandlerEvent { #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum HandlerIn { - /// A gossipsub message to send. - Message(RpcOut), /// The peer has joined the mesh. JoinedMesh, /// The peer has left the mesh. @@ -94,8 +91,8 @@ pub struct EnabledHandler { /// The single long-lived inbound substream. inbound_substream: Option, - /// Queue of values that we want to send to the remote. - send_queue: SmallVec<[proto::RPC; 16]>, + /// Queue of values that we want to send to the remote + send_queue: RpcReceiver, /// Flag indicating that an outbound substream is being established to prevent duplicate /// requests. @@ -159,7 +156,7 @@ enum OutboundSubstreamState { impl Handler { /// Builds a new [`Handler`]. - pub fn new(protocol_config: ProtocolConfig) -> Self { + pub fn new(protocol_config: ProtocolConfig, message_queue: RpcReceiver) -> Self { Handler::Enabled(EnabledHandler { listen_protocol: protocol_config, inbound_substream: None, @@ -167,11 +164,11 @@ impl Handler { outbound_substream_establishing: false, outbound_substream_attempts: 0, inbound_substream_attempts: 0, - send_queue: SmallVec::new(), peer_kind: None, peer_kind_sent: false, last_io_activity: Instant::now(), in_mesh: false, + send_queue: message_queue, }) } } @@ -232,7 +229,7 @@ impl EnabledHandler { } // determine if we need to create the outbound stream - if !self.send_queue.is_empty() + if !self.send_queue.poll_is_empty(cx) && self.outbound_substream.is_none() && !self.outbound_substream_establishing { @@ -250,10 +247,11 @@ impl EnabledHandler { ) { // outbound idle state Some(OutboundSubstreamState::WaitingOutput(substream)) => { - if let Some(message) = self.send_queue.pop() { - self.send_queue.shrink_to_fit(); - self.outbound_substream = - Some(OutboundSubstreamState::PendingSend(substream, message)); + if let Poll::Ready(Some(message)) = self.send_queue.poll_next_unpin(cx) { + self.outbound_substream = Some(OutboundSubstreamState::PendingSend( + substream, + message.into_protobuf(), + )); continue; } @@ -409,7 +407,6 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, message: HandlerIn) { match self { Handler::Enabled(handler) => match message { - HandlerIn::Message(m) => handler.send_queue.push(m.into_protobuf()), HandlerIn::JoinedMesh => { handler.in_mesh = true; } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index d1b92ff0ba8..98c4e12ebfe 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -19,13 +19,19 @@ // DEALINGS IN THE SOFTWARE. //! A collection of types using the Gossipsub system. +use crate::metrics::Metrics; use crate::TopicHash; +use futures::channel::mpsc::{Receiver, Sender}; +use futures::stream::Peekable; +use futures::Stream; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; -use std::fmt; +use std::collections::HashMap; use std::fmt::Debug; +use std::task::{Context, Poll}; +use std::{fmt, pin::Pin}; use crate::rpc_proto::proto; #[cfg(feature = "serde")] @@ -71,12 +77,115 @@ impl std::fmt::Debug for MessageId { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub(crate) struct PeerConnections { /// The kind of protocol the peer supports. pub(crate) kind: PeerKind, /// Its current connections. - pub(crate) connections: Vec, + pub(crate) connections: HashMap, +} + +impl PeerConnections { + /// Send a `RpcOut::Control` message to the `RpcReceiver` + /// this is high priority. By cloning `futures::channel::mpsc::Sender` + /// we get one extra slot in the channel's capacity. + pub(crate) fn control(&mut self, control: ControlAction) { + let mut rpc = RpcOut::Control(control); + for sender in self.connections.values_mut() { + match sender.priority.clone().try_send(rpc) { + Ok(_) => { + return; + } + Err(err) => { + rpc = err.into_inner(); + } + } + } + } + + /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` + /// this is high priority. By cloning `futures::channel::mpsc::Sender` + /// we get one extra slot in the channel's capacity. + pub(crate) fn subscribe(&mut self, topic: TopicHash) { + let mut rpc = RpcOut::Subscribe(topic); + for sender in self.connections.values_mut() { + match sender.priority.clone().try_send(rpc) { + Ok(_) => { + return; + } + Err(err) => { + rpc = err.into_inner(); + } + } + } + } + + /// Send a `RpcOut::Unsubscribe` message to the `RpcReceiver` + /// this is high priority. By cloning `futures::channel::mpsc::Sender` + /// we get one extra slot in the channel's capacity. + pub(crate) fn unsubscribe(&mut self, topic: TopicHash) { + let mut rpc = RpcOut::Unsubscribe(topic); + for sender in self.connections.values_mut() { + match sender.priority.clone().try_send(rpc) { + Ok(_) => { + return; + } + Err(err) => { + rpc = err.into_inner(); + } + } + } + } + + /// Send a `RpcOut::Publish` message to the `RpcReceiver` + /// this is high priority. If message sending fails, an `Err` is returned. + pub(crate) fn publish( + &mut self, + message: RawMessage, + metrics: Option<&mut Metrics>, + ) -> Result<(), ()> { + let mut rpc = RpcOut::Publish(message.clone()); + for sender in self.connections.values_mut() { + match sender.priority.try_send(rpc) { + Ok(_) => { + if let Some(m) = metrics { + m.msg_sent(&message.topic, message.raw_protobuf_len()); + } + return Ok(()); + } + Err(err) if err.is_full() => return Err(()), + // Channel is closed, try another sender. + Err(err) => { + rpc = err.into_inner(); + } + } + } + unreachable!("At least one peer should be available"); + } + + /// Send a `RpcOut::Forward` message to the `RpcReceiver` + /// this is low priority. If the queue is full the message is discarded. + pub(crate) fn forward(&mut self, message: RawMessage, metrics: Option<&mut Metrics>) { + let mut rpc = RpcOut::Forward(message.clone()); + for sender in self.connections.values_mut() { + match sender.non_priority.try_send(rpc) { + Ok(_) => { + if let Some(m) = metrics { + m.msg_sent(&message.topic, message.raw_protobuf_len()); + } + return; + } + Err(err) if err.is_full() => { + tracing::trace!("Queue is full, dropped Forward message"); + return; + } + // Channel is closed, try another sender. + Err(err) => { + rpc = err.into_inner(); + } + } + } + } } /// Describes the types of peers that can exist in the gossipsub context. @@ -512,3 +621,45 @@ impl fmt::Display for PeerKind { f.write_str(self.as_ref()) } } + +/// `RpcOut` sender that is priority aware. +#[derive(Debug, Clone)] +pub(crate) struct RpcSender { + pub(crate) priority: Sender, + pub(crate) non_priority: Sender, +} + +/// `RpcOut` receiver that is priority aware. +#[derive(Debug)] +pub struct RpcReceiver { + pub(crate) priority: Peekable>, + pub(crate) non_priority: Peekable>, +} + +impl RpcReceiver { + pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> bool { + if let Poll::Ready(Some(_)) = Pin::new(&mut self.priority).poll_peek(cx) { + return false; + } + if let Poll::Ready(Some(_)) = Pin::new(&mut self.non_priority).poll_peek(cx) { + return false; + } + true + } +} + +impl Stream for RpcReceiver { + type Item = RpcOut; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // The priority queue is first polled. + if let Poll::Ready(Some(rpc)) = Pin::new(&mut self.priority).poll_next(cx) { + return Poll::Ready(Some(rpc)); + } + // Then we poll the non priority. + Pin::new(&mut self.non_priority).poll_next(cx) + } +}