Skip to content

Commit

Permalink
Merge branch 'libp2p/master' into handler
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 19, 2021
2 parents 682f6be + 1e9fcf9 commit 9262c03
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 85 deletions.
3 changes: 3 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
`PendingConnectionError` and thus `NetworkEvent::{IncomingConnectionError,
DialError}` (see [PR 2191]).

- Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]).

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195

# 0.29.0 [2021-07-12]

Expand Down
12 changes: 7 additions & 5 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

use crate::{
connection::{

handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
manager::{self, Manager, ManagerConfig},
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit,
IncomingInfo, IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo,
IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
},
muxing::StreamMuxer,
network::DialError,
ConnectedPoint, PeerId,
};
use either::Either;
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
future: TFut,
handler: THandler,
info: OutgoingInfo<'_>,
) -> Result<ConnectionId, ConnectionLimit>
) -> Result<ConnectionId, DialError<THandler>>
where
TFut: Future<Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>>
+ Send
Expand All @@ -252,7 +252,9 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_pending_outgoing()?;
if let Err(limit) = self.counters.check_max_pending_outgoing() {
return Err(DialError::ConnectionLimit { limit, handler });
};
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
Expand Down
69 changes: 52 additions & 17 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod event;
pub mod peer;

pub use crate::connection::{ConnectionCounters, ConnectionLimits};
pub use event::{IncomingConnection, NetworkEvent, DialAttemptsRemaining};
pub use event::{DialAttemptsRemaining, IncomingConnection, NetworkEvent};
pub use peer::Peer;

use crate::{
Expand Down Expand Up @@ -202,7 +202,7 @@ where
&mut self,
address: &Multiaddr,
handler: THandler,
) -> Result<ConnectionId, DialError>
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
Expand Down Expand Up @@ -235,15 +235,11 @@ where
Ok(f) => {
let f =
f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool
.add_outgoing(f, handler, info)
.map_err(DialError::ConnectionLimit)
self.pool.add_outgoing(f, handler, info)
}
Err(err) => {
let f = future::err(PendingConnectionError::Transport(err));
self.pool
.add_outgoing(f, handler, info)
.map_err(DialError::ConnectionLimit)
self.pool.add_outgoing(f, handler, info)
}
}
}
Expand Down Expand Up @@ -473,7 +469,10 @@ where
}

/// Initiates a connection attempt to a known peer.
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>) -> Result<ConnectionId, DialError>
fn dial_peer(
&mut self,
opts: DialingOpts<PeerId, THandler>,
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
Expand Down Expand Up @@ -505,7 +504,7 @@ fn dial_peer_impl<TMuxer, THandler, TTrans>(
pool: &mut Pool<THandler, TTrans::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler>,
) -> Result<ConnectionId, DialError>
) -> Result<ConnectionId, DialError<THandler>>
where
THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
Expand All @@ -520,7 +519,15 @@ where
// Ensure the address to dial encapsulates the `p2p` protocol for the
// targeted peer, so that the transport has a "fully qualified" address
// to work with.
let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?;
let addr = match p2p_addr(opts.peer, opts.address) {
Ok(address) => address,
Err(address) => {
return Err(DialError::InvalidAddress {
address,
handler: opts.handler,
})
}
};

let result = match transport.dial(addr.clone()) {
Ok(fut) => {
Expand All @@ -530,7 +537,6 @@ where
peer_id: Some(&opts.peer),
};
pool.add_outgoing(fut, opts.handler, info)
.map_err(DialError::ConnectionLimit)
}
Err(err) => {
let fut = future::err(PendingConnectionError::Transport(err));
Expand All @@ -539,7 +545,6 @@ where
peer_id: Some(&opts.peer),
};
pool.add_outgoing(fut, opts.handler, info)
.map_err(DialError::ConnectionLimit)
}
};

Expand Down Expand Up @@ -752,13 +757,43 @@ fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
}

/// Possible (synchronous) errors when dialing a peer.
#[derive(Clone, Debug)]
pub enum DialError {
#[derive(Clone)]
pub enum DialError<THandler> {
/// The dialing attempt is rejected because of a connection limit.
ConnectionLimit(ConnectionLimit),
ConnectionLimit {
limit: ConnectionLimit,
handler: THandler,
},
/// The address being dialed is invalid, e.g. if it refers to a different
/// remote peer than the one being dialed.
InvalidAddress(Multiaddr),
InvalidAddress {
address: Multiaddr,
handler: THandler,
},
LocalPeerId {
handler: THandler,
},
}

impl<THandler> fmt::Debug for DialError<THandler> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
DialError::ConnectionLimit { limit, handler: _ } => f
.debug_struct("DialError::ConnectionLimit")
.field("limit", limit)
.finish(),
DialError::InvalidAddress {
address,
handler: _,
} => f
.debug_struct("DialError::InvalidAddress")
.field("address", address)
.finish(),
DialError::LocalPeerId { handler: _ } => {
f.debug_struct("DialError::LocalPeerId").finish()
}
}
}
}

