Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Commit

Permalink
added Kad mode separatation logic (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
sh3ll3x3c authored Dec 27, 2023
1 parent 3fa6987 commit 92a313c
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 44 deletions.
16 changes: 5 additions & 11 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ mod client;
mod event_loop;

use crate::{
p2p::{
client::{Client, Command},
event_loop::IdentityData,
},
p2p::client::{Client, Command},
types::{LibP2PConfig, SecretKey},
};
use event_loop::EventLoop;
Expand All @@ -45,7 +42,7 @@ pub fn init(cfg: LibP2PConfig, id_keys: Keypair) -> Result<(Client, EventLoop)>

// Use identify protocol_version as Kademlia protocol name
let kademlia_protocol_name =
StreamProtocol::try_from_owned(cfg.identify_protocol_version.clone())
StreamProtocol::try_from_owned(cfg.identify.protocol_version.clone())
.expect("Invalid Kademlia protocol name");

let mut swarm = SwarmBuilder::with_existing_identity(id_keys)
Expand All @@ -68,8 +65,8 @@ pub fn init(cfg: LibP2PConfig, id_keys: Keypair) -> Result<(Client, EventLoop)>

// create Identify Protocol Config
let identify_cfg =
identify::Config::new(cfg.identify_protocol_version.clone(), key.public())
.with_agent_version(cfg.identify_agent_version.clone());
identify::Config::new(cfg.identify.protocol_version.clone(), key.public())
.with_agent_version(cfg.identify.agent_version.to_string());

// create AutoNAT Server Config
let autonat_cfg = autonat::Config {
Expand Down Expand Up @@ -104,10 +101,7 @@ pub fn init(cfg: LibP2PConfig, id_keys: Keypair) -> Result<(Client, EventLoop)>
swarm,
command_receiver,
cfg.bootstrap_interval,
IdentityData {
agent_version: cfg.identify_agent_version.clone(),
protocol_version: cfg.identify_protocol_version.clone(),
},
cfg.identify,
),
))
}
Expand Down
48 changes: 26 additions & 22 deletions src/p2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use libp2p::{
swarm::{ConnectionError, SwarmEvent},
Multiaddr, PeerId, Swarm,
};
use std::{collections::HashMap, time::Duration};
use std::{collections::HashMap, str::FromStr, time::Duration};
use tokio::{
sync::{mpsc, oneshot},
time::{interval_at, Instant, Interval},
};
use tracing::{debug, trace};

use crate::types::{AgentVersion, IdentifyConfig};

use super::{client::Command, Behaviour, BehaviourEvent};

