Skip to content

Commit

Permalink
Merge branch 'master' into grpc-server-enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
tiram88 committed Oct 25, 2023
2 parents 4fa8079 + c6b9a7b commit e15b070
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 160 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/addressmanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ parking_lot.workspace = true
borsh.workspace = true
log.workspace = true
tokio.workspace = true

thiserror.workspace = true
local-ip-address = "0.5.3"
igd-next = { version = "0.14.2", features = ["aio_tokio"] }

Expand Down
267 changes: 166 additions & 101 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@ mod port_mapping_extender;
mod stores;
extern crate self as address_manager;

use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, iter, net::SocketAddr, sync::Arc, time::Duration};

use address_manager::port_mapping_extender::Extender;
use igd_next::{self as igd, aio::tokio::Tokio, AddPortError, Gateway};
use itertools::Itertools;
use igd_next::{
self as igd, aio::tokio::Tokio, AddAnyPortError, AddPortError, Gateway, GetExternalIpError, GetGenericPortMappingEntryError,
SearchError,
};
use itertools::{
Either::{Left, Right},
Itertools,
};
use kaspa_consensus_core::config::Config;
use kaspa_core::task::tick::TickService;
use kaspa_core::{debug, info, time::unix_now, warn};
use kaspa_core::{debug, info, task::tick::TickService, time::unix_now, warn};
use kaspa_database::prelude::{StoreResultExtensions, DB};
use kaspa_utils::networking::IpAddress;
use local_ip_address::list_afinet_netifas;
use parking_lot::Mutex;
use stores::banned_address_store::{BannedAddressesStore, BannedAddressesStoreReader, ConnectionBanTimestamp, DbBannedAddressesStore};
use thiserror::Error;

pub use stores::NetAddress;

Expand All @@ -33,6 +39,18 @@ struct ExtendHelper {
external_port: u16,
}

