Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: p2p peer management and discovery mechanism #299

Merged
merged 1 commit into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 4 additions & 14 deletions crates/grpc/src/uopool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,23 +471,13 @@ where
mempool_channels.push((ep, waiting_to_pub_rv, p2p_userop_sd))
}

let listen_addrs = config.listen_addr.to_multi_addr();
let mut p2p_network =
Network::new(config.clone(), mempool_channels).expect("p2p network init failed");

for listen_addr in listen_addrs.into_iter() {
info!("P2P node listened on {}", listen_addr);
p2p_network.listen_on(listen_addr).expect("Listen on p2p network failed");
}

if config.bootnodes.is_empty() {
info!("Start p2p mode without bootnodes");
info!("Starting p2p mode without bootnodes");
}

for enr in config.bootnodes.into_iter() {
info!("Trying to dial p2p node {enr:}");
p2p_network.dial(enr).expect("Dial bootnode failed");
}
let mut p2p_network = Network::new(config.clone(), mempool_channels)
.await
.expect("p2p network init failed");

tokio::spawn(async move {
loop {
Expand Down
2 changes: 2 additions & 0 deletions crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ libp2p-mplex = { version = "0.41.0" }

# cryptography
sha2 = "0.10.8"
tiny-keccak = "2"

# async
async-trait = { workspace = true }
Expand All @@ -54,6 +55,7 @@ tokio-util = { version = "0.7.10", features = ["codec"] }
delay_map = "0.3.0"
eyre = { workspace = true }
lazy_static = { workspace = true }
lru = "0.12"
snap = "1.1.1"
thiserror = { workspace = true }
tracing = { workspace = true }
Expand Down
33 changes: 32 additions & 1 deletion crates/p2p/src/discovery/enr_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use discv5::{enr::CombinedPublicKey, Enr};
use libp2p::{
identity::{ed25519, secp256k1, PublicKey},
identity::{ed25519, secp256k1, KeyType, PublicKey},
multiaddr::Protocol,
Multiaddr, PeerId,
};
use tiny_keccak::{Hasher, Keccak};

pub trait EnrExt {
/// PeerId of the ENR
Expand Down Expand Up @@ -58,3 +59,33 @@ impl CombinedPublicKeyExt for CombinedPublicKey {
}
}
}

pub fn peer_id_to_node_id(peer_id: &PeerId) -> Result<discv5::enr::NodeId, String> {
let pk_bytes = &peer_id.to_bytes()[2..];

let public_key = PublicKey::try_decode_protobuf(pk_bytes)
.map_err(|e| format!(" Cannot parse libp2p public key public key from peer id: {e}"))?;

match public_key.key_type() {
KeyType::Secp256k1 => {
let pk = public_key.clone().try_into_secp256k1().expect("right key type");
let uncompressed_key_bytes = &pk.to_bytes_uncompressed()[1..];
let mut output = [0_u8; 32];
let mut hasher = Keccak::v256();
hasher.update(uncompressed_key_bytes);
hasher.finalize(&mut output);
Ok(discv5::enr::NodeId::parse(&output).expect("Must be correct length"))
}
KeyType::Ed25519 => {
let pk = public_key.clone().try_into_ed25519().expect("right key type");
let uncompressed_key_bytes = pk.to_bytes();
let mut output = [0_u8; 32];
let mut hasher = Keccak::v256();
hasher.update(&uncompressed_key_bytes);
hasher.finalize(&mut output);
Ok(discv5::enr::NodeId::parse(&output).expect("Must be correct length"))
}

_ => Err(format!("Unsupported public key from peer {peer_id}")),
}
}
160 changes: 117 additions & 43 deletions crates/p2p/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
pub mod enr;
pub mod enr_ext;

use crate::config::Config;
use self::enr_ext::{peer_id_to_node_id, EnrExt};
use crate::{config::Config, types::globals::NetworkGlobals};
use discv5::{
enr::{CombinedKey, NodeId},
ConfigBuilder, Discv5, Enr, Event,
Discv5, Enr, Event,
};
use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt};
use libp2p::swarm::{dummy::ConnectionHandler, NetworkBehaviour};
use std::{collections::HashSet, pin::Pin, task::Poll};
use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt, TryFutureExt};
use libp2p::{
core::Endpoint,
swarm::{
dummy::ConnectionHandler, ConnectionDenied, ConnectionId, DialError, DialFailure,
FromSwarm, NetworkBehaviour, THandler, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use lru::LruCache;
use std::{num::NonZeroUsize, pin::Pin, sync::Arc, task::Poll};
use tokio::sync::mpsc;
use tracing::{debug, warn};

type QueryResult = Result<Vec<Enr>, discv5::QueryError>;

#[derive(Debug)]
pub struct DiscoveredPeers {
pub peers: Vec<Enr>,
}

pub enum EventStream {
/// Awaiting an event stream to be generated. This is required due to the poll nature of
/// `Discovery`
Expand All @@ -26,30 +40,51 @@ pub enum EventStream {

pub struct Discovery {
/// Core discv5 service.
pub discovery: Discv5,
discovery: Discv5,

/// Network globals.
_network_globals: Arc<NetworkGlobals>,

/// Active discovery queries.
active_queries: FuturesUnordered<Pin<Box<dyn Future<Output = QueryResult> + Send>>>,

/// A cache of discovered ENRs.
cached_enrs: HashSet<Enr>,
cached_enrs: LruCache<PeerId, Enr>,

/// The event stream of discv5.
event_stream: EventStream,
}

impl Discovery {
pub fn new(enr: Enr, key: CombinedKey, config: Config) -> eyre::Result<Self> {
let config = ConfigBuilder::new(config.listen_addr.to_listen_config()).build();
let discovery: Discv5<_> = Discv5::new(enr, key, config).map_err(|e| eyre::anyhow!(e))?;
pub async fn new(
key: CombinedKey,
config: Config,
network_globals: Arc<NetworkGlobals>,
) -> eyre::Result<Self> {
let enr = network_globals.local_enr();

let mut discovery: Discv5<_> =
Discv5::new(enr, key, config.discv5_config).map_err(|e| eyre::anyhow!(e))?;

// adding bootnodes
for bootnode in config.bootnodes {
if bootnode.peer_id() == network_globals.peer_id() {
continue;
}

let event_stream_fut = discovery.event_stream().boxed();
let _ = discovery.add_enr(bootnode);
}

// start the discv5 service
discovery.start().map_err(|e| eyre::format_err!(e.to_string())).await?;
let event_stream = EventStream::Awaiting(Box::pin(discovery.event_stream()));

Ok(Self {
discovery,
_network_globals: network_globals,
active_queries: Default::default(),
cached_enrs: HashSet::new(),
event_stream: EventStream::Awaiting(event_stream_fut),
cached_enrs: LruCache::new(NonZeroUsize::new(50).expect("50 is a valid value")),
event_stream,
})
}

Expand All @@ -58,9 +93,25 @@ impl Discovery {
self.discovery.local_enr()
}

/// Adds an ENR to the discovery.
pub fn add_enr(&mut self, enr: Enr) -> eyre::Result<()> {
self.discovery.add_enr(enr).map_err(|e| eyre::eyre!(e.to_string()))
}

/// Return cached ENRs.
pub fn cached_enrs(&self) -> impl Iterator<Item = &Enr> {
self.cached_enrs.iter().map(|(_, enr)| enr)
}

/// Remove cached ENR.
pub fn remove_enr(&mut self, peer_id: &PeerId) {
self.cached_enrs.pop(peer_id);
}

/// Discovers peers on the network.
pub fn discover_peers(&mut self, target_peers: usize) {
debug!("Starting a peer discovery request target_peers {target_peers:}");

// Generate a random target node id.
let random_node = NodeId::random();
let predicate: Box<dyn Fn(&Enr) -> bool + Send> =
Expand All @@ -71,11 +122,28 @@ impl Discovery {

self.active_queries.push(Box::pin(query_future));
}
}

#[derive(Debug)]
pub struct DiscoveredPeers {
pub peers: Vec<Enr>,
pub fn disconnect_peer(&mut self, peer_id: &PeerId) {
if let Ok(node_id) = peer_id_to_node_id(peer_id) {
self.discovery.disconnect_node(&node_id);
}
self.cached_enrs.pop(peer_id);
}

pub fn on_dial_failure(&mut self, peer_id: Option<PeerId>, error: &DialError) {
if let Some(peer_id) = peer_id {
match error {
DialError::LocalPeerId { .. } |
DialError::Denied { .. } |
DialError::NoAddresses |
DialError::Transport(_) |
DialError::WrongPeerId { .. } => {
self.disconnect_peer(&peer_id);
}
DialError::DialPeerConditionFalse(_) | DialError::Aborted => {}
}
}
}
}

impl NetworkBehaviour for Discovery {
Expand All @@ -88,10 +156,11 @@ impl NetworkBehaviour for Discovery {
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) {
match query_result {
Ok(enrs) => {
for enr in enrs.into_iter() {
self.cached_enrs.insert(enr);
Ok(peers) => {
for enr in peers.iter() {
self.cached_enrs.put(enr.peer_id(), enr.clone());
}
return Poll::Ready(ToSwarm::GenerateEvent(DiscoveredPeers { peers }));
}
Err(e) => warn!("Discovery query failed: {:?}", e),
}
Expand Down Expand Up @@ -123,55 +192,60 @@ impl NetworkBehaviour for Discovery {
}
EventStream::InActive => {}
};

Poll::Pending
}

fn on_swarm_event(&mut self, _event: libp2p::swarm::FromSwarm) {}
fn on_swarm_event(&mut self, event: FromSwarm) {
if let FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) = event {
self.on_dial_failure(peer_id, error)
}
}

fn on_connection_handler_event(
&mut self,
_peer_id: libp2p::PeerId,
_connection_id: libp2p::swarm::ConnectionId,
_event: libp2p::swarm::THandlerOutEvent<Self>,
_peer_id: PeerId,
_connection_id: ConnectionId,
_event: THandlerOutEvent<Self>,
) {
}

fn handle_established_inbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
_peer: libp2p::PeerId,
_local_addr: &libp2p::Multiaddr,
_remote_addr: &libp2p::Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
_connection_id: ConnectionId,
_peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
}

fn handle_established_outbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
_peer: libp2p::PeerId,
_addr: &libp2p::Multiaddr,
_role_override: libp2p::core::Endpoint,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
}

fn handle_pending_inbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
_local_addr: &libp2p::Multiaddr,
_remote_addr: &libp2p::Multiaddr,
) -> Result<(), libp2p::swarm::ConnectionDenied> {
_connection_id: ConnectionId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
Ok(())
}

fn handle_pending_outbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
_maybe_peer: Option<libp2p::PeerId>,
_addresses: &[libp2p::Multiaddr],
_effective_role: libp2p::core::Endpoint,
) -> Result<Vec<libp2p::Multiaddr>, libp2p::swarm::ConnectionDenied> {
_connection_id: ConnectionId,
_maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
Ok(Vec::new())
}
}
Loading
Loading