Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm)!: Allow NetworkBehaviours to manage incoming connections #3099

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d501019
Introduce `NetworkBehaviour::DialPayload`
thomaseizinger Nov 9, 2022
77c85f6
WIP Migrate production code
thomaseizinger Nov 9, 2022
118ffb6
Deprecate `new_handler`
thomaseizinger Nov 9, 2022
377c351
WIP: Completely remove `IntoConnectionHandler`
thomaseizinger Nov 14, 2022
9972eb4
Set supported protocols upon connection establishment
thomaseizinger Nov 15, 2022
fb9cdbc
Remove TODOs
thomaseizinger Nov 15, 2022
a96780d
Fix bad boolean logic
thomaseizinger Nov 15, 2022
3c9d98e
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 15, 2022
900edef
Fix gossipsub tests
thomaseizinger Nov 15, 2022
d1eea3a
Fix clippy warning
thomaseizinger Nov 15, 2022
98bf927
Update docs
thomaseizinger Nov 15, 2022
1cef941
Reduce diff
thomaseizinger Nov 15, 2022
ca3ef3e
Fix clippy errors
thomaseizinger Nov 15, 2022
a7f0685
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 16, 2022
d95b038
Update swarm/src/behaviour.rs
thomaseizinger Nov 16, 2022
edbbe41
Add changelog entry
thomaseizinger Nov 17, 2022
fef8d85
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 18, 2022
a942248
Remove unnecessary bounds
thomaseizinger Nov 18, 2022
0578134
Remove old example
thomaseizinger Nov 18, 2022
f7def2b
fmt
thomaseizinger Nov 18, 2022
a5a728b
Make `new_handler` fallible
thomaseizinger Nov 18, 2022
27e1b2d
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 19, 2022
19348e9
Allow each `NetworkBehaviour` to have their own `ConnectionDenied` re…
thomaseizinger Nov 22, 2022
9c55601
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 23, 2022
527eeda
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 29, 2022
a269e9d
Revert "Allow each `NetworkBehaviour` to have their own `ConnectionDe…
thomaseizinger Dec 7, 2022
dcb4f96
Always box cause for denied connection
thomaseizinger Dec 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,6 @@ use std::{

const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32;

// impl<T: Clone + fmt::Debug + Send + 'static + Unpin> IntoConnectionHandler
// for KademliaHandlerProto<T>
// {
// type Handler = KademliaHandler<T>;
//
// fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
// KademliaHandler::new(self.config, endpoint.clone(), *remote_peer_id)
// }
//
// fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
// if self.config.allow_listening {
// upgrade::EitherUpgrade::A(self.config.protocol_config.clone())
// } else {
// upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)
// }
// }
// }

/// Protocol handler that manages substreams for the Kademlia protocol
/// on a single connection with a peer.
///
Expand Down
30 changes: 15 additions & 15 deletions protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::MdnsConfig;
use futures::Stream;
use if_watch::{IfEvent, IfWatcher};
use libp2p_core::transport::ListenerId;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::{
dummy, ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
Expand Down Expand Up @@ -122,7 +122,7 @@ where
type ConnectionHandler = dummy::ConnectionHandler;
type OutEvent = MdnsEvent;

fn new_handler(&mut self, _: &PeerId, _: &ConnectedPoint) -> Self::ConnectionHandler {
fn new_handler(&mut self, _: &PeerId, _: &libp2p_core::ConnectedPoint) -> Self::ConnectionHandler {
dummy::ConnectionHandler
}

Expand All @@ -134,19 +134,6 @@ where
.collect()
}

fn inject_connection_closed(
&mut self,
peer: &PeerId,
_: &libp2p_core::connection::ConnectionId,
_: &libp2p_core::ConnectedPoint,
_: Self::ConnectionHandler,
remaining_established: usize,
) {
if remaining_established == 0 {
self.expire_node(peer);
}
}

fn inject_event(
&mut self,
_: PeerId,
Expand All @@ -163,6 +150,19 @@ where
}
}

fn inject_connection_closed(
&mut self,
peer: &PeerId,
_: &libp2p_core::connection::ConnectionId,
_: &libp2p_core::ConnectedPoint,
_: Self::ConnectionHandler,
remaining_established: usize,
) {
if remaining_established == 0 {
self.expire_node(peer);
}
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
184 changes: 4 additions & 180 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,188 +288,12 @@ pub enum NetworkBehaviourAction<TOutEvent, TInEvent> {

/// Instructs the swarm to start a dial.
///
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
/// On success, [`NetworkBehaviour::new_handler`] is called with the [`PeerId`] and [`ConnectedPoint`]
/// of the newly established connection.
///
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer
/// can be included in the handler, and thus directly send on connection success or extracted by
/// the [`NetworkBehaviour`] on connection failure.
/// Once the handler is constructed, [`NetworkBehaviour::inject_connection_established`] will be called.
///
/// # Example carrying state in the handler
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we replace this example? Now that new_handler gives you a PeerId, I think it is quite easy to realize that you can store data for a future connection in the behaviour and just pass it into the handler here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • NetworkBehaviours can track state for future connections within themselves (indexed by PeerId) and pass it into the ConnectionHandler upon construction. This removes the need for passing along a handler in NetworkBehaviourAction::Dial.

Say there are two NetworkBehaviour implementations, each requesting a new connection to a new peer. In such case, when NetworkBehaviour::inject_connection_established is called, neither of them knows whether this new connection corresponds to their dial request.

We could still allow the user to provide user data via DialOpts. For the case of Swarm::dial and NetworkBehaviour::inject_dial_failure we could wrap this in an Option.

I am not sure whether we need to design for the above race condition. Your proposal might be just fine. What do folks think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ipfs-embed I validate advertised peer addresses by dialling them — and closing such connections upon success. This is only reliably possible if I can detect whether the connection resulted from my own dialling attempt. OTOH, another cause may have resulted in dialling that same peer with that same address for other reasons, which would then possibly have said “nah, attempt is already underway”. But all of this would happen in addition to existing connections, so it might not be a huge problem.

Copy link
Contributor Author

@thomaseizinger thomaseizinger Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to actively close them? Assuming reasonable keep_alive implementations of your ConnectionHandlers, the connection should get closed automatically if it is not in use.

OTOH, another cause may have resulted in dialling that same peer with that same address for other reasons, which would then possibly have said “nah, attempt is already underway”.

Can this be expressed with PeerCondition::NotDialing?

Note that new_handler also gives you access to ConnectedPoint. So in addition to storing data in your behaviour based on PeerId, you could also index it by the dialed address. new_handler being called for an address you wanted to validate IS the validation that a connection was made to this address. If you have a dedicated AddressValidationBehaviour, that behaviour could then straight up deny that connection which instantly closes it again.

///
/// ```rust
/// # use futures::executor::block_on;
/// # use futures::stream::StreamExt;
/// # use libp2p_core::connection::ConnectionId;
/// # use libp2p_core::identity;
/// # use libp2p_core::transport::{MemoryTransport, Transport};
/// # use libp2p_core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
/// # use libp2p_core::PeerId;
/// # use libp2p_plaintext::PlainText2Config;
/// # use libp2p_swarm::{
/// # DialError, IntoConnectionHandler, KeepAlive, NegotiatedSubstream,
/// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ConnectionHandler,
/// # ConnectionHandlerEvent, ConnectionHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent,
/// # };
/// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
/// # use libp2p_yamux as yamux;
/// # use std::collections::VecDeque;
/// # use std::task::{Context, Poll};
/// # use void::Void;
/// #
/// # let local_key = identity::Keypair::generate_ed25519();
/// # let local_public_key = local_key.public();
/// # let local_peer_id = PeerId::from(local_public_key.clone());
/// #
/// # let transport = MemoryTransport::default()
/// # .upgrade(upgrade::Version::V1)
/// # .authenticate(PlainText2Config { local_public_key })
/// # .multiplex(yamux::YamuxConfig::default())
/// # .boxed();
/// #
/// # let mut swarm = Swarm::new(transport, MyBehaviour::default(), local_peer_id);
/// #
/// // Super precious message that we should better not lose.
/// let message = PreciousMessage("My precious message".to_string());
///
/// // Unfortunately this peer is offline, thus sending our message to it will fail.
/// let offline_peer = PeerId::random();
///
/// // Let's send it anyways. We should get it back in case connecting to the peer fails.
/// swarm.behaviour_mut().send(offline_peer, message);
///
/// block_on(async {
/// // As expected, sending failed. But great news, we got our message back.
/// matches!(
/// swarm.next().await.expect("Infinite stream"),
/// SwarmEvent::Behaviour(PreciousMessage(_))
/// );
/// });
///
/// #[derive(Default)]
/// struct MyBehaviour {
/// outbox_to_swarm: VecDeque<NetworkBehaviourAction<PreciousMessage, MyHandler>>,
/// }
///
/// impl MyBehaviour {
/// fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) {
/// self.outbox_to_swarm
/// .push_back(NetworkBehaviourAction::Dial {
/// opts: DialOpts::peer_id(peer_id)
/// .condition(PeerCondition::Always)
/// .build(),
/// handler: MyHandler { message: Some(msg) },
/// });
/// }
/// }
/// #
/// impl NetworkBehaviour for MyBehaviour {
/// # type ConnectionHandler = MyHandler;
/// # type OutEvent = PreciousMessage;
/// #
/// # fn new_handler(&mut self) -> Self::ConnectionHandler {
/// # MyHandler { message: None }
/// # }
/// #
/// #
/// # fn inject_event(
/// # &mut self,
/// # _: PeerId,
/// # _: ConnectionId,
/// # _: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
/// # ) {
/// # unreachable!();
/// # }
/// #
/// /// /// /// fn inject_dial_failure(
/// &mut self,
/// _peer_id: Option<PeerId>,
/// _error: &DialError,
/// ) {
/// /// /// // As expected, sending the message failed. But lucky us, we got the handler back, thus
/// // the precious message is not lost and we can return it back to the user.
/// let msg = handler.message.unwrap();
/// self.outbox_to_swarm
/// .push_back(NetworkBehaviourAction::GenerateEvent(msg))
/// }
/// #
/// # fn poll(
/// # &mut self,
/// # _: &mut Context<'_>,
/// # _: &mut impl PollParameters,
/// # ) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self::ConnectionHandler>>> {
/// # if let Some(action) = self.outbox_to_swarm.pop_front() {
/// # return Poll::Ready(action);
/// # }
/// # Poll::Pending
/// # }
/// }
///
/// # struct MyHandler {
/// # message: Option<PreciousMessage>,
/// # }
/// #
/// # impl ConnectionHandler for MyHandler {
/// # type InEvent = Void;
/// # type OutEvent = Void;
/// # type Error = Void;
/// # type InboundProtocol = DeniedUpgrade;
/// # type OutboundProtocol = DeniedUpgrade;
/// # type InboundOpenInfo = ();
/// # type OutboundOpenInfo = Void;
/// #
/// # fn listen_protocol(
/// # &self,
/// # ) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
/// # SubstreamProtocol::new(DeniedUpgrade, ())
/// # }
/// #
/// # fn inject_fully_negotiated_inbound(
/// # &mut self,
/// # _: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
/// # _: Self::InboundOpenInfo,
/// # ) {
/// # }
/// #
/// # fn inject_fully_negotiated_outbound(
/// # &mut self,
/// # _: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
/// # _: Self::OutboundOpenInfo,
/// # ) {
/// # }
/// #
/// # fn inject_event(&mut self, _event: Self::InEvent) {}
/// #
/// # fn inject_dial_upgrade_error(
/// # &mut self,
/// # _: Self::OutboundOpenInfo,
/// # _: ConnectionHandlerUpgrErr<Void>,
/// # ) {
/// # }
/// #
/// # fn connection_keep_alive(&self) -> KeepAlive {
/// # KeepAlive::Yes
/// # }
/// #
/// # fn poll(
/// # &mut self,
/// # _: &mut Context<'_>,
/// # ) -> Poll<
/// # ConnectionHandlerEvent<
/// # Self::OutboundProtocol,
/// # Self::OutboundOpenInfo,
/// # Self::OutEvent,
/// # Self::Error,
/// # >,
/// # > {
/// # todo!("If `Self::message.is_some()` send the message to the remote.")
/// # }
/// # }
/// # #[derive(Debug, PartialEq, Eq)]
/// # struct PreciousMessage(String);
/// ```
/// In case the dial fails, the behaviour is notified via [`NetworkBehaviour::inject_dial_failure`].
Dial { opts: DialOpts },

/// Instructs the `Swarm` to send an event to the handler dedicated to a
Expand Down