diff --git a/.gitignore b/.gitignore index 9102759836a5..a08c9b2a3097 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ /target **/*.rs.bk /Cargo.lock -.idea/ \ No newline at end of file +.idea +.DS_STORE +.vscode +.idea/ diff --git a/node/Cargo.toml b/node/Cargo.toml index ae795409273e..154103685f80 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -7,6 +7,13 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +network = { path = "network" } +ferret-libp2p = { path = "ferret-libp2p"} + + +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } +tokio = "0.1.22" +futures = "0.1.29" clap = "2.33.0" dirs = "2.0.2" toml = "0.5.5" diff --git a/node/ferret-libp2p/Cargo.toml b/node/ferret-libp2p/Cargo.toml new file mode 100644 index 000000000000..4806d396a6fd --- /dev/null +++ b/node/ferret-libp2p/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "ferret-libp2p" +version = "0.1.0" +authors = ["ChainSafe Systems "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } +tokio = "0.1.22" +futures = "0.1.29" +log = "0.4.8" +slog = "2.5.2" \ No newline at end of file diff --git a/node/ferret-libp2p/src/behaviour.rs b/node/ferret-libp2p/src/behaviour.rs new file mode 100644 index 000000000000..0ad2c81e69c0 --- /dev/null +++ b/node/ferret-libp2p/src/behaviour.rs @@ -0,0 +1,94 @@ +use futures::Async; +use libp2p::core::identity::Keypair; +use libp2p::core::PeerId; +use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash}; +use libp2p::mdns::{Mdns, MdnsEvent}; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; +use libp2p::tokio_io::{AsyncRead, AsyncWrite}; +use libp2p::NetworkBehaviour; + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")] +pub struct MyBehaviour { + pub gossipsub: Gossipsub, + pub mdns: Mdns, + #[behaviour(ignore)] + events: Vec, +} + +pub enum MyBehaviourEvent { + DiscoveredPeer(PeerId), + ExpiredPeer(PeerId), + GossipMessage { + source: PeerId, + topics: Vec, + message: Vec, + }, +} + +impl NetworkBehaviourEventProcess + for MyBehaviour +{ + fn inject_event(&mut self, event: MdnsEvent) { + match event { + MdnsEvent::Discovered(list) => { + for (peer, _) in list { + self.events.push(MyBehaviourEvent::DiscoveredPeer(peer)) + } + } + MdnsEvent::Expired(list) => { + for (peer, _) in list { + if !self.mdns.has_node(&peer) { + self.events.push(MyBehaviourEvent::ExpiredPeer(peer)) + } + } + } + } + } +} + +impl NetworkBehaviourEventProcess + for MyBehaviour +{ + fn inject_event(&mut self, message: GossipsubEvent) { + if let GossipsubEvent::Message(_, message) = message { + self.events.push(MyBehaviourEvent::GossipMessage { + source: message.source, + topics: message.topics, + message: message.data, + }) + } + } +} + +impl MyBehaviour { + /// Consumes the events list when polled. + fn poll( + &mut self, + ) -> Async> { + if !self.events.is_empty() { + return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + } + Async::NotReady + } +} + +impl MyBehaviour { + pub fn new(local_key: &Keypair) -> Self { + let local_peer_id = local_key.public().into_peer_id(); + let gossipsub_config = GossipsubConfig::default(); + MyBehaviour { + gossipsub: Gossipsub::new(local_peer_id, gossipsub_config), + mdns: Mdns::new().expect("Failed to create mDNS service"), + events: vec![], + } + } + + pub fn publish(&mut self, topic: &Topic, data: impl Into>) { + self.gossipsub.publish(topic, data); + } + + pub fn subscribe(&mut self, topic: Topic) -> bool { + self.gossipsub.subscribe(topic) + } +} diff --git a/node/ferret-libp2p/src/config.rs b/node/ferret-libp2p/src/config.rs new file mode 100644 index 000000000000..66438a2e2d69 --- /dev/null +++ b/node/ferret-libp2p/src/config.rs @@ -0,0 +1,20 @@ +use libp2p::gossipsub::Topic; + +pub struct Libp2pConfig { + pub listening_multiaddr: String, + pub pubsub_topics: Vec, + pub bootstrap_peers: Vec, +} + +impl Default for Libp2pConfig { + fn default() -> Self { + Libp2pConfig { + listening_multiaddr: "/ip4/0.0.0.0/tcp/0".to_owned(), + pubsub_topics: vec![ + Topic::new("/fil/blocks".to_owned()), + Topic::new("/fil/messages".to_owned()), + ], + bootstrap_peers: vec![], + } + } +} diff --git a/node/ferret-libp2p/src/lib.rs b/node/ferret-libp2p/src/lib.rs new file mode 100644 index 000000000000..9caf6c2a5388 --- /dev/null +++ b/node/ferret-libp2p/src/lib.rs @@ -0,0 +1,3 @@ +pub mod behaviour; +pub mod config; +pub mod service; diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs new file mode 100644 index 000000000000..4e8aeb2919a7 --- /dev/null +++ b/node/ferret-libp2p/src/service.rs @@ -0,0 +1,120 @@ +use super::behaviour::{MyBehaviour, MyBehaviourEvent}; +use super::config::Libp2pConfig; +use futures::{Async, Stream}; +use libp2p::{ + core, core::muxing::StreamMuxerBox, core::nodes::Substream, core::transport::boxed::Boxed, + gossipsub::TopicHash, identity, mplex, secio, yamux, PeerId, Swarm, Transport, +}; +use slog::{debug, error, info, Logger}; +use std::io::{Error, ErrorKind}; +use std::time::Duration; +type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; +type Libp2pBehaviour = MyBehaviour>; + +/// The Libp2pService listens to events from the Libp2p swarm. +pub struct Libp2pService { + pub swarm: Swarm, +} + +impl Libp2pService { + /// Constructs a Libp2pService + pub fn new(log: &Logger, config: &Libp2pConfig) -> Self { + // TODO @Greg do local storage + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + info!(log, "Local peer id: {:?}", local_peer_id); + + let transport = build_transport(local_key.clone()); + + let mut swarm = { + let be = MyBehaviour::new(&local_key); + Swarm::new(transport, be, local_peer_id) + }; + + for node in config.bootstrap_peers.clone() { + match node.parse() { + Ok(to_dial) => match Swarm::dial_addr(&mut swarm, to_dial) { + Ok(_) => debug!(log, "Dialed {:?}", node), + Err(e) => debug!(log, "Dial {:?} failed: {:?}", node, e), + }, + Err(err) => error!(log, "Failed to parse address to dial: {:?}", err), + } + } + + Swarm::listen_on( + &mut swarm, + config + .listening_multiaddr + .parse() + .expect("Incorrect MultiAddr Format"), + ) + .unwrap(); + + for topic in config.pubsub_topics.clone() { + swarm.subscribe(topic); + } + + Libp2pService { swarm } + } +} + +impl Stream for Libp2pService { + type Item = NetworkEvent; + type Error = (); + + /// Continuously polls the Libp2p swarm to get events + fn poll(&mut self) -> Result>, Self::Error> { + loop { + match self.swarm.poll() { + Ok(Async::Ready(Some(event))) => match event { + MyBehaviourEvent::DiscoveredPeer(peer) => { + libp2p::Swarm::dial(&mut self.swarm, peer); + } + MyBehaviourEvent::ExpiredPeer(_) => {} + MyBehaviourEvent::GossipMessage { + source, + topics, + message, + } => { + return Ok(Async::Ready(Option::from(NetworkEvent::PubsubMessage { + source, + topics, + message, + }))); + } + }, + Ok(Async::Ready(None)) => break, + Ok(Async::NotReady) => break, + _ => break, + } + } + Ok(Async::NotReady) + } +} + +/// Events emitted by this Service to be listened by the NetworkService. +#[derive(Clone)] +pub enum NetworkEvent { + PubsubMessage { + source: PeerId, + topics: Vec, + message: Vec, + }, +} + +fn build_transport(local_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { + let transport = libp2p::tcp::TcpConfig::new().nodelay(true); + let transport = libp2p::dns::DnsConfig::new(transport); + + transport + .upgrade(core::upgrade::Version::V1) + .authenticate(secio::SecioConfig::new(local_key)) + .multiplex(core::upgrade::SelectUpgrade::new( + yamux::Config::default(), + mplex::MplexConfig::new(), + )) + .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) + .timeout(Duration::from_secs(20)) + .map_err(|err| Error::new(ErrorKind::Other, err)) + .boxed() +} diff --git a/node/network/Cargo.toml b/node/network/Cargo.toml new file mode 100644 index 000000000000..10d21e2eb851 --- /dev/null +++ b/node/network/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "network" +version = "0.1.0" +authors = ["ChainSafe Systems "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ferret-libp2p = { path = "../ferret-libp2p" } +futures = "0.1.29" +tokio = "0.1.22" +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } +log = "0.4.8" +slog = "2.5.2" \ No newline at end of file diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs new file mode 100644 index 000000000000..1f278a4d5194 --- /dev/null +++ b/node/network/src/lib.rs @@ -0,0 +1 @@ +pub mod service; diff --git a/node/network/src/service.rs b/node/network/src/service.rs new file mode 100644 index 000000000000..193f94ad52b6 --- /dev/null +++ b/node/network/src/service.rs @@ -0,0 +1,126 @@ +use ferret_libp2p::config::Libp2pConfig; +use ferret_libp2p::service::{Libp2pService, NetworkEvent}; +use futures::stream::Stream; +use futures::{Async, Future}; +use libp2p::gossipsub::Topic; +use slog::{warn, Logger}; +use std::sync::{Arc, Mutex}; +use tokio::runtime::TaskExecutor; +use tokio::sync::mpsc; + +/// Ingress events to the NetworkService +pub enum NetworkMessage { + PubsubMessage { topics: Topic, message: Vec }, +} + +/// The NetworkService receives commands through a channel which communicates with Libp2p. +/// It also listens to the Libp2p service for +pub struct NetworkService { + pub libp2p: Arc>, +} + +impl NetworkService { + /// Starts a Libp2pService with a given config, UnboundedSender, and tokio executor. + /// Returns an UnboundedSender channel so messages can come in. + pub fn new( + config: &Libp2pConfig, + log: &Logger, + outbound_transmitter: mpsc::UnboundedSender, + executor: &TaskExecutor, + ) -> ( + Self, + mpsc::UnboundedSender, + tokio::sync::oneshot::Sender, + ) { + let (tx, rx) = mpsc::unbounded_channel(); + + let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(log, config))); + + let exit_tx = start( + log.clone(), + libp2p_service.clone(), + executor, + outbound_transmitter, + rx, + ); + + ( + NetworkService { + libp2p: libp2p_service, + }, + tx, + exit_tx, + ) + } +} + +enum Error {} + +/// Spawns the NetworkService service. +fn start( + log: Logger, + libp2p_service: Arc>, + executor: &TaskExecutor, + outbound_transmitter: mpsc::UnboundedSender, + message_receiver: mpsc::UnboundedReceiver, +) -> tokio::sync::oneshot::Sender { + let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); + executor.spawn( + poll(log, libp2p_service, outbound_transmitter, message_receiver) + .select(exit_rx.then(|_| Ok(()))) + .then(move |_| Ok(())), + ); + + network_exit +} + +fn poll( + log: Logger, + libp2p_service: Arc>, + mut outbound_transmitter: mpsc::UnboundedSender, + mut message_receiver: mpsc::UnboundedReceiver, +) -> impl futures::Future { + futures::future::poll_fn(move || -> Result<_, _> { + loop { + match message_receiver.poll() { + Ok(Async::Ready(Some(event))) => match event { + NetworkMessage::PubsubMessage { topics, message } => { + libp2p_service + .lock() + .unwrap() + .swarm + .publish(&topics, message); + } + }, + Ok(Async::NotReady) => break, + _ => break, + } + } + loop { + match libp2p_service.lock().unwrap().poll() { + Ok(Async::Ready(Some(event))) => match event { + NetworkEvent::PubsubMessage { + source, + topics, + message, + } => { + if outbound_transmitter + .try_send(NetworkEvent::PubsubMessage { + source, + topics, + message, + }) + .is_err() + { + warn!(log, "Cant handle message"); + } + } + }, + Ok(Async::Ready(None)) => unreachable!("Stream never ends"), + Ok(Async::NotReady) => break, + _ => break, + } + } + Ok(Async::NotReady) + }) +} diff --git a/node/src/main.rs b/node/src/main.rs index f4ac7d415870..5687cc848733 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -4,8 +4,34 @@ use slog::*; use cli::cli; +use tokio::sync::mpsc; + +use ferret_libp2p::config::Libp2pConfig; +use ferret_libp2p::service::NetworkEvent; +use network::service::*; + +use futures::prelude::*; + +use tokio; + +use tokio::runtime::Runtime; + fn main() { let log = log::setup_logging(); info!(log, "Starting Ferret"); cli(&log); + + // Create the tokio runtime + let rt = Runtime::new().unwrap(); + + // Create the channel so we can receive messages from NetworkService + let (tx, _rx) = mpsc::unbounded_channel::(); + // Create the default libp2p config + let netcfg = Libp2pConfig::default(); + // Start the NetworkService. Returns net_tx so you can pass messages in. + let (_network_service, _net_tx, _exit_tx) = + NetworkService::new(&netcfg, &log, tx, &rt.executor()); + + rt.shutdown_on_idle().wait().unwrap(); + info!(log, "Ferret finish shutdown"); } diff --git a/node/tests/clock_test.rs b/node/tests/clock_test.rs index 179cd56709b2..89607d4e14ca 100644 --- a/node/tests/clock_test.rs +++ b/node/tests/clock_test.rs @@ -2,7 +2,7 @@ use node::clock::ChainEpochClock; #[test] fn create_chain_epoch_clock() { - let utc_timestamp = 1574286946904; + let utc_timestamp = 1_574_286_946_904; let clock = ChainEpochClock::new(utc_timestamp); assert_eq!(clock.get_time().timestamp(), utc_timestamp); }