Skip to content

Commit

Permalink
feat(swarm): replace address scoring with explicit candidates
Browse files Browse the repository at this point in the history
Previously, a `NetworkBehaviour` could report an `AddressScore` for an external address. This score was a `u32` and addresses would be ranked amongst those.

In reality, an address is either confirmed to be publicly reachable (via a protocol such as AutoNAT) or merely represents a candidate that might be an external address. In a way, addresses are guilty (private) until proven innocent (publicly reachable).

When a `NetworkBehaviour` reports an address candidate, we perform address translation on it to potentially correct for ephemeral ports of TCP. These candidates are then injected back into the `NetworkBehaviour`. Protocols such as AutoNAT can use these addresses as a source for probing their NAT status. Once confirmed, they can emit a `ToSwarm::ExternalAddrConfirmed` event which again will be passed to all `NetworkBehaviour`s.

This simplified approach will allow us implement Kademlia's client-mode (#2032) without additional configuration options: As soon as an address is reported as publicly reachable, we can activate server-mode for that connection.

Related: #3877.
Related: #3953.
Related: #2032.
Related: libp2p/go-libp2p#2229.

Co-authored-by: Max Inden <mail@max-inden.de>

Pull-Request: #3954.
  • Loading branch information
thomaseizinger authored May 24, 2023
1 parent a424310 commit 5e8f2e8
Show file tree
Hide file tree
Showing 32 changed files with 273 additions and 822 deletions.
4 changes: 2 additions & 2 deletions examples/rendezvous/src/bin/rzv-register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::StreamExt;
use libp2p::{
core::transport::upgrade::Version,
identity, noise, ping, rendezvous,
swarm::{keep_alive, AddressScore, NetworkBehaviour, SwarmBuilder, SwarmEvent},
swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent},
tcp, yamux, Multiaddr, PeerId, Transport,
};
use std::time::Duration;
Expand Down Expand Up @@ -55,7 +55,7 @@ async fn main() {
// In production the external address should be the publicly facing IP address of the rendezvous point.
// This address is recorded in the registration entry by the rendezvous point.
let external_address = "/ip4/127.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
swarm.add_external_address(external_address, AddressScore::Infinite);
swarm.add_external_address(external_address);

log::info!("Local peer id: {}", swarm.local_peer_id());

Expand Down
20 changes: 5 additions & 15 deletions misc/allow-block-list/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ where
FromSwarm::ExpiredListenAddr(_) => {}
FromSwarm::ListenerError(_) => {}
FromSwarm::ListenerClosed(_) => {}
FromSwarm::NewExternalAddr(_) => {}
FromSwarm::ExpiredExternalAddr(_) => {}
FromSwarm::NewExternalAddrCandidate(_) => {}
FromSwarm::ExternalAddrExpired(_) => {}
FromSwarm::ExternalAddrConfirmed(_) => {}
}
}

