Skip to content

Commit

Permalink
Create Networking Service (#49)
Browse files Browse the repository at this point in the history
* update gitignore

* basic gossipsub mdns chat

* broke up into service

* doesnt work, would appreciate a look

* it compiles! need  to handle events  now

* msgs sending, cant  peer though

* can bootstrap  now

* got the loop to work.

* linting

* more linting

* more linting

* add config

* remove some prints

* docs

* PR suggestions

* more suggestions

* even more suggestions

* more suggestions

* added topics into default config

* lint

* remove types

* lint

* clippy fixes

* remove CLI portion of app. if you wanna have a chat, just checkout the previous commit

* fix PR suggesttions

* add logger to network and libp2p  crates

* fix doctest

* linting

* remove doctest caused more headaches than was beneficial

* pr suggestions

* borrow instead of cloning logger where it doesnt need to be cloned

* linting

* pr suggestions

* more pr suggestions

* add vscode to gitignore
  • Loading branch information
ec2 authored Dec 3, 2019
1 parent acb00bb commit bf60880
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 2 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
/target
**/*.rs.bk
/Cargo.lock
.idea/
.idea
.DS_STORE
.vscode
.idea/
7 changes: 7 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
network = { path = "network" }
ferret-libp2p = { path = "ferret-libp2p"}


libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" }
tokio = "0.1.22"
futures = "0.1.29"
clap = "2.33.0"
dirs = "2.0.2"
toml = "0.5.5"
Expand Down
14 changes: 14 additions & 0 deletions node/ferret-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "ferret-libp2p"
version = "0.1.0"
authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" }
tokio = "0.1.22"
futures = "0.1.29"
log = "0.4.8"
slog = "2.5.2"
94 changes: 94 additions & 0 deletions node/ferret-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use futures::Async;
use libp2p::core::identity::Keypair;
use libp2p::core::PeerId;
use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash};
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess};
use libp2p::tokio_io::{AsyncRead, AsyncWrite};
use libp2p::NetworkBehaviour;

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")]
pub struct MyBehaviour<TSubstream: AsyncRead + AsyncWrite> {
pub gossipsub: Gossipsub<TSubstream>,
pub mdns: Mdns<TSubstream>,
#[behaviour(ignore)]
events: Vec<MyBehaviourEvent>,
}

pub enum MyBehaviourEvent {
DiscoveredPeer(PeerId),
ExpiredPeer(PeerId),
GossipMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: Vec<u8>,
},
}

impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<MdnsEvent>
for MyBehaviour<TSubstream>
{
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.events.push(MyBehaviourEvent::DiscoveredPeer(peer))
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.events.push(MyBehaviourEvent::ExpiredPeer(peer))
}
}
}
}
}
}

impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
for MyBehaviour<TSubstream>
{
fn inject_event(&mut self, message: GossipsubEvent) {
if let GossipsubEvent::Message(_, message) = message {
self.events.push(MyBehaviourEvent::GossipMessage {
source: message.source,
topics: message.topics,
message: message.data,
})
}
}
}

impl<TSubstream: AsyncRead + AsyncWrite> MyBehaviour<TSubstream> {
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, MyBehaviourEvent>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
Async::NotReady
}
}

impl<TSubstream: AsyncRead + AsyncWrite> MyBehaviour<TSubstream> {
pub fn new(local_key: &Keypair) -> Self {
let local_peer_id = local_key.public().into_peer_id();
let gossipsub_config = GossipsubConfig::default();
MyBehaviour {
gossipsub: Gossipsub::new(local_peer_id, gossipsub_config),
mdns: Mdns::new().expect("Failed to create mDNS service"),
events: vec![],
}
}

pub fn publish(&mut self, topic: &Topic, data: impl Into<Vec<u8>>) {
self.gossipsub.publish(topic, data);
}

pub fn subscribe(&mut self, topic: Topic) -> bool {
self.gossipsub.subscribe(topic)
}
}
20 changes: 20 additions & 0 deletions node/ferret-libp2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use libp2p::gossipsub::Topic;

pub struct Libp2pConfig {
pub listening_multiaddr: String,
pub pubsub_topics: Vec<Topic>,
pub bootstrap_peers: Vec<String>,
}

impl Default for Libp2pConfig {
fn default() -> Self {
Libp2pConfig {
listening_multiaddr: "/ip4/0.0.0.0/tcp/0".to_owned(),
pubsub_topics: vec![
Topic::new("/fil/blocks".to_owned()),
Topic::new("/fil/messages".to_owned()),
],
bootstrap_peers: vec![],
}
}
}
3 changes: 3 additions & 0 deletions node/ferret-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod behaviour;
pub mod config;
pub mod service;
120 changes: 120 additions & 0 deletions node/ferret-libp2p/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use super::behaviour::{MyBehaviour, MyBehaviourEvent};
use super::config::Libp2pConfig;
use futures::{Async, Stream};
use libp2p::{
core, core::muxing::StreamMuxerBox, core::nodes::Substream, core::transport::boxed::Boxed,
gossipsub::TopicHash, identity, mplex, secio, yamux, PeerId, Swarm, Transport,
};
use slog::{debug, error, info, Logger};
use std::io::{Error, ErrorKind};
use std::time::Duration;
type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour = MyBehaviour<Substream<StreamMuxerBox>>;

/// The Libp2pService listens to events from the Libp2p swarm.
pub struct Libp2pService {
pub swarm: Swarm<Libp2pStream, Libp2pBehaviour>,
}

impl Libp2pService {
/// Constructs a Libp2pService
pub fn new(log: &Logger, config: &Libp2pConfig) -> Self {
// TODO @Greg do local storage
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
info!(log, "Local peer id: {:?}", local_peer_id);

let transport = build_transport(local_key.clone());

let mut swarm = {
let be = MyBehaviour::new(&local_key);
Swarm::new(transport, be, local_peer_id)
};

for node in config.bootstrap_peers.clone() {
match node.parse() {
Ok(to_dial) => match Swarm::dial_addr(&mut swarm, to_dial) {
Ok(_) => debug!(log, "Dialed {:?}", node),
Err(e) => debug!(log, "Dial {:?} failed: {:?}", node, e),
},
Err(err) => error!(log, "Failed to parse address to dial: {:?}", err),
}
}

Swarm::listen_on(
&mut swarm,
config
.listening_multiaddr
.parse()
.expect("Incorrect MultiAddr Format"),
)
.unwrap();

for topic in config.pubsub_topics.clone() {
swarm.subscribe(topic);
}

Libp2pService { swarm }
}
}

impl Stream for Libp2pService {
type Item = NetworkEvent;
type Error = ();

/// Continuously polls the Libp2p swarm to get events
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
loop {
match self.swarm.poll() {
Ok(Async::Ready(Some(event))) => match event {
MyBehaviourEvent::DiscoveredPeer(peer) => {
libp2p::Swarm::dial(&mut self.swarm, peer);
}
MyBehaviourEvent::ExpiredPeer(_) => {}
MyBehaviourEvent::GossipMessage {
source,
topics,
message,
} => {
return Ok(Async::Ready(Option::from(NetworkEvent::PubsubMessage {
source,
topics,
message,
})));
}
},
Ok(Async::Ready(None)) => break,
Ok(Async::NotReady) => break,
_ => break,
}
}
Ok(Async::NotReady)
}
}

/// Events emitted by this Service to be listened by the NetworkService.
#[derive(Clone)]
pub enum NetworkEvent {
PubsubMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: Vec<u8>,
},
}

fn build_transport(local_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> {
let transport = libp2p::tcp::TcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport);

transport
.upgrade(core::upgrade::Version::V1)
.authenticate(secio::SecioConfig::new(local_key))
.multiplex(core::upgrade::SelectUpgrade::new(
yamux::Config::default(),
mplex::MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed()
}
15 changes: 15 additions & 0 deletions node/network/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "network"
version = "0.1.0"
authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ferret-libp2p = { path = "../ferret-libp2p" }
futures = "0.1.29"
tokio = "0.1.22"
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" }
log = "0.4.8"
slog = "2.5.2"
1 change: 1 addition & 0 deletions node/network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod service;
Loading

0 comments on commit bf60880

Please sign in to comment.