Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

added Kad mode separatation logic #21

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ mod client;
mod event_loop;

use crate::{
p2p::{
client::{Client, Command},
event_loop::IdentityData,
},
p2p::client::{Client, Command},
types::{LibP2PConfig, SecretKey},
};
use event_loop::EventLoop;
Expand All @@ -45,7 +42,7 @@ pub fn init(cfg: LibP2PConfig, id_keys: Keypair) -> Result<(Client, EventLoop)>

// Use identify protocol_version as Kademlia protocol name
let kademlia_protocol_name =
StreamProtocol::try_from_owned(cfg.identify_protocol_version.clone())
StreamProtocol::try_from_owned(cfg.identify.protocol_version.clone())
.expect("Invalid Kademlia protocol name");

let mut swarm = SwarmBuilder::with_existing_identity(id_keys)
Expand All @@ -68,8 +65,8 @@ pub fn init(cfg: LibP2PConfig, id_keys: Keypair) -> Result<(Client, EventLoop)>

// create Identify Protocol Config
let identify_cfg =
identify::Config::new(cfg.identify_protocol_version.clone(), key.public())
.with_agent_version(cfg.identify_agent_version.clone());
identify::Config::new(cfg.identify.protocol_version.clone(), key.public())
.with_agent_version(cfg.identify.agent_version.to_string());

// create AutoNAT Server Config
let autonat_cfg = autonat::Config {
Expand Down Expand Up @@ -104,10 +101,7 @@ pub fn init(cfg: LibP2PConfig, id_keys: Keypair) -> Result<(Client, EventLoop)>
swarm,
command_receiver,
cfg.bootstrap_interval,
IdentityData {
agent_version: cfg.identify_agent_version.clone(),
protocol_version: cfg.identify_protocol_version.clone(),
},
cfg.identify,
),
))
}
Expand Down
48 changes: 26 additions & 22 deletions src/p2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use libp2p::{
swarm::{ConnectionError, SwarmEvent},
Multiaddr, PeerId, Swarm,
};
use std::{collections::HashMap, time::Duration};
use std::{collections::HashMap, str::FromStr, time::Duration};
use tokio::{
sync::{mpsc, oneshot},
time::{interval_at, Instant, Interval},
};
use tracing::{debug, trace};

use crate::types::{AgentVersion, IdentifyConfig};

use super::{client::Command, Behaviour, BehaviourEvent};