Expand Down Expand Up @@ -410,13 +411,7 @@ mod tests {
dialer
.dial(
DialOpts::unknown_peer_id()
.address(
listener
.external_addresses()
.map(|a| a.addr.clone())
.next()
.unwrap(),
)
.address(listener.external_addresses().next().cloned().unwrap())
.build(),
)
.unwrap();
Expand Down Expand Up @@ -470,12 +465,7 @@ mod tests {
{
dialer.dial(
DialOpts::peer_id(*listener.local_peer_id())
.addresses(
listener
.external_addresses()
.map(|a| a.addr.clone())
.collect(),
)
.addresses(listener.external_addresses().cloned().collect())
.build(),
)
}
Expand Down
5 changes: 3 additions & 2 deletions misc/connection-limits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ impl NetworkBehaviour for Behaviour {
FromSwarm::ExpiredListenAddr(_) => {}
FromSwarm::ListenerError(_) => {}
FromSwarm::ListenerClosed(_) => {}
FromSwarm::NewExternalAddr(_) => {}
FromSwarm::ExpiredExternalAddr(_) => {}
FromSwarm::NewExternalAddrCandidate(_) => {}
FromSwarm::ExternalAddrExpired(_) => {}
FromSwarm::ExternalAddrConfirmed(_) => {}
}
}

Expand Down
33 changes: 21 additions & 12 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ use libp2p_request_response::{
};
use libp2p_swarm::{
behaviour::{
AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr,
ExpiredListenAddr, FromSwarm,
AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
ExternalAddrExpired, FromSwarm,
},
ConnectionDenied, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour,
ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, NewExternalAddrCandidate,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use std::{
Expand Down Expand Up @@ -214,7 +214,7 @@ pub struct Behaviour {
probe_id: ProbeId,

listen_addresses: ListenAddresses,
external_addresses: ExternalAddresses,
other_candidates: HashSet<Multiaddr>,
}

impl Behaviour {
Expand All @@ -240,7 +240,7 @@ impl Behaviour {
pending_actions: VecDeque::new(),
probe_id: ProbeId(0),
listen_addresses: Default::default(),
external_addresses: Default::default(),
other_candidates: Default::default(),
}
}

Expand Down Expand Up @@ -279,6 +279,12 @@ impl Behaviour {
self.servers.retain(|p| p != peer);
}

/// Explicitly probe the provided address for external reachability.
pub fn probe_address(&mut self, candidate: Multiaddr) {
self.other_candidates.insert(candidate);
self.as_client().on_new_address();
}

fn as_client(&mut self) -> AsClient {
AsClient {
inner: &mut self.inner,
Expand All @@ -294,7 +300,7 @@ impl Behaviour {
last_probe: &mut self.last_probe,
schedule_probe: &mut self.schedule_probe,
listen_addresses: &self.listen_addresses,
external_addresses: &self.external_addresses,
other_candidates: &self.other_candidates,
}
}

Expand Down Expand Up @@ -532,7 +538,6 @@ impl NetworkBehaviour for Behaviour {

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.listen_addresses.on_swarm_event(&event);
self.external_addresses.on_swarm_event(&event);

match event {
FromSwarm::ConnectionEstablished(connection_established) => {
Expand Down Expand Up @@ -561,14 +566,17 @@ impl NetworkBehaviour for Behaviour {
}));
self.as_client().on_expired_address(addr);
}
FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }) => {
FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }) => {
self.inner
.on_swarm_event(FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }));
.on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
self.as_client().on_expired_address(addr);
}
external_addr @ FromSwarm::NewExternalAddr(_) => {
self.inner.on_swarm_event(external_addr);
self.as_client().on_new_address();
FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
self.inner
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(
NewExternalAddrCandidate { addr },
));
self.probe_address(addr.to_owned());
}
listen_failure @ FromSwarm::ListenFailure(_) => {
self.inner.on_swarm_event(listen_failure)
Expand All @@ -580,6 +588,7 @@ impl NetworkBehaviour for Behaviour {
listener_closed @ FromSwarm::ListenerClosed(_) => {
self.inner.on_swarm_event(listener_closed)
}
confirmed @ FromSwarm::ExternalAddrConfirmed(_) => self.inner.on_swarm_event(confirmed),
}
}

Expand Down
24 changes: 5 additions & 19 deletions protocols/autonat/src/behaviour/as_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ use instant::Instant;
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_request_response::{self as request_response, OutboundFailure, RequestId};
use libp2p_swarm::{
AddressScore, ConnectionId, ExternalAddresses, ListenAddresses, PollParameters, ToSwarm,
};
use libp2p_swarm::{ConnectionId, ListenAddresses, PollParameters, ToSwarm};
use rand::{seq::SliceRandom, thread_rng};
use std::{
collections::{HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -97,13 +95,13 @@ pub(crate) struct AsClient<'a> {
pub(crate) last_probe: &'a mut Option<Instant>,
pub(crate) schedule_probe: &'a mut Delay,
pub(crate) listen_addresses: &'a ListenAddresses,
pub(crate) external_addresses: &'a ExternalAddresses,
pub(crate) other_candidates: &'a HashSet<Multiaddr>,
}

impl<'a> HandleInnerEvent for AsClient<'a> {
fn handle_event(
&mut self,
params: &mut impl PollParameters,
_: &mut impl PollParameters,
event: request_response::Event<DialRequest, DialResponse>,
) -> VecDeque<Action> {
match event {
Expand Down Expand Up @@ -147,19 +145,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
}

if let Ok(address) = response.result {
// Update observed address score if it is finite.
#[allow(deprecated)]
// TODO: Fix once we report `AddressScore` through `FromSwarm` event.
let score = params
.external_addresses()
.find_map(|r| (r.addr == address).then_some(r.score))
.unwrap_or(AddressScore::Finite(0));
if let AddressScore::Finite(finite_score) = score {
actions.push_back(ToSwarm::ReportObservedAddr {
address,
score: AddressScore::Finite(finite_score + 1),
});
}
actions.push_back(ToSwarm::ExternalAddrConfirmed(address));
}

actions
Expand Down Expand Up @@ -201,7 +187,7 @@ impl<'a> AsClient<'a> {
self.schedule_probe.reset(self.config.retry_interval);

let addresses = self
.external_addresses
.other_candidates
.iter()
.chain(self.listen_addresses.iter())
.cloned()
Expand Down
50 changes: 5 additions & 45 deletions protocols/autonat/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use libp2p_autonat::{
};
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_swarm::{AddressScore, Swarm, SwarmEvent};
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt as _;
use std::time::Duration;

Expand Down Expand Up @@ -70,48 +70,6 @@ async fn test_auto_probe() {
assert!(client.behaviour().public_address().is_none());
assert_eq!(client.behaviour().confidence(), 0);

// Test Private NAT Status

// Artificially add a faulty address.
let unreachable_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap();
client.add_external_address(unreachable_addr.clone(), AddressScore::Infinite);

let id = match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => {
assert_eq!(peer, server_id);
probe_id
}
other => panic!("Unexpected behaviour event: {other:?}."),
};

match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error {
probe_id,
peer,
error,
}) => {
assert_eq!(peer.unwrap(), server_id);
assert_eq!(probe_id, id);
assert_eq!(
error,
OutboundProbeError::Response(ResponseError::DialError)
);
}
other => panic!("Unexpected behaviour event: {other:?}."),
}

match client.next_behaviour_event().await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert_eq!(new, NatStatus::Private);
}
other => panic!("Unexpected behaviour event: {other:?}."),
}

