From 6aed4d8ca2eaf2ef405e6125014f5eaa13f5e2a8 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 8 Oct 2020 13:24:18 +0200 Subject: [PATCH 1/4] use generic type for message data + cache message ids --- protocols/gossipsub/src/behaviour.rs | 117 +++++++------- protocols/gossipsub/src/behaviour/tests.rs | 167 +++++++++++--------- protocols/gossipsub/src/config.rs | 59 ++++--- protocols/gossipsub/src/handler.rs | 4 +- protocols/gossipsub/src/lib.rs | 13 +- protocols/gossipsub/src/mcache.rs | 67 ++++---- protocols/gossipsub/src/peer_score/mod.rs | 48 +++--- protocols/gossipsub/src/peer_score/tests.rs | 42 ++--- protocols/gossipsub/src/protocol.rs | 21 +-- protocols/gossipsub/src/types.rs | 68 +++++++- 10 files changed, 360 insertions(+), 246 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index b1fb8b16191..37b009633ef 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -48,7 +48,7 @@ use libp2p_swarm::{ }; use crate::backoff::BackoffStorage; -use crate::config::{GossipsubConfig, ValidationMode}; +use crate::config::{GenericGossipsubConfig, ValidationMode}; use crate::error::PublishError; use crate::gossip_promises::GossipPromises; use crate::handler::{GossipsubHandler, HandlerEvent}; @@ -58,12 +58,13 @@ use crate::protocol::SIGNING_PREFIX; use crate::time_cache::DuplicateCache; use crate::topic::{Hasher, Topic, TopicHash}; use crate::types::{ - GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, - MessageAcceptance, MessageId, PeerInfo, + GenericGossipsubMessage, GossipsubControlAction, GossipsubMessageWithId, GossipsubSubscription, + GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage, }; use crate::types::{GossipsubRpc, PeerKind}; use crate::{rpc_proto, TopicScoreParams}; use std::cmp::Ordering::Equal; +use std::fmt::Debug; mod tests; @@ -119,7 +120,7 @@ impl MessageAuthenticity { /// Event that can happen on the gossipsub behaviour. #[derive(Debug)] -pub enum GossipsubEvent { +pub enum GenericGossipsubEvent> { /// A message has been received. Message { /// The peer that forwarded us this message. @@ -128,7 +129,7 @@ pub enum GossipsubEvent { /// validating a message (if required). message_id: MessageId, /// The message itself. - message: GossipsubMessage, + message: GossipsubMessageWithId, }, /// A remote subscribed to a topic. Subscribed { @@ -146,6 +147,8 @@ pub enum GossipsubEvent { topic: TopicHash, }, } +//for backwards compatibility +pub type GossipsubEvent = GenericGossipsubEvent>; /// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`] /// for further details. @@ -203,12 +206,12 @@ impl From for PublishConfig { /// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If message signing is /// disabled, the [`ValidationMode`] in the config should be adjusted to an appropriate level to /// accept unsigned messages. -pub struct Gossipsub { +pub struct GenericGossipsub> { /// Configuration providing gossipsub performance parameters. - config: GossipsubConfig, + config: GenericGossipsubConfig, /// Events that need to be yielded to the outside when polling. - events: VecDeque, GossipsubEvent>>, + events: VecDeque, GenericGossipsubEvent>>, /// Pools non-urgent control messages between heartbeats. control_pool: HashMap>, @@ -250,7 +253,7 @@ pub struct Gossipsub { backoffs: BackoffStorage, /// Message cache for the last few heartbeats. - mcache: MessageCache, + mcache: MessageCache, /// Heartbeat interval stream. heartbeat: Interval, @@ -283,11 +286,14 @@ pub struct Gossipsub { published_message_ids: DuplicateCache, } -impl Gossipsub { +// for backwards compatibility +pub type Gossipsub = GenericGossipsub>; + +impl> + From> + AsRef<[u8]>> GenericGossipsub { /// Creates a `Gossipsub` struct given a set of parameters specified via a `GossipsubConfig`. pub fn new( privacy: MessageAuthenticity, - config: GossipsubConfig, + config: GenericGossipsubConfig, ) -> Result { // Set up the router given the configuration settings. @@ -297,7 +303,7 @@ impl Gossipsub { // Set up message publishing parameters. - Ok(Gossipsub { + Ok(GenericGossipsub { events: VecDeque::new(), control_pool: HashMap::new(), publish_config: privacy.into(), @@ -314,11 +320,7 @@ impl Gossipsub { config.heartbeat_interval(), config.backoff_slack(), ), - mcache: MessageCache::new( - config.history_gossip(), - config.history_length(), - config.message_id_fn(), - ), + mcache: MessageCache::new(config.history_gossip(), config.history_length()), heartbeat: Interval::new_at( Instant::now() + config.heartbeat_initial_delay(), config.heartbeat_interval(), @@ -462,7 +464,7 @@ impl Gossipsub { pub fn publish( &mut self, topic: Topic, - data: impl Into>, + data: impl Into, ) -> Result { self.publish_many(iter::once(topic), data) } @@ -471,21 +473,23 @@ impl Gossipsub { pub fn publish_many( &mut self, topics: impl IntoIterator>, - data: impl Into>, + data: impl Into, ) -> Result { let message = self.build_message(topics.into_iter().map(|t| t.hash()).collect(), data.into())?; - let msg_id = (self.config.message_id_fn())(&message); + let msg_id = self.config.message_id(&message); let event = Arc::new( GossipsubRpc { subscriptions: Vec::new(), - messages: vec![message.clone()], + messages: vec![RawGossipsubMessage::from(message.clone())], control_msgs: Vec::new(), } .into_protobuf(), ); + let message = GossipsubMessageWithId::new(message, msg_id.clone()); + // check that the size doesn't exceed the max transmission size if event.encoded_len() > self.config.max_transmit_size() { // NOTE: The size limit can be reached by excessive topics or an excessive message. @@ -723,11 +727,7 @@ impl Gossipsub { } let interval = Interval::new(params.decay_interval); - let peer_score = PeerScore::new_with_message_delivery_time_callback( - params, - self.config.message_id_fn(), - callback, - ); + let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback); self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default())); Ok(()) } @@ -1085,7 +1085,10 @@ impl Gossipsub { if !cached_messages.is_empty() { debug!("IWANT: Sending cached messages to peer: {:?}", peer_id); // Send the messages to the peer - let message_list = cached_messages.into_iter().map(|entry| entry.1).collect(); + let message_list = cached_messages + .into_iter() + .map(|entry| RawGossipsubMessage::from(entry.1)) + .collect(); if let Err(_) = self.send_message( peer_id.clone(), GossipsubRpc { @@ -1326,8 +1329,10 @@ impl Gossipsub { /// Handles a newly received GossipsubMessage. /// Forwards the message to all peers in the mesh. - fn handle_received_message(&mut self, mut msg: GossipsubMessage, propagation_source: &PeerId) { - let msg_id = (self.config.message_id_fn())(&msg); + fn handle_received_message(&mut self, msg: RawGossipsubMessage, propagation_source: &PeerId) { + let msg = GenericGossipsubMessage::from(msg); + let msg_id = self.config.message_id(&msg); + let mut msg = GossipsubMessageWithId::new(msg, msg_id.clone()); debug!( "Handling message: {:?} from peer: {}", msg_id, @@ -1421,9 +1426,9 @@ impl Gossipsub { if self.mesh.keys().any(|t| msg.topics.iter().any(|u| t == u)) { debug!("Sending received message to user"); self.events.push_back(NetworkBehaviourAction::GenerateEvent( - GossipsubEvent::Message { + GenericGossipsubEvent::Message { propagation_source: propagation_source.clone(), - message_id: msg_id, + message_id: msg_id.clone(), message: msg.clone(), }, )); @@ -1437,11 +1442,10 @@ impl Gossipsub { // forward the message to mesh peers, if no validation is required if !self.config.validate_messages() { - let message_id = (self.config.message_id_fn())(&msg); if let Err(_) = self.forward_msg(msg, Some(propagation_source)) { error!("Failed to forward message. Too large"); } - debug!("Completed message handling for message: {:?}", message_id); + debug!("Completed message handling for message: {:?}", msg_id); } } @@ -1530,7 +1534,7 @@ impl Gossipsub { } // generates a subscription event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( - GossipsubEvent::Subscribed { + GenericGossipsubEvent::Subscribed { peer_id: propagation_source.clone(), topic: subscription.topic_hash.clone(), }, @@ -1554,7 +1558,7 @@ impl Gossipsub { // generate an unsubscribe event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( - GossipsubEvent::Unsubscribed { + GenericGossipsubEvent::Unsubscribed { peer_id: propagation_source.clone(), topic: subscription.topic_hash.clone(), }, @@ -2119,10 +2123,10 @@ impl Gossipsub { /// Returns true if at least one peer was messaged. fn forward_msg( &mut self, - message: GossipsubMessage, + message: GossipsubMessageWithId, propagation_source: Option<&PeerId>, ) -> Result { - let msg_id = (self.config.message_id_fn())(&message); + let msg_id = message.message_id(); // message is fully validated inform peer_score if let Some((peer_score, ..)) = &mut self.peer_score { @@ -2165,7 +2169,7 @@ impl Gossipsub { let event = Arc::new( GossipsubRpc { subscriptions: Vec::new(), - messages: vec![message.clone()], + messages: vec![RawGossipsubMessage::from(message.clone())], control_msgs: Vec::new(), } .into_protobuf(), @@ -2186,8 +2190,8 @@ impl Gossipsub { pub(crate) fn build_message( &self, topics: Vec, - data: Vec, - ) -> Result { + data: T, + ) -> Result, SigningError> { match &self.publish_config { PublishConfig::Signing { ref keypair, @@ -2200,7 +2204,7 @@ impl Gossipsub { let signature = { let message = rpc_proto::Message { from: Some(author.clone().into_bytes()), - data: Some(data.clone()), + data: Some(data.clone().into()), seqno: Some(sequence_number.to_be_bytes().to_vec()), topic_ids: topics .clone() @@ -2222,7 +2226,7 @@ impl Gossipsub { Some(keypair.sign(&signature_bytes)?) }; - Ok(GossipsubMessage { + Ok(GenericGossipsubMessage { source: Some(author.clone()), data, // To be interoperable with the go-implementation this is treated as a 64-bit @@ -2235,7 +2239,7 @@ impl Gossipsub { }) } PublishConfig::Author(peer_id) => { - Ok(GossipsubMessage { + Ok(GenericGossipsubMessage { source: Some(peer_id.clone()), data, // To be interoperable with the go-implementation this is treated as a 64-bit @@ -2248,7 +2252,7 @@ impl Gossipsub { }) } PublishConfig::RandomAuthor => { - Ok(GossipsubMessage { + Ok(GenericGossipsubMessage { source: Some(PeerId::random()), data, // To be interoperable with the go-implementation this is treated as a 64-bit @@ -2261,7 +2265,7 @@ impl Gossipsub { }) } PublishConfig::Anonymous => { - Ok(GossipsubMessage { + Ok(GenericGossipsubMessage { source: None, data, // To be interoperable with the go-implementation this is treated as a 64-bit @@ -2524,9 +2528,12 @@ fn get_ip_addr(addr: &Multiaddr) -> Option { }) } -impl NetworkBehaviour for Gossipsub { +impl NetworkBehaviour for GenericGossipsub +where + T: Send + 'static + Clone + Into> + From> + AsRef<[u8]>, +{ type ProtocolsHandler = GossipsubHandler; - type OutEvent = GossipsubEvent; + type OutEvent = GenericGossipsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { GossipsubHandler::new( @@ -2789,11 +2796,13 @@ impl NetworkBehaviour for Gossipsub { // Handle any invalid messages from this peer if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - let id_fn = self.config.message_id_fn(); for (message, validation_error) in invalid_messages { let reason = RejectReason::ValidationError(validation_error); + let message = GenericGossipsubMessage::from(message); + let id = self.config.message_id(&message); + let message = GossipsubMessageWithId::new(message, id); peer_score.reject_message(&propagation_source, &message, reason); - gossip_promises.reject_message(&id_fn(&message), &reason); + gossip_promises.reject_message(&message.message_id(), &reason); } } else { // log the invalid messages @@ -2934,7 +2943,7 @@ fn validate_config( Ok(()) } -impl fmt::Debug for Gossipsub { +impl> fmt::Debug for GenericGossipsub { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Gossipsub") .field("config", &self.config) @@ -2983,8 +2992,8 @@ mod local_test { } } - fn test_message() -> GossipsubMessage { - GossipsubMessage { + fn test_message() -> RawGossipsubMessage { + RawGossipsubMessage { source: Some(PeerId::random()), data: vec![0; 100], sequence_number: None, @@ -3030,7 +3039,7 @@ mod local_test { /// Tests RPC message fragmentation fn test_message_fragmentation_deterministic() { let max_transmit_size = 500; - let config = crate::GossipsubConfigBuilder::new() + let config = crate::GenericGossipsubConfigBuilder::new() .max_transmit_size(max_transmit_size) .validation_mode(ValidationMode::Permissive) .build() @@ -3078,7 +3087,7 @@ mod local_test { fn test_message_fragmentation() { fn prop(rpc: GossipsubRpc) { let max_transmit_size = 500; - let config = crate::GossipsubConfigBuilder::new() + let config = crate::GenericGossipsubConfigBuilder::new() .max_transmit_size(max_transmit_size) .validation_mode(ValidationMode::Permissive) .build() diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 02a042b5bed..5cd344a9e0d 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -29,7 +29,10 @@ mod tests { use async_std::net::Ipv4Addr; use rand::Rng; - use crate::{GossipsubConfigBuilder, IdentTopic as Topic, TopicScoreParams}; + use crate::{ + GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, IdentTopic as Topic, + TopicScoreParams, + }; use super::super::*; use crate::error::ValidationError; @@ -223,7 +226,7 @@ mod tests { let mut messages = Vec::with_capacity(rpc.publish.len()); let rpc = rpc.clone(); for message in rpc.publish.into_iter() { - messages.push(GossipsubMessage { + messages.push(RawGossipsubMessage { source: message.from.map(|x| PeerId::from_bytes(x).unwrap()), data: message.data.unwrap_or_default(), sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application @@ -581,8 +584,9 @@ mod tests { _ => collected_publish, }); - let msg_id = - (gs.config.message_id_fn())(&publishes.first().expect("Should contain > 0 entries")); + let msg_id = gs + .config + .message_id(&publishes.first().expect("Should contain > 0 entries")); let config = GossipsubConfig::default(); assert_eq!( @@ -654,8 +658,9 @@ mod tests { _ => collected_publish, }); - let msg_id = - (gs.config.message_id_fn())(&publishes.first().expect("Should contain > 0 entries")); + let msg_id = gs + .config + .message_id(&publishes.first().expect("Should contain > 0 entries")); assert_eq!( publishes.len(), @@ -893,9 +898,7 @@ mod tests { fn test_handle_iwant_msg_cached() { let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true); - let id = gs.config.message_id_fn(); - - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(peers[11].clone()), data: vec![1, 2, 3, 4], sequence_number: Some(1u64), @@ -904,8 +907,9 @@ mod tests { key: None, validated: true, }; - let msg_id = id(&message); - gs.mcache.put(message.clone()); + let msg_id = gs.config.message_id(&message); + gs.mcache + .put(GossipsubMessage::new(message, msg_id.clone())); gs.handle_iwant(&peers[7], vec![msg_id.clone()]); @@ -925,7 +929,9 @@ mod tests { }); assert!( - sent_messages.iter().any(|msg| id(msg) == msg_id), + sent_messages + .iter() + .any(|msg| gs.config.message_id(msg) == msg_id), "Expected the cached message to be sent to an IWANT peer" ); } @@ -935,10 +941,9 @@ mod tests { fn test_handle_iwant_msg_cached_shifted() { let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true); - let id = gs.config.message_id_fn(); // perform 10 memshifts and check that it leaves the cache for shift in 1..10 { - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(peers[11].clone()), data: vec![1, 2, 3, 4], sequence_number: Some(shift), @@ -947,8 +952,9 @@ mod tests { key: None, validated: true, }; - let msg_id = id(&message); - gs.mcache.put(message.clone()); + let msg_id = gs.config.message_id(&message); + gs.mcache + .put(GossipsubMessage::new(message, msg_id.clone())); for _ in 0..shift { gs.mcache.shift(); } @@ -959,7 +965,10 @@ mod tests { let message_exists = gs.events.iter().any(|e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { let event = proto_to_message(event); - event.messages.iter().any(|msg| id(msg) == msg_id) + event + .messages + .iter() + .any(|msg| gs.config.message_id(msg) == msg_id) } _ => false, }); @@ -1376,7 +1385,7 @@ mod tests { let local_id = PeerId::random(); - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(peers[1].clone()), data: vec![12], sequence_number: Some(0), @@ -1534,7 +1543,7 @@ mod tests { let local_id = PeerId::random(); - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(peers[1].clone()), data: vec![], sequence_number: Some(0), @@ -1898,8 +1907,9 @@ mod tests { _ => collected_publish, }); - let msg_id = - (gs.config.message_id_fn())(&publishes.first().expect("Should contain > 0 entries")); + let msg_id = gs + .config + .message_id(&publishes.first().expect("Should contain > 0 entries")); let config = GossipsubConfig::default(); assert_eq!( @@ -1927,7 +1937,7 @@ mod tests { ); //receive message - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -1942,7 +1952,7 @@ mod tests { gs.emit_gossip(); //check that exactly config.gossip_lazy() many gossip messages were sent. - let msg_id = (gs.config.message_id_fn())(&message); + let msg_id = gs.config.message_id(&message); assert_eq!( count_control_msgs(&gs, |_, action| match action { GossipsubControlAction::IHave { @@ -1965,7 +1975,7 @@ mod tests { let (mut gs, _, topic_hashes) = build_and_inject_nodes(m, vec!["topic".into()], true); //receive message - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -1980,7 +1990,7 @@ mod tests { gs.emit_gossip(); //check that exactly config.gossip_lazy() many gossip messages were sent. - let msg_id = (gs.config.message_id_fn())(&message); + let msg_id = gs.config.message_id(&message); assert_eq!( count_control_msgs(&gs, |_, action| match action { GossipsubControlAction::IHave { @@ -2322,7 +2332,7 @@ mod tests { gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); // Receive message - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -2336,7 +2346,7 @@ mod tests { // Emit gossip gs.emit_gossip(); - let msg_id = (gs.config.message_id_fn())(&message); + let msg_id = gs.config.message_id(&message); // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( count_control_msgs(&gs, |peer, action| match action { @@ -2394,7 +2404,7 @@ mod tests { gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); // Rreceive message - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -2405,8 +2415,7 @@ mod tests { }; gs.handle_received_message(message.clone(), &PeerId::random()); - let id = gs.config.message_id_fn(); - let msg_id = id(&message); + let msg_id = gs.config.message_id(&message); gs.handle_iwant(&p1, vec![msg_id.clone()]); gs.handle_iwant(&p2, vec![msg_id.clone()]); @@ -2429,11 +2438,11 @@ mod tests { //the message got sent to p2 assert!(sent_messages .iter() - .any(|(peer_id, msg)| peer_id == &p2 && &id(msg) == &msg_id)); + .any(|(peer_id, msg)| peer_id == &p2 && &gs.config.message_id(msg) == &msg_id)); //the message got not sent to p1 assert!(sent_messages .iter() - .all(|(peer_id, msg)| !(peer_id == &p1 && &id(msg) == &msg_id))); + .all(|(peer_id, msg)| !(peer_id == &p1 && &gs.config.message_id(msg) == &msg_id))); } #[test] @@ -2473,7 +2482,7 @@ mod tests { gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); //message that other peers have - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -2483,8 +2492,7 @@ mod tests { validated: true, }; - let id = gs.config.message_id_fn(); - let msg_id = id(&message); + let msg_id = gs.config.message_id(&message); gs.handle_ihave(&p1, vec![(topics[0].clone(), vec![msg_id.clone()])]); gs.handle_ihave(&p2, vec![(topics[0].clone(), vec![msg_id.clone()])]); @@ -2663,9 +2671,7 @@ mod tests { //reduce score of p2 below publish_threshold but not below graylist_threshold gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); - let id = gs.config.message_id_fn(); - - let message1 = GossipsubMessage { + let message1 = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![1, 2, 3, 4], sequence_number: Some(1u64), @@ -2675,7 +2681,7 @@ mod tests { validated: true, }; - let message2 = GossipsubMessage { + let message2 = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5], sequence_number: Some(2u64), @@ -2685,7 +2691,7 @@ mod tests { validated: true, }; - let message3 = GossipsubMessage { + let message3 = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5, 6], sequence_number: Some(3u64), @@ -2695,7 +2701,7 @@ mod tests { validated: true, }; - let message4 = GossipsubMessage { + let message4 = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5, 6, 7], sequence_number: Some(4u64), @@ -2712,7 +2718,7 @@ mod tests { let control_action = GossipsubControlAction::IHave { topic_hash: topics[0].clone(), - message_ids: vec![id(&message2)], + message_ids: vec![config.message_id(&message2)], }; //clear events @@ -2744,7 +2750,7 @@ mod tests { let control_action = GossipsubControlAction::IHave { topic_hash: topics[0].clone(), - message_ids: vec![id(&message4)], + message_ids: vec![config.message_id(&message4)], }; //receive from p2 @@ -2961,10 +2967,10 @@ mod tests { ); } - fn random_message(seq: &mut u64, topics: &Vec) -> GossipsubMessage { + fn random_message(seq: &mut u64, topics: &Vec) -> RawGossipsubMessage { let mut rng = rand::thread_rng(); *seq += 1; - GossipsubMessage { + RawGossipsubMessage { source: Some(PeerId::random()), data: (0..rng.gen_range(10, 30)) .into_iter() @@ -3008,7 +3014,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3109,7 +3115,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3211,7 +3217,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3305,11 +3311,10 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; - let id = config.message_id_fn(); //peer 0 delivers valid message let m1 = random_message(&mut seq, &topics); deliver_message(&mut gs, 0, m1.clone()); @@ -3317,8 +3322,12 @@ mod tests { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); //message m1 gets validated - gs.report_message_validation_result(&id(&m1), &peers[0], MessageAcceptance::Accept) - .unwrap(); + gs.report_message_validation_result( + &config.message_id(&m1), + &peers[0], + MessageAcceptance::Accept, + ) + .unwrap(); assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); } @@ -3418,7 +3427,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3469,7 +3478,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3481,7 +3490,7 @@ mod tests { //message m1 gets ignored gs.report_message_validation_result( - &(config.message_id_fn())(&m1), + &config.message_id(&m1), &peers[0], MessageAcceptance::Ignore, ) @@ -3526,7 +3535,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3538,7 +3547,7 @@ mod tests { //message m1 gets rejected gs.report_message_validation_result( - &(config.message_id_fn())(&m1), + &config.message_id(&m1), &peers[0], MessageAcceptance::Reject, ) @@ -3586,7 +3595,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3602,7 +3611,7 @@ mod tests { //message m1 gets rejected gs.report_message_validation_result( - &(config.message_id_fn())(&m1), + &config.message_id(&m1), &peers[0], MessageAcceptance::Reject, ) @@ -3654,7 +3663,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3670,19 +3679,19 @@ mod tests { //messages gets rejected gs.report_message_validation_result( - &(config.message_id_fn())(&m1), + &config.message_id(&m1), &peers[0], MessageAcceptance::Reject, ) .unwrap(); gs.report_message_validation_result( - &(config.message_id_fn())(&m2), + &config.message_id(&m2), &peers[0], MessageAcceptance::Reject, ) .unwrap(); gs.report_message_validation_result( - &(config.message_id_fn())(&m3), + &config.message_id(&m3), &peers[0], MessageAcceptance::Reject, ) @@ -3731,7 +3740,7 @@ mod tests { ); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3743,7 +3752,7 @@ mod tests { //message m1 gets rejected gs.report_message_validation_result( - &(config.message_id_fn())(&m1), + &config.message_id(&m1), &peers[0], MessageAcceptance::Reject, ) @@ -4099,7 +4108,7 @@ mod tests { //receive a message let mut seq = 0; let m1 = random_message(&mut seq, &topics); - let id = (config.message_id_fn())(&m1); + let id = config.message_id(&m1); gs.handle_received_message(m1.clone(), &PeerId::random()); @@ -4147,15 +4156,21 @@ mod tests { //peer has 20 messages let mut seq = 0; - let id_fn = config.message_id_fn(); let messages: Vec<_> = (0..20).map(|_| random_message(&mut seq, &topics)).collect(); //peer sends us one ihave for each message in order for message in &messages { - gs.handle_ihave(&peer, vec![(topics[0].clone(), vec![id_fn(message)])]); + gs.handle_ihave( + &peer, + vec![(topics[0].clone(), vec![config.message_id(message)])], + ); } - let first_ten: HashSet<_> = messages.iter().take(10).map(|m| id_fn(m)).collect(); + let first_ten: HashSet<_> = messages + .iter() + .take(10) + .map(|m| config.message_id(m)) + .collect(); //we send iwant only for the first 10 messages assert_eq!( @@ -4182,7 +4197,10 @@ mod tests { //after a heartbeat everything is forgotten gs.heartbeat(); for message in messages[10..].iter() { - gs.handle_ihave(&peer, vec![(topics[0].clone(), vec![id_fn(message)])]); + gs.handle_ihave( + &peer, + vec![(topics[0].clone(), vec![config.message_id(message)])], + ); } //we sent iwant for all 20 messages @@ -4225,9 +4243,8 @@ mod tests { //peer has 20 messages let mut seq = 0; - let id_fn = config.message_id_fn(); let message_ids: Vec<_> = (0..20) - .map(|_| id_fn(&random_message(&mut seq, &topics))) + .map(|_| config.message_id(&random_message(&mut seq, &topics))) .collect(); //peer sends us three ihaves @@ -4415,7 +4432,6 @@ mod tests { let mut first_messages = Vec::new(); let mut second_messages = Vec::new(); let mut seq = 0; - let id_fn = config.message_id_fn(); for peer in &other_peers { for _ in 0..2 { let msg1 = random_message(&mut seq, &topics); @@ -4424,7 +4440,10 @@ mod tests { second_messages.push(msg2.clone()); gs.handle_ihave( peer, - vec![(topics[0].clone(), vec![id_fn(&msg1), id_fn(&msg2)])], + vec![( + topics[0].clone(), + vec![config.message_id(&msg1), config.message_id(&msg2)], + )], ); } } diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index a698c47a61e..b1eac499bf9 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -23,7 +23,7 @@ use std::time::Duration; use libp2p_core::PeerId; -use crate::types::{GossipsubMessage, MessageId}; +use crate::types::{GenericGossipsubMessage, MessageId}; /// The types of message validation that can be employed by gossipsub. #[derive(Debug, Clone)] @@ -49,7 +49,7 @@ pub enum ValidationMode { /// Configuration parameters that define the performance of the gossipsub network. #[derive(Clone)] -pub struct GossipsubConfig { +pub struct GenericGossipsubConfig { /// The protocol id prefix to negotiate this protocol. The protocol id is of the form /// `//`. As gossipsub supports version 1.0 and 1.1, there are two /// protocol id's supported. @@ -135,7 +135,7 @@ pub struct GossipsubConfig { /// /// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as /// the message id. - message_id_fn: fn(&GossipsubMessage) -> MessageId, + message_id_fn: fn(&GenericGossipsubMessage) -> MessageId, /// By default, gossipsub will reject messages that are sent to us that has the same message /// source as we have specified locally. Enabling this, allows these messages and prevents @@ -222,7 +222,10 @@ pub struct GossipsubConfig { published_message_ids_cache_time: Duration, } -impl GossipsubConfig { +// for backwards compatibility +pub type GossipsubConfig = GenericGossipsubConfig>; + +impl GenericGossipsubConfig { //all the getters /// The protocol id prefix to negotiate this protocol. The protocol id is of the form @@ -344,8 +347,8 @@ impl GossipsubConfig { /// /// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as /// the message id. - pub fn message_id_fn(&self) -> fn(&GossipsubMessage) -> MessageId { - self.message_id_fn + pub fn message_id(&self, message: &GenericGossipsubMessage) -> MessageId { + (self.message_id_fn)(message) } /// By default, gossipsub will reject messages that are sent to us that has the same message @@ -465,39 +468,42 @@ impl GossipsubConfig { } } -impl Default for GossipsubConfig { - fn default() -> GossipsubConfig { +impl Default for GenericGossipsubConfig { + fn default() -> Self { //use GossipsubConfigBuilder to also validate defaults - GossipsubConfigBuilder::new() + GenericGossipsubConfigBuilder::new() .build() .expect("Default config parameters should be valid parameters") } } /// The builder struct for constructing a gossipsub configuration. -pub struct GossipsubConfigBuilder { - config: GossipsubConfig, +pub struct GenericGossipsubConfigBuilder { + config: GenericGossipsubConfig, } -impl Default for GossipsubConfigBuilder { - fn default() -> GossipsubConfigBuilder { - GossipsubConfigBuilder { - config: GossipsubConfig::default(), +//for backwards compatibility +pub type GossipsubConfigBuilder = GenericGossipsubConfigBuilder>; + +impl Default for GenericGossipsubConfigBuilder { + fn default() -> Self { + GenericGossipsubConfigBuilder { + config: GenericGossipsubConfig::default(), } } } -impl From for GossipsubConfigBuilder { - fn from(config: GossipsubConfig) -> Self { - GossipsubConfigBuilder { config } +impl From> for GenericGossipsubConfigBuilder { + fn from(config: GenericGossipsubConfig) -> Self { + GenericGossipsubConfigBuilder { config } } } -impl GossipsubConfigBuilder { +impl GenericGossipsubConfigBuilder { // set default values - pub fn new() -> GossipsubConfigBuilder { - GossipsubConfigBuilder { - config: GossipsubConfig { + pub fn new() -> Self { + GenericGossipsubConfigBuilder { + config: GenericGossipsubConfig { protocol_id_prefix: Cow::Borrowed("meshsub"), history_length: 5, history_gossip: 3, @@ -673,7 +679,10 @@ impl GossipsubConfigBuilder { /// /// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as /// the message id. - pub fn message_id_fn(&mut self, id_fn: fn(&GossipsubMessage) -> MessageId) -> &mut Self { + pub fn message_id_fn( + &mut self, + id_fn: fn(&GenericGossipsubMessage) -> MessageId, + ) -> &mut Self { self.config.message_id_fn = id_fn; self } @@ -801,7 +810,7 @@ impl GossipsubConfigBuilder { } /// Constructs a `GossipsubConfig` from the given configuration and validates the settings. - pub fn build(&self) -> Result { + pub fn build(&self) -> Result, &str> { // check all constraints on config if self.config.max_transmit_size < 100 { @@ -832,7 +841,7 @@ impl GossipsubConfigBuilder { } } -impl std::fmt::Debug for GossipsubConfig { +impl std::fmt::Debug for GenericGossipsubConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut builder = f.debug_struct("GossipsubConfig"); let _ = builder.field("protocol_id_prefix", &self.protocol_id_prefix); diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 5a5c5c6e9f8..cc285f0f942 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -21,7 +21,7 @@ use crate::config::ValidationMode; use crate::error::{GossipsubHandlerError, ValidationError}; use crate::protocol::{GossipsubCodec, ProtocolConfig}; -use crate::types::{GossipsubMessage, GossipsubRpc, PeerKind}; +use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage}; use futures::prelude::*; use futures::StreamExt; use futures_codec::Framed; @@ -50,7 +50,7 @@ pub enum HandlerEvent { rpc: GossipsubRpc, /// Any invalid messages that were received in the RPC, along with the associated /// validation error. - invalid_messages: Vec<(GossipsubMessage, ValidationError)>, + invalid_messages: Vec<(RawGossipsubMessage, ValidationError)>, }, /// An inbound or outbound substream has been established with the peer and this informs over /// which protocol. This message only occurs once per connection. diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 78166cbd717..cdd287ef371 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -153,13 +153,20 @@ mod rpc_proto { include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); } -pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity}; -pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode}; +pub use self::behaviour::{ + GenericGossipsub, GenericGossipsubEvent, Gossipsub, GossipsubEvent, MessageAuthenticity, +}; +pub use self::config::{ + GenericGossipsubConfig, GenericGossipsubConfigBuilder, GossipsubConfig, GossipsubConfigBuilder, + ValidationMode, +}; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, }; pub use self::topic::{Hasher, Topic, TopicHash}; -pub use self::types::{GossipsubMessage, GossipsubRpc, MessageAcceptance, MessageId}; +pub use self::types::{ + GenericGossipsubMessage, GossipsubMessage, GossipsubRpc, MessageAcceptance, MessageId, +}; pub type IdentTopic = Topic; pub type Sha256Topic = Topic; diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index fa1cc0b8f12..286b7ab340f 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -19,9 +19,10 @@ // DEALINGS IN THE SOFTWARE. use crate::topic::TopicHash; -use crate::types::{GossipsubMessage, MessageId}; +use crate::types::{GossipsubMessageWithId, MessageId}; use libp2p_core::PeerId; use log::debug; +use std::fmt::Debug; use std::{collections::HashMap, fmt}; /// CacheEntry stored in the history. @@ -33,16 +34,15 @@ pub struct CacheEntry { /// MessageCache struct holding history of messages. #[derive(Clone)] -pub struct MessageCache { - msgs: HashMap, +pub struct MessageCache { + msgs: HashMap>, /// For every message and peer the number of times this peer asked for the message iwant_counts: HashMap>, history: Vec>, gossip: usize, - msg_id: fn(&GossipsubMessage) -> MessageId, } -impl fmt::Debug for MessageCache { +impl> fmt::Debug for MessageCache { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("MessageCache") .field("msgs", &self.msgs) @@ -53,33 +53,28 @@ impl fmt::Debug for MessageCache { } /// Implementation of the MessageCache. -impl MessageCache { - pub fn new( - gossip: usize, - history_capacity: usize, - msg_id: fn(&GossipsubMessage) -> MessageId, - ) -> MessageCache { +impl MessageCache { + pub fn new(gossip: usize, history_capacity: usize) -> Self { MessageCache { gossip, msgs: HashMap::default(), iwant_counts: HashMap::default(), history: vec![Vec::new(); history_capacity], - msg_id, } } /// Put a message into the memory cache. /// /// Returns the message if it already exists. - pub fn put(&mut self, msg: GossipsubMessage) -> Option { - let message_id = (self.msg_id)(&msg); - debug!("Put message {:?} in mcache", &message_id); + pub fn put(&mut self, msg: GossipsubMessageWithId) -> Option> { + let message_id = msg.message_id(); + debug!("Put message {:?} in mcache", message_id); let cache_entry = CacheEntry { mid: message_id.clone(), topics: msg.topics.clone(), }; - let seen_message = self.msgs.insert(message_id, msg); + let seen_message = self.msgs.insert(message_id.clone(), msg); if seen_message.is_none() { // Don't add duplicate entries to the cache. self.history[0].push(cache_entry); @@ -88,7 +83,7 @@ impl MessageCache { } /// Get a message with `message_id` - pub fn get(&self, message_id: &MessageId) -> Option<&GossipsubMessage> { + pub fn get(&self, message_id: &MessageId) -> Option<&GossipsubMessageWithId> { self.msgs.get(message_id) } @@ -98,7 +93,7 @@ impl MessageCache { &mut self, message_id: &MessageId, peer: &PeerId, - ) -> Option<(&GossipsubMessage, u32)> { + ) -> Option<(&GossipsubMessageWithId, u32)> { let iwant_counts = &mut self.iwant_counts; self.msgs.get(message_id).map(|message| { (message, { @@ -114,7 +109,7 @@ impl MessageCache { } /// Gets and validates a message with `message_id`. - pub fn validate(&mut self, message_id: &MessageId) -> Option<&GossipsubMessage> { + pub fn validate(&mut self, message_id: &MessageId) -> Option<&GossipsubMessageWithId> { self.msgs.get_mut(message_id).map(|message| { message.validated = true; &*message @@ -174,7 +169,7 @@ impl MessageCache { } /// Removes a message from the cache and returns it if existent - pub fn remove(&mut self, message_id: &MessageId) -> Option { + pub fn remove(&mut self, message_id: &MessageId) -> Option> { //We only remove the message from msgs and iwant_count and keep the message_id in the // history vector. Zhe id in the history vector will simply be ignored on popping. @@ -186,36 +181,38 @@ impl MessageCache { #[cfg(test)] mod tests { use super::*; - use crate::{IdentTopic as Topic, TopicHash}; + use crate::types::RawGossipsubMessage; + use crate::{GossipsubMessage, IdentTopic as Topic, TopicHash}; use libp2p_core::PeerId; fn gen_testm(x: u64, topics: Vec) -> GossipsubMessage { + let default_id = |message: &RawGossipsubMessage| { + // default message id is: source + sequence number + let mut source_string = message.source.as_ref().unwrap().to_base58(); + source_string.push_str(&message.sequence_number.unwrap().to_string()); + MessageId::from(source_string) + }; let u8x: u8 = x as u8; let source = Some(PeerId::random()); let data: Vec = vec![u8x]; let sequence_number = Some(x); - let m = GossipsubMessage { + let m = RawGossipsubMessage { source, data, sequence_number, topics, signature: None, key: None, - validated: true, + validated: false, }; - m - } - fn new_cache(gossip_size: usize, history: usize) -> MessageCache { - let default_id = |message: &GossipsubMessage| { - // default message id is: source + sequence number - let mut source_string = message.source.as_ref().unwrap().to_base58(); - source_string.push_str(&message.sequence_number.unwrap().to_string()); - MessageId::from(source_string) - }; + let id = default_id(&m); + GossipsubMessage::new(m, id) + } - MessageCache::new(gossip_size, history, default_id) + fn new_cache(gossip_size: usize, history: usize) -> MessageCache> { + MessageCache::new(gossip_size, history) } #[test] @@ -241,7 +238,7 @@ mod tests { assert!(mc.history[0].len() == 1); - let fetched = mc.get(&(mc.msg_id)(&m)); + let fetched = mc.get(m.message_id()); assert_eq!(fetched.is_none(), false); assert_eq!(fetched.is_some(), true); @@ -291,7 +288,7 @@ mod tests { let m = gen_testm(1, vec![]); mc.put(m.clone()); - let fetched = mc.get(&(mc.msg_id)(&m)); + let fetched = mc.get(m.message_id()); // Make sure it is the same fetched message match fetched { diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index feffba13d53..46ed0fe7473 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -22,7 +22,7 @@ //! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour. use crate::time_cache::TimeCache; -use crate::{GossipsubMessage, MessageId, TopicHash}; +use crate::{MessageId, TopicHash}; use libp2p_core::PeerId; use log::{debug, trace, warn}; use std::collections::{hash_map, HashMap, HashSet}; @@ -31,6 +31,7 @@ use std::time::{Duration, Instant}; mod params; use crate::error::ValidationError; +use crate::types::GossipsubMessageWithId; pub use params::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, @@ -50,8 +51,6 @@ pub(crate) struct PeerScore { peer_ips: HashMap>, /// Message delivery tracking. This is a time-cache of `DeliveryRecord`s. deliveries: TimeCache, - /// The message id function. - msg_id: fn(&GossipsubMessage) -> MessageId, /// callback for monitoring message delivery times message_delivery_time_callback: Option, } @@ -199,13 +198,12 @@ impl Default for DeliveryRecord { impl PeerScore { /// Creates a new `PeerScore` using a given set of peer scoring parameters. - pub fn new(params: PeerScoreParams, msg_id: fn(&GossipsubMessage) -> MessageId) -> Self { - Self::new_with_message_delivery_time_callback(params, msg_id, None) + pub fn new(params: PeerScoreParams) -> Self { + Self::new_with_message_delivery_time_callback(params, None) } pub fn new_with_message_delivery_time_callback( params: PeerScoreParams, - msg_id: fn(&GossipsubMessage) -> MessageId, callback: Option, ) -> Self { PeerScore { @@ -213,7 +211,6 @@ impl PeerScore { peer_stats: HashMap::new(), peer_ips: HashMap::new(), deliveries: TimeCache::new(Duration::from_secs(TIME_CACHE_DURATION)), - msg_id, message_delivery_time_callback: callback, } } @@ -557,10 +554,10 @@ impl PeerScore { } } - pub fn validate_message(&mut self, _from: &PeerId, _msg: &GossipsubMessage) { + pub fn validate_message(&mut self, _from: &PeerId, _msg: &GossipsubMessageWithId) { // adds an empty record with the message id self.deliveries - .entry((self.msg_id)(_msg)) + .entry(_msg.message_id().clone()) .or_insert_with(|| DeliveryRecord::default()); if let Some(callback) = self.message_delivery_time_callback { @@ -578,12 +575,12 @@ impl PeerScore { } } - pub fn deliver_message(&mut self, from: &PeerId, msg: &GossipsubMessage) { + pub fn deliver_message(&mut self, from: &PeerId, msg: &GossipsubMessageWithId) { self.mark_first_message_delivery(from, msg); let record = self .deliveries - .entry((self.msg_id)(msg)) + .entry(msg.message_id().clone()) .or_insert_with(|| DeliveryRecord::default()); // this should be the first delivery trace @@ -603,7 +600,12 @@ impl PeerScore { } } - pub fn reject_message(&mut self, from: &PeerId, msg: &GossipsubMessage, reason: RejectReason) { + pub fn reject_message( + &mut self, + from: &PeerId, + msg: &GossipsubMessageWithId, + reason: RejectReason, + ) { match reason { // these messages are not tracked, but the peer is penalized as they are invalid RejectReason::ValidationError(_) | RejectReason::SelfOrigin => { @@ -624,7 +626,7 @@ impl PeerScore { let peers: Vec<_> = { let mut record = self .deliveries - .entry((self.msg_id)(msg)) + .entry(msg.message_id().clone()) .or_insert_with(|| DeliveryRecord::default()); // this should be the first delivery trace @@ -653,10 +655,10 @@ impl PeerScore { } } - pub fn duplicated_message(&mut self, from: &PeerId, msg: &GossipsubMessage) { + pub fn duplicated_message(&mut self, from: &PeerId, msg: &GossipsubMessageWithId) { let record = self .deliveries - .entry((self.msg_id)(msg)) + .entry(msg.message_id().clone()) .or_insert_with(|| DeliveryRecord::default()); if record.peers.get(from).is_some() { @@ -752,7 +754,11 @@ impl PeerScore { /// Increments the "invalid message deliveries" counter for all scored topics the message /// is published in. - fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, msg: &GossipsubMessage) { + fn mark_invalid_message_delivery( + &mut self, + peer_id: &PeerId, + msg: &GossipsubMessageWithId, + ) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { for topic_hash in msg.topics.iter() { if let Some(topic_stats) = @@ -772,7 +778,11 @@ impl PeerScore { /// Increments the "first message deliveries" counter for all scored topics the message is /// published in, as well as the "mesh message deliveries" counter, if the peer is in the /// mesh for the topic. - fn mark_first_message_delivery(&mut self, peer_id: &PeerId, msg: &GossipsubMessage) { + fn mark_first_message_delivery( + &mut self, + peer_id: &PeerId, + msg: &GossipsubMessageWithId, + ) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { for topic_hash in msg.topics.iter() { if let Some(topic_stats) = @@ -813,10 +823,10 @@ impl PeerScore { /// Increments the "mesh message deliveries" counter for messages we've seen before, as long the /// message was received within the P3 window. - fn mark_duplicate_message_delivery( + fn mark_duplicate_message_delivery( &mut self, peer_id: &PeerId, - msg: &GossipsubMessage, + msg: &GossipsubMessageWithId, validated_time: Option, ) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { diff --git a/protocols/gossipsub/src/peer_score/tests.rs b/protocols/gossipsub/src/peer_score/tests.rs index a2ff07c7bdc..73d5ab67c14 100644 --- a/protocols/gossipsub/src/peer_score/tests.rs +++ b/protocols/gossipsub/src/peer_score/tests.rs @@ -21,7 +21,8 @@ /// A collection of unit tests mostly ported from the go implementation. use super::*; -use crate::IdentTopic as Topic; +use crate::types::RawGossipsubMessage; +use crate::{GossipsubMessage, IdentTopic as Topic}; // estimates a value within variance fn within_variance(value: f64, expected: f64, variance: f64) -> bool { @@ -33,7 +34,7 @@ fn within_variance(value: f64, expected: f64, variance: f64) -> bool { // generates a random gossipsub message with sequence number i fn make_test_message(seq: u64) -> GossipsubMessage { - GossipsubMessage { + let m = RawGossipsubMessage { source: Some(PeerId::random()), data: vec![12, 34, 56], sequence_number: Some(seq), @@ -41,10 +42,13 @@ fn make_test_message(seq: u64) -> GossipsubMessage { signature: None, key: None, validated: true, - } + }; + + let id = default_message_id()(&m); + GossipsubMessage::new(m, id) } -fn default_message_id() -> fn(&GossipsubMessage) -> MessageId { +fn default_message_id() -> fn(&RawGossipsubMessage) -> MessageId { |message| { // default message id is: source + sequence number // NOTE: If either the peer_id or source is not provided, we set to 0; @@ -78,7 +82,7 @@ fn test_score_time_in_mesh() { let peer_id = PeerId::random(); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); // Peer score should start at 0 peer_score.add_peer(peer_id.clone()); @@ -124,7 +128,7 @@ fn test_score_time_in_mesh_cap() { let peer_id = PeerId::random(); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); // Peer score should start at 0 peer_score.add_peer(peer_id.clone()); @@ -173,7 +177,7 @@ fn test_score_first_message_deliveries() { let peer_id = PeerId::random(); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); // Peer score should start at 0 peer_score.add_peer(peer_id.clone()); peer_score.graft(&peer_id, topic); @@ -217,7 +221,7 @@ fn test_score_first_message_deliveries_cap() { let peer_id = PeerId::random(); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); // Peer score should start at 0 peer_score.add_peer(peer_id.clone()); peer_score.graft(&peer_id, topic); @@ -259,7 +263,7 @@ fn test_score_first_message_deliveries_decay() { params.topics.insert(topic_hash, topic_params.clone()); let peer_id = PeerId::random(); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); peer_score.add_peer(peer_id.clone()); peer_score.graft(&peer_id, topic); @@ -319,7 +323,7 @@ fn test_score_mesh_message_deliveries() { topic_params.mesh_failure_penalty_weight = 0.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); // peer A always delivers the message first. // peer B delivers next (within the delivery window). @@ -422,7 +426,7 @@ fn test_score_mesh_message_deliveries_decay() { topic_params.mesh_failure_penalty_weight = 0.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); peer_score.add_peer(peer_id_a.clone()); @@ -489,7 +493,7 @@ fn test_score_mesh_failure_penalty() { topic_params.mesh_failure_penalty_decay = 1.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); let peer_id_b = PeerId::random(); @@ -566,7 +570,7 @@ fn test_score_invalid_message_deliveries() { topic_params.invalid_message_deliveries_decay = 1.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); peer_score.add_peer(peer_id_a.clone()); @@ -612,7 +616,7 @@ fn test_score_invalid_message_deliveris_decay() { topic_params.invalid_message_deliveries_decay = 0.9; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); peer_score.add_peer(peer_id_a.clone()); @@ -666,7 +670,7 @@ fn test_score_reject_message_deliveries() { topic_params.invalid_message_deliveries_decay = 1.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); let peer_id_b = PeerId::random(); @@ -779,7 +783,7 @@ fn test_application_score() { topic_params.invalid_message_deliveries_decay = 1.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); peer_score.add_peer(peer_id_a.clone()); @@ -817,7 +821,7 @@ fn test_score_ip_colocation() { topic_params.invalid_message_deliveries_weight = 0.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); let peer_id_b = PeerId::random(); @@ -882,7 +886,7 @@ fn test_score_behaviour_penality() { topic_params.invalid_message_deliveries_weight = 0.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); @@ -931,7 +935,7 @@ fn test_score_retention() { topic_params.time_in_mesh_weight = 0.0; params.topics.insert(topic_hash, topic_params.clone()); - let mut peer_score = PeerScore::new(params, default_message_id()); + let mut peer_score = PeerScore::new(params); let peer_id_a = PeerId::random(); peer_score.add_peer(peer_id_a.clone()); diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 21103643574..fba52354ff7 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -24,8 +24,8 @@ use crate::handler::HandlerEvent; use crate::rpc_proto; use crate::topic::TopicHash; use crate::types::{ - GossipsubControlAction, GossipsubMessage, GossipsubRpc, GossipsubSubscription, - GossipsubSubscriptionAction, MessageId, PeerInfo, PeerKind, + GossipsubControlAction, GossipsubRpc, GossipsubSubscription, GossipsubSubscriptionAction, + MessageId, PeerInfo, PeerKind, RawGossipsubMessage, }; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; @@ -330,7 +330,7 @@ impl Decoder for GossipsubCodec { // If the initial validation logic failed, add the message to invalid messages and // continue processing the others. if let Some(validation_error) = invalid_kind.take() { - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application @@ -354,7 +354,7 @@ impl Decoder for GossipsubCodec { // Build the invalid message (ignoring further validation of sequence number // and source) - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application @@ -383,7 +383,7 @@ impl Decoder for GossipsubCodec { seq_no, seq_no.len() ); - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application @@ -406,7 +406,7 @@ impl Decoder for GossipsubCodec { } else { // sequence number was not present debug!("Sequence number not present but expected"); - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application @@ -436,7 +436,7 @@ impl Decoder for GossipsubCodec { Err(_) => { // invalid peer id, add to invalid messages debug!("Message source has an invalid PeerId"); - let message = GossipsubMessage { + let message = RawGossipsubMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number, @@ -464,7 +464,7 @@ impl Decoder for GossipsubCodec { }; // This message has passed all validation, add it to the validated messages. - messages.push(GossipsubMessage { + messages.push(RawGossipsubMessage { source, data: message.data.unwrap_or_default(), sequence_number, @@ -573,14 +573,15 @@ impl Decoder for GossipsubCodec { #[cfg(test)] mod tests { use super::*; + use crate::config::GossipsubConfig; + use crate::Gossipsub; use crate::IdentTopic as Topic; - use crate::{Gossipsub, GossipsubConfig}; use libp2p_core::identity::Keypair; use quickcheck::*; use rand::Rng; #[derive(Clone, Debug)] - struct Message(GossipsubMessage); + struct Message(RawGossipsubMessage); impl Arbitrary for Message { fn arbitrary(g: &mut G) -> Self { diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 57b051e905a..0c9e5cc2064 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -23,6 +23,7 @@ use crate::rpc_proto; use crate::TopicHash; use libp2p_core::PeerId; use std::fmt; +use std::fmt::Debug; #[derive(Debug)] /// Validation kinds from the application for received messages. @@ -79,12 +80,12 @@ pub enum PeerKind { /// A message received by the gossipsub system. #[derive(Clone, PartialEq, Eq, Hash)] -pub struct GossipsubMessage { +pub struct GenericGossipsubMessage { /// Id of the peer that published this message. pub source: Option, /// Content of the message. Its meaning is out of scope of this library. - pub data: Vec, + pub data: T, /// A random sequence number. pub sequence_number: Option, @@ -104,12 +105,69 @@ pub struct GossipsubMessage { pub validated: bool, } -impl fmt::Debug for GossipsubMessage { +impl GenericGossipsubMessage { + pub fn from>(m: GenericGossipsubMessage) -> Self { + Self { + source: m.source, + data: m.data.into(), + sequence_number: m.sequence_number, + topics: m.topics, + signature: m.signature, + key: m.key, + validated: m.validated, + } + } +} + +pub type RawGossipsubMessage = GenericGossipsubMessage>; + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct DataWithId { + pub id: MessageId, + pub data: T, +} + +impl>> Into> for DataWithId { + fn into(self) -> Vec { + self.data.into() + } +} + +impl> AsRef<[u8]> for DataWithId { + fn as_ref(&self) -> &[u8] { + self.data.as_ref() + } +} + +pub type GossipsubMessageWithId = GenericGossipsubMessage>; + +impl GossipsubMessageWithId { + pub fn new(m: GenericGossipsubMessage, id: MessageId) -> Self { + Self { + source: m.source, + data: DataWithId { id, data: m.data }, + sequence_number: m.sequence_number, + topics: m.topics, + signature: m.signature, + key: m.key, + validated: m.validated, + } + } + + pub fn message_id(&self) -> &MessageId { + &self.data.id + } +} + +// for backwards compatibility +pub type GossipsubMessage = GossipsubMessageWithId>; + +impl> fmt::Debug for GenericGossipsubMessage { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("GossipsubMessage") .field( "data", - &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)), + &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data.as_ref())), ) .field("source", &self.source) .field("sequence_number", &self.sequence_number) @@ -179,7 +237,7 @@ pub enum GossipsubControlAction { #[derive(Clone, PartialEq, Eq, Hash)] pub struct GossipsubRpc { /// List of messages that were part of this RPC query. - pub messages: Vec, + pub messages: Vec, /// List of subscriptions. pub subscriptions: Vec, /// List of Gossipsub control messages. From e7a62f911120ec8979158c2a38c65214b74f8e10 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 8 Oct 2020 13:34:39 +0200 Subject: [PATCH 2/4] fix docs --- protocols/gossipsub/src/behaviour.rs | 10 +++++----- protocols/gossipsub/src/config.rs | 10 +++++----- protocols/gossipsub/src/lib.rs | 12 ++++++------ 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 37b009633ef..d4b2478a3e6 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -73,7 +73,7 @@ mod tests; /// Without signing, a number of privacy preserving modes can be selected. /// /// NOTE: The default validation settings are to require signatures. The [`ValidationMode`] -/// should be updated in the [`GossipsubConfig`] to allow for unsigned messages. +/// should be updated in the [`GenericGossipsubConfig`] to allow for unsigned messages. #[derive(Clone)] pub enum MessageAuthenticity { /// Message signing is enabled. The author will be the owner of the key and the sequence number @@ -94,7 +94,7 @@ pub enum MessageAuthenticity { /// The author of the message and the sequence numbers are excluded from the message. /// /// NOTE: Excluding these fields may make these messages invalid by other nodes who - /// enforce validation of these fields. See [`ValidationMode`] in the `GossipsubConfig` + /// enforce validation of these fields. See [`ValidationMode`] in the `GenericGossipsubConfig` /// for how to customise this for rust-libp2p gossipsub. A custom `message_id` /// function will need to be set to prevent all messages from a peer being filtered /// as duplicates. @@ -203,7 +203,7 @@ impl From for PublishConfig { /// Network behaviour that handles the gossipsub protocol. /// -/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If message signing is +/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GenericGossipsubConfig`] instance. If message signing is /// disabled, the [`ValidationMode`] in the config should be adjusted to an appropriate level to /// accept unsigned messages. pub struct GenericGossipsub> { @@ -290,7 +290,7 @@ pub struct GenericGossipsub> { pub type Gossipsub = GenericGossipsub>; impl> + From> + AsRef<[u8]>> GenericGossipsub { - /// Creates a `Gossipsub` struct given a set of parameters specified via a `GossipsubConfig`. + /// Creates a `GenericGossipsub` struct given a set of parameters specified via a `GenericGossipsubConfig`. pub fn new( privacy: MessageAuthenticity, config: GenericGossipsubConfig, @@ -2186,7 +2186,7 @@ impl> + From> + AsRef<[u8]>> GenericGossipsub } } - /// Constructs a `GossipsubMessage` performing message signing if required. + /// Constructs a `GenericGossipsubMessage` performing message signing if required. pub(crate) fn build_message( &self, topics: Vec, diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index b1eac499bf9..03f73bdcf07 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -133,8 +133,8 @@ pub struct GenericGossipsubConfig { /// addressing, where this function may be set to `hash(message)`. This would prevent messages /// of the same content from being duplicated. /// - /// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as - /// the message id. + /// The function takes a `GenericGossipsubMessage` as input and outputs a String to be + /// interpreted as the message id. message_id_fn: fn(&GenericGossipsubMessage) -> MessageId, /// By default, gossipsub will reject messages that are sent to us that has the same message @@ -345,7 +345,7 @@ impl GenericGossipsubConfig { /// addressing, where this function may be set to `hash(message)`. This would prevent messages /// of the same content from being duplicated. /// - /// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as + /// The function takes a `GenericGossipsubMessage` as input and outputs a String to be interpreted as /// the message id. pub fn message_id(&self, message: &GenericGossipsubMessage) -> MessageId { (self.message_id_fn)(message) @@ -677,7 +677,7 @@ impl GenericGossipsubConfigBuilder { /// addressing, where this function may be set to `hash(message)`. This would prevent messages /// of the same content from being duplicated. /// - /// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as + /// The function takes a `GenericGossipsubMessage` as input and outputs a String to be interpreted as /// the message id. pub fn message_id_fn( &mut self, @@ -809,7 +809,7 @@ impl GenericGossipsubConfigBuilder { self } - /// Constructs a `GossipsubConfig` from the given configuration and validates the settings. + /// Constructs a `GenericGossipsubConfig` from the given configuration and validates the settings. pub fn build(&self) -> Result, &str> { // check all constraints on config diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index cdd287ef371..9899c9b292f 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -55,10 +55,10 @@ //! //! ## GossipsubConfig //! -//! The [`GossipsubConfig`] struct specifies various network performance/tuning configuration +//! The [`GenericGossipsubConfig`] struct specifies various network performance/tuning configuration //! parameters. Specifically it specifies: //! -//! [`GossipsubConfig`]: struct.GossipsubConfig.html +//! [`GenericGossipsubConfig`]: struct.GenericGossipsubConfig.html //! //! - `protocol_id` - The protocol id that this implementation will accept connections on. //! - `history_length` - The number of heartbeats which past messages are kept in cache (default: 5). @@ -83,16 +83,16 @@ //! propagate the message to peers. //! //! This struct implements the `Default` trait and can be initialised via -//! `GossipsubConfig::default()`. +//! `GenericGossipsubConfig::default()`. //! //! //! ## Gossipsub //! -//! The [`Gossipsub`] struct implements the `NetworkBehaviour` trait allowing it to act as the +//! The [`GenericGossipsub`] struct implements the `NetworkBehaviour` trait allowing it to act as the //! routing behaviour in a `Swarm`. This struct requires an instance of `PeerId` and -//! [`GossipsubConfig`]. +//! [`GenericGossipsubConfig`]. //! -//! [`Gossipsub`]: struct.Gossipsub.html +//! [`GenericGossipsub`]: struct.Gossipsub.html //! ## Example //! From 72761e9cee080e7bbeb36ae878c76821f35b3241 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 8 Oct 2020 13:40:13 +0200 Subject: [PATCH 3/4] fix examples --- examples/gossipsub-chat.rs | 2 +- examples/ipfs-private.rs | 2 +- protocols/gossipsub/src/types.rs | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 3dbed0af495..ca2fadf1144 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -154,7 +154,7 @@ fn main() -> Result<(), Box> { message, } => println!( "Got message: {} with id: {} from peer: {:?}", - String::from_utf8_lossy(&message.data), + String::from_utf8_lossy(message.data()), id, peer_id ), diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index d9ddf972997..3cfede8049d 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -195,7 +195,7 @@ fn main() -> Result<(), Box> { message, } => println!( "Got message: {} with id: {} from peer: {:?}", - String::from_utf8_lossy(&message.data), + String::from_utf8_lossy(message.data()), id, peer_id ), diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 0c9e5cc2064..aab3af8518b 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -157,6 +157,10 @@ impl GossipsubMessageWithId { pub fn message_id(&self) -> &MessageId { &self.data.id } + + pub fn data(&self) -> &T { + &self.data.data + } } // for backwards compatibility From 8fd87d240ec3c0fa1747887e062f1b12a786494c Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 8 Oct 2020 13:46:07 +0200 Subject: [PATCH 4/4] fix examples --- examples/gossipsub-chat.rs | 4 ++-- protocols/gossipsub/src/lib.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index ca2fadf1144..180c61be1c4 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -51,7 +51,7 @@ use env_logger::{Builder, Env}; use futures::prelude::*; use libp2p::gossipsub::MessageId; use libp2p::gossipsub::{ - GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode, + GossipsubEvent, RawGossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode, }; use libp2p::{gossipsub, identity, PeerId}; use std::collections::hash_map::DefaultHasher; @@ -79,7 +79,7 @@ fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events let mut swarm = { // To content-address message, we can take the hash of message and use it as an ID. - let message_id_fn = |message: &GossipsubMessage| { + let message_id_fn = |message: &RawGossipsubMessage| { let mut s = DefaultHasher::new(); message.data.hash(&mut s); MessageId::from(s.finish().to_string()) diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 9899c9b292f..b9bf9ffe136 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -166,7 +166,8 @@ pub use self::peer_score::{ }; pub use self::topic::{Hasher, Topic, TopicHash}; pub use self::types::{ - GenericGossipsubMessage, GossipsubMessage, GossipsubRpc, MessageAcceptance, MessageId, + GenericGossipsubMessage, GossipsubMessage, RawGossipsubMessage, GossipsubRpc, MessageAcceptance, + MessageId, }; pub type IdentTopic = Topic; pub type Sha256Topic = Topic;