#[cfg(test)]
Expand Down
13 changes: 4 additions & 9 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use super::{DialError, DialingOpts, Network};
use crate::{
connection::{
handler::THandlerInEvent, pool::Pool, ConnectedPoint, ConnectionHandler, ConnectionId,
ConnectionLimit, EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler,
PendingConnection, Substream,
EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler, PendingConnection,
Substream,
},
Multiaddr, PeerId, StreamMuxer, Transport,
};
Expand Down Expand Up @@ -163,20 +163,15 @@ where
address: Multiaddr,
remaining: I,
handler: THandler,
) -> Result<(ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError>
) -> Result<(ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError<THandler>>
where
I: IntoIterator<Item = Multiaddr>,
{
let (peer_id, network) = match self {
Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => {
return Err(DialError::ConnectionLimit(ConnectionLimit {
current: 0,
limit: 0,
}))
}
Peer::Local => return Err(DialError::LocalPeerId { handler }),
};

let id = network.dial_peer(DialingOpts {
Expand Down
8 changes: 6 additions & 2 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ mod tests {

// check that our subscriptions are sent to each of the peers
// collect all the SendEvents
let send_events: Vec<&NetworkBehaviourAction<Arc<GossipsubHandlerIn>, GossipsubEvent>> = gs
let send_events: Vec<_> = gs
.events
.iter()
.filter(|e| match e {
Expand Down Expand Up @@ -1336,13 +1336,14 @@ mod tests {
//add peer as explicit peer
gs.add_explicit_peer(&peer);

let dial_events: Vec<&NetworkBehaviourAction<Arc<GossipsubHandlerIn>, GossipsubEvent>> = gs
let dial_events: Vec<_> = gs
.events
.iter()
.filter(|e| match e {
NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler: _,
} => peer_id == &peer,
_ => false,
})
Expand Down Expand Up @@ -1388,6 +1389,7 @@ mod tests {
NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler: _,
} => peer_id == peer,
_ => false,
})
Expand All @@ -1406,6 +1408,7 @@ mod tests {
NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler: _,
} => peer_id == peer,
_ => false,
})
Expand Down Expand Up @@ -1819,6 +1822,7 @@ mod tests {
NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler: _,
} => Some(peer_id.clone()),
_ => None,
})
Expand Down
1 change: 1 addition & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ wasm-timer = "0.2"

[dev-dependencies]
async-std = "1.6.2"
env_logger = "0.9.0"
libp2p-noise = { path = "../../transports/noise" }
libp2p-tcp = { path = "../../transports/tcp" }
libp2p-yamux = { path = "../../muxers/yamux" }
Expand Down
12 changes: 8 additions & 4 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ use futures::channel::oneshot;
use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -303,7 +304,7 @@ impl RequestResponseConfig {
/// A request/response protocol for some message codec.
pub struct RequestResponse<TCodec>
where
TCodec: RequestResponseCodec + Send,
TCodec: RequestResponseCodec + Clone + Send + 'static,
{
/// The supported inbound protocols.
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
Expand Down Expand Up @@ -336,7 +337,7 @@ where

impl<TCodec> RequestResponse<TCodec>
where
TCodec: RequestResponseCodec + Clone,
TCodec: RequestResponseCodec + Clone + Send + 'static,
{
/// Creates a new `RequestResponse` behaviour for the given
/// protocols, codec and configuration.
Expand Down Expand Up @@ -403,10 +404,12 @@ where
};

if let Some(request) = self.try_send_request(peer, request) {
let handler = self.new_handler();
self.pending_events
.push_back(NetworkBehaviourAction::DialPeer {
peer_id: *peer,
condition: DialPeerCondition::Disconnected,
handler,
});
self.pending_outbound_requests
.entry(*peer)
Expand Down Expand Up @@ -639,6 +642,7 @@ where
peer_id: &PeerId,
conn: &ConnectionId,
_: &ConnectedPoint,
_: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
let connections = self
.connected
Expand Down Expand Up @@ -682,7 +686,7 @@ where
self.connected.remove(peer);
}

fn inject_dial_failure(&mut self, peer: &PeerId) {
fn inject_dial_failure(&mut self, peer: &PeerId, _: Self::ProtocolsHandler) {
// If there are pending outgoing requests when a dial failure occurs,
// it is implied that we are not connected to the peer, since pending
// outgoing requests are drained when a connection is established and
Expand Down
Loading

0 comments on commit 9262c03

Please sign in to comment.