Skip to content

Commit

Permalink
Upgrade libp2p from 0.52.4 to 0.54.1 (#6248)
Browse files Browse the repository at this point in the history
# Description

Fixes #5996

https://github.com/libp2p/rust-libp2p/releases/tag/libp2p-v0.53.0
https://github.com/libp2p/rust-libp2p/blob/master/CHANGELOG.md

## Integration

Nothing special is needed, just note that `yamux_window_size` is no
longer applicable to libp2p (litep2p seems to still have it though).

## Review Notes

There are a few simplifications and improvements done in libp2p 0.53
regarding swarm interface, I'll list a few key/applicable here.

libp2p/rust-libp2p#4788 removed
`write_length_prefixed` function, so I inlined its code instead.

libp2p/rust-libp2p#4120 introduced new
`libp2p::SwarmBuilder` instead of now deprecated
`libp2p::swarm::SwarmBuilder`, the transition is straightforward and
quite ergonomic (can be seen in tests).

libp2p/rust-libp2p#4581 is the most annoying
change I have seen that basically makes many enums `#[non_exhaustive]`.
I mapped some, but those that couldn't be mapped I dealt with by
printing log messages once they are hit (the best solution I could come
up with, at least with stable Rust).

libp2p/rust-libp2p#4306 makes connection close
as soon as there are no handler using it, so I had to replace
`KeepAlive::Until` with an explicit future that flips internal boolean
after timeout, achieving the old behavior, though it should ideally be
removed completely at some point.

`yamux_window_size` is no longer used by libp2p thanks to
libp2p/rust-libp2p#4970 and generally Yamux
should have a higher performance now.

I have resolved and cleaned up all deprecations related to libp2p except
`BandwidthSinks`. Libp2p deprecated it (though it is still present in
0.54.1, which is why I didn't handle it just yet). Ideally Substrate
would finally [switch to the official Prometheus
client](paritytech/substrate#12699), in which
case we'd get metrics for free. Otherwise a bit of code will need to be
copy-pasted to maintain current behavior with `BandwidthSinks` gone,
which I left a TODO about.

The biggest change in 0.54.0 is
libp2p/rust-libp2p#4568 that changed transport
APIs and enabled unconditional potential port reuse, which can lead to
very confusing errors if running two Substrate nodes on the same machine
without changing listening port explicitly.

Overall nothing scary here, but testing is always appreciated.

# Checklist

* [x] My PR includes a detailed description as outlined in the
"Description" and its two subsections above.
* [x] My PR follows the [labeling requirements](

https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md#Process
) of this project (at minimum one label for `T` required)
* External contributors: ask maintainers to put the right label on your
PR.

---

Polkadot Address: 1vSxzbyz2cJREAuVWjhXUT1ds8vBzoxn2w4asNpusQKwjJd

---------

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
  • Loading branch information
nazar-pc and dmitry-markin authored Dec 12, 2024
1 parent c10e25a commit bfbb46f
Show file tree
Hide file tree
Showing 20 changed files with 846 additions and 1,068 deletions.
682 changes: 284 additions & 398 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ kvdb-shared-tests = { version = "0.11.0" }
landlock = { version = "0.3.0" }
libc = { version = "0.2.155" }
libfuzzer-sys = { version = "0.4" }
libp2p = { version = "0.52.4" }
libp2p = { version = "0.54.1" }
libp2p-identity = { version = "0.2.9" }
libsecp256k1 = { version = "0.7.0", default-features = false }
linked-hash-map = { version = "0.5.4" }
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_6248.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Upgrade libp2p to 0.54.1

doc:
- audience: [Node Dev, Node Operator]
description: |
Upgrade libp2p from 0.52.4 to 0.54.1

crates:
- name: sc-network
bump: major
- name: sc-network-types
bump: minor
- name: sc-network-sync
bump: patch
- name: sc-telemetry
bump: minor
1 change: 1 addition & 0 deletions substrate/client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct Behaviour<B: BlockT> {
}

/// Event generated by `Behaviour`.
#[derive(Debug)]
pub enum BehaviourOut {
/// Started a random iterative Kademlia discovery query.
RandomKademliaStarted,
Expand Down
172 changes: 82 additions & 90 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ use futures::prelude::*;
use futures_timer::Delay;
use ip_network::IpNetwork;
use libp2p::{
core::{Endpoint, Multiaddr},
core::{transport::PortUse, Endpoint, Multiaddr},
kad::{
self,
record::store::{MemoryStore, RecordStore},
store::{MemoryStore, RecordStore},
Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
Event, GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
RecordKey,
},
mdns::{self, tokio::Behaviour as TokioMdns},
Expand All @@ -68,8 +68,8 @@ use libp2p::{
toggle::{Toggle, ToggleConnectionHandler},
DialFailure, ExternalAddrConfirmed, FromSwarm,
},
ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters,
StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
PeerId,
};
Expand Down Expand Up @@ -214,23 +214,14 @@ impl DiscoveryConfig {
enable_mdns,
kademlia_disjoint_query_paths,
kademlia_protocol,
kademlia_legacy_protocol,
kademlia_legacy_protocol: _,
kademlia_replication_factor,
} = self;

let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
let mut config = KademliaConfig::default();
let mut config = KademliaConfig::new(kademlia_protocol.clone());

config.set_replication_factor(kademlia_replication_factor);
// Populate kad with both the legacy and the new protocol names.
// Remove the legacy protocol:
// https://github.com/paritytech/polkadot-sdk/issues/504
let kademlia_protocols = if let Some(legacy_protocol) = kademlia_legacy_protocol {
vec![kademlia_protocol.clone(), legacy_protocol]
} else {
vec![kademlia_protocol.clone()]
};
config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect());

config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);

Expand Down Expand Up @@ -613,12 +604,14 @@ impl NetworkBehaviour for DiscoveryBehaviour {
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
self.kademlia.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
)
}

Expand Down Expand Up @@ -690,7 +683,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
Ok(list.into_iter().collect())
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(e) => {
self.num_connections += 1;
Expand Down Expand Up @@ -777,6 +770,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {

self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
},
event => {
debug!(target: "sub-libp2p", "New unknown `FromSwarm` libp2p event: {event:?}");
self.kademlia.on_swarm_event(event);
},
}
}

