Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove DiscoveryEvent and use KademliaEvent instead #1377

Merged
merged 7 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Description of the upcoming release here.

### Changed

- [#1377](https://github.com/FuelLabs/fuel-core/pull/1377): Remove `DiscoveryEvent` and use `KademliaEvent` directly in `DiscoveryBehavior`
- [#1366](https://github.com/FuelLabs/fuel-core/pull/1366): Improve caching during docker builds in CI by replacing gha
- [#1358](https://github.com/FuelLabs/fuel-core/pull/1358): Upgraded the Rust version used in CI to 1.72.0. Also includes associated Clippy changes.
- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Modified block synchronization to use asynchronous task execution when retrieving block headers.
Expand Down
8 changes: 4 additions & 4 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
discovery::{
DiscoveryBehaviour,
DiscoveryConfig,
DiscoveryEvent,
},
gossipsub::{
config::build_gossipsub_behaviour,
Expand Down Expand Up @@ -40,10 +39,11 @@ use libp2p::{
Multiaddr,
PeerId,
};
use libp2p_kad::KademliaEvent;

#[derive(Debug)]
pub enum FuelBehaviourEvent {
Discovery(DiscoveryEvent),
Discovery(KademliaEvent),
PeerReport(PeerReportEvent),
Gossipsub(GossipsubEvent),
RequestResponse(RequestResponseEvent<RequestMessage, NetworkResponse>),
Expand Down Expand Up @@ -190,8 +190,8 @@ impl<Codec: NetworkCodec> FuelBehaviour<Codec> {
}
}

impl From<DiscoveryEvent> for FuelBehaviourEvent {
fn from(event: DiscoveryEvent) -> Self {
impl From<KademliaEvent> for FuelBehaviourEvent {
fn from(event: KademliaEvent) -> Self {
FuelBehaviourEvent::Discovery(event)
}
}
Expand Down
118 changes: 28 additions & 90 deletions crates/services/p2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use libp2p::{
handler::KademliaHandlerProto,
store::MemoryStore,
Kademlia,
KademliaEvent,
QueryId,
},
mdns::Event as MdnsEvent,
Expand All @@ -27,11 +26,9 @@ use libp2p::{
Multiaddr,
PeerId,
};
use libp2p_kad::KademliaEvent;
use std::{
collections::{
HashSet,
VecDeque,
},
collections::HashSet,
pin::Pin,
task::{
Context,
Expand All @@ -46,19 +43,6 @@ pub use discovery_config::DiscoveryConfig;

const SIXTY_SECONDS: Duration = Duration::from_secs(60);

/// Event generated by the `DiscoveryBehaviour`.
#[derive(Debug)]
pub enum DiscoveryEvent {
/// Notify the swarm of an UnroutablePeer
UnroutablePeer(PeerId),

/// Notify the swarm of a connected peer and its addresses
PeerInfoOnConnect {
peer_id: PeerId,
addresses: Vec<Multiaddr>,
},
}

/// NetworkBehavior for discovery of nodes
pub struct DiscoveryBehaviour {
/// List of bootstrap nodes and their addresses
Expand All @@ -70,9 +54,6 @@ pub struct DiscoveryBehaviour {
/// Track the connected peers
connected_peers: HashSet<PeerId>,

/// Events to report to the swarm
pending_events: VecDeque<DiscoveryEvent>,

/// For discovery on local network, optionally available
mdns: MdnsWrapper,

Expand Down Expand Up @@ -103,7 +84,7 @@ impl DiscoveryBehaviour {

impl NetworkBehaviour for DiscoveryBehaviour {
type ConnectionHandler = KademliaHandlerProto<QueryId>;
type OutEvent = DiscoveryEvent;
type OutEvent = KademliaEvent;

// Initializes new handler on a new opened connection
fn new_handler(&mut self) -> Self::ConnectionHandler {
Expand Down Expand Up @@ -131,13 +112,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}) => {
if *other_established == 0 {
self.connected_peers.insert(*peer_id);
let addresses = self.addresses_of_peer(peer_id);

self.pending_events
.push_back(DiscoveryEvent::PeerInfoOnConnect {
peer_id: *peer_id,
addresses,
});

trace!("Connected to a peer {:?}", peer_id);
}
Expand All @@ -163,10 +137,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(next_event) = self.pending_events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(next_event))
}

// if random walk is enabled poll the stream that will fire when random walk is scheduled
if let Some(next_kad_random_query) = self.next_kad_random_walk.as_mut() {
while next_kad_random_query.poll_unpin(cx).is_ready() {
Expand All @@ -184,51 +154,20 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}

// poll Kademlia behaviour
while let Poll::Ready(kad_action) = self.kademlia.poll(cx, params) {
// poll sub-behaviors
if let Poll::Ready(kad_action) = self.kademlia.poll(cx, params) {
match kad_action {
NetworkBehaviourAction::GenerateEvent(
KademliaEvent::UnroutablePeer { peer },
) => {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
DiscoveryEvent::UnroutablePeer(peer),
))
}

NetworkBehaviourAction::Dial { handler, opts } => {
return Poll::Ready(NetworkBehaviourAction::Dial { handler, opts })
}
NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
} => {
return Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
})
}
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
} => {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
})
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
score,
})
}
_ => {}
NetworkBehaviourAction::GenerateEvent(event) => match event {
KademliaEvent::UnroutablePeer { peer } => {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::UnroutablePeer { peer },
))
}
_ => Poll::Pending,
},
_ => Poll::Ready(kad_action),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simply return Poll::Ready(kad_action)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that before, but technically that's not what the code was doing before because it was only matching on the one event variant.

I can switch it if you think it's safe (I think it is too).

}
}

