Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement constructor, initial bootstrap, and tests, for PropagationNetwork. #41

Merged
merged 21 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ serde_json = "1.0"
futures = "0.3"
log = "0.4"
simperby-common = { version = "0.0.0", path = "../common" }
libp2p = "0.46.1"
libp2p = "0.46.1"

[dev-dependencies]
rand = "0.8.5"
6 changes: 3 additions & 3 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use simperby_common::crypto::*;
use std::net::SocketAddrV4;
use tokio::sync::mpsc;
use tokio::sync::broadcast;

pub type BroadcastToken = u64;

Expand Down Expand Up @@ -35,7 +35,7 @@ pub trait AuthorizedNetwork: Send + Sync {
/// Gets the current status of a broadcasting message.
async fn get_broadcast_status(&self, token: BroadcastToken) -> Result<BroadcastStatus, String>;
/// Creates a receiver for every message broadcasted to the network, except the one sent by this instance.
async fn create_recv_queue(&self) -> Result<mpsc::Receiver<Vec<u8>>, ()>;
async fn create_recv_queue(&self) -> Result<broadcast::Receiver<Vec<u8>>, ()>;
/// Provides the estimated list of live nodes that are eligible and identified by their public keys.
async fn get_live_list(&self) -> Result<Vec<PublicKey>, ()>;
}
Expand All @@ -56,7 +56,7 @@ pub trait UnauthorizedNetwork: Send + Sync {
/// Broadcasts a message to the network.
async fn broadcast(&self, message: &[u8]) -> Result<(), String>;
/// Creates a receiver for every message broadcasted to the network, except the one sent by this instance.
async fn create_recv_queue(&self) -> Result<mpsc::Receiver<Vec<u8>>, ()>;
async fn create_recv_queue(&self) -> Result<broadcast::Receiver<Vec<u8>>, ()>;
/// Provides the estimated list of live nodes identified by their IP addresses
async fn get_live_list(&self) -> Result<Vec<std::net::SocketAddrV4>, ()>;
}
49 changes: 41 additions & 8 deletions network/src/propagation_network/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,63 @@
use libp2p::{
floodsub::{Floodsub, FloodsubEvent},
identify::{Identify, IdentifyEvent},
kad::{store::MemoryStore, Kademlia, KademliaEvent},
identify::{Identify, IdentifyConfig, IdentifyEvent},
identity::PublicKey,
kad::{store::MemoryStore, Kademlia, KademliaConfig, KademliaEvent},
NetworkBehaviour,
};
use std::time::Duration;

/// A libp2p network behaviour.
///
/// It collects other network behaviours to extend their functionalities,
/// and implements [`libp2p::swarm::NetworkBehaviour`] as well.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event")]
pub struct Behaviour {
/// A network behaviour that identifies connected peers.
/// The network behaviour that identifies connected peers.
///
/// Information of the identified peer contains its public key, listen addresses, etc.
identify: Identify,
/// A network behaviour that implement Kademlia Distributed Hash Table (Kademlia DHT).
pub(crate) identify: Identify,
/// The network behaviour that implement Kademlia Distributed Hash Table (Kademlia DHT).
///
/// Storing and retrieving items in/from the DHT do not occur in this crate.
/// Instead, kademlia continuously discovers k closest peers
/// to maintain k connections with its neighbors.
kademlia: Kademlia<MemoryStore>,
/// A network behaviour that implements PubSub message passing protocol.
pub(crate) kademlia: Kademlia<MemoryStore>,
/// The network behaviour that implements PubSub message passing protocol.
///
/// It tries to propagate a message to all peers that it has connections with,
/// thus flooding the network with messages.
floodsub: Floodsub,
pub(crate) floodsub: Floodsub,
}