Expand All @@ -789,11 +786,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
}

fn poll(
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Immediately process the content of `discovered`.
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(ev))
Expand Down Expand Up @@ -836,7 +829,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}

while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
while let Poll::Ready(ev) = self.kademlia.poll(cx) {
match ev {
ToSwarm::GenerateEvent(ev) => match ev {
KademliaEvent::RoutingUpdated { peer, .. } => {
Expand Down Expand Up @@ -1019,30 +1012,38 @@ impl NetworkBehaviour for DiscoveryBehaviour {
e.key(), e,
),
},
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::Bootstrap(res),
..
} => match res {
Ok(ok) => debug!(
target: "sub-libp2p",
"Libp2p => DHT bootstrap progressed: {ok:?}",
),
Err(e) => warn!(
target: "sub-libp2p",
"Libp2p => DHT bootstrap error: {e:?}",
),
},
// We never start any other type of query.
KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
},
Event::ModeChanged { new_mode } => {
debug!(target: "sub-libp2p", "Libp2p => Kademlia mode changed: {new_mode}")
},
},
ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
ToSwarm::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
ToSwarm::CloseConnection { peer_id, connection } =>
return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
ToSwarm::NewExternalAddrCandidate(observed) =>
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
ToSwarm::ExternalAddrConfirmed(addr) =>
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
ToSwarm::ExternalAddrExpired(addr) =>
return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
ToSwarm::RemoveListener { id } =>
return Poll::Ready(ToSwarm::RemoveListener { id }),
event => {
return Poll::Ready(event.map_out(|_| {
unreachable!("`GenerateEvent` is handled in a branch above; qed")
}));
},
}
}