while let Poll::Ready(mdns_event) = self.mdns.poll(cx, params) {
} else if let Poll::Ready(mdns_event) = self.mdns.poll(cx, params) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the previous iteration would iterate over all mdns events. Let's say, if we had many NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(list)), we would process them for one loop.

While right not it is not true. I'm okay to use if statement for Kademlia, but for mdns it is better to use while as we did before=)

match mdns_event {
NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(list)) => {
// inform kademlia of newly discovered local peers
Expand All @@ -238,27 +177,26 @@ impl NetworkBehaviour for DiscoveryBehaviour {
self.kademlia.add_address(&peer_id, multiaddr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if list.len() + self.connected_peers.len() > self.max_peers_connected we will not handle this case=) After looking at how add_address works, I think it is okay to call add_address without any limitation(I think we can remoev if self.connected_peers.len() < self.max_peers_connected check) because it only updates the routing table.

Copy link
Member Author

@MitchTurner MitchTurner Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

                    if self.connected_peers.len() < self.max_peers_connected {
                        for (peer_id, multiaddr) in list {
                            self.kademlia.add_address(&peer_id, multiaddr);
                        }
                    }

doesn't even work as intended, I think, because it only checks before you add the first peer.

}
}
Poll::Pending
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
score,
})
}
NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
} => {
return Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
})
}
_ => {}
} => Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
}),
_ => Poll::Pending,
}
} else {
Poll::Pending
}

Poll::Pending
}

/// return list of known addresses for a given peer
Expand Down Expand Up @@ -311,8 +249,8 @@ mod tests {
use super::{
DiscoveryBehaviour,
DiscoveryConfig,
KademliaEvent,
};
use crate::discovery::DiscoveryEvent;
use futures::{
future::poll_fn,
StreamExt,
Expand Down Expand Up @@ -443,9 +381,9 @@ mod tests {
// if peer has connected - remove it from the set
left_to_discover[swarm_index].remove(&peer_id);
}
SwarmEvent::Behaviour(DiscoveryEvent::UnroutablePeer(
peer_id,
)) => {
SwarmEvent::Behaviour(KademliaEvent::UnroutablePeer {
peer: peer_id,
}) => {
// kademlia discovered a peer but does not have it's address
// we simulate Identify happening and provide the address
let unroutable_peer_addr = discovery_swarms
Expand Down
6 changes: 1 addition & 5 deletions crates/services/p2p/src/discovery/discovery_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use libp2p::{
PeerId,
};
use std::{
collections::{
HashSet,
VecDeque,
},
collections::HashSet,
time::Duration,
};
use tracing::warn;
Expand Down Expand Up @@ -183,7 +180,6 @@ impl DiscoveryConfig {
bootstrap_nodes,
reserved_nodes,
connected_peers: HashSet::new(),
pending_events: VecDeque::new(),
kademlia,
next_kad_random_walk,
duration_to_next_kad: Duration::from_secs(1),
Expand Down