enum QueryChannel {
Expand All @@ -34,28 +36,22 @@ struct BootstrapState {
timer: Interval,
}

// Identity strings used for peer filtering
pub struct IdentityData {
pub agent_version: String,
pub protocol_version: String,
}

pub struct EventLoop {
swarm: Swarm<Behaviour>,
command_receiver: mpsc::Receiver<Command>,
pending_kad_queries: HashMap<QueryId, QueryChannel>,
pending_kad_routing: HashMap<PeerId, oneshot::Sender<Result<()>>>,
pending_swarm_events: HashMap<PeerId, SwarmChannel>,
bootstrap: BootstrapState,
identity_data: IdentityData,
identity_data: IdentifyConfig,
}

impl EventLoop {
pub fn new(
swarm: Swarm<Behaviour>,
command_receiver: mpsc::Receiver<Command>,
bootstrap_interval: Duration,
identity_data: IdentityData,
identity_data: IdentifyConfig,
) -> Self {
Self {
swarm,
Expand Down Expand Up @@ -144,22 +140,30 @@ impl EventLoop {
..
},
})) => {
debug!("Identity received from: {peer_id:?} on listen address: {listen_addrs:?}");
// interested in addresses with actual Multiaddresses
// containing proper 'p2p' protocol tag
if agent_version != self.identity_data.agent_version
&& protocol_version != self.identity_data.protocol_version
{
trace!("Identity Received from: {peer_id:?} on listen address: {listen_addrs:?}");
let incoming_peer_agent_version = match AgentVersion::from_str(&agent_version) {
Ok(agent) => agent,
Err(e) => {
debug!("Error parsing incoming agent version: {e}");
return;
}
};
if protocol_version == self.identity_data.protocol_version {
// Add peer to routing table only if it's in Kademlia server mode
if incoming_peer_agent_version.kademlia_mode == "server".to_string() {
trace!("Adding peer {peer_id} to routing table.");
for addr in listen_addrs {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr);
}
}
} else {
// Block and remove non-Avail peers
debug!("Removing and blocking non-avail peer from routing table. Peer: {peer_id}. Agent: {agent_version}. Protocol: {protocol_version}");
self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
self.swarm.behaviour_mut().blocked_peers.block_peer(peer_id);
} else {
for addr in listen_addrs {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr);
}
}
}
SwarmEvent::Behaviour(BehaviourEvent::AutoNat(autonat_event)) => match autonat_event {
Expand Down
89 changes: 78 additions & 11 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use anyhow::Context;
use serde::{Deserialize, Serialize};
use std::{fmt::Display, net::SocketAddr, str::FromStr, time::Duration};
use std::{
fmt::{self, Display},
net::SocketAddr,
str::FromStr,
time::Duration,
};

pub const IDENTITY_PROTOCOL: &str = "/avail_kad/id/1.0.0";
pub const IDENTITY_AGENT_BASE: &str = "avail-light-client";
pub const IDENTITY_AGENT_CLIENT_TYPE: &str = "rust-client";

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
Expand All @@ -25,10 +34,6 @@ pub struct RuntimeConfig {
/// Sets the amount of time to keep connections alive when they're idle. (default: 30s).
/// NOTE: libp2p default value is 10s, but because of Avail block time of 20s the value has been increased
pub connection_idle_timeout: u64,
/// Sets application-specific version of the protocol family used by the peer. (default: "/avail_kad/id/1.0.0")
pub identify_protocol: String,
/// Sets agent version that is sent to peers in the network. (default: "avail-light-client/rust-client")
pub identify_agent: String,
/// Configures AutoNAT behaviour to reject probes as a server for clients that are observed at a non-global ip address (default: false)
pub autonat_only_global_ips: bool,
/// Sets the timeout for a single Kademlia query. (default: 60s).
Expand All @@ -46,13 +51,14 @@ pub struct RuntimeConfig {
/// Default bootstrap peerID is 12D3KooWStAKPADXqJ7cngPYXd2mSANpdgh1xQ34aouufHA2xShz
pub secret_key: Option<SecretKey>,
pub origin: String,
/// Genesis hash of the network to be connected to. Set to a string beginning with "DEV" to connect to any network.
pub genesis_hash: String,
}

pub struct LibP2PConfig {
pub port: u16,
pub autonat_only_global_ips: bool,
pub identify_agent_version: String,
pub identify_protocol_version: String,
pub identify: IdentifyConfig,
pub kademlia: KademliaConfig,
pub secret_key: Option<SecretKey>,
pub bootstrap_interval: Duration,
Expand All @@ -64,8 +70,7 @@ impl From<&RuntimeConfig> for LibP2PConfig {
Self {
port: rtcfg.port,
autonat_only_global_ips: rtcfg.autonat_only_global_ips,
identify_agent_version: rtcfg.identify_agent.clone(),
identify_protocol_version: rtcfg.identify_protocol.clone(),
identify: rtcfg.into(),
kademlia: rtcfg.into(),
secret_key: rtcfg.secret_key.clone(),
bootstrap_interval: Duration::from_secs(rtcfg.bootstrap_period),
Expand Down Expand Up @@ -99,14 +104,13 @@ impl Default for RuntimeConfig {
}),
port: 39000,
autonat_only_global_ips: false,
identify_protocol: "/avail_kad/id/1.0.0".to_string(),
identify_agent: "avail-light-client/rust-client".to_string(),
connection_idle_timeout: 30,
kad_query_timeout: 60,
bootstrap_period: 300,
ot_collector_endpoint: "http://127.0.0.1:4317".to_string(),
metrics_network_dump_interval: 15,
origin: "external".to_string(),
genesis_hash: "DEV".to_owned(),
}
}
}
Expand Down Expand Up @@ -138,3 +142,66 @@ impl Display for Addr {
write!(f, "{}:{}", self.host, self.port)
}
}

pub struct IdentifyConfig {
pub agent_version: AgentVersion,
/// Contains Avail genesis hash
pub protocol_version: String,
}

pub struct AgentVersion {
pub base_version: String,
pub client_type: String,
// Kademlia client or server mode
pub kademlia_mode: String,
}

impl fmt::Display for AgentVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{}/{}",
self.base_version, self.client_type, self.kademlia_mode
)
}
}

impl FromStr for AgentVersion {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let parts: Vec<&str> = s.split('/').collect();
if parts.len() != 3 {
return Err("Failed to parse agent version".to_owned());
}

Ok(AgentVersion {
base_version: parts[0].to_string(),
client_type: parts[1].to_string(),
kademlia_mode: parts[2].to_string(),
})
}
}

impl From<&RuntimeConfig> for IdentifyConfig {
fn from(val: &RuntimeConfig) -> Self {
let mut genhash_short = val.genesis_hash.trim_start_matches("0x").to_string();
genhash_short.truncate(6);

let agent_version = AgentVersion {
base_version: IDENTITY_AGENT_BASE.to_string(),
client_type: IDENTITY_AGENT_CLIENT_TYPE.to_string(),
// Bootstrap should only be in server mode
kademlia_mode: "server".to_string(),
};

Self {
agent_version,
protocol_version: format!(
"{id}-{gen_hash}",
id = IDENTITY_PROTOCOL,
gen_hash = genhash_short
),
}
}
}

0 comments on commit 92a313c

Please sign in to comment.