// Poll mDNS.
while let Poll::Ready(ev) = self.mdns.poll(cx, params) {
while let Poll::Ready(ev) = self.mdns.poll(cx) {
match ev {
ToSwarm::GenerateEvent(event) => match event {
mdns::Event::Discovered(list) => {
Expand All @@ -1064,17 +1065,17 @@ impl NetworkBehaviour for DiscoveryBehaviour {
},
// `event` is an enum with no variant
ToSwarm::NotifyHandler { event, .. } => match event {},
ToSwarm::CloseConnection { peer_id, connection } =>
return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
ToSwarm::NewExternalAddrCandidate(observed) =>
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
ToSwarm::ExternalAddrConfirmed(addr) =>
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
ToSwarm::ExternalAddrExpired(addr) =>
return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
ToSwarm::RemoveListener { id } =>
return Poll::Ready(ToSwarm::RemoveListener { id }),
event => {
return Poll::Ready(
event
.map_in(|_| {
unreachable!("`NotifyHandler` is handled in a branch above; qed")
})
.map_out(|_| {
unreachable!("`GenerateEvent` is handled in a branch above; qed")
}),
);
},
}
}

Expand Down Expand Up @@ -1117,21 +1118,14 @@ mod tests {
},
identity::Keypair,
noise,
swarm::{Executor, Swarm, SwarmEvent},
swarm::{Swarm, SwarmEvent},
yamux, Multiaddr,
};
use sp_core::hash::H256;
use std::{collections::HashSet, pin::Pin, task::Poll};
use std::{collections::HashSet, task::Poll, time::Duration};

struct TokioExecutor(tokio::runtime::Runtime);
impl Executor for TokioExecutor {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.0.spawn(f);
}
}

#[test]
fn discovery_working() {
#[tokio::test]
async fn discovery_working() {
let mut first_swarm_peer_id_and_addr = None;

let genesis_hash = H256::from_low_u64_be(1);
Expand All @@ -1142,42 +1136,40 @@ mod tests {
// the first swarm via `with_permanent_addresses`.
let mut swarms = (0..25)
.map(|i| {
let keypair = Keypair::generate_ed25519();

let transport = MemoryTransport::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&keypair).unwrap())
.multiplex(yamux::Config::default())
.boxed();

let behaviour = {
let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
config
.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
.allow_private_ip(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.with_kademlia(genesis_hash, fork_id, &protocol_id);

config.finish()
};

let runtime = tokio::runtime::Runtime::new().unwrap();
#[allow(deprecated)]
let mut swarm = libp2p::swarm::SwarmBuilder::with_executor(
transport,
behaviour,
keypair.public().to_peer_id(),
TokioExecutor(runtime),
)
.build();
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_other_transport(|keypair| {
MemoryTransport::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&keypair).unwrap())
.multiplex(yamux::Config::default())
.boxed()
})
.unwrap()
.with_behaviour(|keypair| {
let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
config
.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
.allow_private_ip(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.with_kademlia(genesis_hash, fork_id, &protocol_id);

config.finish()
})
.unwrap()
.with_swarm_config(|config| {
// This is taken care of by notification protocols in non-test environment
config.with_idle_connection_timeout(Duration::from_secs(10))
})
.build();

let listen_addr: Multiaddr =
format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

if i == 0 {
first_swarm_peer_id_and_addr =
Some((keypair.public().to_peer_id(), listen_addr.clone()))
Some((*swarm.local_peer_id(), listen_addr.clone()))
}

swarm.listen_on(listen_addr.clone()).unwrap();
Expand Down Expand Up @@ -1264,7 +1256,7 @@ mod tests {
}
});

futures::executor::block_on(fut);
fut.await
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub enum Endpoint {
impl From<ConnectedPoint> for PeerEndpoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { address, role_override } =>
ConnectedPoint::Dialer { address, role_override, port_use: _ } =>
Self::Dialing(address, role_override.into()),
ConnectedPoint::Listener { local_addr, send_back_addr } =>
Self::Listening { local_addr, send_back_addr },
Expand Down
Loading

0 comments on commit bfbb46f

Please sign in to comment.