assert_eq!(client.behaviour().confidence(), 0);
assert_eq!(client.behaviour().nat_status(), NatStatus::Private);
assert!(client.behaviour().public_address().is_none());

// Test new public listening address
client.listen().await;

Expand Down Expand Up @@ -142,12 +100,14 @@ async fn test_auto_probe() {
}
SwarmEvent::Behaviour(Event::StatusChanged { old, new }) => {
// Expect to flip status to public
assert_eq!(old, NatStatus::Private);
assert_eq!(old, NatStatus::Unknown);
assert!(matches!(new, NatStatus::Public(_)));
assert!(new.is_public());
break;
}
SwarmEvent::IncomingConnection { .. }
| SwarmEvent::ConnectionEstablished { .. }
| SwarmEvent::Dialing { .. }
| SwarmEvent::NewListenAddr { .. }
| SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."),
Expand Down Expand Up @@ -198,7 +158,7 @@ async fn test_confidence() {
client.listen().await;
} else {
let unreachable_addr = "/ip4/127.0.0.1/tcp/42".parse().unwrap();
client.add_external_address(unreachable_addr, AddressScore::Infinite);
client.behaviour_mut().probe_address(unreachable_addr);
}

for i in 0..MAX_CONFIDENCE + 1 {
Expand Down
16 changes: 7 additions & 9 deletions protocols/autonat/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use libp2p_autonat::{
use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::DialError;
use libp2p_swarm::{AddressScore, Swarm, SwarmEvent};
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt as _;
use std::{num::NonZeroU32, time::Duration};

Expand Down Expand Up @@ -131,10 +131,9 @@ async fn test_dial_back() {
async fn test_dial_error() {
let (mut server, server_id, server_addr) = new_server_swarm(None).await;
let (mut client, client_id) = new_client_swarm(server_id, server_addr).await;
client.add_external_address(
"/ip4/127.0.0.1/tcp/12345".parse().unwrap(),
AddressScore::Infinite,
);
client
.behaviour_mut()
.probe_address("/ip4/127.0.0.1/tcp/12345".parse().unwrap());
async_std::task::spawn(client.loop_on_next());

let request_probe_id = match server.next_behaviour_event().await {
Expand Down Expand Up @@ -274,10 +273,9 @@ async fn test_dial_multiple_addr() {

let (mut client, client_id) = new_client_swarm(server_id, server_addr.clone()).await;
client.listen().await;
client.add_external_address(
"/ip4/127.0.0.1/tcp/12345".parse().unwrap(),
AddressScore::Infinite,
);
client
.behaviour_mut()
.probe_address("/ip4/127.0.0.1/tcp/12345".parse().unwrap());
async_std::task::spawn(client.loop_on_next());

let dial_addresses = match server.next_behaviour_event().await {
Expand Down
5 changes: 3 additions & 2 deletions protocols/dcutr/src/behaviour_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,9 @@ impl NetworkBehaviour for Behaviour {
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddr(_)
| FromSwarm::ExpiredExternalAddr(_) => {}
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,9 @@ impl NetworkBehaviour for Floodsub {
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddr(_)
| FromSwarm::ExpiredExternalAddr(_) => {}
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3500,8 +3500,9 @@ where
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddr(_)
| FromSwarm::ExpiredExternalAddr(_) => {}
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {}
}
}
}
Expand Down
Loading

0 comments on commit 5e8f2e8

Please sign in to comment.