impl Behaviour {
/// A constructor with default configuration.
pub fn new(local_public_key: PublicKey) -> Self {
let local_peer_id = local_public_key.to_peer_id();

let identify_config =
IdentifyConfig::new("/simperby/identify".to_string(), local_public_key);

// Create a key-value store, which will not be used in this crate, for Kademlia DHT.
let store = MemoryStore::new(local_peer_id);

// Note: The default configuration for Kademlia is subject to change.
let mut kademlia_config = KademliaConfig::default();
kademlia_config
.set_protocol_name("/simperby/kademlia".as_bytes())
.set_connection_idle_timeout(Duration::from_secs(30))
.set_query_timeout(Duration::from_secs(20));

Self {
identify: Identify::new(identify_config),
kademlia: Kademlia::with_config(local_peer_id, store, kademlia_config),
floodsub: Floodsub::new(local_peer_id),
}
}
}

#[derive(Debug)]
/// Network events captured from other network behaviours in [`Behaviour`].
pub enum Event {
Identify(IdentifyEvent),
Expand Down
85 changes: 85 additions & 0 deletions network/src/propagation_network/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::{
default::Default,
net::{Ipv4Addr, SocketAddrV4},
time::Duration,
};

use libp2p::{multiaddr::Protocol, Multiaddr};

/// A set of configurations of [`PropagationNetwork`].
///
/// To customize it, call `default` and chain the methods named like `with_<fieldname>`.
pub struct PropagationNetworkConfig {
/// The addresses to listen on to handle incoming connection requests.
pub(crate) listen_address: Multiaddr,
/// The timeout parameter for listener creation.
pub(crate) listener_creation_timeout: Duration,
/// The timeout parameter for initial bootstrap.
pub(crate) initial_bootstrap_timeout: Duration,
/// The interval for the guaranteed lock aquisition for swarm.
///
/// It is the maximal delay until the [`PropagationNetwork`] aquires
/// all of the resources needed to serve a job assigned from its public interface.
pub(crate) lock_release_interval: Duration,
/// The interval for the regular peer discovery routine.
pub(crate) peer_discovery_interval: Duration,
/// The capacity for the message queue that passes messages from other nodes
/// to its simperby node.
pub(crate) message_queue_capacity: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is common to refer struct fields with The in the doc comments.
(e.g., The addresses to ..)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied it to all structs.

}

impl Default for PropagationNetworkConfig {
fn default() -> Self {
Self {
listen_address: Self::convert_socketaddr_to_multiaddr(SocketAddrV4::new(
Ipv4Addr::new(0, 0, 0, 0),
0,
)),
listener_creation_timeout: Duration::from_millis(1000),
initial_bootstrap_timeout: Duration::from_millis(3000),
lock_release_interval: Duration::from_millis(30),
peer_discovery_interval: Duration::from_millis(10000),
message_queue_capacity: 100,
}
}
}

impl PropagationNetworkConfig {
pub fn with_listen_address(&mut self, listen_address: SocketAddrV4) -> &mut Self {
self.listen_address = Self::convert_socketaddr_to_multiaddr(listen_address);
self
}

pub fn with_listener_creation_timeout(
&mut self,
listener_creation_timeout: Duration,
) -> &mut Self {
self.listener_creation_timeout = listener_creation_timeout;
self
}

pub fn with_initial_bootstrap_timeout(
&mut self,
initial_bootstrap_timeout: Duration,
) -> &mut Self {
self.initial_bootstrap_timeout = initial_bootstrap_timeout;
self
}

pub fn with_peer_discovery_interval(&mut self, peer_discovery_interval: Duration) -> &mut Self {
self.peer_discovery_interval = peer_discovery_interval;
self
}

pub fn with_message_queue_capacity(&mut self, message_queue_capacity: usize) -> &mut Self {
self.message_queue_capacity = message_queue_capacity;
self
}

fn convert_socketaddr_to_multiaddr(socket_addr: SocketAddrV4) -> Multiaddr {
Multiaddr::from_iter([
Protocol::Ip4(*socket_addr.ip()),
Protocol::Tcp(socket_addr.port()),
])
}
}
Loading