diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 69be07d42f0..ecbeb9453c5 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -18,11 +18,20 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{Transport, transport::{TransportError, ListenerEvent}}; +use crate::{ + transport::{ListenerEvent, TransportError}, + Transport, +}; use fnv::FnvHashMap; -use futures::{future::{self, Ready}, prelude::*, channel::mpsc, task::Context, task::Poll}; +use futures::{ + channel::mpsc, + future::{self, Ready}, + prelude::*, + task::Context, + task::Poll, +}; use lazy_static::lazy_static; -use multiaddr::{Protocol, Multiaddr}; +use multiaddr::{Multiaddr, Protocol}; use parking_lot::Mutex; use rw_stream_sink::RwStreamSink; use std::{collections::hash_map::Entry, error, fmt, io, num::NonZeroU64, pin::Pin}; @@ -66,7 +75,7 @@ impl Hub { let (tx, rx) = mpsc::channel(2); match hub.entry(port) { Entry::Occupied(_) => return None, - Entry::Vacant(e) => e.insert(tx) + Entry::Vacant(e) => e.insert(tx), }; Some((rx, port)) @@ -103,7 +112,8 @@ impl DialFuture { fn new(port: NonZeroU64) -> Option { let sender = HUB.get(&port)?.clone(); - let (_dial_port_channel, dial_port) = HUB.register_port(0) + let (_dial_port_channel, dial_port) = HUB + .register_port(0) .expect("there to be some random unoccupied port."); let (a_tx, a_rx) = mpsc::channel(4096); @@ -129,14 +139,15 @@ impl Future for DialFuture { type Output = Result>, MemoryTransportError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.sender.poll_ready(cx) { Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(())) => {}, + Poll::Ready(Ok(())) => {} Poll::Ready(Err(_)) => return Poll::Ready(Err(MemoryTransportError::Unreachable)), } - let channel_to_send = self.channel_to_send.take() + let channel_to_send = self + .channel_to_send + .take() .expect("Future should not be polled again once complete"); let dial_port = self.dial_port; match self.sender.start_send((channel_to_send, dial_port)) { @@ -144,8 +155,10 @@ impl Future for DialFuture { Ok(()) => {} } - Poll::Ready(Ok(self.channel_to_return.take() - .expect("Future should not be polled again once complete"))) + Poll::Ready(Ok(self + .channel_to_return + .take() + .expect("Future should not be polled again once complete"))) } } @@ -172,7 +185,7 @@ impl Transport for MemoryTransport { port, addr: Protocol::Memory(port.get()).into(), receiver: rx, - tell_listen_addr: true + tell_listen_addr: true, }; Ok(listener) @@ -222,16 +235,19 @@ pub struct Listener { /// Receives incoming connections. receiver: ChannelReceiver, /// Generate `ListenerEvent::NewAddress` to inform about our listen address. - tell_listen_addr: bool + tell_listen_addr: bool, } impl Stream for Listener { - type Item = Result>, MemoryTransportError>>, MemoryTransportError>, MemoryTransportError>; + type Item = Result< + ListenerEvent>, MemoryTransportError>>, MemoryTransportError>, + MemoryTransportError, + >; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.tell_listen_addr { self.tell_listen_addr = false; - return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(self.addr.clone())))) + return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(self.addr.clone())))); } let (channel, dial_port) = match Stream::poll_next(Pin::new(&mut self.receiver), cx) { @@ -243,7 +259,7 @@ impl Stream for Listener { let event = ListenerEvent::Upgrade { upgrade: future::ready(Ok(channel)), local_addr: self.addr.clone(), - remote_addr: Protocol::Memory(dial_port.get()).into() + remote_addr: Protocol::Memory(dial_port.get()).into(), }; Poll::Ready(Some(Ok(event))) @@ -295,8 +311,7 @@ pub struct Chan> { dial_port: Option, } -impl Unpin for Chan { -} +impl Unpin for Chan {} impl Stream for Chan { type Item = Result; @@ -314,12 +329,15 @@ impl Sink for Chan { type Error = io::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.outgoing.poll_ready(cx) + self.outgoing + .poll_ready(cx) .map(|v| v.map_err(|_| io::ErrorKind::BrokenPipe.into())) } fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.outgoing.start_send(item).map_err(|_| io::ErrorKind::BrokenPipe.into()) + self.outgoing + .start_send(item) + .map_err(|_| io::ErrorKind::BrokenPipe.into()) } fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { @@ -355,30 +373,59 @@ mod tests { assert_eq!(parse_memory_addr(&"/memory/5".parse().unwrap()), Ok(5)); assert_eq!(parse_memory_addr(&"/tcp/150".parse().unwrap()), Err(())); assert_eq!(parse_memory_addr(&"/memory/0".parse().unwrap()), Ok(0)); - assert_eq!(parse_memory_addr(&"/memory/5/tcp/150".parse().unwrap()), Err(())); - assert_eq!(parse_memory_addr(&"/tcp/150/memory/5".parse().unwrap()), Err(())); - assert_eq!(parse_memory_addr(&"/memory/1234567890".parse().unwrap()), Ok(1_234_567_890)); + assert_eq!( + parse_memory_addr(&"/memory/5/tcp/150".parse().unwrap()), + Err(()) + ); + assert_eq!( + parse_memory_addr(&"/tcp/150/memory/5".parse().unwrap()), + Err(()) + ); + assert_eq!( + parse_memory_addr(&"/memory/1234567890".parse().unwrap()), + Ok(1_234_567_890) + ); } #[test] fn listening_twice() { let transport = MemoryTransport::default(); - assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok()); - assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok()); - let _listener = transport.listen_on("/memory/1639174018481".parse().unwrap()).unwrap(); - assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_err()); - assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_err()); + assert!(transport + .listen_on("/memory/1639174018481".parse().unwrap()) + .is_ok()); + assert!(transport + .listen_on("/memory/1639174018481".parse().unwrap()) + .is_ok()); + let _listener = transport + .listen_on("/memory/1639174018481".parse().unwrap()) + .unwrap(); + assert!(transport + .listen_on("/memory/1639174018481".parse().unwrap()) + .is_err()); + assert!(transport + .listen_on("/memory/1639174018481".parse().unwrap()) + .is_err()); drop(_listener); - assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok()); - assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok()); + assert!(transport + .listen_on("/memory/1639174018481".parse().unwrap()) + .is_ok()); + assert!(transport + .listen_on("/memory/1639174018481".parse().unwrap()) + .is_ok()); } #[test] fn port_not_in_use() { let transport = MemoryTransport::default(); - assert!(transport.dial("/memory/810172461024613".parse().unwrap()).is_err()); - let _listener = transport.listen_on("/memory/810172461024613".parse().unwrap()).unwrap(); - assert!(transport.dial("/memory/810172461024613".parse().unwrap()).is_ok()); + assert!(transport + .dial("/memory/810172461024613".parse().unwrap()) + .is_err()); + let _listener = transport + .listen_on("/memory/810172461024613".parse().unwrap()) + .unwrap(); + assert!(transport + .dial("/memory/810172461024613".parse().unwrap()) + .is_ok()); } #[test] @@ -396,9 +443,11 @@ mod tests { let listener = async move { let listener = t1.listen_on(t1_addr.clone()).unwrap(); - let upgrade = listener.filter_map(|ev| futures::future::ready( - ListenerEvent::into_upgrade(ev.unwrap()) - )).next().await.unwrap(); + let upgrade = listener + .filter_map(|ev| futures::future::ready(ListenerEvent::into_upgrade(ev.unwrap()))) + .next() + .await + .unwrap(); let mut socket = upgrade.0.await.unwrap(); @@ -423,16 +472,14 @@ mod tests { #[test] fn dialer_address_unequal_to_listener_address() { - let listener_addr: Multiaddr = Protocol::Memory( - rand::random::().saturating_add(1), - ).into(); + let listener_addr: Multiaddr = + Protocol::Memory(rand::random::().saturating_add(1)).into(); let listener_addr_cloned = listener_addr.clone(); let listener_transport = MemoryTransport::default(); let listener = async move { - let mut listener = listener_transport.listen_on(listener_addr.clone()) - .unwrap(); + let mut listener = listener_transport.listen_on(listener_addr.clone()).unwrap(); while let Some(ev) = listener.next().await { if let ListenerEvent::Upgrade { remote_addr, .. } = ev.unwrap() { assert!( @@ -445,7 +492,8 @@ mod tests { }; let dialer = async move { - MemoryTransport::default().dial(listener_addr_cloned) + MemoryTransport::default() + .dial(listener_addr_cloned) .unwrap() .await .unwrap(); @@ -459,21 +507,18 @@ mod tests { let (terminate, should_terminate) = futures::channel::oneshot::channel(); let (terminated, is_terminated) = futures::channel::oneshot::channel(); - let listener_addr: Multiaddr = Protocol::Memory( - rand::random::().saturating_add(1), - ).into(); + let listener_addr: Multiaddr = + Protocol::Memory(rand::random::().saturating_add(1)).into(); let listener_addr_cloned = listener_addr.clone(); let listener_transport = MemoryTransport::default(); let listener = async move { - let mut listener = listener_transport.listen_on(listener_addr.clone()) - .unwrap(); + let mut listener = listener_transport.listen_on(listener_addr.clone()).unwrap(); while let Some(ev) = listener.next().await { if let ListenerEvent::Upgrade { remote_addr, .. } = ev.unwrap() { - let dialer_port = NonZeroU64::new( - parse_memory_addr(&remote_addr).unwrap(), - ).unwrap(); + let dialer_port = + NonZeroU64::new(parse_memory_addr(&remote_addr).unwrap()).unwrap(); assert!( HUB.get(&dialer_port).is_some(), @@ -494,7 +539,8 @@ mod tests { }; let dialer = async move { - let _chan = MemoryTransport::default().dial(listener_addr_cloned) + let _chan = MemoryTransport::default() + .dial(listener_addr_cloned) .unwrap() .await .unwrap(); diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 0d28dd3c1de..1bfa386a8d6 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -86,7 +86,7 @@ fn main() -> Result<(), Box> { }; // Set a custom gossipsub - let gossipsub_config = gossipsub::GossipsubConfigBuilder::new() + let gossipsub_config = gossipsub::GossipsubConfigBuilder::default() .heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space .validation_mode(ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing) .message_id_fn(message_id_fn) // content-address messages. No two messages of the diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index 6c1bb96b559..7afd9a5cb68 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -232,7 +232,7 @@ fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events let mut swarm = { - let gossipsub_config = GossipsubConfigBuilder::new() + let gossipsub_config = GossipsubConfigBuilder::default() .max_transmit_size(262144) .build() .expect("valid config"); diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 3dd9167d517..dfc8a0d245d 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -37,6 +37,8 @@ async-std = "1.6.3" env_logger = "0.8.1" libp2p-plaintext = { path = "../plaintext" } libp2p-yamux = { path = "../../muxers/yamux" } +libp2p-mplex = { path = "../../muxers/mplex" } +libp2p-noise = { path = "../../protocols/noise" } quickcheck = "0.9.2" hex = "0.4.2" derive_builder = "0.9.0" diff --git a/protocols/gossipsub/src/backoff.rs b/protocols/gossipsub/src/backoff.rs index 44d896b4e9d..2e6e7614f94 100644 --- a/protocols/gossipsub/src/backoff.rs +++ b/protocols/gossipsub/src/backoff.rs @@ -69,7 +69,7 @@ impl BackoffStorage { } } - /// Updates the backoff for a peer (if there is already a more restrictive backoff than this call + /// Updates the backoff for a peer (if there is already a more restrictive backoff then this call /// doesn't change anything). pub fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) { let instant = Instant::now() + time; diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index c94347c45e6..39286e62f19 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -47,7 +47,7 @@ use libp2p_swarm::{ }; use crate::backoff::BackoffStorage; -use crate::config::{GenericGossipsubConfig, ValidationMode}; +use crate::config::{Config, ValidationMode}; use crate::error::{PublishError, SubscriptionError}; use crate::gossip_promises::GossipPromises; use crate::handler::{GossipsubHandler, HandlerEvent}; @@ -75,7 +75,7 @@ mod tests; /// Without signing, a number of privacy preserving modes can be selected. /// /// NOTE: The default validation settings are to require signatures. The [`ValidationMode`] -/// should be updated in the [`GenericGossipsubConfig`] to allow for unsigned messages. +/// should be updated in the [`Config`] to allow for unsigned messages. #[derive(Clone)] pub enum MessageAuthenticity { /// Message signing is enabled. The author will be the owner of the key and the sequence number @@ -96,7 +96,7 @@ pub enum MessageAuthenticity { /// The author of the message and the sequence numbers are excluded from the message. /// /// NOTE: Excluding these fields may make these messages invalid by other nodes who - /// enforce validation of these fields. See [`ValidationMode`] in the `GenericGossipsubConfig` + /// enforce validation of these fields. See [`ValidationMode`] in the `Config` /// for how to customise this for rust-libp2p gossipsub. A custom `message_id` /// function will need to be set to prevent all messages from a peer being filtered /// as duplicates. @@ -116,8 +116,8 @@ impl MessageAuthenticity { /// Event that can be emitted by the gossipsub behaviour. #[derive(Debug)] -pub enum GenericGossipsubEvent> { - /// A message has been received. +pub enum Event> { + /// A message has been received. Message { /// The peer that forwarded us this message. propagation_source: PeerId, @@ -142,8 +142,8 @@ pub enum GenericGossipsubEvent> { topic: TopicHash, }, } -//for backwards compatibility -pub type GossipsubEvent = GenericGossipsubEvent>; +// For general use cases +pub type GossipsubEvent = Event>; /// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`] /// for further details. @@ -196,17 +196,16 @@ impl From for PublishConfig { } } -type GossipsubNetworkBehaviourAction = - NetworkBehaviourAction, GenericGossipsubEvent>; +type GossipsubNetworkBehaviourAction = NetworkBehaviourAction, Event>; /// Network behaviour that handles the gossipsub protocol. /// -/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GenericGossipsubConfig`] instance. If message signing is +/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If message signing is /// disabled, the [`ValidationMode`] in the config should be adjusted to an appropriate level to /// accept unsigned messages. pub struct GenericGossipsub, Filter: TopicSubscriptionFilter> { /// Configuration providing gossipsub performance parameters. - config: GenericGossipsubConfig, + config: Config, /// Events that need to be yielded to the outside when polling. events: VecDeque>, @@ -231,7 +230,8 @@ pub struct GenericGossipsub, Filter: TopicSubscriptionFilter> { /// A map of all connected peers to their subscribed topics. peer_topics: HashMap>, - /// A set of all explicit peers. + /// A set of all explicit peers. These are peers that remain connected and we unconditionally + /// forward messages to, outside of the scoring system. explicit_peers: HashSet, /// A list of peers that have been blacklisted by the user. @@ -275,21 +275,23 @@ pub struct GenericGossipsub, Filter: TopicSubscriptionFilter> { peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>, /// Counts the number of `IHAVE` received from each peer since the last heartbeat. - count_peer_have: HashMap, + count_received_ihave: HashMap, /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat. - count_iasked: HashMap, + count_sent_iwant: HashMap, - /// short term cache for published messsage ids + /// Short term cache for published messsage ids. This is used for penalizing peers sending + /// our own messages back if the messages are anonymous or use a random author. published_message_ids: DuplicateCache, - /// short term cache for fast message ids mapping them to the real message ids + /// Short term cache for fast message ids mapping them to the real message ids fast_messsage_id_cache: TimeCache, + /// The filter used to handle message subscriptions. subscription_filter: Filter, } -// for backwards compatibility +// For general use and convenience. pub type Gossipsub = GenericGossipsub, AllowAllSubscriptionFilter>; impl GenericGossipsub @@ -298,10 +300,7 @@ where F: TopicSubscriptionFilter + Default, { /// Creates a `GenericGossipsub` struct given a set of parameters specified via a `GenericGossipsubConfig`. - pub fn new( - privacy: MessageAuthenticity, - config: GenericGossipsubConfig, - ) -> Result { + pub fn new(privacy: MessageAuthenticity, config: Config) -> Result { Self::new_with_subscription_filter(privacy, config, F::default()) } } @@ -311,10 +310,10 @@ where T: Clone + Into> + From> + AsRef<[u8]>, F: TopicSubscriptionFilter, { - /// Creates a `GenericGossipsub` struct given a set of parameters specified via a `GenericGossipsubConfig`. + /// Creates a `GenericGossipsub` struct given a set of parameters specified via a `Config`. pub fn new_with_subscription_filter( privacy: MessageAuthenticity, - config: GenericGossipsubConfig, + config: Config, subscription_filter: F, ) -> Result { // Set up the router given the configuration settings. @@ -352,8 +351,8 @@ where px_peers: HashSet::new(), outbound_peers: HashSet::new(), peer_score: None, - count_peer_have: HashMap::new(), - count_iasked: HashMap::new(), + count_received_ihave: HashMap::new(), + count_sent_iwant: HashMap::new(), peer_protocols: HashMap::new(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), config, @@ -405,7 +404,8 @@ where /// Subscribe to a topic. /// - /// Returns true if the subscription worked. Returns false if we were already subscribed. + /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already + /// subscribed. pub fn subscribe(&mut self, topic: &Topic) -> Result { debug!("Subscribing to topic: {}", topic); let topic_hash = topic.hash(); @@ -449,7 +449,7 @@ where /// Unsubscribes from a topic. /// - /// Returns true if we were subscribed to this topic. + /// Returns [`Ok(true)`] if we were subscribed to this topic. pub fn unsubscribe(&mut self, topic: &Topic) -> Result { debug!("Unsubscribing from topic: {}", topic); let topic_hash = topic.hash(); @@ -631,21 +631,20 @@ where Ok(msg_id) } - /// This function should be called when `config.validate_messages()` is `true` after the - /// message got validated by the caller. Messages are stored in the - /// ['Memcache'] and validation is expected to be fast enough that the messages should still - /// exist in the cache. There are three possible validation outcomes and the outcome is given - /// in acceptance. - /// - /// If acceptance = Accept the message will get propagated to the network. The - /// `propagation_source` parameter indicates who the message was received by and will not - /// be forwarded back to that peer. + /// This function should be called when `config.validate_messages()` is `true` after the message + /// got validated by the caller. Messages are stored in the ['Memcache'] and validation is + /// expected to be fast enough that the messages should still exist in the cache. There are + /// three possible validation outcomes and the outcome is given in acceptance. /// - /// If acceptance = Reject the message will be deleted from the memcache and the P₄ penalty - /// will be applied to the `propagation_source`. + /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the + /// network. The `propagation_source` parameter indicates who the message was received by and + /// will not be forwarded back to that peer. /// - /// If acceptance = Ignore the message will be deleted from the memcache but no P₄ penalty - /// will be applied. + /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache + /// and the P₄ penalty will be applied to the `propagation_source`. + // + /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache + /// but no P₄ penalty will be applied. /// /// This function will return true if the message was found in the cache and false if was not /// in the cache anymore. @@ -712,7 +711,7 @@ where } } - /// Removes a blacklisted peer if it has previously been blacklisted. + /// Removes a peer from the blacklist if it has previously been blacklisted. pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) { if self.blacklisted_peers.remove(peer_id) { debug!("Peer has been removed from the blacklist: {}", peer_id); @@ -991,7 +990,10 @@ where } // IHAVE flood protection - let peer_have = self.count_peer_have.entry(peer_id.clone()).or_insert(0); + let peer_have = self + .count_received_ihave + .entry(peer_id.clone()) + .or_insert(0); *peer_have += 1; if *peer_have > self.config.max_ihave_messages() { debug!( @@ -1002,7 +1004,7 @@ where return; } - if let Some(iasked) = self.count_iasked.get(peer_id) { + if let Some(iasked) = self.count_sent_iwant.get(peer_id) { if *iasked >= self.config.max_ihave_length() { debug!( "IHAVE: peer {} has already advertised too many messages ({}); ignoring", @@ -1036,7 +1038,7 @@ where } if !iwant_ids.is_empty() { - let iasked = self.count_iasked.entry(peer_id.clone()).or_insert(0); + let iasked = self.count_sent_iwant.entry(peer_id.clone()).or_insert(0); let mut iask = iwant_ids.len(); if *iasked + iask > self.config.max_ihave_length() { iask = self.config.max_ihave_length().saturating_sub(*iasked); @@ -1327,7 +1329,7 @@ where if below_threshold { debug!( "PRUNE: ignoring PX from peer {:?} with insufficient score \ - [score ={} topic = {}]", + [score ={} topic = {}]", peer_id, score, topic_hash ); continue; @@ -1351,7 +1353,9 @@ where fn px_connect(&mut self, mut px: Vec) { let n = self.config.prune_peers(); // Ignore peerInfo with no ID - //TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a signed peer record? + // + //TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a + // signed peer record? px = px.into_iter().filter(|p| p.peer_id.is_some()).collect(); if px.len() > n { // only use at most prune_peers many random peers @@ -1361,8 +1365,8 @@ where } for p in px { - // TODO: Once signed records are spec'd: extract signed peer record if given and handle it, see - // https://github.com/libp2p/specs/pull/217 + // TODO: Once signed records are spec'd: extract signed peer record if given and handle + // it, see https://github.com/libp2p/specs/pull/217 if let Some(peer_id) = p.peer_id { // mark as px peer self.px_peers.insert(peer_id.clone()); @@ -1453,7 +1457,8 @@ where true } - /// Handles a newly received GossipsubMessage. + /// Handles a newly received [`RawGossipsubMessage`]. + /// /// Forwards the message to all peers in the mesh. fn handle_received_message(&mut self, msg: RawGossipsubMessage, propagation_source: &PeerId) { let fast_message_id = self.config.fast_message_id(&msg); @@ -1477,7 +1482,7 @@ where // Add the message to the duplicate caches and memcache. if let Some(fast_message_id) = fast_message_id { - //add id to cache + // add id to cache self.fast_messsage_id_cache .entry(fast_message_id) .or_insert_with(|| msg.message_id().clone()); @@ -1497,8 +1502,8 @@ where msg.message_id() ); - // Tells score that message arrived (but is maybe not fully validated yet) - // Consider message as delivered for gossip promises + // Tells score that message arrived (but is maybe not fully validated yet). + // Consider message as delivered for gossip promises. if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { peer_score.validate_message(propagation_source, &msg); gossip_promises.message_delivered(msg.message_id()); @@ -1510,13 +1515,12 @@ where // Dispatch the message to the user if we are subscribed to any of the topics if self.mesh.contains_key(&msg.topic) { debug!("Sending received message to user"); - self.events.push_back(NetworkBehaviourAction::GenerateEvent( - GenericGossipsubEvent::Message { + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { propagation_source: propagation_source.clone(), message_id: msg.message_id().clone(), message: msg.clone(), - }, - )); + })); } else { debug!( "Received message on a topic we are not subscribed to: {:?}", @@ -1645,7 +1649,7 @@ where } // generates a subscription event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( - GenericGossipsubEvent::Subscribed { + Event::Subscribed { peer_id: propagation_source.clone(), topic: subscription.topic_hash.clone(), }, @@ -1665,7 +1669,7 @@ where .push((propagation_source.clone(), subscription.topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( - GenericGossipsubEvent::Unsubscribed { + Event::Unsubscribed { peer_id: propagation_source.clone(), topic: subscription.topic_hash.clone(), }, @@ -1731,8 +1735,8 @@ where self.backoffs.heartbeat(); // clean up ihave counters - self.count_iasked.clear(); - self.count_peer_have.clear(); + self.count_sent_iwant.clear(); + self.count_received_ihave.clear(); // apply iwant penalties self.apply_iwant_penalties(); @@ -1770,7 +1774,7 @@ where if score(p) < 0.0 { debug!( "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \ - {}]", + {}]", p, score(p), topic_hash @@ -2092,7 +2096,7 @@ where fn emit_gossip(&mut self) { let mut rng = thread_rng(); for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) { - let mut message_ids = self.mcache.get_gossip_ids(&topic_hash); + let mut message_ids = self.mcache.get_gossip_message_ids(&topic_hash); if message_ids.is_empty() { return; } @@ -2240,6 +2244,7 @@ where } /// Helper function which forwards a message to mesh\[topic\] peers. + /// /// Returns true if at least one peer was messaged. fn forward_msg( &mut self, @@ -2303,7 +2308,7 @@ where } } - /// Constructs a `GenericGossipsubMessage` performing message signing if required. + /// Constructs a [`GenericGossipsubMessage`] performing message signing if required. pub(crate) fn build_message( &self, topic: TopicHash, @@ -2629,13 +2634,6 @@ where } } -fn get_remote_addr(endpoint: &ConnectedPoint) -> &Multiaddr { - match endpoint { - ConnectedPoint::Dialer { address } => address, - ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, - } -} - fn get_ip_addr(addr: &Multiaddr) -> Option { addr.iter().find_map(|p| match p { Ip4(addr) => Some(IpAddr::V4(addr)), @@ -2650,7 +2648,7 @@ where F: Send + 'static + TopicSubscriptionFilter, { type ProtocolsHandler = GossipsubHandler; - type OutEvent = GenericGossipsubEvent; + type OutEvent = Event; fn new_handler(&mut self) -> Self::ProtocolsHandler { GossipsubHandler::new( @@ -2804,7 +2802,7 @@ where // Add the IP to the peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { - if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) { + if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) { peer_score.add_ip(&peer_id, ip); } else { trace!( @@ -2824,7 +2822,7 @@ where ) { // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { - if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) { + if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) { peer_score.remove_ip(peer, &ip); } else { trace!( @@ -2845,7 +2843,7 @@ where ) { // Exchange IP in peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { - if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_old)) { + if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) { peer_score.remove_ip(peer, &ip); } else { trace!( @@ -2854,7 +2852,7 @@ where endpoint_old ) } - if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_new)) { + if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) { peer_score.add_ip(&peer, ip); } else { trace!( @@ -3170,7 +3168,7 @@ mod local_test { /// Tests RPC message fragmentation fn test_message_fragmentation_deterministic() { let max_transmit_size = 500; - let config = crate::GenericGossipsubConfigBuilder::new() + let config = crate::ConfigBuilder::default() .max_transmit_size(max_transmit_size) .validation_mode(ValidationMode::Permissive) .build() @@ -3218,7 +3216,7 @@ mod local_test { fn test_message_fragmentation() { fn prop(rpc: GossipsubRpc) { let max_transmit_size = 500; - let config = crate::GenericGossipsubConfigBuilder::new() + let config = crate::ConfigBuilder::default() .max_transmit_size(max_transmit_size) .validation_mode(ValidationMode::Permissive) .build() diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 2bee5c3f0f1..ed367795a72 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -// collection of tests for the gossipsub network behaviour +// Collection of tests for the gossipsub network behaviour mod tests { use byteorder::{BigEndian, ByteOrder}; @@ -29,7 +29,7 @@ mod tests { use rand::Rng; use crate::{ - GenericGossipsubConfigBuilder, GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, + ConfigBuilder, GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, IdentTopic as Topic, TopicScoreParams, }; @@ -52,7 +52,7 @@ mod tests { peer_no: usize, topics: Vec, to_subscribe: bool, - gs_config: GenericGossipsubConfig, + gs_config: Config, explicit: usize, outbound: usize, scoring: Option<(PeerScoreParams, PeerScoreThresholds)>, @@ -559,7 +559,7 @@ mod tests { // - Insert message into gs.mcache and gs.received //turn off flood publish to test old behaviour - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -629,7 +629,7 @@ mod tests { // - Insert message into gs.mcache and gs.received //turn off flood publish to test fanout behaviour - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -842,7 +842,7 @@ mod tests { /// Test Gossipsub.get_random_peers() function fn test_get_random_peers() { // generate a default GossipsubConfig - let gs_config = GossipsubConfigBuilder::new() + let gs_config = GossipsubConfigBuilder::default() .validation_mode(ValidationMode::Anonymous) .build() .unwrap(); @@ -1289,7 +1289,7 @@ mod tests { #[test] fn test_explicit_peer_reconnects() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .check_explicit_peers_ticks(2) .build() .unwrap(); @@ -1648,8 +1648,8 @@ mod tests { ); } - #[test] // Tests the mesh maintenance addition + #[test] fn test_mesh_addition() { let config = GossipsubConfig::default(); @@ -1682,8 +1682,8 @@ mod tests { assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n()); } - #[test] // Tests the mesh maintenance subtraction + #[test] fn test_mesh_subtraction() { let config = GossipsubConfig::default(); @@ -1855,7 +1855,7 @@ mod tests { #[test] fn test_do_not_graft_within_backoff_period() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .backoff_slack(1) .heartbeat_interval(Duration::from_millis(100)) .build() @@ -1911,7 +1911,7 @@ mod tests { #[test] fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without_backoff() { //set default backoff period to 1 second - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .prune_backoff(Duration::from_millis(90)) .backoff_slack(1) .heartbeat_interval(Duration::from_millis(100)) @@ -2136,7 +2136,7 @@ mod tests { //use an extreme case to catch errors with high probability let m = 50; let n = 2 * m; - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .mesh_n_high(n) .mesh_n(n) .mesh_n_low(n) @@ -2350,7 +2350,7 @@ mod tests { #[test] fn test_only_send_nonnegative_scoring_peers_in_px() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .prune_peers(16) .do_px() .build() @@ -2615,7 +2615,7 @@ mod tests { #[test] fn test_do_not_publish_to_peer_below_publish_threshold() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -2857,7 +2857,7 @@ mod tests { #[test] fn test_ignore_px_from_peers_below_accept_px_threshold() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .prune_peers(16) .build() .unwrap(); @@ -2933,7 +2933,7 @@ mod tests { #[test] fn test_keep_best_scoring_peers_on_oversubscription() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .mesh_n_low(15) .mesh_n(30) .mesh_n_high(60) @@ -3259,7 +3259,7 @@ mod tests { #[test] fn test_scoring_p3b() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .prune_backoff(Duration::from_millis(100)) .build() .unwrap(); @@ -3356,7 +3356,7 @@ mod tests { #[test] fn test_scoring_p4_valid_message() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3412,7 +3412,7 @@ mod tests { #[test] fn test_scoring_p4_invalid_signature() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3470,7 +3470,7 @@ mod tests { #[test] fn test_scoring_p4_message_from_self() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3520,7 +3520,7 @@ mod tests { #[test] fn test_scoring_p4_ignored_message() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3576,7 +3576,7 @@ mod tests { #[test] fn test_scoring_p4_application_invalidated_message() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3635,7 +3635,7 @@ mod tests { #[test] fn test_scoring_p4_application_invalid_message_from_two_peers() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3702,7 +3702,7 @@ mod tests { #[test] fn test_scoring_p4_three_application_invalid_messages() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3778,7 +3778,7 @@ mod tests { #[test] fn test_scoring_p4_decay() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3984,7 +3984,7 @@ mod tests { #[test] fn test_scoring_p7_grafts_before_backoff() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .prune_backoff(Duration::from_millis(200)) .graft_flood_threshold(Duration::from_millis(100)) .build() @@ -4054,7 +4054,7 @@ mod tests { #[test] fn test_opportunistic_grafting() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .mesh_n_low(3) .mesh_n(5) .mesh_n_high(7) @@ -4213,7 +4213,7 @@ mod tests { #[test] fn test_ignore_too_many_ihaves() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .max_ihave_messages(10) .build() .unwrap(); @@ -4299,7 +4299,7 @@ mod tests { #[test] fn test_ignore_too_many_messages_in_ihave() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .max_ihave_messages(10) .max_ihave_length(10) .build() @@ -4392,7 +4392,7 @@ mod tests { #[test] fn test_limit_number_of_message_ids_inside_ihave() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .max_ihave_messages(10) .max_ihave_length(100) .build() @@ -4473,7 +4473,7 @@ mod tests { #[test] fn test_iwant_penalties() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .iwant_followup_time(Duration::from_secs(4)) .build() .unwrap(); @@ -4582,7 +4582,7 @@ mod tests { #[test] fn test_publish_to_floodsub_peers_without_flood_publish() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -4638,7 +4638,7 @@ mod tests { #[test] fn test_do_not_use_floodsub_in_fanout() { - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -4890,7 +4890,6 @@ mod tests { address as *mut Pointers }}; } - #[derive(Clone, Default)] struct MessageData(pub Vec); @@ -4941,7 +4940,7 @@ mod tests { } id }; - let config = GenericGossipsubConfigBuilder::new() + let config = ConfigBuilder::default() .message_id_fn(message_id_fn) .fast_message_id_fn(fast_message_id_fn) .build() diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 6604304b9b5..8ff0453e1d0 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -48,9 +48,12 @@ pub enum ValidationMode { None, } +// For general use cases. +pub type GossipsubConfig = Config>; + /// Configuration parameters that define the performance of the gossipsub network. #[derive(Clone)] -pub struct GenericGossipsubConfig { +pub struct Config { /// The protocol id prefix to negotiate this protocol. The protocol id is of the form /// `//`. As gossipsub supports version 1.0 and 1.1, there are two /// protocol id's supported. @@ -76,8 +79,9 @@ pub struct GenericGossipsubConfig { mesh_n_high: usize, /// Affects how peers are selected when pruning a mesh due to over subscription. - // At least `retain_scores` of the retained peers will be high-scoring, while the remainder are - // chosen randomly (D_score in the spec, default is 4). + /// + /// At least `retain_scores` of the retained peers will be high-scoring, while the remainder are + /// chosen randomly (D_score in the spec, default is 4). retain_scores: usize, /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec, @@ -142,6 +146,10 @@ pub struct GenericGossipsubConfig { /// have different fast message ids, but it is allowed that two semantically identical messages /// have different fast message ids as long as the message_id_fn produces the same id for them. /// + /// On high intensive networks with lots of messages, where the message_id is based on the result of + /// decompressed traffic, it is beneficial to specify a `fast-message-id` that can identify and + /// filter duplicates quickly without performing the overhead of decompression. + /// /// The function takes a `RawGossipsubMessage` as input and outputs a String to be /// interpreted as the fast message id. Default is None. fast_message_id_fn: Option FastMessageId>, @@ -183,9 +191,9 @@ pub struct GenericGossipsubConfig { /// good enough score. The default is true. flood_publish: bool, - // If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, - // then there is an extra score penalty applied to the peer through P7. The default is 10 - // seconds. + /// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, + /// then there is an extra score penalty applied to the peer through P7. The default is 10 + /// seconds. graft_flood_threshold: Duration, /// Minimum number of outbound peers in the mesh network before adding more (D_out in the spec). @@ -193,11 +201,11 @@ pub struct GenericGossipsubConfig { /// The default is 2. mesh_outbound_min: usize, - // Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is - // applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh - // peers to replace lower-scoring ones, if the median score of our mesh peers falls below a - // threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). - // The default is 60. + /// Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is + /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh + /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a + /// threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). + /// The default is 60. opportunistic_graft_ticks: u64, /// The maximum number of new peers to graft to during opportunistic grafting. The default is 2. @@ -231,10 +239,7 @@ pub struct GenericGossipsubConfig { published_message_ids_cache_time: Duration, } -// For backwards compatibility -pub type GossipsubConfig = GenericGossipsubConfig>; - -impl GenericGossipsubConfig { +impl Config { // All the getters /// The protocol id prefix to negotiate this protocol. The protocol id is of the form @@ -274,6 +279,7 @@ impl GenericGossipsubConfig { } /// Affects how peers are selected when pruning a mesh due to over subscription. + // // At least `retain_scores` of the retained peers will be high-scoring, while the remainder are // chosen randomly (D_score in the spec, default is 4). pub fn retain_scores(&self) -> usize { @@ -287,6 +293,7 @@ impl GenericGossipsubConfig { } /// Affects how many peers we will emit gossip to at each heartbeat. + /// /// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or /// `gossip_lazy`, whichever is greater. The default is 0.25. pub fn gossip_factor(&self) -> f64 { @@ -423,8 +430,8 @@ impl GenericGossipsubConfig { self.flood_publish } - // If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, - // then there is an extra score penalty applied to the peer through P7. + /// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, + /// then there is an extra score penalty applied to the peer through P7. pub fn graft_flood_threshold(&self) -> Duration { self.graft_flood_threshold } @@ -436,18 +443,18 @@ impl GenericGossipsubConfig { self.mesh_outbound_min } - // Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is - // applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh - // peers to replace lower-scoring ones, if the median score of our mesh peers falls below a - // threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). - // The default is 60. + /// Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is + /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh + /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a + /// threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). + /// The default is 60. pub fn opportunistic_graft_ticks(&self) -> u64 { self.opportunistic_graft_ticks } - // Controls how many times we will allow a peer to request the same message id through IWANT - // gossip before we start ignoring them. This is designed to prevent peers from spamming us - // with requests and wasting our resources. The default is 3. + /// Controls how many times we will allow a peer to request the same message id through IWANT + /// gossip before we start ignoring them. This is designed to prevent peers from spamming us + /// with requests and wasting our resources. The default is 3. pub fn gossip_retransimission(&self) -> u32 { self.gossip_retransimission } @@ -490,42 +497,27 @@ impl GenericGossipsubConfig { } } -impl Default for GenericGossipsubConfig { +impl Default for Config { fn default() -> Self { - //use GossipsubConfigBuilder to also validate defaults - GenericGossipsubConfigBuilder::new() + // use ConfigBuilder to also validate defaults + ConfigBuilder::default() .build() .expect("Default config parameters should be valid parameters") } } /// The builder struct for constructing a gossipsub configuration. -pub struct GenericGossipsubConfigBuilder { - config: GenericGossipsubConfig, +pub struct ConfigBuilder { + config: Config, } -//for backwards compatibility -pub type GossipsubConfigBuilder = GenericGossipsubConfigBuilder>; +// For general use cases. +pub type GossipsubConfigBuilder = ConfigBuilder>; -impl Default for GenericGossipsubConfigBuilder { +impl Default for ConfigBuilder { fn default() -> Self { - GenericGossipsubConfigBuilder { - config: GenericGossipsubConfig::default(), - } - } -} - -impl From> for GenericGossipsubConfigBuilder { - fn from(config: GenericGossipsubConfig) -> Self { - GenericGossipsubConfigBuilder { config } - } -} - -impl GenericGossipsubConfigBuilder { - // set default values - pub fn new() -> Self { - GenericGossipsubConfigBuilder { - config: GenericGossipsubConfig { + ConfigBuilder { + config: Config { protocol_id_prefix: Cow::Borrowed("meshsub"), history_length: 5, history_gossip: 3, @@ -577,7 +569,15 @@ impl GenericGossipsubConfigBuilder { }, } } +} + +impl From> for ConfigBuilder { + fn from(config: Config) -> Self { + ConfigBuilder { config } + } +} +impl ConfigBuilder { /// The protocol id to negotiate this protocol (default is `/meshsub/1.0.0`). pub fn protocol_id_prefix(&mut self, protocol_id: impl Into>) -> &mut Self { self.config.protocol_id_prefix = protocol_id.into(); @@ -616,8 +616,9 @@ impl GenericGossipsubConfigBuilder { } /// Affects how peers are selected when pruning a mesh due to over subscription. - // At least `retain_scores` of the retained peers will be high-scoring, while the remainder are - // chosen randomly (D_score in the spec, default is 4). + /// + /// At least `retain_scores` of the retained peers will be high-scoring, while the remainder are + /// chosen randomly (D_score in the spec, default is 4). pub fn retain_scores(&mut self, retain_scores: usize) -> &mut Self { self.config.retain_scores = retain_scores; self @@ -631,6 +632,7 @@ impl GenericGossipsubConfigBuilder { } /// Affects how many peers we will emit gossip to at each heartbeat. + /// /// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or /// `gossip_lazy`, whichever is greater. The default is 0.25. pub fn gossip_factor(&mut self, gossip_factor: f64) -> &mut Self { @@ -700,8 +702,8 @@ impl GenericGossipsubConfigBuilder { /// addressing, where this function may be set to `hash(message)`. This would prevent messages /// of the same content from being duplicated. /// - /// The function takes a `GenericGossipsubMessage` as input and outputs a String to be interpreted as - /// the message id. + /// The function takes a [`GenericGossipsubMessage`] as input and outputs a String to be + /// interpreted as the message id. pub fn message_id_fn( &mut self, id_fn: fn(&GenericGossipsubMessage) -> MessageId, @@ -712,12 +714,12 @@ impl GenericGossipsubConfigBuilder { /// A user-defined optional function that computes fast ids from raw messages. This can be used /// to avoid possibly expensive transformations from `RawGossipsubMessage` to - /// `GenericGossipsubMessage` for duplicates. Two semantically different messages must always + /// [`GenericGossipsubMessage`] for duplicates. Two semantically different messages must always /// have different fast message ids, but it is allowed that two semantically identical messages /// have different fast message ids as long as the message_id_fn produces the same id for them. /// - /// The function takes a `RawGossipsubMessage` as input and outputs a String to be - /// interpreted as the fast message id. Default is None. + /// The function takes a [`RawGossipsubMessage`] as input and outputs a String to be interpreted + /// as the fast message id. Default is None. pub fn fast_message_id_fn( &mut self, fast_id_fn: fn(&RawGossipsubMessage) -> FastMessageId, @@ -734,6 +736,7 @@ impl GenericGossipsubConfigBuilder { } /// Controls the number of peers to include in prune Peer eXchange. + /// /// When we prune a peer that's eligible for PX (has a good score, etc), we will try to /// send them signed peer records for up to `prune_peers` other peers that we /// know of. It is recommended that this value is larger than `mesh_n_high` so that the pruned @@ -773,8 +776,8 @@ impl GenericGossipsubConfigBuilder { self } - // If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, - // then there is an extra score penalty applied to the peer through P7. + /// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, + /// then there is an extra score penalty applied to the peer through P7. pub fn graft_flood_threshold(&mut self, graft_flood_threshold: Duration) -> &mut Self { self.config.graft_flood_threshold = graft_flood_threshold; self @@ -788,19 +791,19 @@ impl GenericGossipsubConfigBuilder { self } - // Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is - // applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh - // peers to replace lower-scoring ones, if the median score of our mesh peers falls below a - // threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). - // The default is 60. + /// Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is + /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh + /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a + /// threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). + /// The default is 60. pub fn opportunistic_graft_ticks(&mut self, opportunistic_graft_ticks: u64) -> &mut Self { self.config.opportunistic_graft_ticks = opportunistic_graft_ticks; self } - // Controls how many times we will allow a peer to request the same message id through IWANT - // gossip before we start ignoring them. This is designed to prevent peers from spamming us - // with requests and wasting our resources. + /// Controls how many times we will allow a peer to request the same message id through IWANT + /// gossip before we start ignoring them. This is designed to prevent peers from spamming us + /// with requests and wasting our resources. pub fn gossip_retransimission(&mut self, gossip_retransimission: u32) -> &mut Self { self.config.gossip_retransimission = gossip_retransimission; self @@ -837,6 +840,9 @@ impl GenericGossipsubConfigBuilder { self } + /// Time to wait for a message requested through IWANT following an IHAVE advertisement. + /// If the message is not received within this window, a broken promise is declared and + /// the router may apply behavioural penalties. The default is 3 seconds. pub fn iwant_followup_time(&mut self, iwant_followup_time: Duration) -> &mut Self { self.config.iwant_followup_time = iwant_followup_time; self @@ -848,6 +854,7 @@ impl GenericGossipsubConfigBuilder { self } + /// Published message ids time cache duration. The default is 10 seconds. pub fn published_message_ids_cache_time( &mut self, published_message_ids_cache_time: Duration, @@ -857,7 +864,7 @@ impl GenericGossipsubConfigBuilder { } /// Constructs a `GenericGossipsubConfig` from the given configuration and validates the settings. - pub fn build(&self) -> Result, &str> { + pub fn build(&self) -> Result, &str> { // check all constraints on config if self.config.max_transmit_size < 100 { @@ -888,7 +895,7 @@ impl GenericGossipsubConfigBuilder { } } -impl std::fmt::Debug for GenericGossipsubConfig { +impl std::fmt::Debug for Config { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut builder = f.debug_struct("GossipsubConfig"); let _ = builder.field("protocol_id_prefix", &self.protocol_id_prefix); @@ -935,7 +942,7 @@ mod test { #[test] fn create_thing() { - let builder = GossipsubConfigBuilder::new() + let builder = GossipsubConfigBuilder::default() .protocol_id_prefix("purple") .build() .unwrap(); diff --git a/protocols/gossipsub/src/gossip_promises.rs b/protocols/gossipsub/src/gossip_promises.rs index 1f0e4b8dfff..4dfadbe54ec 100644 --- a/protocols/gossipsub/src/gossip_promises.rs +++ b/protocols/gossipsub/src/gossip_promises.rs @@ -32,14 +32,15 @@ use wasm_timer::Instant; /// for each `IWANT` message we track one random requested message id. #[derive(Default)] pub(crate) struct GossipPromises { - // Stores for each tracked message id and peer the instant when this promise expires. - // If the peer didn't respond until then we consider the promise as broken and penalize the - // peer. + /// Stores for each tracked message id and peer the instant when this promise expires. + /// + /// If the peer didn't respond until then we consider the promise as broken and penalize the + /// peer. promises: HashMap>, } impl GossipPromises { - /// Track a promise to deliver a message from a list of msgIDs we are requesting. + /// Track a promise to deliver a message from a list of [`MessageId`]s we are requesting. pub fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) { // Randomly select a message id let mut rng = thread_rng(); diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index b2988f384cf..8026143b2ee 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -37,7 +37,12 @@ use std::{ io, pin::Pin, task::{Context, Poll}, + time::Duration, }; +use wasm_timer::Instant; + +/// The initial time (in seconds) we set the keep alive for protocol negotiations to occur. +const INITIAL_KEEP_ALIVE: u64 = 30; /// The event emitted by the Handler. This informs the behaviour of various events created /// by the handler. @@ -57,11 +62,11 @@ pub enum HandlerEvent { PeerKind(PeerKind), } -// The maximum number of substreams we accept or create before disconnecting from the peer. -// -// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we -// attempt to recreate these. This imposes an upper bound of new substreams before we consider the -// connection faulty and disconnect. This also prevents against potential substream creation loops. +/// The maximum number of substreams we accept or create before disconnecting from the peer. +/// +/// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we +/// attempt to recreate these. This imposes an upper bound of new substreams before we consider the +/// connection faulty and disconnect. This also prevents against potential substream creation loops. const MAX_SUBSTREAM_CREATION: usize = 5; /// Protocol Handler that manages a single long-lived substream with a peer. @@ -92,12 +97,14 @@ pub struct GossipsubHandler { peer_kind: Option, /// Keeps track on whether we have sent the peer kind to the behaviour. + // // NOTE: Use this flag rather than checking the substream count each poll. peer_kind_sent: bool, /// If the peer doesn't support the gossipsub protocol we do not immediately disconnect. /// Rather, we disable the handler and prevent any incoming or outgoing substreams from being /// established. + /// /// This value is set to true to indicate the peer doesn't support gossipsub. protocol_unsupported: bool, @@ -136,7 +143,7 @@ enum OutboundSubstreamState { } impl GossipsubHandler { - /// Builds a new `GossipsubHandler`. + /// Builds a new [`GossipsubHandler`]. pub fn new( protocol_id_prefix: std::borrow::Cow<'static, str>, max_transmit_size: usize, @@ -163,7 +170,7 @@ impl GossipsubHandler { peer_kind_sent: false, protocol_unsupported: false, upgrade_errors: VecDeque::new(), - keep_alive: KeepAlive::Yes, + keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), } } } @@ -174,8 +181,8 @@ impl ProtocolsHandler for GossipsubHandler { type Error = GossipsubHandlerError; type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; - type OutboundProtocol = ProtocolConfig; type OutboundOpenInfo = Self::InEvent; + type OutboundProtocol = ProtocolConfig; fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 42ba0c1a439..8c885547d34 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -41,7 +41,7 @@ //! implementations, due to undefined elements in the current specification. //! //! - **Topics** - In gossipsub, topics configurable by the `hash_topics` configuration parameter. -//! Topics are of type `TopicHash`. The current go implementation uses raw utf-8 strings, and this +//! Topics are of type [`TopicHash`]. The current go implementation uses raw utf-8 strings, and this //! is default configuration in rust-libp2p. Topics can be hashed (SHA256 hashed then base64 //! encoded) by setting the `hash_topics` configuration parameter to true. //! @@ -55,83 +55,70 @@ //! //! ## GossipsubConfig //! -//! The [`GenericGossipsubConfig`] struct specifies various network performance/tuning configuration +//! The [`Config`] struct specifies various network performance/tuning configuration //! parameters. Specifically it specifies: //! -//! [`GenericGossipsubConfig`]: struct.GenericGossipsubConfig.html -//! -//! - `protocol_id` - The protocol id that this implementation will accept connections on. -//! - `history_length` - The number of heartbeats which past messages are kept in cache (default: 5). -//! - `history_gossip` - The number of past heartbeats that the node will send gossip metadata -//! about (default: 3). -//! - `mesh_n` - The target number of peers store in the local mesh network. -//! (default: 6). -//! - `mesh_n_low` - The minimum number of peers in the local mesh network before. -//! trying to add more peers to the mesh from the connected peer pool (default: 4). -//! - `mesh_n_high` - The maximum number of peers in the local mesh network before removing peers to -//! reach `mesh_n` peers (default: 12). -//! - `gossip_lazy` - The number of peers that the local node will gossip to during a heartbeat (default: `mesh_n` = 6). -//! - `heartbeat_initial_delay - The initial time delay before starting the first heartbeat (default: 5 seconds). -//! - `heartbeat_interval` - The time between each heartbeat (default: 1 second). -//! - `fanout_ttl` - The fanout time to live time period. The timeout required before removing peers from the fanout -//! for a given topic (default: 1 minute). -//! - `max_transmit_size` - This sets the maximum transmission size for total gossipsub messages on the network. -//! - `hash_topics` - Whether to hash the topics using base64(SHA256(topic)) or to leave as plain utf-8 strings. -//! - `manual_propagation` - Whether gossipsub should immediately forward received messages on the -//! network. For applications requiring message validation, this should be set to false, then the -//! application should call `propagate_message(message_id, propagation_source)` once validated, to -//! propagate the message to peers. +//! [`Config`]: struct.Config.html //! //! This struct implements the `Default` trait and can be initialised via -//! `GenericGossipsubConfig::default()`. +//! `Config::default()`. //! //! //! ## Gossipsub //! //! The [`GenericGossipsub`] struct implements the `NetworkBehaviour` trait allowing it to act as the //! routing behaviour in a `Swarm`. This struct requires an instance of `PeerId` and -//! [`GenericGossipsubConfig`]. +//! [`Config`]. //! -//! [`GenericGossipsub`]: struct.Gossipsub.html +//! [`GenericGossipsub`]: struct.GenericGossipsub.html //! ## Example //! //! An example of initialising a gossipsub compatible swarm: //! -//! ```ignore -//! #extern crate libp2p; -//! #extern crate futures; -//! #extern crate tokio; -//! #use libp2p::gossipsub::GossipsubEvent; -//! #use libp2p::{identity, gossipsub, -//! # tokio_codec::{FramedRead, LinesCodec}, -//! #}; -//! let local_key = identity::Keypair::generate_ed25519(); -//! let local_pub_key = local_key.public(); -//! -//! // Set up an encrypted TCP Transport over the Mplex and Yamux protocols -//! let transport = libp2p::build_development_transport(local_key); -//! -//! // Create a Floodsub/Gossipsub topic -//! let topic = libp2p::floodsub::TopicBuilder::new("example").build(); +//! ``` +//! use libp2p_gossipsub::GossipsubEvent; +//! use libp2p_core::{identity::Keypair,transport::{Transport, MemoryTransport}, Multiaddr}; +//! use libp2p_gossipsub::MessageAuthenticity; +//! let local_key = Keypair::generate_ed25519(); +//! let local_peer_id = libp2p_core::PeerId::from(local_key.public()); +//! +//! // Set up an encrypted TCP Transport over the Mplex +//! // This is test transport (memory). +//! let noise_keys = libp2p_noise::Keypair::::new().into_authentic(&local_key).unwrap(); +//! let transport = MemoryTransport::default() +//! .upgrade(libp2p_core::upgrade::Version::V1) +//! .authenticate(libp2p_noise::NoiseConfig::xx(noise_keys).into_authenticated()) +//! .multiplex(libp2p_mplex::MplexConfig::new()) +//! .boxed(); +//! +//! // Create a Gossipsub topic +//! let topic = libp2p_gossipsub::IdentTopic::new("example"); +//! +//! // Set the message authenticity - How we expect to publish messages +//! // Here we expect the publisher to sign the message with their key. +//! let message_authenticity = MessageAuthenticity::Signed(local_key); //! //! // Create a Swarm to manage peers and events //! let mut swarm = { //! // set default parameters for gossipsub -//! let gossipsub_config = gossipsub::GossipsubConfig::default(); +//! let gossipsub_config = libp2p_gossipsub::GossipsubConfig::default(); //! // build a gossipsub network behaviour //! let mut gossipsub = -//! gossipsub::Gossipsub::new(local_pub_key.clone().into_peer_id(), gossipsub_config); -//! gossipsub.subscribe(topic.clone()); -//! libp2p::Swarm::new( +//! libp2p_gossipsub::Gossipsub::new(message_authenticity, gossipsub_config).unwrap(); +//! // subscribe to the topic +//! gossipsub.subscribe(&topic); +//! // create the swarm +//! libp2p_swarm::Swarm::new( //! transport, //! gossipsub, -//! libp2p::core::topology::MemoryTopology::empty(local_pub_key), +//! local_peer_id, //! ) //! }; //! -//! // Listen on all interfaces and whatever port the OS assigns -//! let addr = libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); +//! // Listen on a memory transport. +//! let memory: Multiaddr = libp2p_core::multiaddr::Protocol::Memory(10).into(); +//! let addr = libp2p_swarm::Swarm::listen_on(&mut swarm, memory).unwrap(); //! println!("Listening on {:?}", addr); //! ``` @@ -157,11 +144,10 @@ extern crate derive_builder; mod rpc_proto; pub use self::behaviour::{ - GenericGossipsub, GenericGossipsubEvent, Gossipsub, GossipsubEvent, MessageAuthenticity, + Event, GenericGossipsub, Gossipsub, GossipsubEvent, MessageAuthenticity, }; pub use self::config::{ - GenericGossipsubConfig, GenericGossipsubConfigBuilder, GossipsubConfig, GossipsubConfigBuilder, - ValidationMode, + Config, ConfigBuilder, GossipsubConfig, GossipsubConfigBuilder, ValidationMode, }; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index 6db0c8eda69..b2af29f02b2 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -39,6 +39,9 @@ pub struct MessageCache { /// For every message and peer the number of times this peer asked for the message iwant_counts: HashMap>, history: Vec>, + /// The number of indices in the cache history used for gossipping. That means that a message + /// won't get gossipped anymore when shift got called `gossip` many times after inserting the + /// message in the cache. gossip: usize, } @@ -83,12 +86,12 @@ impl MessageCache { } /// Get a message with `message_id` - #[allow(dead_code)] + #[cfg(test)] pub fn get(&self, message_id: &MessageId) -> Option<&GossipsubMessageWithId> { self.msgs.get(message_id) } - ///increases the iwant count for the given message by one and returns the message together + /// Increases the iwant count for the given message by one and returns the message together /// with the iwant if the message exists. pub fn get_with_iwant_counts( &mut self, @@ -109,7 +112,7 @@ impl MessageCache { }) } - /// Gets and validates a message with `message_id`. + /// Gets a message with [`MessageId`] and tags it as validated. pub fn validate(&mut self, message_id: &MessageId) -> Option<&GossipsubMessageWithId> { self.msgs.get_mut(message_id).map(|message| { message.validated = true; @@ -117,8 +120,8 @@ impl MessageCache { }) } - /// Get a list of GossipIds for a given topic - pub fn get_gossip_ids(&self, topic: &TopicHash) -> Vec { + /// Get a list of `MessageIds` for a given topic. + pub fn get_gossip_message_ids(&self, topic: &TopicHash) -> Vec { self.history[..self.gossip] .iter() .fold(vec![], |mut current_entries, entries| { @@ -147,7 +150,7 @@ impl MessageCache { } /// Shift the history array down one and delete messages associated with the - /// last entry + /// last entry. pub fn shift(&mut self) { for entry in self.history.pop().expect("history is always > 1") { if let Some(msg) = self.msgs.remove(&entry.mid) { @@ -155,8 +158,9 @@ impl MessageCache { // If GossipsubConfig::validate_messages is true, the implementing // application has to ensure that Gossipsub::validate_message gets called for // each received message within the cache timeout time." - debug!("The message with id {} got removed from the cache without being validated.", - &entry.mid + debug!( + "The message with id {} got removed from the cache without being validated.", + &entry.mid ); } } @@ -202,7 +206,7 @@ mod tests { source, data, sequence_number, - topic: topic, + topic, signature: None, key: None, validated: false, diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score.rs similarity index 100% rename from protocols/gossipsub/src/peer_score/mod.rs rename to protocols/gossipsub/src/peer_score.rs diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index ef3409b1732..7b8bae3c243 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -43,7 +43,7 @@ use unsigned_varint::codec; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; -/// Implementation of the `ConnectionUpgrade` for the Gossipsub protocol. +/// Implementation of [`InboundUpgrade`] and [`OutboundUpgrade`] for the Gossipsub protocol. #[derive(Clone)] pub struct ProtocolConfig { /// The Gossipsub protocol id to listen on. @@ -55,7 +55,8 @@ pub struct ProtocolConfig { } impl ProtocolConfig { - /// Builds a new `ProtocolConfig`. + /// Builds a new [`ProtocolConfig`]. + /// /// Sets the maximum gossip transmission size. pub fn new( id_prefix: Cow<'static, str>, diff --git a/protocols/gossipsub/src/rpc_proto.rs b/protocols/gossipsub/src/rpc_proto.rs index 3c0f6b3df4d..027a9731ee1 100644 --- a/protocols/gossipsub/src/rpc_proto.rs +++ b/protocols/gossipsub/src/rpc_proto.rs @@ -1,3 +1,23 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); #[cfg(test)] diff --git a/protocols/gossipsub/src/subscription_filter.rs b/protocols/gossipsub/src/subscription_filter.rs index c1048d81361..0f38157623a 100644 --- a/protocols/gossipsub/src/subscription_filter.rs +++ b/protocols/gossipsub/src/subscription_filter.rs @@ -24,7 +24,7 @@ use log::info; use std::collections::{BTreeSet, HashMap, HashSet}; pub trait TopicSubscriptionFilter { - /// returns true iff the topic is of interest and we can subscribe to it + /// Returns true iff the topic is of interest and we can subscribe to it. fn can_subscribe(&mut self, topic_hash: &TopicHash) -> bool; /// Filters a list of incoming subscriptions and returns a filtered set diff --git a/protocols/gossipsub/src/time_cache.rs b/protocols/gossipsub/src/time_cache.rs index af2df1e05e5..90300e77ea3 100644 --- a/protocols/gossipsub/src/time_cache.rs +++ b/protocols/gossipsub/src/time_cache.rs @@ -18,7 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -///! This implements a time-based LRU cache for checking gossipsub message duplicates. +//! This implements a time-based LRU cache for checking gossipsub message duplicates. + use fnv::FnvHashMap; use std::collections::hash_map::{ self, diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 2d995e06019..e53b9885cf6 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -73,10 +73,15 @@ macro_rules! declare_message_id_type { declare_message_id_type!(MessageId, "MessageId"); // A type for gossipsub fast messsage ids, not to confuse with "real" message ids. +// +// A fast-message-id is an optional message_id that can be used to filter duplicates quickly. On +// high intensive networks with lots of messages, where the message_id is based on the result of +// decompressed traffic, it is beneficial to specify a `fast-message-id` that can identify and +// filter duplicates quickly without performing the overhead of decompression. declare_message_id_type!(FastMessageId, "FastMessageId"); -#[derive(Debug, Clone, PartialEq)] /// Describes the types of peers that can exist in the gossipsub context. +#[derive(Debug, Clone, PartialEq)] pub enum PeerKind { /// A gossipsub 1.1 peer. Gossipsubv1_1, diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index b6d2e416a86..1ef92ba658c 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -30,8 +30,7 @@ use std::{ use futures::StreamExt; use libp2p_core::{ - identity, multiaddr::Protocol, transport::MemoryTransport, upgrade, - Multiaddr, Transport, + identity, multiaddr::Protocol, transport::MemoryTransport, upgrade, Multiaddr, Transport, }; use libp2p_gossipsub::{ Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic as Topic, MessageAuthenticity, @@ -158,7 +157,7 @@ fn build_node() -> (Multiaddr, Swarm) { // reduce the default values of the heartbeat, so that all nodes will receive gossip in a // timely fashion. - let config = GossipsubConfigBuilder::new() + let config = GossipsubConfigBuilder::default() .heartbeat_initial_delay(Duration::from_millis(100)) .heartbeat_interval(Duration::from_millis(200)) .history_length(10)