#[derive(Error, Debug)]
pub enum UpnpError {
#[error(transparent)]
AddPortError(#[from] AddPortError),
#[error(transparent)]
AddAnyPortError(#[from] AddAnyPortError),
#[error(transparent)]
SearchError(#[from] SearchError),
#[error(transparent)]
GetExternalIpError(#[from] GetExternalIpError),
}

pub struct AddressManager {
banned_address_store: DbBannedAddressesStore,
address_store: address_store_with_cache::Store,
Expand All @@ -55,124 +73,171 @@ impl AddressManager {
}

fn init_local_addresses(&mut self, tick_service: Arc<TickService>) -> Option<Extender> {
if let Some((net_addr, extend)) = self.configured_address() {
self.local_net_addresses.push(net_addr);

if let Some(ExtendHelper { gateway, local_addr, external_port }) = extend {
let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
root_url: gateway.root_url,
control_url: gateway.control_url,
control_schema_url: gateway.control_schema_url,
control_schema: gateway.control_schema,
provider: Tokio,
};
Some(Extender::new(
tick_service,
Duration::from_secs(UPNP_EXTEND_PERIOD),
UPNP_DEADLINE_SEC,
gateway,
external_port,
local_addr,
))
} else {
None
}
self.local_net_addresses = self.local_addresses().collect();

let extender = if self.local_net_addresses.is_empty() && !self.config.disable_upnp {
let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match self.upnp() {
Err(err) => {
warn!("[UPnP] Error adding port mapping: {err}");
return None;
}
Ok(None) => return None,
Ok(Some((net_address, extend_helper))) => (net_address, extend_helper),
};
self.local_net_addresses.push(net_address);

let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
root_url: gateway.root_url,
control_url: gateway.control_url,
control_schema_url: gateway.control_schema_url,
control_schema: gateway.control_schema,
provider: Tokio,
};
Some(Extender::new(
tick_service,
Duration::from_secs(UPNP_EXTEND_PERIOD),
UPNP_DEADLINE_SEC,
gateway,
external_port,
local_addr,
))
} else {
self.add_routable_addresses_from_net_interfaces();
None
}
};

self.local_net_addresses.iter().for_each(|net_addr| {
info!("Publicly routable local address {} added to store", net_addr);
});
extender
}

fn configured_address(&self) -> Option<(NetAddress, Option<ExtendHelper>)> {
fn local_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
match self.config.externalip {
// An external IP was passed, we will try to bind that if it's valid
Some(local_net_address) if local_net_address.ip.is_publicly_routable() => {
info!("External address {} added to store", local_net_address);
Some((local_net_address, None))
info!("External address is publicly routable {}", local_net_address);
return Left(iter::once(local_net_address));
}
Some(local_net_address) => {
info!("Non-publicly routable external address {} not added to store", local_net_address);
None
info!("External address is not publicly routable {}", local_net_address);
}
None if !self.config.disable_upnp => {
let gateway = igd::search_gateway(Default::default()).ok()?;
let ip = IpAddress::new(gateway.get_external_ip().ok()?);
if !ip.is_publicly_routable() {
info!("Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
return None;
}
info!("Got external ip from gateway using upnp: {ip}");

let default_port = self.config.default_p2p_port();
None => {}
};

let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(default_port);
let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
} else {
normalized_p2p_listen_address.into()
};
Right(self.routable_addresses_from_net_interfaces())
}

match gateway.add_port(
igd::PortMappingProtocol::TCP,
default_port,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
) {
Ok(_) => {
info!("Added port mapping to default external port: {ip}:{default_port}");
Some((
NetAddress { ip, port: default_port },
Some(ExtendHelper { gateway, local_addr, external_port: default_port }),
))
}
Err(AddPortError::PortInUse {}) => {
let port = gateway
.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, UPNP_REGISTRATION_NAME)
.ok()?;
info!("Added port mapping to random external port: {ip}:{port}");
Some((NetAddress { ip, port }, Some(ExtendHelper { gateway, local_addr, external_port: port })))
}
Err(err) => {
warn!("error adding port: {err}");
None
}
}
}
None => None,
fn routable_addresses_from_net_interfaces(&self) -> impl Iterator<Item = NetAddress> + '_ {
// check whatever was passed as listen address (if routable)
// otherwise(listen_address === 0.0.0.0) check all interfaces
let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());
if listen_address.ip.is_publicly_routable() {
info!("Publicly routable local address found: {}", listen_address.ip);
Left(Left(iter::once(listen_address)))
} else if listen_address.ip.is_unspecified() {
let network_interfaces = list_afinet_netifas();
let Ok(network_interfaces) = network_interfaces else {
warn!("Error getting network interfaces: {:?}", network_interfaces);
return Left(Right(iter::empty()));
};
// TODO: Add Check IPv4 or IPv6 match from Go code
Right(network_interfaces.into_iter().map(|(_, ip)| IpAddress::from(ip)).filter(|&ip| ip.is_publicly_routable()).map(
|ip| {
info!("Publicly routable local address found: {}", ip);
NetAddress::new(ip, self.config.default_p2p_port())
},
))
} else {
Left(Right(iter::empty()))
}
}
fn add_routable_addresses_from_net_interfaces(&mut self) {
// If listen_address === 0.0.0.0, bind all interfaces
// else, bind whatever was passed as listen address (if routable)
let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());

