diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index b38f456972b..fb5958dc019 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -208,7 +208,6 @@ mod network { use super::*; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; - use libp2p::core::either::EitherError; use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName}; use libp2p::identity; use libp2p::identity::ed25519; @@ -216,7 +215,7 @@ mod network { use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult}; use libp2p::multiaddr::Protocol; use libp2p::request_response::{self, ProtocolSupport, RequestId, ResponseChannel}; - use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}; + use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use std::collections::{hash_map, HashMap, HashSet}; use std::iter; @@ -401,13 +400,7 @@ mod network { } } - async fn handle_event( - &mut self, - event: SwarmEvent< - ComposedEvent, - EitherError, io::Error>, - >, - ) { + async fn handle_event(&mut self, event: SwarmEvent) { match event { SwarmEvent::Behaviour(ComposedEvent::Kademlia( KademliaEvent::OutboundQueryProgressed { diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index 688c67a6190..a9b89f40734 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -193,8 +193,8 @@ impl super::Recorder for Metrics { } } -impl super::Recorder> for Metrics { - fn record(&self, event: &libp2p_swarm::SwarmEvent) { +impl super::Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { if let libp2p_swarm::SwarmEvent::ConnectionClosed { peer_id, num_established, diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 351887260df..6ebab878e0e 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -142,8 +142,8 @@ impl Recorder for Metrics { } } -impl Recorder> for Metrics { - fn record(&self, event: &libp2p_swarm::SwarmEvent) { +impl Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { self.swarm.record(event); #[cfg(feature = "identify")] diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index a003ab56570..4de23402033 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -153,8 +153,8 @@ impl Metrics { } } -impl super::Recorder> for Metrics { - fn record(&self, event: &libp2p_swarm::SwarmEvent) { +impl super::Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { match event { libp2p_swarm::SwarmEvent::Behaviour(_) => {} libp2p_swarm::SwarmEvent::ConnectionEstablished { diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs index f0fdf5930ac..5bcb7e8a2dc 100644 --- a/protocols/dcutr/src/handler/direct.rs +++ b/protocols/dcutr/src/handler/direct.rs @@ -52,7 +52,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = void::Void; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type OutboundOpenInfo = Void; @@ -94,14 +93,8 @@ impl ConnectionHandler for Handler { fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if !self.reported { self.reported = true; return Poll::Ready(ConnectionHandlerEvent::Custom( diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 301f2ee3d82..0e08bee3ee6 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -28,8 +28,8 @@ use libp2p_core::multiaddr::Multiaddr; use libp2p_core::upgrade::{self, DeniedUpgrade, NegotiationError, UpgradeError}; use libp2p_core::ConnectedPoint; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, @@ -143,7 +143,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::OutEvent, - ::Error, >, >, /// Inbound connect, accepted by the behaviour, pending completion. @@ -247,15 +246,24 @@ impl Handler { }, )); } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| { - e.map_err(|e| match e { - EitherError::A(e) => EitherError::A(e), - EitherError::B(v) => void::unreachable(v), - }) - })); + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(p), + )) => { + self.queued_events + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + p, + ))); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { + self.queued_events + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + e, + ))); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(never))) => { + void::unreachable(never) } } } @@ -289,10 +297,31 @@ impl Handler { }, )); } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(EitherError::B))); + + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + e @ ConnectionHandlerUpgrErr::Timer => { + self.queued_events + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + e, + ))); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(p), + )) => { + self.queued_events + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + p, + ))); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { + self.queued_events + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + e, + ))); } } } @@ -301,9 +330,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = Command; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; type InboundProtocol = upgrade::EitherUpgrade; type OutboundProtocol = protocol::outbound::Upgrade; type OutboundOpenInfo = u8; // Number of upgrade attempts. @@ -364,18 +390,15 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + err, + ))); } // Return queued events. @@ -392,9 +415,10 @@ impl ConnectionHandler for Handler { )); } Err(e) => { - return Poll::Ready(ConnectionHandlerEvent::Close( - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))), - )) + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + e, + ))) } } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 68bcf912975..fe455e6dfa1 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -27,9 +27,9 @@ use futures::StreamExt; use instant::Instant; use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, - SubstreamProtocol, + CloseReason, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, + ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + KeepAlive, SubstreamProtocol, }; use libp2p_swarm::NegotiatedSubstream; use log::{error, trace, warn}; @@ -248,7 +248,6 @@ impl GossipsubHandler { impl ConnectionHandler for GossipsubHandler { type InEvent = GossipsubHandlerIn; type OutEvent = HandlerEvent; - type Error = GossipsubHandlerError; type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; type OutboundOpenInfo = crate::rpc_proto::Rpc; @@ -283,14 +282,8 @@ impl ConnectionHandler for GossipsubHandler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Handle any upgrade errors if let Some(error) = self.upgrade_errors.pop_front() { let reported_error = match error { @@ -327,7 +320,10 @@ impl ConnectionHandler for GossipsubHandler { // If there was a fatal error, close the connection. if let Some(error) = reported_error { - return Poll::Ready(ConnectionHandlerEvent::Close(error)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", + error, + ))); } } @@ -342,9 +338,10 @@ impl ConnectionHandler for GossipsubHandler { if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, end the connection. - return Poll::Ready(ConnectionHandlerEvent::Close( + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", GossipsubHandlerError::MaxInboundSubstreams, - )); + ))); } // determine if we need to create the stream @@ -353,9 +350,10 @@ impl ConnectionHandler for GossipsubHandler { && !self.outbound_substream_establishing { if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { - return Poll::Ready(ConnectionHandlerEvent::Close( + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", GossipsubHandlerError::MaxOutboundSubstreams, - )); + ))); } let message = self.send_queue.remove(0); self.send_queue.shrink_to_fit(); @@ -478,13 +476,18 @@ impl ConnectionHandler for GossipsubHandler { } Err(e) => { error!("Error sending message: {}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + return Poll::Ready(ConnectionHandlerEvent::Close( + CloseReason::new("gossipsub", e), + )); } } } Poll::Ready(Err(e)) => { error!("Outbound substream error while sending output: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", + e, + ))); } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -506,7 +509,10 @@ impl ConnectionHandler for GossipsubHandler { Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(e)) + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", + e, + ))) } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -528,13 +534,13 @@ impl ConnectionHandler for GossipsubHandler { } Poll::Ready(Err(e)) => { warn!("Outbound substream error while closing: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close( + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", io::Error::new( io::ErrorKind::BrokenPipe, "Failed to close outbound substream", - ) - .into(), - )); + ), + ))); } Poll::Pending => { self.keep_alive = KeepAlive::No; diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 21063acc661..def57df6de0 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -38,7 +38,7 @@ use libp2p_swarm::{ use log::warn; use smallvec::SmallVec; use std::collections::VecDeque; -use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; +use std::{pin::Pin, task::Context, task::Poll, time::Duration}; pub struct Proto { initial_delay: Duration, @@ -101,8 +101,7 @@ pub struct Handler { inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< - [ConnectionHandlerEvent>, (), Event, io::Error>; - 4], + [ConnectionHandlerEvent>, (), Event>; 4], >, /// Streams awaiting `BehaviourInfo` to then send identify requests. @@ -274,7 +273,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = InEvent; type OutEvent = Event; - type Error = io::Error; type InboundProtocol = SelectUpgrade>; type OutboundProtocol = EitherUpgrade>; type OutboundOpenInfo = (); @@ -333,9 +331,7 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent, - > { + ) -> Poll> { if !self.events.is_empty() { return Poll::Ready(self.events.remove(0)); } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index d012c94d2e0..2fb6752ba14 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -632,7 +632,6 @@ where { type InEvent = KademliaHandlerIn; type OutEvent = KademliaHandlerEvent; - type Error = io::Error; // TODO: better error type? type InboundProtocol = upgrade::EitherUpgrade; type OutboundProtocol = KademliaProtocolConfig; // Message of the request to send to the remote, and user data if we expect an answer. @@ -749,14 +748,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if let ProtocolStatus::Confirmed = self.protocol_status { self.protocol_status = ProtocolStatus::Reported; return Poll::Ready(ConnectionHandlerEvent::Custom( @@ -844,7 +837,6 @@ where KademliaProtocolConfig, (KadRequestMsg, Option), KademliaHandlerEvent, - io::Error, >; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -986,7 +978,6 @@ where KademliaProtocolConfig, (KadRequestMsg, Option), KademliaHandlerEvent, - io::Error, >; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 2703b274c77..aae03bb1ee7 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -25,7 +25,7 @@ use futures_timer::Delay; use libp2p_core::upgrade::ReadyUpgrade; use libp2p_core::{upgrade::NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, @@ -256,7 +256,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = Void; type OutEvent = crate::Result; - type Error = Failure; type InboundProtocol = ReadyUpgrade<&'static [u8]>; type OutboundProtocol = ReadyUpgrade<&'static [u8]>; type OutboundOpenInfo = (); @@ -279,8 +278,7 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, (), crate::Result, Self::Error>> - { + ) -> Poll, (), crate::Result>> { match self.state { State::Inactive { reported: true } => { return Poll::Pending; // nothing to do on this connection @@ -325,7 +323,9 @@ impl ConnectionHandler for Handler { if self.failures > 1 || self.config.max_failures.get() > 1 { if self.failures >= self.config.max_failures.get() { log::debug!("Too many failures ({}). Closing connection.", self.failures); - return Poll::Ready(ConnectionHandlerEvent::Close(error)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "ping", error, + ))); } return Poll::Ready(ConnectionHandlerEvent::Custom(Err(error))); diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 5d01cf9dbce..a4b7f235e7c 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -32,8 +32,8 @@ use libp2p_core::either::EitherError; use libp2p_core::multiaddr::Protocol; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, SendWrapper, }; use libp2p_swarm::{ dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, @@ -186,7 +186,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::OutEvent, - ::Error, >, >, @@ -509,9 +508,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = In; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; type InboundProtocol = inbound_stop::Upgrade; type OutboundProtocol = outbound_hop::Upgrade; type OutboundOpenInfo = OutboundOpenInfo; @@ -554,18 +550,15 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "relay-client", + err, + ))); } // Return queued events. diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index ef8b40755b2..fc8228c09d3 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -34,8 +34,8 @@ use libp2p_core::connection::ConnectionId; use libp2p_core::either::EitherError; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, SendWrapper, }; use libp2p_swarm::{ dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, @@ -390,7 +390,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::OutEvent, - ::Error, >, >, @@ -623,9 +622,6 @@ type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { type InEvent = In; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; type InboundProtocol = inbound_hop::Upgrade; type OutboundProtocol = outbound_stop::Upgrade; type OutboundOpenInfo = OutboundOpenInfo; @@ -743,18 +739,14 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "relay", err, + ))); } // Return queued events. diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index f57dfded6c9..6556f20bf36 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -357,7 +357,6 @@ where { type InEvent = InEvent; type OutEvent = OutEvent; - type Error = Void; type InboundProtocol = PassthroughProtocol; type OutboundProtocol = PassthroughProtocol; type InboundOpenInfo = (); @@ -447,14 +446,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if let Some(open_info) = self.new_substreams.pop_front() { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: TOutboundSubstreamHandler::upgrade(open_info), diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 460df22b812..dc32809fe8e 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -64,11 +64,11 @@ fn get_rand_memory_address() -> Multiaddr { .unwrap() } -pub async fn await_event_or_timeout( - swarm: &mut (impl Stream> + FusedStream + Unpin), -) -> SwarmEvent +pub async fn await_event_or_timeout( + swarm: &mut (impl Stream> + FusedStream + Unpin), +) -> SwarmEvent where - SwarmEvent: Debug, + SwarmEvent: Debug, { tokio::time::timeout( Duration::from_secs(30), @@ -80,13 +80,13 @@ where .expect("network behaviour to emit an event within 30 seconds") } -pub async fn await_events_or_timeout( - swarm_1: &mut (impl Stream> + FusedStream + Unpin), - swarm_2: &mut (impl Stream> + FusedStream + Unpin), -) -> (SwarmEvent, SwarmEvent) +pub async fn await_events_or_timeout( + swarm_1: &mut (impl Stream> + FusedStream + Unpin), + swarm_2: &mut (impl Stream> + FusedStream + Unpin), +) -> (SwarmEvent, SwarmEvent) where - SwarmEvent: Debug, - SwarmEvent: Debug, + SwarmEvent: Debug, + SwarmEvent: Debug, { tokio::time::timeout( Duration::from_secs(30), diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 3eba5b10c77..2c0d957d264 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -24,8 +24,8 @@ use crate::codec::Codec; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, }; pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; @@ -284,7 +284,6 @@ where { type InEvent = RequestProtocol; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr; type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; type OutboundOpenInfo = RequestId; @@ -336,12 +335,14 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, RequestId, Self::OutEvent, Self::Error>> - { + ) -> Poll, RequestId, Self::OutEvent>> { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "request-response", + err, + ))); } // Drain pending events. diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 141d6c3efbd..f9db354b6d0 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -574,7 +574,6 @@ pub enum NetworkBehaviourAction< /// # impl ConnectionHandler for MyHandler { /// # type InEvent = Void; /// # type OutEvent = Void; - /// # type Error = Void; /// # type InboundProtocol = DeniedUpgrade; /// # type OutboundProtocol = DeniedUpgrade; /// # type InboundOpenInfo = (); @@ -620,8 +619,7 @@ pub enum NetworkBehaviourAction< /// # ConnectionHandlerEvent< /// # Self::OutboundProtocol, /// # Self::OutboundOpenInfo, - /// # Self::OutEvent, - /// # Self::Error, + /// # Self::OutEvent /// # >, /// # > { /// # todo!("If `Self::message.is_some()` send the message to the remote.") diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 81255a40274..b1097005ee7 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -229,7 +229,6 @@ where { type InEvent = TInner::InEvent; type OutEvent = TInner::OutEvent; - type Error = TInner::Error; type InboundProtocol = EitherUpgrade, SendWrapper>; type OutboundProtocol = TInner::OutboundProtocol; @@ -268,14 +267,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if let Some(inner) = self.inner.as_mut() { inner.poll(cx) } else { diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 272f78f3a6b..b71c5f53896 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -165,7 +165,7 @@ where pub fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll, ConnectionError>> { + ) -> Poll, ConnectionError>> { let Self { requested_substreams, muxing, @@ -745,7 +745,6 @@ mod tests { impl ConnectionHandler for MockConnectionHandler { type InEvent = Void; type OutEvent = Void; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -793,12 +792,7 @@ mod tests { &mut self, _: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, + ConnectionHandlerEvent, > { if self.outbound_requested { self.outbound_requested = false; diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 541d458df0c..fcb7dfaade1 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::handler::CloseReason; use crate::transport::TransportError; use crate::Multiaddr; use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; @@ -25,7 +26,7 @@ use std::{fmt, io}; /// Errors that can occur in the context of an established `Connection`. #[derive(Debug)] -pub enum ConnectionError { +pub enum ConnectionError { /// An I/O error occurred on the connection. // TODO: Eventually this should also be a custom error? IO(io::Error), @@ -33,29 +34,23 @@ pub enum ConnectionError { /// The connection keep-alive timeout expired. KeepAliveTimeout, - /// The connection handler produced an error. - Handler(THandlerErr), + /// The connection handler actively closed the connection. + Handler(CloseReason), } -impl fmt::Display for ConnectionError -where - THandlerErr: fmt::Display, -{ +impl fmt::Display for ConnectionError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {err}"), ConnectionError::KeepAliveTimeout => { write!(f, "Connection closed due to expired keep-alive timeout.") } - ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {err}"), + ConnectionError::Handler(_) => Ok(()), // `CloseReason` prints enough context. } } } -impl std::error::Error for ConnectionError -where - THandlerErr: std::error::Error + 'static, -{ +impl std::error::Error for ConnectionError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { ConnectionError::IO(err) => Some(err), @@ -65,7 +60,7 @@ where } } -impl From for ConnectionError { +impl From for ConnectionError { fn from(error: io::Error) -> Self { ConnectionError::IO(error) } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 7a81c57e2df..0409d9915cf 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -261,7 +261,7 @@ where connected: Connected, /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. - error: Option::Error>>, + error: Option, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, handler: THandler::Handler, diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 8e1129d8cae..b6273509a57 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -92,7 +92,7 @@ pub enum EstablishedConnectionEvent { Closed { id: ConnectionId, peer_id: PeerId, - error: Option>, + error: Option, handler: THandler, }, } diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 4ec58581c2e..bb8d4a22151 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -58,7 +58,6 @@ pub struct ConnectionHandler; impl crate::handler::ConnectionHandler for ConnectionHandler { type InEvent = Void; type OutEvent = Void; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -79,14 +78,8 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { Poll::Pending } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 2a4e1d44483..6bf25c8ca8a 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -96,8 +96,6 @@ pub trait ConnectionHandler: Send + 'static { type InEvent: fmt::Debug + Send + 'static; /// Custom event that can be produced by the handler and that will be returned to the outside. type OutEvent: fmt::Debug + Send + 'static; - /// The type of errors returned by [`ConnectionHandler::poll`]. - type Error: error::Error + fmt::Debug + Send + 'static; /// The inbound upgrade for the protocol(s) used by the handler. type InboundProtocol: InboundUpgradeSend; /// The outbound upgrade for the protocol(s) used by the handler. @@ -238,14 +236,7 @@ pub trait ConnectionHandler: Send + 'static { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - >; + ) -> Poll>; /// Adds a closure that turns the input event into something else. fn map_in_event(self, map: TMap) -> MapInEvent @@ -426,8 +417,8 @@ impl SubstreamProtocol { } /// Event produced by a handler. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ConnectionHandlerEvent { +#[derive(Debug)] +pub enum ConnectionHandlerEvent { /// Request a new outbound substream to be opened with the remote. OutboundSubstreamRequest { /// The protocol(s) to apply on the substream. @@ -442,22 +433,22 @@ pub enum ConnectionHandlerEvent - ConnectionHandlerEvent +impl + ConnectionHandlerEvent { /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a /// `TOutboundOpenInfo` to something else. pub fn map_outbound_open_info( self, map: F, - ) -> ConnectionHandlerEvent + ) -> ConnectionHandlerEvent where F: FnOnce(TOutboundOpenInfo) -> I, { @@ -474,10 +465,7 @@ impl /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`) /// to something else. - pub fn map_protocol( - self, - map: F, - ) -> ConnectionHandlerEvent + pub fn map_protocol(self, map: F) -> ConnectionHandlerEvent where F: FnOnce(TConnectionUpgrade) -> I, { @@ -496,7 +484,7 @@ impl pub fn map_custom( self, map: F, - ) -> ConnectionHandlerEvent + ) -> ConnectionHandlerEvent where F: FnOnce(TCustom) -> I, { @@ -508,25 +496,56 @@ impl ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), } } +} - /// If this is a `Close` event, maps the content to something else. - pub fn map_close( - self, - map: F, - ) -> ConnectionHandlerEvent - where - F: FnOnce(TErr) -> I, - { - match self { - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } - } - ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val), - ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)), +/// Encapsulates the reason for why a specific [`ConnectionHandler`] closed a connection. +#[derive(Debug)] +pub struct CloseReason { + protocol: &'static str, + source: Box, +} + +impl CloseReason { + /// Construct a new [`CloseReason`]. + /// + /// The first parameter should be a meaningful identifier for the component / protocol that is closing the connection. + /// The given `source` is returned from [`Error::source`](error::Error::source) and can be printed by iterating the source of this error. + /// + /// If you don't want to iterate over the sources yourself, you can use an error handling library like `anyhow` or `eyre`. + /// + /// # Example + /// + /// ```rust + /// # use libp2p_swarm::handler::CloseReason; + /// + /// # fn main() { + /// let source = std::io::Error::from(std::io::ErrorKind::UnexpectedEof); // Imagine this being the result of reading from a stream. + /// let reason = CloseReason::new("ping", source); + /// + /// assert_eq!(reason.to_string(), "connection closed by 'ping' protocol") + /// # } + /// + /// ``` + pub fn new(protocol: &'static str, source: impl error::Error + Send + 'static) -> Self { + Self { + protocol, + source: Box::new(source), } } } +impl fmt::Display for CloseReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "connection closed by '{}' protocol", self.protocol) + } +} + +impl error::Error for CloseReason { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + Some(self.source.as_ref()) + } +} + /// Error that can happen on an outbound substream opening attempt. #[derive(Debug)] pub enum ConnectionHandlerUpgrErr { diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index e6d16ed1133..dba1eb2f4f4 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -99,7 +99,6 @@ where { type InEvent = Either; type OutEvent = Either; - type Error = Either; type InboundProtocol = EitherUpgrade, SendWrapper>; type OutboundProtocol = @@ -140,23 +139,15 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { let event = match self { Either::Left(handler) => futures::ready!(handler.poll(cx)) .map_custom(Either::Left) - .map_close(Either::Left) .map_protocol(|p| EitherUpgrade::A(SendWrapper(p))) .map_outbound_open_info(Either::Left), Either::Right(handler) => futures::ready!(handler.poll(cx)) .map_custom(Either::Right) - .map_close(Either::Right) .map_protocol(|p| EitherUpgrade::B(SendWrapper(p))) .map_outbound_open_info(Either::Right), }; diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index 326a6f8f4f9..f3bf633ce5c 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -53,7 +53,6 @@ where { type InEvent = TNewIn; type OutEvent = TConnectionHandler::OutEvent; - type Error = TConnectionHandler::Error; type InboundProtocol = TConnectionHandler::InboundProtocol; type OutboundProtocol = TConnectionHandler::OutboundProtocol; type InboundOpenInfo = TConnectionHandler::InboundOpenInfo; @@ -77,14 +76,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { self.inner.poll(cx) } diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index 87306dc48c6..a3f1abf8e7e 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -48,7 +48,6 @@ where { type InEvent = TConnectionHandler::InEvent; type OutEvent = TNewOut; - type Error = TConnectionHandler::Error; type InboundProtocol = TConnectionHandler::InboundProtocol; type OutboundProtocol = TConnectionHandler::OutboundProtocol; type InboundOpenInfo = TConnectionHandler::InboundOpenInfo; @@ -70,14 +69,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { self.inner.poll(cx).map(|ev| match ev { ConnectionHandlerEvent::Custom(ev) => ConnectionHandlerEvent::Custom((self.map)(ev)), ConnectionHandlerEvent::Close(err) => ConnectionHandlerEvent::Close(err), diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index d80b51c52a8..ddc7ec8af80 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -93,7 +93,6 @@ where { type InEvent = (K, ::InEvent); type OutEvent = (K, ::OutEvent); - type Error = ::Error; type InboundProtocol = Upgrade::InboundProtocol>; type OutboundProtocol = ::OutboundProtocol; type InboundOpenInfo = Info::InboundOpenInfo>; @@ -293,14 +292,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Calling `gen_range(0, 0)` (see below) would panic, so we have return early to avoid // that situation. if self.handlers.is_empty() { diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index e8cd03ebed8..423d35c4c1e 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -19,9 +19,9 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, - SubstreamProtocol, + CloseReason, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, + ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + KeepAlive, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; use instant::Instant; @@ -123,7 +123,6 @@ where { type InEvent = TOutbound; type OutEvent = TEvent; - type Error = ConnectionHandlerUpgrErr<::Error>; type InboundProtocol = TInbound; type OutboundProtocol = TOutbound; type OutboundOpenInfo = (); @@ -144,16 +143,12 @@ where fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if let Some(err) = self.pending_error.take() { - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "oneshot", err, + ))); } if !self.events_out.is_empty() { diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs index 2efa949dd71..99a88732033 100644 --- a/swarm/src/handler/pending.rs +++ b/swarm/src/handler/pending.rs @@ -42,7 +42,6 @@ impl PendingConnectionHandler { impl ConnectionHandler for PendingConnectionHandler { type InEvent = Void; type OutEvent = Void; - type Error = Void; type InboundProtocol = PendingUpgrade; type OutboundProtocol = PendingUpgrade; type OutboundOpenInfo = Void; @@ -63,14 +62,8 @@ impl ConnectionHandler for PendingConnectionHandler { fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { Poll::Pending } diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 65508c0b6a5..7619341be37 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -333,7 +333,6 @@ where { type InEvent = EitherOutput; type OutEvent = EitherOutput; - type Error = EitherError; type InboundProtocol = SelectUpgrade< SendWrapper<::InboundProtocol>, SendWrapper<::InboundProtocol>, @@ -374,20 +373,14 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { match self.proto1.poll(cx) { Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { return Poll::Ready(ConnectionHandlerEvent::Custom(EitherOutput::First(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(EitherError::A(event))); + return Poll::Ready(ConnectionHandlerEvent::Close(event)); } Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { @@ -404,7 +397,7 @@ where return Poll::Ready(ConnectionHandlerEvent::Custom(EitherOutput::Second(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(EitherError::B(event))); + return Poll::Ready(ConnectionHandlerEvent::Close(event)); } Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index bd1ed812b8b..5a23d2b6e14 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -63,7 +63,6 @@ pub struct ConnectionHandler; impl crate::handler::ConnectionHandler for ConnectionHandler { type InEvent = Void; type OutEvent = Void; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -84,14 +83,8 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { Poll::Pending } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index cf6051e1e85..2d22882352c 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -170,13 +170,9 @@ type THandlerInEvent = type THandlerOutEvent = < as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent; -/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. -type THandlerErr = - < as IntoConnectionHandler>::Handler as ConnectionHandler>::Error; - /// Event generated by the `Swarm`. #[derive(Debug)] -pub enum SwarmEvent { +pub enum SwarmEvent { /// Event generated by the `NetworkBehaviour`. Behaviour(TBehaviourOutEvent), /// A connection to the given peer has been opened. @@ -206,7 +202,7 @@ pub enum SwarmEvent { num_established: u32, /// Reason for the disconnection, if it was not a successful /// active close. - cause: Option>, + cause: Option, }, /// A new connection arrived on a listener and is in the process of protocol negotiation. /// @@ -803,7 +799,7 @@ where fn handle_pool_event( &mut self, event: PoolEvent, transport::Boxed<(PeerId, StreamMuxerBox)>>, - ) -> Option>> { + ) -> Option> { match event { PoolEvent::ConnectionEstablished { peer_id, @@ -978,7 +974,7 @@ where as Transport>::ListenerUpgrade, io::Error, >, - ) -> Option>> { + ) -> Option> { match event { TransportEvent::Incoming { listener_id: _, @@ -1082,7 +1078,7 @@ where fn handle_behaviour_event( &mut self, event: NetworkBehaviourAction, - ) -> Option>> { + ) -> Option> { match event { NetworkBehaviourAction::GenerateEvent(event) => { return Some(SwarmEvent::Behaviour(event)) @@ -1168,7 +1164,7 @@ where fn poll_next_event( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll> { // We use a `this` variable because the compiler can't mutably borrow multiple times // across a `Deref`. let this = &mut *self; @@ -1363,7 +1359,7 @@ impl Stream for Swarm where TBehaviour: NetworkBehaviour, { - type Item = SwarmEvent, THandlerErr>; + type Item = SwarmEvent>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().poll_next_event(cx).map(Some)