enum QueryChannel {
Expand All @@ -34,28 +36,22 @@ struct BootstrapState {
timer: Interval,
}

// Identity strings used for peer filtering
pub struct IdentityData {
pub agent_version: String,
pub protocol_version: String,
}

pub struct EventLoop {
swarm: Swarm<Behaviour>,
command_receiver: mpsc::Receiver<Command>,
pending_kad_queries: HashMap<QueryId, QueryChannel>,
pending_kad_routing: HashMap<PeerId, oneshot::Sender<Result<()>>>,
pending_swarm_events: HashMap<PeerId, SwarmChannel>,
bootstrap: BootstrapState,
identity_data: IdentityData,
identity_data: IdentifyConfig,
}

impl EventLoop {
pub fn new(
swarm: Swarm<Behaviour>,
command_receiver: mpsc::Receiver<Command>,
bootstrap_interval: Duration,
identity_data: IdentityData,
identity_data: IdentifyConfig,
) -> Self {
Self {
swarm,
Expand Down Expand Up @@ -144,22 +140,30 @@ impl EventLoop {
..
},
})) => {
debug!("Identity received from: {peer_id:?} on listen address: {listen_addrs:?}");
// interested in addresses with actual Multiaddresses
// containing proper 'p2p' protocol tag
if agent_version != self.identity_data.agent_version
&& protocol_version != self.identity_data.protocol_version
{
trace!("Identity Received from: {peer_id:?} on listen address: {listen_addrs:?}");
let incoming_peer_agent_version = match AgentVersion::from_str(&agent_version) {
Ok(agent) => agent,
Err(e) => {
debug!("Error parsing incoming agent version: {e}");
return;
}
};
if protocol_version == self.identity_data.protocol_version {
// Add peer to routing table only if it's in Kademlia server mode
if incoming_peer_agent_version.kademlia_mode == "server".to_string() {
trace!("Adding peer {peer_id} to routing table.");
for addr in listen_addrs {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr);
}
}
} else {
// Block and remove non-Avail peers
debug!("Removing and blocking non-avail peer from routing table. Peer: {peer_id}. Agent: {agent_version}. Protocol: {protocol_version}");
self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
self.swarm.behaviour_mut().blocked_peers.block_peer(peer_id);
} else {
for addr in listen_addrs {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr);
}
}
}
SwarmEvent::Behaviour(BehaviourEvent::AutoNat(autonat_event)) => match autonat_event {
Expand Down
89 changes: 78 additions & 11 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use anyhow::Context;
use serde::{Deserialize, Serialize};
use std::{fmt::Display, net::SocketAddr, str::FromStr, time::Duration};
use std::{
fmt::{self, Display},
net::SocketAddr,
str::FromStr,
time::Duration,
};

pub const IDENTITY_PROTOCOL: &str = "/avail_kad/id/1.0.0";
pub const IDENTITY_AGENT_BASE: &str = "avail-light-client";
pub const IDENTITY_AGENT_CLIENT_TYPE: &str = "rust-client";

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
Expand All @@ -25,10 +34,6 @@ pub struct RuntimeConfig {
/// Sets the amount of time to keep connections alive when they're idle. (default: 30s).
/// NOTE: libp2p default value is 10s, but because of Avail block time of 20s the value has been increased
pub connection_idle_timeout: u64,
/// Sets application-specific version of the protocol family used by the peer. (default: "/avail_kad/id/1.0.0")
pub identify_protocol: String,
/// Sets agent version that is sent to peers in the network. (default: "avail-light-client/rust-client")
pub identify_agent: String,
/// Configures AutoNAT behaviour to reject probes as a server for clients that are observed at a non-global ip address (default: false)
pub autonat_only_global_ips: bool,
/// Sets the timeout for a single Kademlia query. (default: 60s).
Expand All @@ -46,13 +51,14 @@ pub struct RuntimeConfig {
/// Default bootstrap peerID is 12D3KooWStAKPADXqJ7cngPYXd2mSANpdgh1xQ34aouufHA2xShz
pub secret_key: Option<SecretKey>,
pub origin: String,
/// Genesis hash of the network to be connected to. Set to a string beginning with "DEV" to connect to any network.
pub genesis_hash: String,
}

pub struct LibP2PConfig {
pub port: u16,
pub autonat_only_global_ips: bool,
pub identify_agent_version: String,
pub identify_protocol_version: String,
pub identify: IdentifyConfig,
pub kademlia: KademliaConfig,
pub secret_key: Option<SecretKey>,
pub bootstrap_interval: Duration,
Expand All @@ -64,8 +70,7 @@ impl From<&RuntimeConfig> for LibP2PConfig {
Self {
port: rtcfg.port,
autonat_only_global_ips: rtcfg.autonat_only_global_ips,
identify_agent_version: rtcfg.identify_agent.clone(),
identify_protocol_version: rtcfg.identify_protocol.clone(),
identify: rtcfg.into(),
kademlia: rtcfg.into(),
secret_key: rtcfg.secret_key.clone(),
bootstrap_interval: Duration::from_secs(rtcfg.bootstrap_period),
Expand Down Expand Up @@ -99,14 +104,13 @@ impl Default for RuntimeConfig {
}),
port: 39000,
autonat_only_global_ips: false,
identify_protocol: "/avail_kad/id/1.0.0".to_string(),
identify_agent: "avail-light-client/rust-client".to_string(),
connection_idle_timeout: 30,
kad_query_timeout: 60,
bootstrap_period: 300,
ot_collector_endpoint: "http://127.0.0.1:4317".to_string(),
metrics_network_dump_interval: 15,
origin: "external".to_string(),
genesis_hash: "DEV".to_owned(),
}
}
}
Expand Down Expand Up @@ -138,3 +142,66 @@ impl Display for Addr {
write!(f, "{}:{}", self.host, self.port)
}
}

pub struct IdentifyConfig {
pub agent_version: AgentVersion,
/// Contains Avail genesis hash
pub protocol_version: String,
}

pub struct AgentVersion {
pub base_version: String,
pub client_type: String,
// Kademlia client or server mode
pub kademlia_mode: String,
}

impl fmt::Display for AgentVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{}/{}",
self.base_version, self.client_type, self.kademlia_mode
)
}
}

impl FromStr for AgentVersion {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let parts: Vec<&str> = s.split('/').collect();
if parts.len() != 3 {
return Err("Failed to parse agent version".to_owned());
}

Ok(AgentVersion {
base_version: parts[0].to_string(),
client_type: parts[1].to_string(),
kademlia_mode: parts[2].to_string(),
})
}
}

impl From<&RuntimeConfig> for IdentifyConfig {
fn from(val: &RuntimeConfig) -> Self {
let mut genhash_short = val.genesis_hash.trim_start_matches("0x").to_string();
genhash_short.truncate(6);

let agent_version = AgentVersion {
base_version: IDENTITY_AGENT_BASE.to_string(),
client_type: IDENTITY_AGENT_CLIENT_TYPE.to_string(),
// Bootstrap should only be in server mode
kademlia_mode: "server".to_string(),
};

Self {
agent_version,
protocol_version: format!(
"{id}-{gen_hash}",
id = IDENTITY_PROTOCOL,
gen_hash = genhash_short
),
}
}
}