if listen_address.ip.is_unspecified() {
let network_interfaces = list_afinet_netifas();
fn upnp(&self) -> Result<Option<(NetAddress, ExtendHelper)>, UpnpError> {
info!("[UPnP] Attempting to register upnp... (to disable run the node with --disable-upnp)");
let gateway = igd::search_gateway(Default::default())?;
let ip = IpAddress::new(gateway.get_external_ip()?);
if !ip.is_publicly_routable() {
info!("[UPnP] Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
return Ok(None);
}
info!("[UPnP] Got external ip from gateway using upnp: {ip}");

if let Ok(network_interfaces) = network_interfaces {
for (_, ip) in network_interfaces.iter() {
let curr_ip = IpAddress::new(*ip);
let default_port = self.config.default_p2p_port();

// TODO: Add Check IPv4 or IPv6 match from Go code
if curr_ip.is_publicly_routable() {
info!("Publicly routable local address {} added to store", curr_ip);
self.local_net_addresses.push(NetAddress { ip: curr_ip, port: self.config.default_p2p_port() });
} else {
debug!("Non-publicly routable interface address {} not added to store", curr_ip);
let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(default_port);
let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
} else {
normalized_p2p_listen_address.into()
};

// This loop checks for existing port mappings in the UPnP-enabled gateway.
//
// The goal of this loop is to identify if the desired external port (`default_port`) is
// already mapped to any device inside the local network. This is crucial because, in
// certain scenarios, gateways might not throw the `PortInUse` error but rather might
// silently remap the external port when there's a conflict. By iterating through the
// current mappings, we can make an informed decision about whether to attempt using
// the default port or request a new random one.
//
// The loop goes through all existing port mappings one-by-one:
// - If a mapping is found that uses the desired external port, the loop breaks with `already_in_use` set to true.
// - If the index is not valid (i.e., we've iterated through all the mappings), the loop breaks with `already_in_use` set to false.
// - Any other errors during fetching of port mappings are handled accordingly, but the end result is to exit the loop with the `already_in_use` flag set appropriately.
let mut index = 0;
let already_in_use = loop {
match gateway.get_generic_port_mapping_entry(index) {
Ok(entry) => {
if entry.enabled && entry.external_port == default_port {
info!("[UPnP] Found existing mapping that uses the same external port. Description: {}, external port: {}, internal port: {}, client: {}, lease duration: {}", entry.port_mapping_description, entry.external_port, entry.internal_port, entry.internal_client, entry.lease_duration);
break true;
}
index += 1;
}
} else {
warn!("Error getting network interfaces: {:?}", network_interfaces);
Err(GetGenericPortMappingEntryError::ActionNotAuthorized) => {
index += 1;
continue;
}
Err(GetGenericPortMappingEntryError::RequestError(err)) => {
warn!("[UPnP] request existing port mapping err: {:?}", err);
break false;
}
Err(GetGenericPortMappingEntryError::SpecifiedArrayIndexInvalid) => break false,
}
} else if listen_address.ip.is_publicly_routable() {
info!("Publicly routable P2P listen address {} added to store", listen_address.ip);
self.local_net_addresses.push(listen_address);
} else {
debug!("Non-publicly routable listen address {} not added to store.", listen_address.ip);
};
if already_in_use {
let port =
gateway.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, UPNP_REGISTRATION_NAME)?;
info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
return Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })));
}

match gateway.add_port(
igd::PortMappingProtocol::TCP,
default_port,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
) {
Ok(_) => {
info!("[UPnP] Added port mapping to default external port: {ip}:{default_port}");
Ok(Some((NetAddress { ip, port: default_port }, ExtendHelper { gateway, local_addr, external_port: default_port })))
}
Err(AddPortError::PortInUse {}) => {
let port = gateway.add_any_port(
igd::PortMappingProtocol::TCP,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
)?;
info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })))
}
Err(err) => Err(err.into()),
}
}

pub fn best_local_address(&mut self) -> Option<NetAddress> {
if self.local_net_addresses.is_empty() {
None
Expand Down
11 changes: 8 additions & 3 deletions components/addressmanager/src/port_mapping_extender.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use igd_next::{aio::tokio::Tokio, AddPortError};
use kaspa_core::{
debug, error,
debug, error, info,
task::{
service::{AsyncService, AsyncServiceFuture},
tick::{TickReason, TickService},
Expand Down Expand Up @@ -49,9 +49,9 @@ impl Extender {
)
.await
{
warn!("extend external ip mapping err: {e:?}");
warn!("[UPnP] Extend external ip mapping err: {e:?}");
} else {
debug!("extend external ip mapping");
debug!("[UPnP] Extend external ip mapping");
}
}
// Let the system print final logs before exiting
Expand Down Expand Up @@ -81,6 +81,11 @@ impl AsyncService for Extender {

fn stop(self: Arc<Self>) -> AsyncServiceFuture {
Box::pin(async move {
if let Err(err) = self.gateway.remove_port(igd_next::PortMappingProtocol::TCP, self.external_port).await {
warn!("[UPnP] Remove port mapping err: {err:?}");
} else {
info!("[UPnP] Successfully removed port mapping, external port: {}", self.external_port);
}
trace!("{} stopped", SERVICE_NAME);
Ok(())
})
Expand Down
Loading

0 comments on commit e15b070

Please sign in to comment.