Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update libp2p and import used features only #259

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
738 changes: 386 additions & 352 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion fuel-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ bincode = "1.3"
futures = "0.3"
futures-timer = "3.0"
ip_network = "0.4"
libp2p = "0.41"
libp2p = { version = "0.43", default-features = false, features = [
"dns-async-std", "gossipsub", "identify", "kad", "mdns", "mplex", "noise",
"ping", "request-response", "secp256k1", "tcp-async-io", "yamux", "websocket"
] }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
sha2 = "0.9"
Expand Down
2 changes: 1 addition & 1 deletion fuel-p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl FuelBehaviour {
) -> Poll<
NetworkBehaviourAction<
<Self as NetworkBehaviour>::OutEvent,
<Self as NetworkBehaviour>::ProtocolsHandler,
<Self as NetworkBehaviour>::ConnectionHandler,
>,
> {
match self.events.pop_front() {
Expand Down
48 changes: 47 additions & 1 deletion fuel-p2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
use libp2p::{Multiaddr, PeerId};
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
identity::Keypair,
mplex, noise, yamux, Multiaddr, PeerId, Transport,
};
use std::{net::IpAddr, time::Duration};

pub const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20);

/// Maximum number of frames buffered per substream.
const MAX_NUM_OF_FRAMES_BUFFERED: usize = 256;

/// Adds a timeout to the setup and protocol upgrade process for all
/// inbound and outbound connections established through the transport.
const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20);

#[derive(Clone, Debug)]
pub struct P2PConfig {
/// Name of the Network
Expand Down Expand Up @@ -34,3 +45,38 @@ pub struct P2PConfig {
/// Sets the keep-alive timeout of idle connections.
pub set_connection_keep_alive: Option<Duration>,
}

/// Transport for libp2p communication:
/// TCP/IP, Websocket
/// Noise as encryption layer
/// mplex or yamux for multiplexing
pub async fn build_transport(local_keypair: Keypair) -> Boxed<(PeerId, StreamMuxerBox)> {
let transport = {
let tcp = libp2p::tcp::TcpConfig::new().nodelay(true);
let ws_tcp = libp2p::websocket::WsConfig::new(tcp.clone()).or_transport(tcp);
libp2p::dns::DnsConfig::system(ws_tcp).await.unwrap()
};

let auth_config = {
let dh_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&local_keypair)
.expect("Noise key generation failed");

noise::NoiseConfig::xx(dh_keys).into_authenticated()
};

let multiplex_config = {
let mut mplex_config = mplex::MplexConfig::new();
mplex_config.set_max_buffer_size(MAX_NUM_OF_FRAMES_BUFFERED);

let yamux_config = yamux::YamuxConfig::default();
libp2p::core::upgrade::SelectUpgrade::new(yamux_config, mplex_config)
};

transport
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(auth_config)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
}
42 changes: 22 additions & 20 deletions fuel-p2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use libp2p::{
mdns::MdnsEvent,
multiaddr::Protocol,
swarm::{
DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
ProtocolsHandler,
ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -82,11 +82,11 @@ impl DiscoveryBehaviour {
}

impl NetworkBehaviour for DiscoveryBehaviour {
type ProtocolsHandler = KademliaHandlerProto<QueryId>;
type ConnectionHandler = KademliaHandlerProto<QueryId>;
type OutEvent = DiscoveryEvent;

// Initializes new handler on a new opened connection
fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
// in our case we just return KademliaHandlerProto
self.kademlia.new_handler()
}
Expand All @@ -96,7 +96,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
self.kademlia.inject_event(peer_id, connection, event);
}
Expand All @@ -106,7 +106,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(next_event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(next_event));
}
Expand Down Expand Up @@ -251,6 +251,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.connected_peers_count += 1;

Expand All @@ -259,34 +260,35 @@ impl NetworkBehaviour for DiscoveryBehaviour {
connection_id,
endpoint,
failed_addresses,
)
}
other_established,
);

fn inject_connected(&mut self, peer_id: &PeerId) {
let addresses = self.addresses_of_peer(peer_id);

self.events
.push_back(DiscoveryEvent::Connected(*peer_id, addresses));
self.kademlia.inject_connected(peer_id);
trace!("Connected to a peer {:?}", peer_id);
}

fn inject_connection_closed(
&mut self,
_peer_id: &PeerId,
_connection_id: &ConnectionId,
_connection_point: &ConnectedPoint,
_handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
peer_id: &PeerId,
connection_id: &ConnectionId,
connection_point: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
other_established: usize,
) {
self.connected_peers_count -= 1;
// no need to pass it to kademlia.inject_connection_closed() since it does nothing
}
self.kademlia.inject_connection_closed(
peer_id,
connection_id,
connection_point,
handler,
other_established,
);

fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.events
.push_back(DiscoveryEvent::Disconnected(*peer_id));

self.kademlia.inject_disconnected(peer_id)
}

fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
Expand All @@ -300,7 +302,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ProtocolsHandler,
handler: Self::ConnectionHandler,
err: &DialError,
) {
self.kademlia.inject_dial_failure(peer_id, handler, err)
Expand Down
3 changes: 2 additions & 1 deletion fuel-p2p/src/discovery/mdns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ impl MdnsWrapper {
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<MdnsEvent, <Mdns as NetworkBehaviour>::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<MdnsEvent, <Mdns as NetworkBehaviour>::ConnectionHandler>>
{
loop {
match self {
Self::Instantiating(fut) => {
Expand Down
73 changes: 39 additions & 34 deletions fuel-p2p/src/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use libp2p::{
identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo},
ping::{Event as PingEvent, Ping, PingConfig, PingSuccess},
swarm::{
IntoProtocolsHandler, IntoProtocolsHandlerSelect, NetworkBehaviour, NetworkBehaviourAction,
PollParameters, ProtocolsHandler,
ConnectionHandler, IntoConnectionHandler, IntoConnectionHandlerSelect, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -132,14 +132,14 @@ impl PeerInfoBehaviour {
}

impl NetworkBehaviour for PeerInfoBehaviour {
type ProtocolsHandler = IntoProtocolsHandlerSelect<
<Ping as NetworkBehaviour>::ProtocolsHandler,
<Identify as NetworkBehaviour>::ProtocolsHandler,
type ConnectionHandler = IntoConnectionHandlerSelect<
<Ping as NetworkBehaviour>::ConnectionHandler,
<Identify as NetworkBehaviour>::ConnectionHandler,
>;
type OutEvent = PeerInfoEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
IntoProtocolsHandler::select(self.ping.new_handler(), self.identify.new_handler())
fn new_handler(&mut self) -> Self::ConnectionHandler {
IntoConnectionHandler::select(self.ping.new_handler(), self.identify.new_handler())
}

fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<libp2p::Multiaddr> {
Expand All @@ -154,34 +154,53 @@ impl NetworkBehaviour for PeerInfoBehaviour {
connection_id: &ConnectionId,
connected_point: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.ping.inject_connection_established(
peer_id,
connection_id,
connected_point,
failed_addresses,
other_established,
);

self.identify.inject_connection_established(
peer_id,
connection_id,
connected_point,
failed_addresses,
other_established,
);

self.insert_peer(peer_id, connected_point.clone());
}

fn inject_connected(&mut self, peer_id: &PeerId) {
let addresses = self.addresses_of_peer(peer_id);
self.insert_peer_addresses(peer_id, addresses);
self.ping.inject_connected(peer_id);
self.identify.inject_connected(peer_id);
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.ping.inject_disconnected(peer_id);
self.identify.inject_disconnected(peer_id);
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify.inject_connection_closed(
peer_id,
conn,
endpoint,
identity_handler,
remaining_established,
);
self.ping.inject_connection_closed(
peer_id,
conn,
endpoint,
ping_handler,
remaining_established,
);

// todo: we could keep it in a cache for a while
self.peers.remove(peer_id);
Expand All @@ -191,7 +210,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
loop {
match self.ping.poll(cx, params) {
Poll::Pending => break,
Expand Down Expand Up @@ -223,7 +242,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
}
Poll::Ready(NetworkBehaviourAction::Dial { handler, opts }) => {
let handler =
IntoProtocolsHandler::select(handler, self.identify.new_handler());
IntoConnectionHandler::select(handler, self.identify.new_handler());

return Poll::Ready(NetworkBehaviourAction::Dial { handler, opts });
}
Expand Down Expand Up @@ -269,7 +288,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
})
}
Poll::Ready(NetworkBehaviourAction::Dial { handler, opts }) => {
let handler = IntoProtocolsHandler::select(self.ping.new_handler(), handler);
let handler = IntoConnectionHandler::select(self.ping.new_handler(), handler);
return Poll::Ready(NetworkBehaviourAction::Dial { handler, opts });
}
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => match event {
Expand Down Expand Up @@ -316,7 +335,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
match event {
EitherOutput::First(ping_event) => {
Expand All @@ -329,20 +348,6 @@ impl NetworkBehaviour for PeerInfoBehaviour {
}
}

fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify
.inject_connection_closed(peer_id, conn, endpoint, identity_handler);
self.ping
.inject_connection_closed(peer_id, conn, endpoint, ping_handler);
}

fn inject_address_change(
&mut self,
peer_id: &PeerId,
Expand All @@ -357,7 +362,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ProtocolsHandler,
handler: Self::ConnectionHandler,
error: &libp2p::swarm::DialError,
) {
let (ping_handler, identity_handler) = handler.into_inner();
Expand Down Expand Up @@ -395,7 +400,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
&mut self,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
handler: Self::ProtocolsHandler,
handler: Self::ConnectionHandler,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify
Expand Down
Loading