diff --git a/blockchain/blocks/src/gossip_block.rs b/blockchain/blocks/src/gossip_block.rs index f7d72a77162f..154a56b3e006 100644 --- a/blockchain/blocks/src/gossip_block.rs +++ b/blockchain/blocks/src/gossip_block.rs @@ -6,7 +6,7 @@ use cid::Cid; use encoding::{tuple::*, Cbor}; /// Block message used as serialized gossipsub messages for blocks topic -#[derive(Serialize_tuple, Deserialize_tuple)] +#[derive(Clone, Debug, Serialize_tuple, Deserialize_tuple)] pub struct GossipBlock { pub header: BlockHeader, pub bls_messages: Vec, diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index f45e4543aa66..60cd577452cf 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -8,6 +8,7 @@ use crate::hello::{HelloRequest, HelloResponse}; use async_std::sync::{channel, Receiver, Sender}; use async_std::{stream, task}; use forest_cid::{multihash::Blake2b256, Cid}; +use forest_encoding::from_slice; use futures::channel::oneshot::Sender as OneShotSender; use futures::select; use futures_util::stream::StreamExt; @@ -16,7 +17,6 @@ use libp2p::{ core, core::muxing::StreamMuxerBox, core::transport::boxed::Boxed, - gossipsub::TopicHash, identity::{ed25519, Keypair}, mplex, noise, yamux, PeerId, Swarm, Transport, }; @@ -28,6 +28,8 @@ use std::sync::Arc; use std::time::Duration; use utils::read_file_to_vec; +use forest_blocks::GossipBlock; +use forest_message::SignedMessage; pub use libp2p::gossipsub::Topic; pub const PUBSUB_BLOCK_STR: &str = "/fil/blocks"; @@ -40,8 +42,7 @@ const PUBSUB_TOPICS: [&str; 2] = [PUBSUB_BLOCK_STR, PUBSUB_MSG_STR]; pub enum NetworkEvent { PubsubMessage { source: Option, - topics: Vec, - message: Vec, + message: PubsubMessage, }, HelloRequest { request: HelloRequest, @@ -67,6 +68,15 @@ pub enum NetworkEvent { }, } +/// Message types that can come over GossipSub +#[derive(Debug, Clone)] +pub enum PubsubMessage { + /// Messages that come over the block topic + Block(GossipBlock), + /// Messages that come over the message topic + Message(SignedMessage), +} + /// Events into this Service #[derive(Debug)] pub enum NetworkMessage { @@ -94,6 +104,7 @@ pub struct Libp2pService { network_sender_in: Sender, network_receiver_out: Receiver, network_sender_out: Sender, + network_name: String, } impl Libp2pService @@ -120,7 +131,8 @@ where // Subscribe to gossipsub topics with the network name suffix for topic in PUBSUB_TOPICS.iter() { - swarm.subscribe(Topic::new(format!("{}/{}", topic, network_name))); + let t = Topic::new(format!("{}/{}", topic, network_name)); + swarm.subscribe(t); } // Bootstrap with Kademlia @@ -139,6 +151,7 @@ where network_sender_in, network_receiver_out, network_sender_out, + network_name: network_name.to_owned(), } } @@ -147,6 +160,8 @@ where let mut swarm_stream = self.swarm.fuse(); let mut network_stream = self.network_receiver_in.fuse(); let mut interval = stream::interval(Duration::from_secs(10)).fuse(); + let pubsub_block_str = format!("{}/{}", PUBSUB_BLOCK_STR, self.network_name); + let pubsub_msg_str = format!("{}/{}", PUBSUB_MSG_STR, self.network_name); loop { select! { @@ -167,11 +182,37 @@ where message, } => { trace!("Got a Gossip Message from {:?}", source); - emit_event(&self.network_sender_out, NetworkEvent::PubsubMessage { - source, - topics, - message - }).await; + // there should only be one topic associated with any particular gossip message + let topic = match topics.get(0) { + Some(t) => t.as_str(), + None => { + warn!("received gossipsub message without topic from {:?}", source); + continue; + }, + }; + if topic == pubsub_block_str { + match from_slice::(&message) { + Ok(b) => { + emit_event(&self.network_sender_out, NetworkEvent::PubsubMessage{ + source, + message: PubsubMessage::Block(b), + }).await; + } + Err(e) => warn!("Gossip Block from peer {:?} could not be deserialized: {}", source, e) + } + } else if topic == pubsub_msg_str { + match from_slice::(&message) { + Ok(m) => { + emit_event(&self.network_sender_out, NetworkEvent::PubsubMessage{ + source, + message: PubsubMessage::Message(m), + }).await; + } + Err(e) => warn!("Gossip Message from peer {:?} could not be deserialized: {}", source, e) + } + } else { + warn!("Getting gossip messages from unknown topic: {}", topic); + } } ForestBehaviourEvent::HelloRequest { request, channel, .. } => { debug!("Received hello request: {:?}", request);