From dea351de6e2f2077af0933ca4d8ca4caa47a4441 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 14 Oct 2020 19:48:59 +0200 Subject: [PATCH 1/2] remove support for multi-topic messages --- protocols/gossipsub/build.rs | 2 +- protocols/gossipsub/src/behaviour.rs | 55 +++++++++--------- protocols/gossipsub/src/behaviour/tests.rs | 47 ++++++---------- protocols/gossipsub/src/compat.proto | 12 ++++ protocols/gossipsub/src/lib.rs | 4 +- protocols/gossipsub/src/mcache.rs | 8 +-- protocols/gossipsub/src/peer_score/mod.rs | 10 ++-- protocols/gossipsub/src/peer_score/tests.rs | 2 +- protocols/gossipsub/src/protocol.rs | 43 +++----------- protocols/gossipsub/src/rpc.proto | 2 +- protocols/gossipsub/src/rpc_proto.rs | 62 +++++++++++++++++++++ protocols/gossipsub/src/types.rs | 18 ++---- 12 files changed, 144 insertions(+), 121 deletions(-) create mode 100644 protocols/gossipsub/src/compat.proto create mode 100644 protocols/gossipsub/src/rpc_proto.rs diff --git a/protocols/gossipsub/build.rs b/protocols/gossipsub/build.rs index a3de99880dc..a0c81052bdc 100644 --- a/protocols/gossipsub/build.rs +++ b/protocols/gossipsub/build.rs @@ -19,5 +19,5 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap(); + prost_build::compile_protos(&["src/rpc.proto", "src/compat.proto"], &["src"]).unwrap(); } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 99e1476d15e..dc5478b1143 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -23,7 +23,7 @@ use std::{ collections::HashSet, collections::VecDeque, collections::{BTreeSet, HashMap}, - fmt, iter, + fmt, iter::FromIterator, net::IpAddr, sync::Arc, @@ -496,23 +496,13 @@ where Ok(true) } - /// Publishes a message to the network. + /// Publishes a message with multiple topics to the network. pub fn publish( &mut self, topic: Topic, data: impl Into, ) -> Result { - self.publish_many(iter::once(topic), data) - } - - /// Publishes a message with multiple topics to the network. - pub fn publish_many( - &mut self, - topics: impl IntoIterator>, - data: impl Into, - ) -> Result { - let message = - self.build_message(topics.into_iter().map(|t| t.hash()).collect(), data.into())?; + let message = self.build_message(topic.into(), data.into())?; let msg_id = self.config.message_id(&message); let event = Arc::new( @@ -562,7 +552,7 @@ where !self.config.flood_publish() && self.forward_msg(message.clone(), None)?; let mut recipient_peers = HashSet::new(); - for topic_hash in &message.topics { + for topic_hash in &message.topic { if let Some(set) = self.topic_peers.get(&topic_hash) { if self.config.flood_publish() { // Forward to all peers above score and all explicit peers @@ -1520,7 +1510,12 @@ where self.mcache.put(msg.clone()); // Dispatch the message to the user if we are subscribed to any of the topics - if self.mesh.keys().any(|t| msg.topics.iter().any(|u| t == u)) { + if msg + .topic + .as_ref() + .map(|t| self.mesh.contains_key(&t)) + .unwrap_or(false) + { debug!("Sending received message to user"); self.events.push_back(NetworkBehaviourAction::GenerateEvent( GenericGossipsubEvent::Message { @@ -1531,8 +1526,8 @@ where )); } else { debug!( - "Received message on a topic we are not subscribed to. Topics {:?}", - msg.topics.iter().collect::>() + "Received message on a topic we are not subscribed to: {:?}", + msg.topic ); return; } @@ -2265,7 +2260,7 @@ where let mut recipient_peers = HashSet::new(); // add mesh peers - for topic in &message.topics { + for topic in &message.topic { // mesh if let Some(mesh_peers) = self.mesh.get(&topic) { for peer_id in mesh_peers { @@ -2283,7 +2278,11 @@ where if let Some(topics) = self.peer_topics.get(p) { if Some(p) != propagation_source && Some(p) != message.source.as_ref() - && message.topics.iter().any(|t| topics.contains(t)) + && message + .topic + .as_ref() + .map(|t| topics.contains(t)) + .unwrap_or(false) { recipient_peers.insert(p.clone()); } @@ -2315,7 +2314,7 @@ where /// Constructs a `GenericGossipsubMessage` performing message signing if required. pub(crate) fn build_message( &self, - topics: Vec, + topic: TopicHash, data: T, ) -> Result, SigningError> { match &self.publish_config { @@ -2332,11 +2331,7 @@ where from: Some(author.clone().into_bytes()), data: Some(data.clone().into()), seqno: Some(sequence_number.to_be_bytes().to_vec()), - topic_ids: topics - .clone() - .into_iter() - .map(TopicHash::into_string) - .collect(), + topic: Some(topic.clone().into_string()), signature: None, key: None, }; @@ -2358,7 +2353,7 @@ where // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(sequence_number), - topics, + topic: Some(topic), signature, key: inline_key.clone(), validated: true, // all published messages are valid @@ -2371,7 +2366,7 @@ where // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(rand::random()), - topics, + topic: Some(topic), signature: None, key: None, validated: true, // all published messages are valid @@ -2384,7 +2379,7 @@ where // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(rand::random()), - topics, + topic: Some(topic), signature: None, key: None, validated: true, // all published messages are valid @@ -2397,7 +2392,7 @@ where // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: None, - topics, + topic: Some(topic), signature: None, key: None, validated: true, // all published messages are valid @@ -3135,7 +3130,7 @@ mod local_test { source: Some(PeerId::random()), data: vec![0; 100], sequence_number: None, - topics: vec![], + topic: None, signature: None, key: None, validated: false, diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 839190cbc41..8707e28acdd 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -238,11 +238,7 @@ mod tests { source: message.from.map(|x| PeerId::from_bytes(x).unwrap()), data: message.data.unwrap_or_default(), sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application - topics: message - .topic_ids - .into_iter() - .map(TopicHash::from_raw) - .collect(), + topic: message.topic.map(TopicHash::from_raw), signature: message.signature, // don't inform the application key: None, validated: false, @@ -938,7 +934,7 @@ mod tests { source: Some(peers[11].clone()), data: vec![1, 2, 3, 4], sequence_number: Some(1u64), - topics: Vec::new(), + topic: None, signature: None, key: None, validated: true, @@ -987,7 +983,7 @@ mod tests { source: Some(peers[11].clone()), data: vec![1, 2, 3, 4], sequence_number: Some(shift), - topics: Vec::new(), + topic: None, signature: None, key: None, validated: true, @@ -1465,7 +1461,7 @@ mod tests { source: Some(peers[1].clone()), data: vec![12], sequence_number: Some(0), - topics: vec![topic_hashes[0].clone()], + topic: topic_hashes.get(0).cloned(), signature: None, key: None, validated: true, @@ -1623,7 +1619,7 @@ mod tests { source: Some(peers[1].clone()), data: vec![], sequence_number: Some(0), - topics: vec![topic_hashes[0].clone()], + topic: topic_hashes.get(0).cloned(), signature: None, key: None, validated: true, @@ -1980,16 +1976,9 @@ mod tests { .to_subscribe(true) .create_network(); - let other_topic = Topic::new("test2"); - - // subscribe an additional new peer to test2 - gs.subscribe(&other_topic).unwrap(); - add_peer(&mut gs, &vec![other_topic.hash()], false, false); - //publish message let publish_data = vec![0; 42]; - gs.publish_many(vec![Topic::new(topic), other_topic.clone()], publish_data) - .unwrap(); + gs.publish(Topic::new(topic), publish_data).unwrap(); // Collect all publish messages let publishes = gs @@ -2013,7 +2002,7 @@ mod tests { let config = GossipsubConfig::default(); assert_eq!( publishes.len(), - config.mesh_n_high() + 10 + 1, + config.mesh_n_high() + 10, "Should send a publish message to all known peers" ); @@ -2040,7 +2029,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topics: vec![topic_hashes[0].clone()], + topic: topic_hashes.get(0).cloned(), signature: None, key: None, validated: true, @@ -2082,7 +2071,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topics: vec![topic_hashes[0].clone()], + topic: topic_hashes.get(0).cloned(), signature: None, key: None, validated: true, @@ -2450,7 +2439,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topics: vec![topics[0].clone()], + topic: topics.get(0).cloned(), signature: None, key: None, validated: true, @@ -2521,7 +2510,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topics: vec![topics[0].clone()], + topic: topics.get(0).cloned(), signature: None, key: None, validated: true, @@ -2598,7 +2587,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topics: vec![topics[0].clone()], + topic: topics.get(0).cloned(), signature: None, key: None, validated: true, @@ -2771,7 +2760,7 @@ mod tests { source: Some(PeerId::random()), data: vec![1, 2, 3, 4], sequence_number: Some(1u64), - topics: topics.clone(), + topic: topics.get(0).cloned(), signature: None, key: None, validated: true, @@ -2781,7 +2770,7 @@ mod tests { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5], sequence_number: Some(2u64), - topics: topics.clone(), + topic: topics.get(0).cloned(), signature: None, key: None, validated: true, @@ -2791,7 +2780,7 @@ mod tests { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5, 6], sequence_number: Some(3u64), - topics: topics.clone(), + topic: topics.get(0).cloned(), signature: None, key: None, validated: true, @@ -2801,7 +2790,7 @@ mod tests { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5, 6, 7], sequence_number: Some(4u64), - topics: topics.clone(), + topic: topics.get(0).cloned(), signature: None, key: None, validated: true, @@ -3071,7 +3060,7 @@ mod tests { .map(|_| rng.gen()) .collect(), sequence_number: Some(*seq), - topics: topics.clone(), + topic: topics.get(rng.gen_range(0, topics.len())).cloned(), signature: None, key: None, validated: true, @@ -4969,7 +4958,7 @@ mod tests { source: None, data: counters_address.to_be_bytes().to_vec(), sequence_number: None, - topics: vec![topic_hashes[0].clone()], + topic: topic_hashes.get(0).cloned(), signature: None, key: None, validated: true, diff --git a/protocols/gossipsub/src/compat.proto b/protocols/gossipsub/src/compat.proto new file mode 100644 index 00000000000..b2753bf7e41 --- /dev/null +++ b/protocols/gossipsub/src/compat.proto @@ -0,0 +1,12 @@ +syntax = "proto2"; + +package compat.pb; + +message Message { + optional bytes from = 1; + optional bytes data = 2; + optional bytes seqno = 3; + repeated string topic_ids = 4; + optional bytes signature = 5; + optional bytes key = 6; +} \ No newline at end of file diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 137517e2452..42ba0c1a439 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -154,9 +154,7 @@ mod types; #[macro_use] extern crate derive_builder; -mod rpc_proto { - include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); -} +mod rpc_proto; pub use self::behaviour::{ GenericGossipsub, GenericGossipsubEvent, Gossipsub, GossipsubEvent, MessageAuthenticity, diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index e2f272ee908..e4ea5542dca 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -29,7 +29,7 @@ use std::{collections::HashMap, fmt}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CacheEntry { mid: MessageId, - topics: Vec, + topic: Option, } /// MessageCache struct holding history of messages. @@ -71,7 +71,7 @@ impl MessageCache { debug!("Put message {:?} in mcache", message_id); let cache_entry = CacheEntry { mid: message_id.clone(), - topics: msg.topics.clone(), + topic: msg.topic.clone(), }; let seen_message = self.msgs.insert(message_id.clone(), msg); @@ -126,7 +126,7 @@ impl MessageCache { let mut found_entries: Vec = entries .iter() .filter_map(|entry| { - if entry.topics.iter().any(|t| t == topic) { + if entry.topic.iter().any(|t| t == topic) { let mid = &entry.mid; // Only gossip validated messages if let Some(true) = self.msgs.get(mid).map(|msg| msg.validated) { @@ -202,7 +202,7 @@ mod tests { source, data, sequence_number, - topics, + topic: topics.get(0).cloned(), signature: None, key: None, validated: false, diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index 3ec4c69c424..97ec2a81c58 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -562,7 +562,7 @@ impl PeerScore { .or_insert_with(|| DeliveryRecord::default()); if let Some(callback) = self.message_delivery_time_callback { - for topic in &_msg.topics { + for topic in &_msg.topic { if self .peer_stats .get(_from) @@ -673,7 +673,7 @@ impl PeerScore { } else { 0.0 }; - for topic in &msg.topics { + for topic in &msg.topic { if self .peer_stats .get(from) @@ -761,7 +761,7 @@ impl PeerScore { msg: &GossipsubMessageWithId, ) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { - for topic_hash in msg.topics.iter() { + for topic_hash in msg.topic.iter() { if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) { @@ -785,7 +785,7 @@ impl PeerScore { msg: &GossipsubMessageWithId, ) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { - for topic_hash in msg.topics.iter() { + for topic_hash in msg.topic.iter() { if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) { @@ -836,7 +836,7 @@ impl PeerScore { } else { None }; - for topic_hash in msg.topics.iter() { + for topic_hash in msg.topic.iter() { if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) { diff --git a/protocols/gossipsub/src/peer_score/tests.rs b/protocols/gossipsub/src/peer_score/tests.rs index 73d5ab67c14..8f7b7a79c98 100644 --- a/protocols/gossipsub/src/peer_score/tests.rs +++ b/protocols/gossipsub/src/peer_score/tests.rs @@ -38,7 +38,7 @@ fn make_test_message(seq: u64) -> GossipsubMessage { source: Some(PeerId::random()), data: vec![12, 34, 56], sequence_number: Some(seq), - topics: vec![Topic::new("test").hash()], + topic: Some(Topic::new("test").hash()), signature: None, key: None, validated: true, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index fba52354ff7..862bde5dcf3 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -334,11 +334,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topics: message - .topic_ids - .into_iter() - .map(TopicHash::from_raw) - .collect(), + topic: message.topic.map(TopicHash::from_raw), signature: None, // don't inform the application key: message.key, validated: false, @@ -358,11 +354,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topics: message - .topic_ids - .into_iter() - .map(TopicHash::from_raw) - .collect(), + topic: message.topic.map(TopicHash::from_raw), signature: None, // don't inform the application key: message.key, validated: false, @@ -387,11 +379,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topics: message - .topic_ids - .into_iter() - .map(TopicHash::from_raw) - .collect(), + topic: message.topic.map(TopicHash::from_raw), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -410,11 +398,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topics: message - .topic_ids - .into_iter() - .map(TopicHash::from_raw) - .collect(), + topic: message.topic.map(TopicHash::from_raw), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -440,11 +424,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number, - topics: message - .topic_ids - .into_iter() - .map(TopicHash::from_raw) - .collect(), + topic: message.topic.map(TopicHash::from_raw), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -468,11 +448,7 @@ impl Decoder for GossipsubCodec { source, data: message.data.unwrap_or_default(), sequence_number, - topics: message - .topic_ids - .into_iter() - .map(TopicHash::from_raw) - .collect(), + topic: message.topic.map(TopicHash::from_raw), signature: message.signature, key: message.key, validated: false, @@ -595,11 +571,8 @@ mod tests { ) .unwrap(); let data = (0..g.gen_range(10, 10024)).map(|_| g.gen()).collect(); - let topics = Vec::arbitrary(g) - .into_iter() - .map(|id: TopicId| id.0) - .collect(); - Message(gs.build_message(topics, data).unwrap()) + let topic_id = TopicId::arbitrary(g).0; + Message(gs.build_message(topic_id, data).unwrap()) } } diff --git a/protocols/gossipsub/src/rpc.proto b/protocols/gossipsub/src/rpc.proto index c8d1838a890..f745b98646a 100644 --- a/protocols/gossipsub/src/rpc.proto +++ b/protocols/gossipsub/src/rpc.proto @@ -18,7 +18,7 @@ message Message { optional bytes from = 1; optional bytes data = 2; optional bytes seqno = 3; - repeated string topic_ids = 4; + optional string topic = 4; optional bytes signature = 5; optional bytes key = 6; } diff --git a/protocols/gossipsub/src/rpc_proto.rs b/protocols/gossipsub/src/rpc_proto.rs new file mode 100644 index 00000000000..9c56856bc17 --- /dev/null +++ b/protocols/gossipsub/src/rpc_proto.rs @@ -0,0 +1,62 @@ +include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); + +#[cfg(test)] +mod test { + use crate::IdentTopic as Topic; + use libp2p_core::PeerId; + use prost::Message; + use rand::Rng; + + mod compat_proto { + include!(concat!(env!("OUT_DIR"), "/compat.pb.rs")); + } + + #[test] + fn test_multi_topic_message_compatibility() { + let topic1 = Topic::new("t1").hash(); + let topic2 = Topic::new("t2").hash(); + + let new_message1 = super::Message { + from: Some(PeerId::random().as_bytes().to_vec()), + data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()), + topic: Some(topic1.clone().into_string()), + signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + }; + let old_message1 = compat_proto::Message { + from: Some(PeerId::random().as_bytes().to_vec()), + data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()), + topic_ids: vec![topic1.clone().into_string()], + signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + }; + let old_message2 = compat_proto::Message { + from: Some(PeerId::random().as_bytes().to_vec()), + data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()), + topic_ids: vec![topic1.clone().into_string(), topic2.clone().into_string()], + signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + }; + + let mut new_message1b = Vec::with_capacity(new_message1.encoded_len()); + new_message1.encode(&mut new_message1b).unwrap(); + + let mut old_message1b = Vec::with_capacity(old_message1.encoded_len()); + old_message1.encode(&mut old_message1b).unwrap(); + + let mut old_message2b = Vec::with_capacity(old_message2.encoded_len()); + old_message2.encode(&mut old_message2b).unwrap(); + + let new_message = super::Message::decode(&old_message1b[..]).unwrap(); + assert_eq!(new_message.topic, Some(topic1.clone().into_string())); + + let new_message = super::Message::decode(&old_message2b[..]).unwrap(); + assert_eq!(new_message.topic, Some(topic2.clone().into_string())); + + let old_message = compat_proto::Message::decode(&new_message1b[..]).unwrap(); + assert_eq!(old_message.topic_ids, vec![topic1.into_string()]); + } +} diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 2e5230034b1..7e34531e714 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -100,10 +100,8 @@ pub struct GenericGossipsubMessage { /// A random sequence number. pub sequence_number: Option, - /// List of topics this message belongs to. - /// - /// Each message can belong to multiple topics at once. - pub topics: Vec, + /// The topic this message belongs to + pub topic: Option, /// The signature of the message if it's signed. pub signature: Option>, @@ -121,7 +119,7 @@ impl GenericGossipsubMessage { source: m.source, data: m.data.into(), sequence_number: m.sequence_number, - topics: m.topics, + topic: m.topic, signature: m.signature, key: m.key, validated: m.validated, @@ -157,7 +155,7 @@ impl GossipsubMessageWithId { source: m.source, data: DataWithId { id, data: m.data }, sequence_number: m.sequence_number, - topics: m.topics, + topic: m.topic, signature: m.signature, key: m.key, validated: m.validated, @@ -185,7 +183,7 @@ impl> fmt::Debug for GenericGossipsubMessage { ) .field("source", &self.source) .field("sequence_number", &self.sequence_number) - .field("topics", &self.topics) + .field("topic", &self.topic) .finish() } } @@ -277,11 +275,7 @@ impl Into for GossipsubRpc { from: message.source.map(|m| m.into_bytes()), data: Some(message.data), seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()), - topic_ids: message - .topics - .into_iter() - .map(TopicHash::into_string) - .collect(), + topic: message.topic.map(TopicHash::into_string), signature: message.signature, key: message.key, }; From 90d0ffae6cf800e8220e94b16495e6e2db6582fe Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Mon, 19 Oct 2020 15:18:40 +0200 Subject: [PATCH 2/2] make message topic required --- protocols/gossipsub/src/behaviour.rs | 68 ++++----- protocols/gossipsub/src/behaviour/tests.rs | 32 ++--- protocols/gossipsub/src/mcache.rs | 45 ++---- protocols/gossipsub/src/peer_score/mod.rs | 152 ++++++++++---------- protocols/gossipsub/src/peer_score/tests.rs | 2 +- protocols/gossipsub/src/protocol.rs | 12 +- protocols/gossipsub/src/rpc.proto | 6 +- protocols/gossipsub/src/rpc_proto.rs | 6 +- protocols/gossipsub/src/types.rs | 4 +- 9 files changed, 144 insertions(+), 183 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index dc5478b1143..0f831ff7010 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -552,21 +552,19 @@ where !self.config.flood_publish() && self.forward_msg(message.clone(), None)?; let mut recipient_peers = HashSet::new(); - for topic_hash in &message.topic { - if let Some(set) = self.topic_peers.get(&topic_hash) { - if self.config.flood_publish() { - // Forward to all peers above score and all explicit peers - recipient_peers.extend( - set.iter() - .filter(|p| { - self.explicit_peers.contains(*p) - || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0 - }) - .map(|p| p.clone()), - ); - continue; - } - + let topic_hash = &message.topic; + if let Some(set) = self.topic_peers.get(&topic_hash) { + if self.config.flood_publish() { + // Forward to all peers above score and all explicit peers + recipient_peers.extend( + set.iter() + .filter(|p| { + self.explicit_peers.contains(*p) + || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0 + }) + .map(|p| p.clone()), + ); + } else { // Explicit peers for peer in &self.explicit_peers { if set.contains(peer) { @@ -1510,12 +1508,7 @@ where self.mcache.put(msg.clone()); // Dispatch the message to the user if we are subscribed to any of the topics - if msg - .topic - .as_ref() - .map(|t| self.mesh.contains_key(&t)) - .unwrap_or(false) - { + if self.mesh.contains_key(&msg.topic) { debug!("Sending received message to user"); self.events.push_back(NetworkBehaviourAction::GenerateEvent( GenericGossipsubEvent::Message { @@ -2260,15 +2253,12 @@ where let mut recipient_peers = HashSet::new(); // add mesh peers - for topic in &message.topic { - // mesh - if let Some(mesh_peers) = self.mesh.get(&topic) { - for peer_id in mesh_peers { - if Some(peer_id) != propagation_source - && Some(peer_id) != message.source.as_ref() - { - recipient_peers.insert(peer_id.clone()); - } + let topic = &message.topic; + // mesh + if let Some(mesh_peers) = self.mesh.get(&topic) { + for peer_id in mesh_peers { + if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() { + recipient_peers.insert(peer_id.clone()); } } } @@ -2278,11 +2268,7 @@ where if let Some(topics) = self.peer_topics.get(p) { if Some(p) != propagation_source && Some(p) != message.source.as_ref() - && message - .topic - .as_ref() - .map(|t| topics.contains(t)) - .unwrap_or(false) + && topics.contains(&message.topic) { recipient_peers.insert(p.clone()); } @@ -2331,7 +2317,7 @@ where from: Some(author.clone().into_bytes()), data: Some(data.clone().into()), seqno: Some(sequence_number.to_be_bytes().to_vec()), - topic: Some(topic.clone().into_string()), + topic: topic.clone().into_string(), signature: None, key: None, }; @@ -2353,7 +2339,7 @@ where // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(sequence_number), - topic: Some(topic), + topic, signature, key: inline_key.clone(), validated: true, // all published messages are valid @@ -2366,7 +2352,7 @@ where // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(rand::random()), - topic: Some(topic), + topic, signature: None, key: None, validated: true, // all published messages are valid @@ -2379,7 +2365,7 @@ where // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: Some(rand::random()), - topic: Some(topic), + topic, signature: None, key: None, validated: true, // all published messages are valid @@ -2392,7 +2378,7 @@ where // To be interoperable with the go-implementation this is treated as a 64-bit // big-endian uint. sequence_number: None, - topic: Some(topic), + topic, signature: None, key: None, validated: true, // all published messages are valid @@ -3130,7 +3116,7 @@ mod local_test { source: Some(PeerId::random()), data: vec![0; 100], sequence_number: None, - topic: None, + topic: TopicHash::from_raw("test_topic"), signature: None, key: None, validated: false, diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 8707e28acdd..855a2c5b4de 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -238,7 +238,7 @@ mod tests { source: message.from.map(|x| PeerId::from_bytes(x).unwrap()), data: message.data.unwrap_or_default(), sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application - topic: message.topic.map(TopicHash::from_raw), + topic: TopicHash::from_raw(message.topic), signature: message.signature, // don't inform the application key: None, validated: false, @@ -934,7 +934,7 @@ mod tests { source: Some(peers[11].clone()), data: vec![1, 2, 3, 4], sequence_number: Some(1u64), - topic: None, + topic: TopicHash::from_raw("topic"), signature: None, key: None, validated: true, @@ -983,7 +983,7 @@ mod tests { source: Some(peers[11].clone()), data: vec![1, 2, 3, 4], sequence_number: Some(shift), - topic: None, + topic: TopicHash::from_raw("topic"), signature: None, key: None, validated: true, @@ -1461,7 +1461,7 @@ mod tests { source: Some(peers[1].clone()), data: vec![12], sequence_number: Some(0), - topic: topic_hashes.get(0).cloned(), + topic: topic_hashes[0].clone(), signature: None, key: None, validated: true, @@ -1619,7 +1619,7 @@ mod tests { source: Some(peers[1].clone()), data: vec![], sequence_number: Some(0), - topic: topic_hashes.get(0).cloned(), + topic: topic_hashes[0].clone(), signature: None, key: None, validated: true, @@ -2029,7 +2029,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topic: topic_hashes.get(0).cloned(), + topic: topic_hashes[0].clone(), signature: None, key: None, validated: true, @@ -2071,7 +2071,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topic: topic_hashes.get(0).cloned(), + topic: topic_hashes[0].clone(), signature: None, key: None, validated: true, @@ -2439,7 +2439,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topic: topics.get(0).cloned(), + topic: topics[0].clone(), signature: None, key: None, validated: true, @@ -2510,7 +2510,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topic: topics.get(0).cloned(), + topic: topics[0].clone(), signature: None, key: None, validated: true, @@ -2587,7 +2587,7 @@ mod tests { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), - topic: topics.get(0).cloned(), + topic: topics[0].clone(), signature: None, key: None, validated: true, @@ -2760,7 +2760,7 @@ mod tests { source: Some(PeerId::random()), data: vec![1, 2, 3, 4], sequence_number: Some(1u64), - topic: topics.get(0).cloned(), + topic: topics[0].clone(), signature: None, key: None, validated: true, @@ -2770,7 +2770,7 @@ mod tests { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5], sequence_number: Some(2u64), - topic: topics.get(0).cloned(), + topic: topics[0].clone(), signature: None, key: None, validated: true, @@ -2780,7 +2780,7 @@ mod tests { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5, 6], sequence_number: Some(3u64), - topic: topics.get(0).cloned(), + topic: topics[0].clone(), signature: None, key: None, validated: true, @@ -2790,7 +2790,7 @@ mod tests { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5, 6, 7], sequence_number: Some(4u64), - topic: topics.get(0).cloned(), + topic: topics[0].clone(), signature: None, key: None, validated: true, @@ -3060,7 +3060,7 @@ mod tests { .map(|_| rng.gen()) .collect(), sequence_number: Some(*seq), - topic: topics.get(rng.gen_range(0, topics.len())).cloned(), + topic: topics[rng.gen_range(0, topics.len())].clone(), signature: None, key: None, validated: true, @@ -4958,7 +4958,7 @@ mod tests { source: None, data: counters_address.to_be_bytes().to_vec(), sequence_number: None, - topic: topic_hashes.get(0).cloned(), + topic: topic_hashes[0].clone(), signature: None, key: None, validated: true, diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index e4ea5542dca..6db0c8eda69 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -29,7 +29,7 @@ use std::{collections::HashMap, fmt}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CacheEntry { mid: MessageId, - topic: Option, + topic: TopicHash, } /// MessageCache struct holding history of messages. @@ -126,7 +126,7 @@ impl MessageCache { let mut found_entries: Vec = entries .iter() .filter_map(|entry| { - if entry.topic.iter().any(|t| t == topic) { + if &entry.topic == topic { let mid = &entry.mid; // Only gossip validated messages if let Some(true) = self.msgs.get(mid).map(|msg| msg.validated) { @@ -186,7 +186,7 @@ mod tests { use crate::{GossipsubMessage, IdentTopic as Topic, TopicHash}; use libp2p_core::PeerId; - fn gen_testm(x: u64, topics: Vec) -> GossipsubMessage { + fn gen_testm(x: u64, topic: TopicHash) -> GossipsubMessage { let default_id = |message: &RawGossipsubMessage| { // default message id is: source + sequence number let mut source_string = message.source.as_ref().unwrap().to_base58(); @@ -202,7 +202,7 @@ mod tests { source, data, sequence_number, - topic: topics.get(0).cloned(), + topic: topic, signature: None, key: None, validated: false, @@ -231,9 +231,7 @@ mod tests { let mut mc = new_cache(10, 15); let topic1_hash = Topic::new("topic1").hash().clone(); - let topic2_hash = Topic::new("topic2").hash().clone(); - - let m = gen_testm(10, vec![topic1_hash, topic2_hash]); + let m = gen_testm(10, topic1_hash); mc.put(m.clone()); @@ -257,9 +255,7 @@ mod tests { let mut mc = new_cache(10, 15); let topic1_hash = Topic::new("topic1").hash().clone(); - let topic2_hash = Topic::new("topic2").hash().clone(); - - let m = gen_testm(10, vec![topic1_hash, topic2_hash]); + let m = gen_testm(10, topic1_hash); mc.put(m.clone()); @@ -280,35 +276,16 @@ mod tests { assert_eq!(fetched.is_none(), true); } - #[test] - /// Test adding a message with no topics. - fn test_no_topic_put() { - let mut mc = new_cache(3, 5); - - // Build the message - let m = gen_testm(1, vec![]); - mc.put(m.clone()); - - let fetched = mc.get(m.message_id()); - - // Make sure it is the same fetched message - match fetched { - Some(x) => assert_eq!(*x, m), - _ => assert!(false), - } - } - #[test] /// Test shift mechanism. fn test_shift() { let mut mc = new_cache(1, 5); let topic1_hash = Topic::new("topic1").hash().clone(); - let topic2_hash = Topic::new("topic2").hash().clone(); // Build the message for i in 0..10 { - let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]); + let m = gen_testm(i, topic1_hash.clone()); mc.put(m.clone()); } @@ -328,10 +305,10 @@ mod tests { let mut mc = new_cache(1, 5); let topic1_hash = Topic::new("topic1").hash().clone(); - let topic2_hash = Topic::new("topic2").hash().clone(); + // Build the message for i in 0..10 { - let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]); + let m = gen_testm(i, topic1_hash.clone()); mc.put(m.clone()); } @@ -354,10 +331,10 @@ mod tests { let mut mc = new_cache(4, 5); let topic1_hash = Topic::new("topic1").hash().clone(); - let topic2_hash = Topic::new("topic2").hash().clone(); + // Build the message for i in 0..10 { - let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]); + let m = gen_testm(i, topic1_hash.clone()); mc.put(m.clone()); } diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index 97ec2a81c58..ba81e04b958 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -562,16 +562,15 @@ impl PeerScore { .or_insert_with(|| DeliveryRecord::default()); if let Some(callback) = self.message_delivery_time_callback { - for topic in &_msg.topic { - if self - .peer_stats - .get(_from) - .and_then(|s| s.topics.get(topic)) - .map(|ts| ts.in_mesh()) - .unwrap_or(false) - { - callback(_from, topic, 0.0); - } + let topic = &_msg.topic; + if self + .peer_stats + .get(_from) + .and_then(|s| s.topics.get(topic)) + .map(|ts| ts.in_mesh()) + .unwrap_or(false) + { + callback(_from, topic, 0.0); } } } @@ -673,16 +672,15 @@ impl PeerScore { } else { 0.0 }; - for topic in &msg.topic { - if self - .peer_stats - .get(from) - .and_then(|s| s.topics.get(topic)) - .map(|ts| ts.in_mesh()) - .unwrap_or(false) - { - callback(from, topic, time); - } + let topic = &msg.topic; + if self + .peer_stats + .get(from) + .and_then(|s| s.topics.get(topic)) + .map(|ts| ts.in_mesh()) + .unwrap_or(false) + { + callback(from, topic, time); } } @@ -761,17 +759,16 @@ impl PeerScore { msg: &GossipsubMessageWithId, ) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { - for topic_hash in msg.topic.iter() { - if let Some(topic_stats) = - peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) - { - debug!( - "Peer {} delivered an invalid message in topic {} and gets penalized \ + let topic_hash = &msg.topic; + if let Some(topic_stats) = + peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) + { + debug!( + "Peer {} delivered an invalid message in topic {} and gets penalized \ for it", - peer_id, topic_hash - ); - topic_stats.invalid_message_deliveries += 1f64; - } + peer_id, topic_hash + ); + topic_stats.invalid_message_deliveries += 1f64; } } } @@ -785,38 +782,37 @@ impl PeerScore { msg: &GossipsubMessageWithId, ) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { - for topic_hash in msg.topic.iter() { - if let Some(topic_stats) = - peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) - { + let topic_hash = &msg.topic; + if let Some(topic_stats) = + peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) + { + let cap = self + .params + .topics + .get(topic_hash) + .expect("Topic must exist if there are known topic_stats") + .first_message_deliveries_cap; + topic_stats.first_message_deliveries = + if topic_stats.first_message_deliveries + 1f64 > cap { + cap + } else { + topic_stats.first_message_deliveries + 1f64 + }; + + if let MeshStatus::Active { .. } = topic_stats.mesh_status { let cap = self .params .topics .get(topic_hash) .expect("Topic must exist if there are known topic_stats") - .first_message_deliveries_cap; - topic_stats.first_message_deliveries = - if topic_stats.first_message_deliveries + 1f64 > cap { + .mesh_message_deliveries_cap; + + topic_stats.mesh_message_deliveries = + if topic_stats.mesh_message_deliveries + 1f64 > cap { cap } else { - topic_stats.first_message_deliveries + 1f64 + topic_stats.mesh_message_deliveries + 1f64 }; - - if let MeshStatus::Active { .. } = topic_stats.mesh_status { - let cap = self - .params - .topics - .get(topic_hash) - .expect("Topic must exist if there are known topic_stats") - .mesh_message_deliveries_cap; - - topic_stats.mesh_message_deliveries = - if topic_stats.mesh_message_deliveries + 1f64 > cap { - cap - } else { - topic_stats.mesh_message_deliveries + 1f64 - }; - } } } } @@ -836,32 +832,34 @@ impl PeerScore { } else { None }; - for topic_hash in msg.topic.iter() { - if let Some(topic_stats) = - peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) - { - if let MeshStatus::Active { .. } = topic_stats.mesh_status { - let topic_params = self - .params - .topics - .get(topic_hash) - .expect("Topic must exist if there are known topic_stats"); - - // check against the mesh delivery window -- if the validated time is passed as 0, then - // the message was received before we finished validation and thus falls within the mesh - // delivery window. - if let Some(validated_time) = validated_time { - if let Some(now) = &now { - //should always be true - let window_time = validated_time - .checked_add(topic_params.mesh_message_deliveries_window) - .unwrap_or_else(|| now.clone()); - if now > &window_time { - continue; - } + let topic_hash = &msg.topic; + if let Some(topic_stats) = + peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) + { + if let MeshStatus::Active { .. } = topic_stats.mesh_status { + let topic_params = self + .params + .topics + .get(topic_hash) + .expect("Topic must exist if there are known topic_stats"); + + // check against the mesh delivery window -- if the validated time is passed as 0, then + // the message was received before we finished validation and thus falls within the mesh + // delivery window. + let mut falls_in_mesh_deliver_window = true; + if let Some(validated_time) = validated_time { + if let Some(now) = &now { + //should always be true + let window_time = validated_time + .checked_add(topic_params.mesh_message_deliveries_window) + .unwrap_or_else(|| now.clone()); + if now > &window_time { + falls_in_mesh_deliver_window = false; } } + } + if falls_in_mesh_deliver_window { let cap = topic_params.mesh_message_deliveries_cap; topic_stats.mesh_message_deliveries = if topic_stats.mesh_message_deliveries + 1f64 > cap { diff --git a/protocols/gossipsub/src/peer_score/tests.rs b/protocols/gossipsub/src/peer_score/tests.rs index 8f7b7a79c98..330221db863 100644 --- a/protocols/gossipsub/src/peer_score/tests.rs +++ b/protocols/gossipsub/src/peer_score/tests.rs @@ -38,7 +38,7 @@ fn make_test_message(seq: u64) -> GossipsubMessage { source: Some(PeerId::random()), data: vec![12, 34, 56], sequence_number: Some(seq), - topic: Some(Topic::new("test").hash()), + topic: Topic::new("test").hash(), signature: None, key: None, validated: true, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 862bde5dcf3..f2fbebf01da 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -334,7 +334,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topic: message.topic.map(TopicHash::from_raw), + topic: TopicHash::from_raw(message.topic), signature: None, // don't inform the application key: message.key, validated: false, @@ -354,7 +354,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topic: message.topic.map(TopicHash::from_raw), + topic: TopicHash::from_raw(message.topic), signature: None, // don't inform the application key: message.key, validated: false, @@ -379,7 +379,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topic: message.topic.map(TopicHash::from_raw), + topic: TopicHash::from_raw(message.topic), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -398,7 +398,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topic: message.topic.map(TopicHash::from_raw), + topic: TopicHash::from_raw(message.topic), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -424,7 +424,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number, - topic: message.topic.map(TopicHash::from_raw), + topic: TopicHash::from_raw(message.topic), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -448,7 +448,7 @@ impl Decoder for GossipsubCodec { source, data: message.data.unwrap_or_default(), sequence_number, - topic: message.topic.map(TopicHash::from_raw), + topic: TopicHash::from_raw(message.topic), signature: message.signature, key: message.key, validated: false, diff --git a/protocols/gossipsub/src/rpc.proto b/protocols/gossipsub/src/rpc.proto index f745b98646a..2ce12f3f37f 100644 --- a/protocols/gossipsub/src/rpc.proto +++ b/protocols/gossipsub/src/rpc.proto @@ -18,9 +18,9 @@ message Message { optional bytes from = 1; optional bytes data = 2; optional bytes seqno = 3; - optional string topic = 4; - optional bytes signature = 5; - optional bytes key = 6; + required string topic = 4; + optional bytes signature = 5; + optional bytes key = 6; } message ControlMessage { diff --git a/protocols/gossipsub/src/rpc_proto.rs b/protocols/gossipsub/src/rpc_proto.rs index 9c56856bc17..3c0f6b3df4d 100644 --- a/protocols/gossipsub/src/rpc_proto.rs +++ b/protocols/gossipsub/src/rpc_proto.rs @@ -20,7 +20,7 @@ mod test { from: Some(PeerId::random().as_bytes().to_vec()), data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()), - topic: Some(topic1.clone().into_string()), + topic: topic1.clone().into_string(), signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), }; @@ -51,10 +51,10 @@ mod test { old_message2.encode(&mut old_message2b).unwrap(); let new_message = super::Message::decode(&old_message1b[..]).unwrap(); - assert_eq!(new_message.topic, Some(topic1.clone().into_string())); + assert_eq!(new_message.topic, topic1.clone().into_string()); let new_message = super::Message::decode(&old_message2b[..]).unwrap(); - assert_eq!(new_message.topic, Some(topic2.clone().into_string())); + assert_eq!(new_message.topic, topic2.clone().into_string()); let old_message = compat_proto::Message::decode(&new_message1b[..]).unwrap(); assert_eq!(old_message.topic_ids, vec![topic1.into_string()]); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 7e34531e714..b15e503f40e 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -101,7 +101,7 @@ pub struct GenericGossipsubMessage { pub sequence_number: Option, /// The topic this message belongs to - pub topic: Option, + pub topic: TopicHash, /// The signature of the message if it's signed. pub signature: Option>, @@ -275,7 +275,7 @@ impl Into for GossipsubRpc { from: message.source.map(|m| m.into_bytes()), data: Some(message.data), seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()), - topic: message.topic.map(TopicHash::into_string), + topic: TopicHash::into_string(message.topic), signature: message.signature, key: message.key, };