Skip to content

Commit

Permalink
Expose API for PublicAddresses (#212)
Browse files Browse the repository at this point in the history
This PR exposes an object for handling public addresses.
- `add_address` - adds an address to the given set, inserting the local
peer ID or returning an error on different peer Ids
- `remove_address` - removes an exact address from the set
- `get_addresses` - list of actual addresses

Users of litep2p can obtain the given list via litep2p::public_addresses
method.
The lists are exposed internally by a similar API on the transport
handler (the handler that communicates with the transport manager).

The identify protocol provides a concatenated list of:
- user provided addresses via the config
- listen addresses 
- external addresses

This ensures that Substrate can use a custom heuristic to update the
list of external addresses.

### Testing Done
- Added an integration test with 2 litep2p nodes

Closes: #191

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Sep 4, 2024
1 parent e3a22d5 commit 25ddf91
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 68 deletions.
159 changes: 159 additions & 0 deletions src/addresses.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2024 litep2p developers
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::{collections::HashSet, sync::Arc};

use multiaddr::{Multiaddr, Protocol};
use parking_lot::RwLock;

use crate::PeerId;

/// Set of the public addresses of the local node.
///
/// The format of the addresses stored in the set contain the local peer ID.
/// This requirement is enforced by the [`PublicAddresses::add_address`] method,
/// that will add the local peer ID to the address if it is missing.
///
/// # Note
///
/// - The addresses are reported to the identify protocol and are used by other nodes
/// to establish a connection with the local node.
///
/// - Users must ensure that the addresses are reachable from the network.
#[derive(Debug, Clone)]
pub struct PublicAddresses {
pub(crate) inner: Arc<RwLock<HashSet<Multiaddr>>>,
local_peer_id: PeerId,
}

impl PublicAddresses {
/// Creates new [`PublicAddresses`] from the given peer ID.
pub(crate) fn new(local_peer_id: PeerId) -> Self {
Self {
inner: Arc::new(RwLock::new(HashSet::new())),
local_peer_id,
}
}

/// Add a public address to the list of addresses.
///
/// The address must contain the local peer ID, otherwise an error is returned.
/// In case the address does not contain any peer ID, it will be added.
///
/// Returns true if the address was added, false if it was already present.
pub fn add_address(&self, address: Multiaddr) -> Result<bool, InsertionError> {
let address = ensure_local_peer(address, self.local_peer_id)?;
Ok(self.inner.write().insert(address))
}

/// Remove the exact public address.
///
/// The provided address must contain the local peer ID.
pub fn remove_address(&self, address: &Multiaddr) -> bool {
self.inner.write().remove(address)
}

/// Returns a vector of the available listen addresses.
pub fn get_addresses(&self) -> Vec<Multiaddr> {
self.inner.read().iter().cloned().collect()
}
}

/// Check if the address contains the local peer ID.
///
/// If the address does not contain any peer ID, it will be added.
fn ensure_local_peer(
mut address: Multiaddr,
local_peer_id: PeerId,
) -> Result<Multiaddr, InsertionError> {
if address.is_empty() {
return Err(InsertionError::EmptyAddress);
}

// Verify the peer ID from the address corresponds to the local peer ID.
if let Some(peer_id) = PeerId::try_from_multiaddr(&address) {
if peer_id != local_peer_id {
return Err(InsertionError::DifferentPeerId);
}
} else {
address.push(Protocol::P2p(local_peer_id.into()));
}

Ok(address)
}

/// The error returned when an address cannot be inserted.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InsertionError {
/// The address is empty.
EmptyAddress,
/// The address contains a different peer ID than the local peer ID.
DifferentPeerId,
}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;

#[test]
fn add_remove_contains() {
let peer_id = PeerId::random();
let addresses = PublicAddresses::new(peer_id);
let address = Multiaddr::from_str("/dns/domain1.com/tcp/30333").unwrap();
let peer_address = Multiaddr::from_str("/dns/domain1.com/tcp/30333")
.unwrap()
.with(Protocol::P2p(peer_id.into()));

assert!(!addresses.get_addresses().contains(&address));

assert!(addresses.add_address(address.clone()).unwrap());
// Adding the address a second time returns Ok(false).
assert!(!addresses.add_address(address.clone()).unwrap());

assert!(!addresses.get_addresses().contains(&address));
assert!(addresses.get_addresses().contains(&peer_address));

addresses.remove_address(&peer_address);
assert!(!addresses.get_addresses().contains(&peer_address));
}

