Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{core/,swarm/}: Dial with handler and return handler on error and closed #2191

Merged
merged 43 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a332591
core/: Return handler on connection error and closed
mxinden Aug 12, 2021
46904a6
swarm/: Inject handler on connection error and closed
mxinden Aug 12, 2021
d756be1
swarm/src/behaviour: Provide handler with Dial and DialAddr
mxinden Aug 12, 2021
595623f
Merge branch 'libp2p/master' into handler
mxinden Aug 18, 2021
705842f
swarm/src/behaviour: Add default trait para on NetworkBehaviourAction
mxinden Aug 18, 2021
d744b4a
core/src/connection/manager: Fully close a task on disconnect
mxinden Aug 18, 2021
5c2aef6
core/: Remove DisconnectedPeer::set_connected and Pool::add
mxinden Aug 18, 2021
ce0d278
core/src/connection: Report ConnectionLimit through task
mxinden Aug 18, 2021
425e777
core/: Emit Event::Failed on aborted pending connection
mxinden Aug 19, 2021
5ff6397
core/tests: Adjust to type changes
mxinden Aug 19, 2021
7d9285f
core/CHANGELOG: Add entry for ConnectionLimit change
mxinden Aug 19, 2021
682f6be
protocols/*: Update
mxinden Aug 19, 2021
9262c03
Merge branch 'libp2p/master' into handler
mxinden Aug 19, 2021
62c5e13
protocols/*: Update
mxinden Aug 19, 2021
174693a
swarm-derive: Adjust to changes
mxinden Aug 20, 2021
a78de13
core/: Fix ConectionClose and PendingAborted reporting
mxinden Aug 20, 2021
d4960b7
*: Format with rustfmt
mxinden Aug 20, 2021
b73139a
core/src/connection: Remove outdated doc comment
mxinden Aug 20, 2021
aa02e5f
swarm/src/toggle: Fix TODOs
mxinden Aug 20, 2021
2c9f0d3
protocols/: Remove unused imports
mxinden Aug 23, 2021
6d7c73a
Merge branch 'libp2p/master' into handler
mxinden Aug 23, 2021
a56980e
core/src/network/event: Use NoZeroU32
mxinden Aug 24, 2021
1c3ed2e
swarm/src/protocols_handler: Rename to into_protocols_handler
mxinden Aug 24, 2021
32fc84e
swarm/src/behaviour: Introduce NetworkBehaviour::inject_listen_failure
mxinden Aug 24, 2021
7ea4908
swarm/src/lib: Inject handler on DialPeerCondition false
mxinden Aug 24, 2021
fbd4681
core/src/connection: Assume manager to always close handler
mxinden Aug 25, 2021
6787e77
swarm-derive: Add comments
mxinden Aug 25, 2021
b864133
swarm: Add documentation
mxinden Aug 25, 2021
b2bf380
*: Format with rustfmt
mxinden Aug 25, 2021
7d342b6
swar/src/behaviour: Link to NotifyHandler not SendEvent
mxinden Aug 25, 2021
5853890
*: Update changelogs
mxinden Aug 25, 2021
4d0faf9
swarm-derive: Fix typo
mxinden Aug 25, 2021
7a45e7b
Apply suggestions from code review
mxinden Aug 26, 2021
cef949c
core/src/network: Revert map_err
mxinden Aug 26, 2021
ceb77e5
core/src/network: Use custom method on DialAttemptsRemaining
mxinden Aug 26, 2021
814ff4b
swarm: Add doc example for carrying state in handler
mxinden Aug 26, 2021
90df72a
Merge branch 'libp2p/master' into handler
mxinden Aug 26, 2021
60c7261
swarm/src/lib: Remove use_handler_to_carry_state
mxinden Aug 30, 2021
a2f1819
core/tests/network_dial_error: Use get_attempts
mxinden Aug 30, 2021
b905545
swarm/src/behaviour.rs: Use heading for doc example
mxinden Aug 30, 2021
99f81d0
core/tests: Format with rustfmt
mxinden Aug 30, 2021
1c5eb8e
Merge branch 'master' into handler
mxinden Aug 30, 2021
c09198e
protocols/gossipsub/src/behaviour.rs: Remove unnecesary assignment
mxinden Aug 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,8 @@ where
continue 'poll;
}
Poll::Ready(None) => {
todo!("Safe to assume that this should never happen?");
// The manager has dropped the task or disappeared; abort.
// TODO: Should we return the handler in this case?
// return Poll::Ready(());
// The manager has disappeared; abort.
return Poll::Ready(());
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,8 @@ pub enum DialError<THandler> {
address: Multiaddr,
handler: THandler,
},
LocalPeerId {
handler: THandler,
},
/// The dialing attempt is rejected because the peer being dialed is the local peer.
LocalPeerId { handler: THandler },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe SelfDial is a more descriptive name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to stress the fact that the peer ids match. This would e.g. not work by matching multiaddresses. All that said, I don't think LocalPeerId is the perfect name either.

}

impl<THandler> fmt::Debug for DialError<THandler> {
Expand Down
6 changes: 3 additions & 3 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use libp2p_core::{
ConnectedPoint, Multiaddr, PeerId, PublicKey,
};
use libp2p_swarm::{
AddressScore, DialPeerCondition, IntoProtocolsHandler, NegotiatedSubstream, NetworkBehaviour,
NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
AddressScore, DialError, DialPeerCondition, IntoProtocolsHandler, NegotiatedSubstream,
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
ProtocolsHandlerUpgrErr,
};
use std::{
Expand Down Expand Up @@ -223,7 +223,7 @@ impl NetworkBehaviour for Identify {
}
}

fn inject_dial_failure(&mut self, peer_id: &PeerId, _: Self::ProtocolsHandler) {
fn inject_dial_failure(&mut self, peer_id: &PeerId, _: Self::ProtocolsHandler, _: DialError) {
if !self.connected.contains_key(peer_id) {
self.pending_push.remove(peer_id);
}
Expand Down
32 changes: 28 additions & 4 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use libp2p_core::{
ConnectedPoint, Multiaddr, PeerId,
};
use libp2p_swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
DialError, DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters,
};
use log::{debug, info, warn};
use smallvec::SmallVec;
Expand Down Expand Up @@ -1864,9 +1865,32 @@ where
}
}

fn inject_dial_failure(&mut self, peer_id: &PeerId, _: Self::ProtocolsHandler) {
for query in self.queries.iter_mut() {
query.on_failure(peer_id);
fn inject_dial_failure(
&mut self,
peer_id: &PeerId,
_: Self::ProtocolsHandler,
error: DialError,
) {
match error {
DialError::Banned
| DialError::ConnectionLimit(_)
| DialError::InvalidAddress(_)
| DialError::UnreachableAddr(_)
| DialError::LocalPeerId
| DialError::NoAddresses => {
for query in self.queries.iter_mut() {
query.on_failure(peer_id);
}
}
DialError::DialPeerConditionFalse(
DialPeerCondition::Disconnected | DialPeerCondition::NotDialing,
) => {
// We might (still) be connected, or about to be connected, thus do not report the
// failure to the queries.
}
DialError::DialPeerConditionFalse(DialPeerCondition::Always) => {
unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
}
}
}

Expand Down
17 changes: 15 additions & 2 deletions protocols/relay/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use libp2p_core::connection::{ConnectedPoint, ConnectionId, ListenerId};
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::PeerId;
use libp2p_swarm::{
DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
DialError, DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -302,7 +302,20 @@ impl NetworkBehaviour for Relay {
}
}

fn inject_dial_failure(&mut self, peer_id: &PeerId, _: Self::ProtocolsHandler) {
fn inject_dial_failure(
&mut self,
peer_id: &PeerId,
_: Self::ProtocolsHandler,
error: DialError,
) {
if let DialError::DialPeerConditionFalse(
DialPeerCondition::Disconnected | DialPeerCondition::NotDialing,
) = error
{
// Return early. The dial, that this dial was canceled for, might still succeed.
return;
}

if let Entry::Occupied(o) = self.listeners.entry(*peer_id) {
if matches!(o.get(), RelayListener::Connecting { .. }) {
// By removing the entry, the channel to the listener is dropped and thus the
Expand Down
4 changes: 2 additions & 2 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use futures::channel::oneshot;
use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{
DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
DialError, DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
};
use smallvec::SmallVec;
Expand Down Expand Up @@ -686,7 +686,7 @@ where
self.connected.remove(peer);
}

fn inject_dial_failure(&mut self, peer: &PeerId, _: Self::ProtocolsHandler) {
fn inject_dial_failure(&mut self, peer: &PeerId, _: Self::ProtocolsHandler, _: DialError) {
// If there are pending outgoing requests when a dial failure occurs,
// it is implied that we are not connected to the peer, since pending
// outgoing requests are drained when a connection is established and
Expand Down
11 changes: 8 additions & 3 deletions protocols/request-response/src/throttled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use codec::{Codec, Message, ProtocolWrapper, Type};
use futures::ready;
use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{
IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use lru::LruCache;
use std::{cmp::max, num::NonZeroU16};
Expand Down Expand Up @@ -493,8 +493,13 @@ where
self.behaviour.inject_disconnected(p)
}

fn inject_dial_failure(&mut self, p: &PeerId, handler: Self::ProtocolsHandler) {
self.behaviour.inject_dial_failure(p, handler)
fn inject_dial_failure(
&mut self,
p: &PeerId,
handler: Self::ProtocolsHandler,
error: DialError,
) {
self.behaviour.inject_dial_failure(p, handler, error)
}

fn inject_event(
Expand Down
71 changes: 60 additions & 11 deletions swarm-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let connection_id = quote! {::libp2p::core::connection::ConnectionId};
let connected_point = quote! {::libp2p::core::ConnectedPoint};
let listener_id = quote! {::libp2p::core::connection::ListenerId};
let dial_error = quote! {::libp2p::swarm::DialError};

let poll_parameters = quote! {::libp2p::swarm::PollParameters};

Expand Down Expand Up @@ -227,11 +228,13 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.fields
.iter()
.enumerate()
// The outmost handler belongs to the last behaviour.
.rev()
.filter(|f| !is_ignored(&f.1))
.enumerate()
.map(move |(enum_n, (field_n, field))| {
let handler = if field_n == 0 {
// Given that the iterator is reversed, this is the innermost handler only.
quote! { let handler = handlers }
} else {
quote! {
Expand Down Expand Up @@ -271,11 +274,13 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.fields
.iter()
.enumerate()
// The outmost handler belongs to the last behaviour.
.rev()
.filter(|f| !is_ignored(&f.1))
.enumerate()
.map(move |(enum_n, (field_n, field))| {
let handler = if field_n == 0 {
// Given that the iterator is reversed, this is the innermost handler only.
quote! { let handler = handlers }
} else {
quote! {
Expand All @@ -284,8 +289,42 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
};

let inject = match field.ident {
Some(ref i) => quote! { self.#i.inject_dial_failure(peer_id, handler) },
None => quote! { self.#enum_n.inject_dial_failure(peer_id, handler) },
Some(ref i) => {
quote! { self.#i.inject_dial_failure(peer_id, handler, error.clone()) }
}
None => {
quote! { self.#enum_n.inject_dial_failure(peer_id, handler, error.clone()) }
}
};

quote! {
#handler;
#inject;
}
})
};

// Build the list of statements to put in the body of `inject_listen_failure()`.
let inject_listen_failure_stmts = {
data_struct
.fields
.iter()
.enumerate()
.rev()
.filter(|f| !is_ignored(&f.1))
.enumerate()
.map(move |(enum_n, (field_n, field))| {
let handler = if field_n == 0 {
quote! { let handler = handlers }
} else {
quote! {
let (handlers, handler) = handlers.into_inner()
}
};

let inject = match field.ident {
Some(ref i) => quote! { self.#i.inject_listen_failure(local_addr, send_back_addr, handler) },
None => quote! { self.#enum_n.inject_listen_failure(local_addr, send_back_addr, handler) },
};

quote! {
Expand Down Expand Up @@ -485,7 +524,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}
}

out_handler.unwrap_or(quote! {()}) // TODO: incorrect
out_handler.unwrap_or(quote! {()}) // TODO: See test `empty`.
};

// The method to use to poll.
Expand Down Expand Up @@ -529,7 +568,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
wrapped_event = quote!{ #either_ident::First(#wrapped_event) };
}

let wrapped_handler = {
// `DialPeer` and `DialAddress` each provide a handler of the specific
// behaviour triggering the event. Though in order for the final handler
// to be able to handle protocols of all behaviours, the provided
// handler needs to be combined with handlers of all other behaviours.
let provided_handler_and_new_handlers = {
let mut out_handler = None;

for (f_n, f) in data_struct.fields.iter().enumerate() {
Expand All @@ -543,7 +586,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
};

let builder = if field_n == f_n {
quote! { handler }
// The behaviour that triggered the event. Thus, instead of
// creating a new handler, use the provided handler.
quote! { provided_handler }
} else {
quote! { #f_name.new_handler() }
};
Expand All @@ -556,7 +601,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}
}

out_handler.unwrap_or(quote! {()}) // TODO: incorrect
out_handler.unwrap_or(quote! {()}) // TODO: See test `empty`.
};

let generate_event_match_arm = if event_process {
Expand All @@ -577,11 +622,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
loop {
match #trait_to_impl::poll(&mut #field_name, cx, poll_params) {
#generate_event_match_arm
std::task::Poll::Ready(#network_behaviour_action::DialAddress { address, handler }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address, handler: #wrapped_handler });
std::task::Poll::Ready(#network_behaviour_action::DialAddress { address, providedhandler }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address, handler: #provided_handler_and_new_handlers });
}
std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition, handler }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition, handler: #wrapped_handler });
std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition, provided_handler }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition, handler: #provided_handler_and_new_handlers });
}
std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => {
return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler {
Expand Down Expand Up @@ -645,10 +690,14 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_addr_reach_failure_stmts);*
}

fn inject_dial_failure(&mut self, peer_id: &#peer_id, handlers: Self::ProtocolsHandler) {
fn inject_dial_failure(&mut self, peer_id: &#peer_id, handlers: Self::ProtocolsHandler, error: #dial_error) {
#(#inject_dial_failure_stmts);*
}

fn inject_listen_failure(&mut self, local_addr: &#multiaddr, send_back_addr: &#multiaddr, handlers: Self::ProtocolsHandler) {
#(#inject_listen_failure_stmts);*
}

fn inject_new_listener(&mut self, id: #listener_id) {
#(#inject_new_listener_stmts);*
}
Expand Down
2 changes: 2 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
- Implement `ProtocolsHandler` on `either::Either`representing either of two
`ProtocolsHandler` implementations (see [PR 2192]).

TODO: Mention `inject_dial_failure` now called on DialPeerCondition not met.

[PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150
[PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
Expand Down
Loading