#[test]
fn get_addresses() {
let peer_id = PeerId::random();
let addresses = PublicAddresses::new(peer_id);
let address1 = Multiaddr::from_str("/dns/domain1.com/tcp/30333").unwrap();
let address2 = Multiaddr::from_str("/dns/domain2.com/tcp/30333").unwrap();
// Addresses different than the local peer ID are ignored.
let address3 = Multiaddr::from_str(
"/dns/domain2.com/tcp/30333/p2p/12D3KooWSueCPH3puP2PcvqPJdNaDNF3jMZjtJtDiSy35pWrbt5h",
)
.unwrap();

assert!(addresses.add_address(address1.clone()).unwrap());
assert!(addresses.add_address(address2.clone()).unwrap());
addresses.add_address(address3.clone()).unwrap_err();

let addresses = addresses.get_addresses();
assert_eq!(addresses.len(), 2);
assert!(addresses.contains(&address1.with(Protocol::P2p(peer_id.into()))));
assert!(addresses.contains(&address2.with(Protocol::P2p(peer_id.into()))));
}
}
15 changes: 11 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
#![allow(clippy::match_like_matches_macro)]

use crate::{
addresses::PublicAddresses,
config::Litep2pConfig,
error::DialError,
protocol::{
libp2p::{bitswap::Bitswap, identify::Identify, kademlia::Kademlia, ping::Ping},
mdns::Mdns,
Expand All @@ -50,7 +52,6 @@ use crate::transport::webrtc::WebRtcTransport;
#[cfg(feature = "websocket")]
use crate::transport::websocket::WebSocketTransport;

use error::DialError;
use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;
use transport::Endpoint;
Expand All @@ -65,6 +66,7 @@ pub use types::protocol::ProtocolName;

pub(crate) mod peer_id;

pub mod addresses;
pub mod codec;
pub mod config;
pub mod crypto;
Expand Down Expand Up @@ -387,7 +389,7 @@ impl Litep2p {
// if identify was enabled, give it the enabled protocols and listen addresses and start it
if let Some((service, mut identify_config)) = identify_info.take() {
identify_config.protocols = transport_manager.protocols().cloned().collect();
let identify = Identify::new(service, identify_config, listen_addresses.clone());
let identify = Identify::new(service, identify_config);

litep2p_config.executor.run(Box::pin(async move {
let _ = identify.run().await;
Expand Down Expand Up @@ -450,7 +452,12 @@ impl Litep2p {
&self.local_peer_id
}

/// Get listen address of litep2p.
/// Get the list of public addresses of the node.
pub fn public_addresses(&self) -> PublicAddresses {
self.transport_manager.public_addresses()
}

/// Get the list of listen addresses of the node.
pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
self.listen_addresses.iter()
}
Expand All @@ -473,7 +480,7 @@ impl Litep2p {
/// Add one ore more known addresses for peer.
///
/// Return value denotes how many addresses were added for the peer.
// Addresses belonging to disabled/unsupported transports will be ignored.
/// Addresses belonging to disabled/unsupported transports will be ignored.
pub fn add_known_address(
&mut self,
peer: PeerId,
Expand Down
97 changes: 78 additions & 19 deletions src/protocol/libp2p/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ pub struct Config {
/// Protocols supported by the local node, filled by `Litep2p`.
pub(crate) protocols: Vec<ProtocolName>,

/// Public addresses.
pub(crate) public_addresses: Vec<Multiaddr>,

/// Protocol version.
pub(crate) protocol_version: String,

Expand All @@ -97,15 +94,13 @@ impl Config {
pub fn new(
protocol_version: String,
user_agent: Option<String>,
public_addresses: Vec<Multiaddr>,
) -> (Self, Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>) {
let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE);

(
Self {
tx_event,
public: None,
public_addresses,
protocol_version,
user_agent,
codec: ProtocolCodec::UnsignedVarint(Some(IDENTIFY_PAYLOAD_SIZE)),
Expand Down Expand Up @@ -182,9 +177,6 @@ pub(crate) struct Identify {
/// User agent.
user_agent: String,

/// Public addresses.
listen_addresses: HashSet<Multiaddr>,

/// Protocols supported by the local node, filled by `Litep2p`.
protocols: Vec<String>,

Expand All @@ -200,16 +192,11 @@ pub(crate) struct Identify {

impl Identify {
/// Create new [`Identify`] protocol.
pub(crate) fn new(
service: TransportService,
config: Config,
listen_addresses: Vec<Multiaddr>,
) -> Self {
pub(crate) fn new(service: TransportService, config: Config) -> Self {
Self {
service,
tx: config.tx_event,
peers: HashMap::new(),
listen_addresses: config.public_addresses.into_iter().chain(listen_addresses).collect(),
public: config.public.expect("public key to be supplied"),
protocol_version: config.protocol_version,
user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()),
Expand Down Expand Up @@ -265,15 +252,16 @@ impl Identify {
}
};

let mut listen_addr: HashSet<_> =
self.service.listen_addresses().into_iter().map(|addr| addr.to_vec()).collect();
listen_addr
.extend(self.service.public_addresses().inner.read().iter().map(|addr| addr.to_vec()));

let identify = identify_schema::Identify {
protocol_version: Some(self.protocol_version.clone()),
agent_version: Some(self.user_agent.clone()),
public_key: Some(self.public.to_protobuf_encoding()),
listen_addrs: self
.listen_addresses
.iter()
.map(|address| address.to_vec())
.collect::<Vec<_>>(),
listen_addrs: listen_addr.into_iter().collect(),
observed_addr,
protocols: self.protocols.clone(),
};
Expand Down Expand Up @@ -413,3 +401,74 @@ impl Identify {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{config::ConfigBuilder, transport::tcp::config::Config as TcpConfig, Litep2p};
use multiaddr::{Multiaddr, Protocol};

fn create_litep2p() -> (
Litep2p,
Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
PeerId,
) {
let (identify_config, identify) =
Config::new("1.0.0".to_string(), Some("litep2p/1.0.0".to_string()));

let keypair = crate::crypto::ed25519::Keypair::generate();
let peer = PeerId::from_public_key(&crate::crypto::PublicKey::Ed25519(keypair.public()));
let config = ConfigBuilder::new()
.with_keypair(keypair)
.with_tcp(TcpConfig {
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
..Default::default()
})
.with_libp2p_identify(identify_config)
.build();

(Litep2p::new(config).unwrap(), identify, peer)
}

#[tokio::test]
async fn update_identify_addresses() {
// Create two instances of litep2p
let (mut litep2p1, mut event_stream1, peer1) = create_litep2p();
let (mut litep2p2, mut event_stream2, _peer2) = create_litep2p();
let litep2p1_address = litep2p1.listen_addresses().into_iter().next().unwrap();

let multiaddr: Multiaddr = "/ip6/::9/tcp/111".parse().unwrap();
// Litep2p1 is now reporting the new address.
assert!(litep2p1.public_addresses().add_address(multiaddr.clone()).unwrap());

// Dial `litep2p1`
litep2p2.dial_address(litep2p1_address.clone()).await.unwrap();

let expected_multiaddr = multiaddr.with(Protocol::P2p(peer1.into()));

tokio::spawn(async move {
loop {
tokio::select! {
_ = litep2p1.next_event() => {}
_event = event_stream1.next() => {}
}
}
});

loop {
tokio::select! {
_ = litep2p2.next_event() => {}
event = event_stream2.next() => match event {
Some(IdentifyEvent::PeerIdentified {
listen_addresses,
..
}) => {
assert!(listen_addresses.iter().any(|address| address == &expected_multiaddr));
break;
}
_ => {}
}
}
}
}
}
Loading

0 comments on commit 25ddf91

Please sign in to comment.