From 677e15127eb9a38fe65389167570948544312b87 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 21 Aug 2024 18:47:22 +0300 Subject: [PATCH] errors: Add `DialError` error and `ListDialFailures` event for better error reporting (#206) The purpose of this PR is to pave the way for making the Identify protocol more robust, which is currently linked with the low number of peers and connective issues over a long period of time - https://github.com/paritytech/polkadot-sdk/issues/4925 This PR adds a coherent `DialError` that exposes the minimal information users need to know about dial failures. - https://github.com/paritytech/polkadot-sdk/issues/5239 A new litep2p event is added for reporting multiple dial errors that occur on different protocols back to the user: ```rust /// A list of multiple dial failures. ListDialFailures { /// List of errors. /// /// Depending on the transport, the address might be different for each error. errors: Vec<(Multiaddr, DialError)>, }, ``` This event eases the debugging of substrate connectivity issues. At the same time, it can be used in a future PR to inform back to the Identify protocol which self-reported addresses of some peers are unreachable: - https://github.com/paritytech/litep2p/issues/203 ### Next Steps - Add more tests - Warp sync + sync full nodes since this is touching individual transports ### Future Work - The overarching `litep2p::Error` needs a closer look and a refactoring: - https://github.com/paritytech/litep2p/issues/204 - https://github.com/paritytech/litep2p/issues/128 - ConnectionError event for individual transports can be simplified: - https://github.com/paritytech/litep2p/issues/205 - I've observed some inconsistencies in handling TCP vs WebSocket connection timeouts. I believe that we can have another pass and share even more code between them: - https://github.com/paritytech/litep2p/issues/70 --------- Signed-off-by: Alexandru Vasile Co-authored-by: Dmitry Markin --- src/crypto/ed25519.rs | 15 +- src/crypto/mod.rs | 24 +-- src/crypto/noise/mod.rs | 71 +++---- src/error.rs | 197 +++++++++++++++--- src/lib.rs | 17 +- src/multistream_select/dialer_select.rs | 34 ++- src/multistream_select/negotiated.rs | 31 +-- src/multistream_select/protocol.rs | 36 +--- src/protocol/libp2p/kademlia/mod.rs | 8 +- src/protocol/mod.rs | 4 +- src/protocol/notification/mod.rs | 8 +- .../notification/tests/notification.rs | 15 +- src/protocol/protocol_set.rs | 42 ++-- src/protocol/request_response/mod.rs | 6 +- src/protocol/request_response/tests.rs | 7 +- src/transport/common/listener.rs | 37 ++-- src/transport/dummy.rs | 4 +- src/transport/manager/mod.rs | 136 +++++++++++- src/transport/mod.rs | 7 +- src/transport/quic/connection.rs | 23 +- src/transport/quic/listener.rs | 46 ++-- src/transport/quic/mod.rs | 104 +++++---- src/transport/tcp/connection.rs | 73 +++---- src/transport/tcp/mod.rs | 65 ++++-- src/transport/webrtc/connection.rs | 12 +- src/transport/webrtc/util.rs | 11 +- src/transport/websocket/connection.rs | 39 ++-- src/transport/websocket/mod.rs | 149 ++++++------- tests/connection/mod.rs | 8 +- 29 files changed, 755 insertions(+), 474 deletions(-) diff --git a/src/crypto/ed25519.rs b/src/crypto/ed25519.rs index af10a20c..d95aaa39 100644 --- a/src/crypto/ed25519.rs +++ b/src/crypto/ed25519.rs @@ -21,7 +21,10 @@ //! Ed25519 keys. -use crate::{error::Error, PeerId}; +use crate::{ + error::{Error, ParseError}, + PeerId, +}; use ed25519_dalek::{self as ed25519, Signer as _, Verifier as _}; use std::fmt; @@ -131,11 +134,13 @@ impl PublicKey { /// Try to parse a public key from a byte array containing the actual key as produced by /// `to_bytes`. - pub fn try_from_bytes(k: &[u8]) -> crate::Result { - let k = <[u8; 32]>::try_from(k) - .map_err(|e| Error::Other(format!("Failed to parse ed25519 public key: {e}")))?; + pub fn try_from_bytes(k: &[u8]) -> Result { + let k = <[u8; 32]>::try_from(k).map_err(|_| ParseError::InvalidPublicKey)?; + + // The error type of the verifying key is deliberately opaque as to avoid side-channel + // leakage. We can't provide a more specific error type here. ed25519::VerifyingKey::from_bytes(&k) - .map_err(|e| Error::Other(format!("Failed to parse ed25519 public key: {e}"))) + .map_err(|_| ParseError::InvalidPublicKey) .map(PublicKey) } diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index b565639a..f98ab70f 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -21,7 +21,7 @@ //! Crypto-related code. -use crate::{error::*, peer_id::*}; +use crate::{error::ParseError, peer_id::*}; pub mod ed25519; pub(crate) mod noise; @@ -65,11 +65,10 @@ impl PublicKey { /// Decode a public key from a protobuf structure, e.g. read from storage /// or received from another node. - pub fn from_protobuf_encoding(bytes: &[u8]) -> crate::Result { + pub fn from_protobuf_encoding(bytes: &[u8]) -> Result { use prost::Message; - let pubkey = keys_proto::PublicKey::decode(bytes) - .map_err(|error| Error::Other(format!("Invalid Protobuf: {error:?}")))?; + let pubkey = keys_proto::PublicKey::decode(bytes)?; pubkey.try_into() } @@ -92,19 +91,16 @@ impl From<&PublicKey> for keys_proto::PublicKey { } impl TryFrom for PublicKey { - type Error = Error; + type Error = ParseError; fn try_from(pubkey: keys_proto::PublicKey) -> Result { let key_type = keys_proto::KeyType::try_from(pubkey.r#type) - .map_err(|_| Error::Other(format!("Unknown key type: {}", pubkey.r#type)))?; - - match key_type { - keys_proto::KeyType::Ed25519 => - Ok(ed25519::PublicKey::try_from_bytes(&pubkey.data).map(PublicKey::Ed25519)?), - _ => Err(Error::Other(format!( - "Unsupported key type: {}", - key_type.as_str_name() - ))), + .map_err(|_| ParseError::UnknownKeyType(pubkey.r#type))?; + + if key_type == keys_proto::KeyType::Ed25519 { + Ok(ed25519::PublicKey::try_from_bytes(&pubkey.data).map(PublicKey::Ed25519)?) + } else { + Err(ParseError::UnknownKeyType(key_type as i32)) } } } diff --git a/src/crypto/noise/mod.rs b/src/crypto/noise/mod.rs index f130042c..8254121c 100644 --- a/src/crypto/noise/mod.rs +++ b/src/crypto/noise/mod.rs @@ -24,7 +24,8 @@ use crate::{ config::Role, crypto::{ed25519::Keypair, PublicKey}, - error, PeerId, + error::{NegotiationError, ParseError}, + PeerId, }; use bytes::{Buf, Bytes, BytesMut}; @@ -103,7 +104,7 @@ impl NoiseContext { keypair: snow::Keypair, id_keys: &Keypair, role: Role, - ) -> crate::Result { + ) -> Result { let noise_payload = handshake_schema::NoiseHandshakePayload { identity_key: Some(PublicKey::Ed25519(id_keys.public()).to_protobuf_encoding()), identity_sig: Some( @@ -113,7 +114,7 @@ impl NoiseContext { }; let mut payload = Vec::with_capacity(noise_payload.encoded_len()); - noise_payload.encode(&mut payload)?; + noise_payload.encode(&mut payload).map_err(ParseError::from)?; Ok(Self { noise: NoiseState::Handshake(noise), @@ -123,7 +124,7 @@ impl NoiseContext { }) } - pub fn new(keypair: &Keypair, role: Role) -> crate::Result { + pub fn new(keypair: &Keypair, role: Role) -> Result { tracing::trace!(target: LOG_TARGET, ?role, "create new noise configuration"); let builder: Builder<'_> = Builder::with_resolver( @@ -144,7 +145,7 @@ impl NoiseContext { /// Create new [`NoiseContext`] with prologue. #[cfg(feature = "webrtc")] - pub fn with_prologue(id_keys: &Keypair, prologue: Vec) -> crate::Result { + pub fn with_prologue(id_keys: &Keypair, prologue: Vec) -> Result { let noise: Builder<'_> = Builder::with_resolver( NOISE_PARAMETERS.parse().expect("qed; Valid noise pattern"), Box::new(protocol::Resolver), @@ -162,35 +163,36 @@ impl NoiseContext { /// Get remote public key from the received Noise payload. #[cfg(feature = "webrtc")] - pub fn get_remote_public_key(&mut self, reply: &[u8]) -> crate::Result { + pub fn get_remote_public_key(&mut self, reply: &[u8]) -> Result { let (len_slice, reply) = reply.split_at(2); - let len = u16::from_be_bytes(len_slice.try_into().map_err(|_| error::Error::InvalidData)?) - as usize; + let len = u16::from_be_bytes( + len_slice + .try_into() + .map_err(|_| NegotiationError::ParseError(ParseError::InvalidPublicKey))?, + ) as usize; let mut buffer = vec![0u8; len]; let NoiseState::Handshake(ref mut noise) = self.noise else { tracing::error!(target: LOG_TARGET, "invalid state to read the second handshake message"); debug_assert!(false); - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )); + return Err(NegotiationError::StateMismatch); }; let res = noise.read_message(reply, &mut buffer)?; buffer.truncate(res); - let payload = handshake_schema::NoiseHandshakePayload::decode(buffer.as_slice())?; + let payload = handshake_schema::NoiseHandshakePayload::decode(buffer.as_slice()) + .map_err(|err| NegotiationError::ParseError(err.into()))?; - PublicKey::from_protobuf_encoding(&payload.identity_key.ok_or( - error::Error::NegotiationError(error::NegotiationError::PeerIdMissing), - )?) + let identity = payload.identity_key.ok_or(NegotiationError::PeerIdMissing)?; + PublicKey::from_protobuf_encoding(&identity).map_err(|err| err.into()) } /// Get first message. /// /// Listener only sends one message (the payload) - pub fn first_message(&mut self, role: Role) -> crate::Result> { + pub fn first_message(&mut self, role: Role) -> Result, NegotiationError> { match role { Role::Dialer => { tracing::trace!(target: LOG_TARGET, "get noise dialer first message"); @@ -198,9 +200,7 @@ impl NoiseContext { let NoiseState::Handshake(ref mut noise) = self.noise else { tracing::error!(target: LOG_TARGET, "invalid state to read the first handshake message"); debug_assert!(false); - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )); + return Err(NegotiationError::StateMismatch); }; let mut buffer = vec![0u8; 256]; @@ -220,15 +220,13 @@ impl NoiseContext { /// Get second message. /// /// Only the dialer sends the second message. - pub fn second_message(&mut self) -> crate::Result> { + pub fn second_message(&mut self) -> Result, NegotiationError> { tracing::trace!(target: LOG_TARGET, "get noise paylod message"); let NoiseState::Handshake(ref mut noise) = self.noise else { tracing::error!(target: LOG_TARGET, "invalid state to read the first handshake message"); debug_assert!(false); - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )); + return Err(NegotiationError::StateMismatch); }; let mut buffer = vec![0u8; 2048]; @@ -246,7 +244,7 @@ impl NoiseContext { async fn read_handshake_message( &mut self, io: &mut T, - ) -> crate::Result { + ) -> Result { let mut size = BytesMut::zeroed(2); io.read_exact(&mut size).await?; let size = size.get_u16(); @@ -260,9 +258,7 @@ impl NoiseContext { let NoiseState::Handshake(ref mut noise) = self.noise else { tracing::error!(target: LOG_TARGET, "invalid state to read handshake message"); debug_assert!(false); - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )); + return Err(NegotiationError::StateMismatch); }; let nread = noise.read_message(&message, &mut out)?; @@ -286,13 +282,10 @@ impl NoiseContext { } /// Convert Noise into transport mode. - fn into_transport(self) -> crate::Result { + fn into_transport(self) -> Result { let transport = match self.noise { NoiseState::Handshake(noise) => noise.into_transport_mode()?, - NoiseState::Transport(_) => - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )), + NoiseState::Transport(_) => return Err(NegotiationError::StateMismatch), }; Ok(NoiseContext { @@ -664,15 +657,15 @@ impl AsyncWrite for NoiseSocket { } /// Try to parse `PeerId` from received `NoiseHandshakePayload` -fn parse_peer_id(buf: &[u8]) -> crate::Result { +fn parse_peer_id(buf: &[u8]) -> Result { match handshake_schema::NoiseHandshakePayload::decode(buf) { Ok(payload) => { - let public_key = PublicKey::from_protobuf_encoding(&payload.identity_key.ok_or( - error::Error::NegotiationError(error::NegotiationError::PeerIdMissing), - )?)?; + let identity = payload.identity_key.ok_or(NegotiationError::PeerIdMissing)?; + + let public_key = PublicKey::from_protobuf_encoding(&identity)?; Ok(PeerId::from_public_key(&public_key)) } - Err(err) => Err(From::from(err)), + Err(err) => Err(ParseError::from(err).into()), } } @@ -683,7 +676,7 @@ pub async fn handshake( role: Role, max_read_ahead_factor: usize, max_write_buffer_size: usize, -) -> crate::Result<(NoiseSocket, PeerId)> { +) -> Result<(NoiseSocket, PeerId), NegotiationError> { tracing::debug!(target: LOG_TARGET, ?role, "start noise handshake"); let mut noise = NoiseContext::new(keypair, role)?; @@ -797,7 +790,7 @@ mod tests { #[test] fn invalid_peer_id_schema() { match parse_peer_id(&vec![1, 2, 3, 4]).unwrap_err() { - crate::Error::ParseError(_) => {} + NegotiationError::ParseError(_) => {} _ => panic!("invalid error"), } } diff --git a/src/error.rs b/src/error.rs index 947f2239..f05a83c3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -48,15 +48,15 @@ pub enum Error { #[error("Protocol `{0}` not supported")] ProtocolNotSupported(String), #[error("Address error: `{0}`")] - AddressError(AddressError), + AddressError(#[from] AddressError), #[error("Parse error: `{0}`")] ParseError(ParseError), #[error("I/O error: `{0}`")] IoError(ErrorKind), #[error("Negotiation error: `{0}`")] - NegotiationError(NegotiationError), + NegotiationError(#[from] NegotiationError), #[error("Substream error: `{0}`")] - SubstreamError(SubstreamError), + SubstreamError(#[from] SubstreamError), #[error("Substream error: `{0}`")] NotificationError(NotificationError), #[error("Essential task closed")] @@ -127,24 +127,57 @@ pub enum Error { ConnectionLimit(ConnectionLimitsError), } +/// Error type for address parsing. #[derive(Debug, thiserror::Error)] pub enum AddressError { - #[error("Invalid protocol")] + /// The provided address does not correspond to the transport protocol. + /// + /// For example, this can happen when the address used the UDP protocol but + /// the handling transport only allows TCP connections. + #[error("Invalid address for protocol")] InvalidProtocol, + /// The provided address is not a valid URL. + #[error("Invalid URL")] + InvalidUrl, + /// The provided address does not include a peer ID. #[error("`PeerId` missing from the address")] PeerIdMissing, + /// No address is available for the provided peer ID. #[error("Address not available")] AddressNotAvailable, + /// The provided address contains an invalid multihash. + #[error("Multihash does not contain a valid peer ID : `{0:?}`")] + InvalidPeerId(Multihash), } #[derive(Debug, thiserror::Error)] pub enum ParseError { - #[error("Invalid multihash: `{0:?}`")] - InvalidMultihash(Multihash), + /// The provided probuf message cannot be decoded. #[error("Failed to decode protobuf message: `{0:?}`")] - ProstDecodeError(prost::DecodeError), + ProstDecodeError(#[from] prost::DecodeError), + /// The provided protobuf message cannot be encoded. #[error("Failed to encode protobuf message: `{0:?}`")] - ProstEncodeError(prost::EncodeError), + ProstEncodeError(#[from] prost::EncodeError), + /// The protobuf message contains an unexpected key type. + /// + /// This error can happen when: + /// - The provided key type is not recognized. + /// - The provided key type is recognized but not supported. + #[error("Unknown key type from protobuf message: `{0}`")] + UnknownKeyType(i32), + /// The public key bytes are invalid and cannot be parsed. + /// + /// This error can happen when: + /// - The received number of bytes is not equal to the expected number of bytes (32 bytes). + /// - The bytes are not a valid Ed25519 public key. + /// - Length of the public key is not represented by 2 bytes (WebRTC specific). + #[error("Invalid public key")] + InvalidPublicKey, + /// The provided date has an invalid format. + /// + /// This error is protocol specific. + #[error("Invalid data")] + InvalidData, } #[derive(Debug, thiserror::Error)] @@ -152,23 +185,51 @@ pub enum SubstreamError { #[error("Connection closed")] ConnectionClosed, #[error("yamux error: `{0}`")] - YamuxError(crate::yamux::ConnectionError), + YamuxError(crate::yamux::ConnectionError, Direction), #[error("Failed to read from substream, substream id `{0:?}`")] ReadFailure(Option), #[error("Failed to write to substream, substream id `{0:?}`")] WriteFailure(Option), + #[error("Negotiation error: `{0:?}`")] + NegotiationError(#[from] NegotiationError), } +/// Error during the negotiation phase. #[derive(Debug, thiserror::Error)] pub enum NegotiationError { + /// Error occurred during the multistream-select phase of the negotiation. #[error("multistream-select error: `{0:?}`")] - MultistreamSelectError(crate::multistream_select::NegotiationError), + MultistreamSelectError(#[from] crate::multistream_select::NegotiationError), + /// Error occurred during the Noise handshake negotiation. #[error("multistream-select error: `{0:?}`")] - SnowError(snow::Error), - #[error("Connection closed while negotiating")] - ConnectionClosed, + SnowError(#[from] snow::Error), + /// The peer ID was not provided by the noise handshake. #[error("`PeerId` missing from Noise handshake")] PeerIdMissing, + /// The negotiation operation timed out. + #[error("Operation timed out")] + Timeout, + /// The message provided over the wire has an invalid format or is unsupported. + #[error("Parse error: `{0}`")] + ParseError(#[from] ParseError), + /// An I/O error occurred during the negotiation process. + #[error("I/O error: `{0}`")] + IoError(ErrorKind), + /// Expected a different state during the negotiation process. + #[error("Expected a different state")] + StateMismatch, + /// The noise handshake provided a different peer ID than the one expected in the dialing + /// address. + #[error("Peer ID mismatch: expected `{0}`, got `{1}`")] + PeerIdMismatch(PeerId, PeerId), + /// Error specific to the QUIC transport. + #[cfg(feature = "quic")] + #[error("QUIC error: `{0}`")] + Quic(#[from] QuicError), + /// Error specific to the WebSocket transport. + #[cfg(feature = "websocket")] + #[error("WebSocket error: `{0}`")] + WebSocket(#[from] tokio_tungstenite::tungstenite::error::Error), } #[derive(Debug, thiserror::Error)] @@ -183,19 +244,62 @@ pub enum NotificationError { NotificationStreamClosed(PeerId), } +/// The error type for dialing a peer. +/// +/// This error is reported via the litep2p events. #[derive(Debug, thiserror::Error)] pub enum DialError { - #[error("Tried to dial self")] - TriedToDialSelf, - #[error("Already connected to peer")] - AlreadyConnected, - #[error("Peer doens't have any known addresses")] - NoAddressAvailable(PeerId), + /// The dialing operation timed out. + /// + /// This error indicates that the `connection_open_timeout` from the protocol configuration + /// was exceeded. + #[error("Dial timed out")] + Timeout, + /// The provided address for dialing is invalid. + #[error("Address error: `{0}`")] + AddressError(#[from] AddressError), + /// An error occurred during DNS lookup operation. + /// + /// The address provided may be valid, however it failed to resolve to a concrete IP address. + /// This error may be recoverable. + #[error("DNS lookup error for `{0}`")] + DnsError(#[from] DnsError), + /// An error occurred during the negotiation process. + #[error("Negotiation error: `{0}`")] + NegotiationError(#[from] NegotiationError), +} + +/// Error during the QUIC transport negotiation. +#[cfg(feature = "quic")] +#[derive(Debug, thiserror::Error)] +pub enum QuicError { + /// The provided certificate is invalid. + #[error("Invalid certificate")] + InvalidCertificate, + /// The connection was lost. + #[error("Failed to negotiate QUIC: `{0}`")] + ConnectionError(#[from] quinn::ConnectionError), + /// The connection could not be established. + #[error("Failed to connect to peer: `{0}`")] + ConnectError(#[from] quinn::ConnectError), +} + +/// Error during DNS resolution. +#[derive(Debug, thiserror::Error)] +pub enum DnsError { + /// The DNS resolution failed to resolve the provided URL. + #[error("DNS failed to resolve url `{0}`")] + ResolveError(String), + /// The DNS expected a different IP address version. + /// + /// For example, DNSv4 was expected but DNSv6 was provided. + #[error("DNS type is different from the provided IP address")] + IpVersionMismatch, } impl From> for Error { fn from(hash: MultihashGeneric<64>) -> Self { - Error::ParseError(ParseError::InvalidMultihash(hash)) + Error::AddressError(AddressError::InvalidPeerId(hash)) } } @@ -205,6 +309,12 @@ impl From for Error { } } +impl From for DialError { + fn from(error: io::Error) -> Self { + DialError::NegotiationError(NegotiationError::IoError(error.kind())) + } +} + impl From for Error { fn from(error: crate::multistream_select::NegotiationError) -> Error { Error::NegotiationError(NegotiationError::MultistreamSelectError(error)) @@ -241,6 +351,24 @@ impl From for Error { } } +impl From for NegotiationError { + fn from(error: io::Error) -> Self { + NegotiationError::IoError(error.kind()) + } +} + +impl From for Error { + fn from(error: ParseError) -> Self { + Error::ParseError(error) + } +} + +impl From> for AddressError { + fn from(hash: MultihashGeneric<64>) -> Self { + AddressError::InvalidPeerId(hash) + } +} + #[cfg(feature = "quic")] impl From for Error { fn from(error: quinn::ConnectionError) -> Self { @@ -251,6 +379,23 @@ impl From for Error { } } +#[cfg(feature = "quic")] +impl From for DialError { + fn from(error: quinn::ConnectionError) -> Self { + match error { + quinn::ConnectionError::TimedOut => DialError::Timeout, + error => DialError::NegotiationError(NegotiationError::Quic(error.into())), + } + } +} + +#[cfg(feature = "quic")] +impl From for DialError { + fn from(error: quinn::ConnectError) -> Self { + DialError::NegotiationError(NegotiationError::Quic(error.into())) + } +} + impl From for Error { fn from(error: ConnectionLimitsError) -> Self { Error::ConnectionLimit(error) @@ -264,18 +409,6 @@ mod tests { #[tokio::test] async fn try_from_errors() { - tracing::trace!("{:?}", NotificationError::InvalidState); - tracing::trace!("{:?}", DialError::AlreadyConnected); - tracing::trace!( - "{:?}", - SubstreamError::YamuxError(crate::yamux::ConnectionError::Closed) - ); - tracing::trace!("{:?}", AddressError::PeerIdMissing); - tracing::trace!( - "{:?}", - ParseError::InvalidMultihash(Multihash::from(PeerId::random())) - ); - let (tx, rx) = channel(1); drop(rx); diff --git a/src/lib.rs b/src/lib.rs index 6222c241..c42d6fda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,7 @@ use crate::transport::webrtc::WebRtcTransport; #[cfg(feature = "websocket")] use crate::transport::websocket::WebSocketTransport; +use error::DialError; use multiaddr::{Multiaddr, Protocol}; use multihash::Multihash; use transport::Endpoint; @@ -112,12 +113,22 @@ pub enum Litep2pEvent { }, /// Failed to dial peer. + /// + /// This error can originate from dialing a single peer address. DialFailure { /// Address of the peer. address: Multiaddr, /// Dial error. - error: Error, + error: DialError, + }, + + /// A list of multiple dial failures. + ListDialFailures { + /// List of errors. + /// + /// Depending on the transport, the address might be different for each error. + errors: Vec<(Multiaddr, DialError)>, }, } @@ -489,6 +500,10 @@ impl Litep2p { }), TransportEvent::DialFailure { address, error, .. } => return Some(Litep2pEvent::DialFailure { address, error }), + + TransportEvent::OpenFailure { errors, .. } => { + return Some(Litep2pEvent::ListDialFailures { errors }); + } _ => {} } } diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index d28435a3..0bd9c259 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -22,7 +22,7 @@ use crate::{ codec::unsigned_varint::UnsignedVarint, - error::{self, Error}, + error::{self, Error, ParseError}, multistream_select::{ protocol::{ encode_multistream_message, HeaderLine, Message, MessageIO, Protocol, ProtocolError, @@ -354,19 +354,23 @@ impl DialerState { } /// Register response to [`DialerState`]. - pub fn register_response(&mut self, payload: Vec) -> crate::Result { + pub fn register_response( + &mut self, + payload: Vec, + ) -> Result { let Message::Protocols(protocols) = - Message::decode(payload.into()).map_err(|_| Error::InvalidData)? + Message::decode(payload.into()).map_err(|_| ParseError::InvalidData)? else { - return Err(Error::NegotiationError( - error::NegotiationError::MultistreamSelectError(NegotiationError::Failed), + return Err(crate::error::NegotiationError::MultistreamSelectError( + NegotiationError::Failed, )); }; let mut protocol_iter = protocols.into_iter(); loop { match (&self.state, protocol_iter.next()) { - (HandshakeState::WaitingResponse, None) => return Err(Error::InvalidState), + (HandshakeState::WaitingResponse, None) => + return Err(crate::error::NegotiationError::StateMismatch), (HandshakeState::WaitingResponse, Some(protocol)) => { let header = Protocol::try_from(&b"/multistream/1.0.0"[..]) .expect("valid multitstream-select header"); @@ -374,10 +378,8 @@ impl DialerState { if protocol == header { self.state = HandshakeState::WaitingProtocol; } else { - return Err(Error::NegotiationError( - error::NegotiationError::MultistreamSelectError( - NegotiationError::Failed, - ), + return Err(crate::error::NegotiationError::MultistreamSelectError( + NegotiationError::Failed, )); } } @@ -392,8 +394,8 @@ impl DialerState { } } - return Err(Error::NegotiationError( - error::NegotiationError::MultistreamSelectError(NegotiationError::Failed), + return Err(crate::error::NegotiationError::MultistreamSelectError( + NegotiationError::Failed, )); } (HandshakeState::WaitingProtocol, None) => { @@ -816,9 +818,7 @@ mod tests { DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); match dialer_state.register_response(bytes.freeze().to_vec()) { - Err(Error::NegotiationError(error::NegotiationError::MultistreamSelectError( - NegotiationError::Failed, - ))) => {} + Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {} event => panic!("invalid event: {event:?}"), } } @@ -837,9 +837,7 @@ mod tests { DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); match dialer_state.register_response(bytes.freeze().to_vec()) { - Err(Error::NegotiationError(error::NegotiationError::MultistreamSelectError( - NegotiationError::Failed, - ))) => {} + Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {} event => panic!("invalid event: {event:?}"), } } diff --git a/src/multistream_select/negotiated.rs b/src/multistream_select/negotiated.rs index 8eee8459..846399aa 100644 --- a/src/multistream_select/negotiated.rs +++ b/src/multistream_select/negotiated.rs @@ -350,21 +350,17 @@ where } /// Error that can happen when negotiating a protocol with the remote. -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum NegotiationError { /// A protocol error occurred during the negotiation. - ProtocolError(ProtocolError), + #[error("A protocol error occurred during the negotiation: `{0:?}`")] + ProtocolError(#[from] ProtocolError), /// Protocol negotiation failed because no protocol could be agreed upon. + #[error("Protocol negotiation failed.")] Failed, } -impl From for NegotiationError { - fn from(err: ProtocolError) -> NegotiationError { - NegotiationError::ProtocolError(err) - } -} - impl From for NegotiationError { fn from(err: io::Error) -> NegotiationError { ProtocolError::from(err).into() @@ -379,22 +375,3 @@ impl From for io::Error { io::Error::new(io::ErrorKind::Other, err) } } - -impl Error for NegotiationError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - NegotiationError::ProtocolError(err) => Some(err), - _ => None, - } - } -} - -impl fmt::Display for NegotiationError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self { - NegotiationError::ProtocolError(p) => - fmt.write_fmt(format_args!("Protocol error: {p}")), - NegotiationError::Failed => fmt.write_str("Protocol negotiation failed."), - } - } -} diff --git a/src/multistream_select/protocol.rs b/src/multistream_select/protocol.rs index b23df5fc..3c02f794 100644 --- a/src/multistream_select/protocol.rs +++ b/src/multistream_select/protocol.rs @@ -422,25 +422,27 @@ where } /// A protocol error. -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum ProtocolError { /// I/O error. - IoError(io::Error), + #[error("I/O error: `{0}`")] + IoError(#[from] io::Error), /// Received an invalid message from the remote. + #[error("Received an invalid message from the remote.")] InvalidMessage, /// A protocol (name) is invalid. + #[error("A protocol (name) is invalid.")] InvalidProtocol, /// Too many protocols have been returned by the remote. + #[error("Too many protocols have been returned by the remote.")] TooManyProtocols, -} -impl From for ProtocolError { - fn from(err: io::Error) -> ProtocolError { - ProtocolError::IoError(err) - } + /// The protocol is not supported. + #[error("The protocol is not supported.")] + ProtocolNotSupported, } impl From for io::Error { @@ -457,23 +459,3 @@ impl From for ProtocolError { Self::from(io::Error::new(io::ErrorKind::InvalidData, err.to_string())) } } - -impl Error for ProtocolError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match *self { - ProtocolError::IoError(ref err) => Some(err), - _ => None, - } - } -} - -impl fmt::Display for ProtocolError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self { - ProtocolError::IoError(e) => write!(fmt, "I/O error: {e}"), - ProtocolError::InvalidMessage => write!(fmt, "Received an invalid message."), - ProtocolError::InvalidProtocol => write!(fmt, "A protocol (name) is invalid."), - ProtocolError::TooManyProtocols => write!(fmt, "Too many protocols received."), - } - } -} diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 9dc2c347..b69689e6 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -21,7 +21,7 @@ //! [`/ipfs/kad/1.0.0`](https://github.com/libp2p/specs/blob/master/kad-dht/README.md) implementation. use crate::{ - error::Error, + error::{Error, SubstreamError}, protocol::{ libp2p::kademlia::{ bucket::KBucketEntry, @@ -492,7 +492,11 @@ impl Kademlia { } /// Failed to open substream to remote peer. - async fn on_substream_open_failure(&mut self, substream_id: SubstreamId, error: Error) { + async fn on_substream_open_failure( + &mut self, + substream_id: SubstreamId, + error: SubstreamError, + ) { tracing::trace!( target: LOG_TARGET, ?substream_id, diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 94043904..87b92f84 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -22,7 +22,7 @@ use crate::{ codec::ProtocolCodec, - error::Error, + error::SubstreamError, substream::Substream, transport::Endpoint, types::{protocol::ProtocolName, SubstreamId}, @@ -125,7 +125,7 @@ pub enum TransportEvent { substream: SubstreamId, /// Error that occurred when the substream was being opened. - error: Error, + error: SubstreamError, }, } diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index eefe5904..984771b3 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -21,7 +21,7 @@ //! Notification protocol implementation. use crate::{ - error::Error, + error::{Error, SubstreamError}, executor::Executor, protocol::{ self, @@ -813,7 +813,11 @@ impl NotificationProtocol { /// /// If the substream was initiated by the local node, it must be reported that the substream /// failed to open. Otherwise the peer state can silently be converted to `Closed`. - async fn on_substream_open_failure(&mut self, substream_id: SubstreamId, error: Error) { + async fn on_substream_open_failure( + &mut self, + substream_id: SubstreamId, + error: SubstreamError, + ) { tracing::debug!( target: LOG_TARGET, protocol = %self.protocol, diff --git a/src/protocol/notification/tests/notification.rs b/src/protocol/notification/tests/notification.rs index 52b12f07..f766fc44 100644 --- a/src/protocol/notification/tests/notification.rs +++ b/src/protocol/notification/tests/notification.rs @@ -31,7 +31,7 @@ use crate::{ ConnectionState, InboundState, NotificationProtocol, OutboundState, PeerContext, PeerState, ValidationResult, }, - InnerTransportEvent, ProtocolCommand, + InnerTransportEvent, ProtocolCommand, SubstreamError, }, substream::Substream, transport::Endpoint, @@ -225,7 +225,9 @@ async fn substream_open_failure_for_unknown_substream() { let (mut notif, _handle, _sender, _tx) = make_notification_protocol(); - notif.on_substream_open_failure(SubstreamId::new(), Error::Unknown).await; + notif + .on_substream_open_failure(SubstreamId::new(), SubstreamError::ConnectionClosed) + .await; } #[tokio::test] @@ -313,7 +315,9 @@ async fn substream_open_failure_for_unknown_peer() { let substream_id = SubstreamId::from(1337usize); notif.pending_outbound.insert(substream_id, peer); - notif.on_substream_open_failure(substream_id, Error::Unknown).await; + notif + .on_substream_open_failure(substream_id, SubstreamError::ConnectionClosed) + .await; } #[tokio::test] @@ -895,7 +899,10 @@ async fn open_failure_reported_once() { notif.pending_outbound.insert(SubstreamId::from(1337usize), peer); notif - .on_substream_open_failure(SubstreamId::from(1337usize), Error::Unknown) + .on_substream_open_failure( + SubstreamId::from(1337usize), + SubstreamError::ConnectionClosed, + ) .await; match handle.next().await { diff --git a/src/protocol/protocol_set.rs b/src/protocol/protocol_set.rs index a0458146..736cb7f7 100644 --- a/src/protocol/protocol_set.rs +++ b/src/protocol/protocol_set.rs @@ -20,7 +20,10 @@ use crate::{ codec::ProtocolCodec, - error::Error, + error::{Error, NegotiationError, SubstreamError}, + multistream_select::{ + NegotiationError as MultiStreamNegotiationError, ProtocolError as MultiStreamProtocolError, + }, protocol::{ connection::{ConnectionHandle, Permit}, Direction, TransportEvent, @@ -131,7 +134,7 @@ pub enum InnerTransportEvent { substream: SubstreamId, /// Error that occurred when the substream was being opened. - error: Error, + error: SubstreamError, }, } @@ -274,7 +277,7 @@ impl ProtocolSet { protocol: ProtocolName, direction: Direction, substream: Substream, - ) -> crate::Result<()> { + ) -> Result<(), SubstreamError> { tracing::debug!(target: LOG_TARGET, %protocol, ?peer, ?direction, "substream opened"); let (protocol, fallback) = match self.fallback_names.get(&protocol) { @@ -282,19 +285,28 @@ impl ProtocolSet { None => (protocol, None), }; - self.protocols - .get_mut(&protocol) - .ok_or(Error::ProtocolNotSupported(protocol.to_string()))? + let Some(protocol_context) = self.protocols.get(&protocol) else { + return Err(NegotiationError::MultistreamSelectError( + MultiStreamNegotiationError::ProtocolError( + MultiStreamProtocolError::ProtocolNotSupported, + ), + ) + .into()); + }; + + let event = InnerTransportEvent::SubstreamOpened { + peer, + protocol: protocol.clone(), + fallback, + direction, + substream, + }; + + protocol_context .tx - .send(InnerTransportEvent::SubstreamOpened { - peer, - protocol: protocol.clone(), - fallback, - direction, - substream, - }) + .send(event) .await - .map_err(From::from) + .map_err(|_| SubstreamError::ConnectionClosed) } /// Get codec used by the protocol. @@ -312,7 +324,7 @@ impl ProtocolSet { &mut self, protocol: ProtocolName, substream: SubstreamId, - error: Error, + error: SubstreamError, ) -> crate::Result<()> { tracing::debug!( target: LOG_TARGET, diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index 16b3c468..3125f71c 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -21,7 +21,7 @@ //! Request-response protocol implementation. use crate::{ - error::{Error, NegotiationError}, + error::{Error, NegotiationError, SubstreamError}, multistream_select::NegotiationError::Failed as MultistreamFailed, protocol::{ request_response::handle::{InnerRequestResponseEvent, RequestResponseCommand}, @@ -623,7 +623,7 @@ impl RequestResponseProtocol { async fn on_substream_open_failure( &mut self, substream: SubstreamId, - error: Error, + error: SubstreamError, ) -> crate::Result<()> { let Some(RequestContext { request_id, peer, .. @@ -660,7 +660,7 @@ impl RequestResponseProtocol { peer, request_id, error: match error { - Error::NegotiationError(NegotiationError::MultistreamSelectError( + SubstreamError::NegotiationError(NegotiationError::MultistreamSelectError( MultistreamFailed, )) => RequestResponseError::UnsupportedProtocol, _ => RequestResponseError::Rejected, diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 9cb842f6..107b87ca 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -26,7 +26,7 @@ use crate::{ ConfigBuilder, DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle, RequestResponseProtocol, }, - InnerTransportEvent, TransportService, + InnerTransportEvent, SubstreamError, TransportService, }, substream::Substream, transport::{ @@ -138,7 +138,10 @@ async fn unknown_substream_open_failure() { let (mut protocol, _handle, _manager, _tx) = protocol(); match protocol - .on_substream_open_failure(SubstreamId::from(1338usize), Error::Unknown) + .on_substream_open_failure( + SubstreamId::from(1338usize), + SubstreamError::ConnectionClosed, + ) .await { Err(Error::InvalidState) => {} diff --git a/src/transport/common/listener.rs b/src/transport/common/listener.rs index d9afe729..fbd0ddb6 100644 --- a/src/transport/common/listener.rs +++ b/src/transport/common/listener.rs @@ -20,7 +20,10 @@ //! Shared socket listener between TCP and WebSocket. -use crate::{error::AddressError, Error, PeerId}; +use crate::{ + error::{AddressError, DnsError}, + PeerId, +}; use futures::Stream; use multiaddr::{Multiaddr, Protocol}; @@ -70,7 +73,7 @@ pub enum DnsType { impl AddressType { /// Resolve the address to a concrete IP. - pub async fn lookup_ip(self) -> crate::Result { + pub async fn lookup_ip(self) -> Result { let (url, port, dns_type) = match self { // We already have the IP address. AddressType::Socket(address) => return Ok(address), @@ -95,7 +98,7 @@ impl AddressType { url ); - return Err(Error::Other(format!("Failed to resolve DNS address {url}"))); + return Err(DnsError::ResolveError(url)); } }; @@ -109,10 +112,7 @@ impl AddressType { "Multiaddr DNS type does not match IP version `{}`", url ); - - return Err(Error::Other(format!( - "Miss-match in DNS address IP version {url}" - ))); + return Err(DnsError::IpVersionMismatch); }; Ok(SocketAddr::new(ip, port)) @@ -185,7 +185,7 @@ pub trait GetSocketAddr { /// The `PeerId` is optional and may not be present. fn multiaddr_to_socket_address( address: &Multiaddr, - ) -> crate::Result<(AddressType, Option)>; + ) -> Result<(AddressType, Option), AddressError>; /// Convert concrete `SocketAddr` to `Multiaddr`. fn socket_address_to_multiaddr(address: &SocketAddr) -> Multiaddr; @@ -197,7 +197,7 @@ pub struct TcpAddress; impl GetSocketAddr for TcpAddress { fn multiaddr_to_socket_address( address: &Multiaddr, - ) -> crate::Result<(AddressType, Option)> { + ) -> Result<(AddressType, Option), AddressError> { multiaddr_to_socket_address(address, SocketListenerType::Tcp) } @@ -214,7 +214,7 @@ pub struct WebSocketAddress; impl GetSocketAddr for WebSocketAddress { fn multiaddr_to_socket_address( address: &Multiaddr, - ) -> crate::Result<(AddressType, Option)> { + ) -> Result<(AddressType, Option), AddressError> { multiaddr_to_socket_address(address, SocketListenerType::WebSocket) } @@ -352,7 +352,7 @@ enum SocketListenerType { fn multiaddr_to_socket_address( address: &Multiaddr, ty: SocketListenerType, -) -> crate::Result<(AddressType, Option)> { +) -> Result<(AddressType, Option), AddressError> { tracing::trace!(target: LOG_TARGET, ?address, "parse multi address"); let mut iter = address.iter(); @@ -370,7 +370,7 @@ fn multiaddr_to_socket_address( ?protocol, "invalid transport protocol, expected `Tcp`", ); - Err(Error::AddressError(AddressError::InvalidProtocol)) + Err(AddressError::InvalidProtocol) } }; @@ -384,7 +384,7 @@ fn multiaddr_to_socket_address( ?protocol, "invalid transport protocol, expected `Tcp`", ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }, Some(Protocol::Ip4(address)) => match iter.next() { @@ -396,7 +396,7 @@ fn multiaddr_to_socket_address( ?protocol, "invalid transport protocol, expected `Tcp`", ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }, Some(Protocol::Dns(address)) => handle_dns_type(address.into(), DnsType::Dns, iter.next())?, @@ -406,7 +406,7 @@ fn multiaddr_to_socket_address( handle_dns_type(address.into(), DnsType::Dns6, iter.next())?, protocol => { tracing::error!(target: LOG_TARGET, ?protocol, "invalid transport protocol"); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }; @@ -423,14 +423,15 @@ fn multiaddr_to_socket_address( ?protocol, "invalid protocol, expected `Ws` or `Wss`" ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }; } } let maybe_peer = match iter.next() { - Some(Protocol::P2p(multihash)) => Some(PeerId::from_multihash(multihash)?), + Some(Protocol::P2p(multihash)) => + Some(PeerId::from_multihash(multihash).map_err(AddressError::InvalidPeerId)?), None => None, protocol => { tracing::error!( @@ -438,7 +439,7 @@ fn multiaddr_to_socket_address( ?protocol, "invalid protocol, expected `P2p` or `None`" ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }; diff --git a/src/transport/dummy.rs b/src/transport/dummy.rs index f8c07571..29e9570d 100644 --- a/src/transport/dummy.rs +++ b/src/transport/dummy.rs @@ -104,7 +104,7 @@ impl Transport for DummyTransport { #[cfg(test)] mod tests { use super::*; - use crate::{transport::Endpoint, Error, PeerId}; + use crate::{error::DialError, transport::Endpoint, PeerId}; use futures::StreamExt; #[tokio::test] @@ -114,7 +114,7 @@ mod tests { transport.inject_event(TransportEvent::DialFailure { connection_id: ConnectionId::from(1338usize), address: Multiaddr::empty(), - error: Error::Unknown, + error: DialError::Timeout, }); let peer = PeerId::random(); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 33cc8f5b..a47d309e 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -21,7 +21,7 @@ use crate::{ codec::ProtocolCodec, crypto::ed25519::Keypair, - error::{AddressError, Error}, + error::{AddressError, DialError, Error}, executor::Executor, protocol::{InnerTransportEvent, TransportService}, transport::{ @@ -248,6 +248,9 @@ pub struct TransportManager { /// Connection limits. connection_limits: limits::ConnectionLimits, + + /// Opening connections errors. + opening_errors: HashMap>, } impl TransportManager { @@ -292,6 +295,7 @@ impl TransportManager { next_substream_id: Arc::new(AtomicUsize::new(0usize)), next_connection_id: Arc::new(AtomicUsize::new(0usize)), connection_limits: limits::ConnectionLimits::new(connection_limits_config), + opening_errors: HashMap::new(), }, handle, ) @@ -1603,6 +1607,7 @@ impl TransportManager { } } TransportEvent::ConnectionEstablished { peer, endpoint } => { + self.opening_errors.remove(&endpoint.connection_id()); match self.on_connection_established(peer, &endpoint) { Err(error) => { tracing::debug!( @@ -1655,6 +1660,8 @@ impl TransportManager { } } TransportEvent::ConnectionOpened { connection_id, address } => { + self.opening_errors.remove(&connection_id); + if let Err(error) = self.on_connection_opened(transport, connection_id, address) { tracing::debug!( target: LOG_TARGET, @@ -1664,7 +1671,7 @@ impl TransportManager { ); } } - TransportEvent::OpenFailure { connection_id } => { + TransportEvent::OpenFailure { connection_id, errors } => { match self.on_open_failure(transport, connection_id) { Err(error) => tracing::debug!( target: LOG_TARGET, @@ -1709,13 +1716,19 @@ impl TransportManager { }; } - return Some(TransportEvent::DialFailure { - connection_id, - address: Multiaddr::empty(), - error: Error::Unknown, - }) + let mut grouped_errors = self.opening_errors.remove(&connection_id).unwrap_or_default(); + grouped_errors.extend(errors); + return Some(TransportEvent::OpenFailure { connection_id, errors: grouped_errors }); + } + Ok(None) => { + tracing::trace!( + target: LOG_TARGET, + ?connection_id, + "open failure, but not the last transport", + ); + + self.opening_errors.entry(connection_id).or_default().extend(errors); } - Ok(None) => {} } }, TransportEvent::PendingInboundConnection { connection_id } => { @@ -1764,6 +1777,7 @@ mod tests { transport::{dummy::DummyTransport, KEEP_ALIVE_TIMEOUT}, }; use std::{ + borrow::Cow, net::{Ipv4Addr, Ipv6Addr}, sync::Arc, }; @@ -4067,4 +4081,110 @@ mod tests { assert!(!peer_context.addresses.contains(&second_address)); } } + + #[tokio::test] + async fn opening_errors_are_reported() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (mut manager, _handle) = TransportManager::new( + Keypair::generate(), + HashSet::new(), + BandwidthSink::new(), + 8usize, + ConnectionLimitsConfig::default(), + ); + let peer = PeerId::random(); + let connection_id = ConnectionId::from(0); + + // Setup TCP transport. + let dial_address_tcp = Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .with(Protocol::Tcp(8888)) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + )); + let transport = Box::new({ + let mut transport = DummyTransport::new(); + transport.inject_event(TransportEvent::OpenFailure { + connection_id, + errors: vec![(dial_address_tcp.clone(), DialError::Timeout)], + }); + transport + }); + manager.register_transport(SupportedTransport::Tcp, transport); + manager.add_known_address( + peer, + vec![Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5))) + .with(Protocol::Tcp(8888)) + .with(Protocol::P2p(Multihash::from(peer)))] + .into_iter(), + ); + + // Setup WebSockets transport. + let dial_address_ws = Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .with(Protocol::Tcp(8889)) + .with(Protocol::Ws(Cow::Borrowed("/"))) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + )); + + let transport = Box::new({ + let mut transport = DummyTransport::new(); + transport.inject_event(TransportEvent::OpenFailure { + connection_id, + errors: vec![(dial_address_ws.clone(), DialError::Timeout)], + }); + transport + }); + manager.register_transport(SupportedTransport::WebSocket, transport); + manager.add_known_address( + peer, + vec![Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5))) + .with(Protocol::Tcp(8889)) + .with(Protocol::Ws(Cow::Borrowed("/"))) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + ))] + .into_iter(), + ); + + // Dial the peer on both transports. + assert!(manager.dial(peer).await.is_ok()); + assert!(!manager.pending_connections.is_empty()); + + { + let peers = manager.peers.read(); + + match peers.get(&peer) { + Some(PeerContext { + state: PeerState::Opening { .. }, + .. + }) => {} + state => panic!("invalid state for peer: {state:?}"), + } + } + + match manager.next().await.unwrap() { + TransportEvent::OpenFailure { + connection_id, + errors, + } => { + assert_eq!(connection_id, ConnectionId::from(0)); + assert_eq!(errors.len(), 2); + let tcp = errors.iter().find(|(addr, _)| addr == &dial_address_tcp).unwrap(); + assert!(std::matches!(tcp.1, DialError::Timeout)); + + let ws = errors.iter().find(|(addr, _)| addr == &dial_address_ws).unwrap(); + assert!(std::matches!(ws.1, DialError::Timeout)); + } + event => panic!("invalid event: {event:?}"), + } + assert!(manager.pending_connections.is_empty()); + assert!(manager.opening_errors.is_empty()); + } } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 792508cc..1d61ca9d 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -20,7 +20,7 @@ //! Transport protocol implementations provided by [`Litep2p`](`crate::Litep2p`). -use crate::{transport::manager::TransportHandle, types::ConnectionId, Error, PeerId}; +use crate::{error::DialError, transport::manager::TransportHandle, types::ConnectionId, PeerId}; use futures::Stream; use multiaddr::Multiaddr; @@ -159,13 +159,16 @@ pub(crate) enum TransportEvent { address: Multiaddr, /// Error. - error: Error, + error: DialError, }, /// Open failure for an unnegotiated set of connections. OpenFailure { /// Connection ID. connection_id: ConnectionId, + + /// Errors. + errors: Vec<(Multiaddr, DialError)>, }, } diff --git a/src/transport/quic/connection.rs b/src/transport/quic/connection.rs index ec14d772..52c198e7 100644 --- a/src/transport/quic/connection.rs +++ b/src/transport/quic/connection.rs @@ -24,7 +24,7 @@ use std::time::Duration; use crate::{ config::Role, - error::Error, + error::{Error, NegotiationError, SubstreamError}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -63,7 +63,7 @@ enum ConnectionError { substream_id: Option, /// Error. - error: Error, + error: SubstreamError, }, } @@ -138,13 +138,14 @@ impl QuicConnection { stream: S, role: &Role, protocols: Vec<&str>, - ) -> crate::Result<(Negotiated, ProtocolName)> { + ) -> Result<(Negotiated, ProtocolName), NegotiationError> { tracing::trace!(target: LOG_TARGET, ?protocols, "negotiating protocols"); let (protocol, socket) = match role { - Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await?, - Role::Listener => listener_select_proto(stream, protocols).await?, - }; + Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await, + Role::Listener => listener_select_proto(stream, protocols).await, + } + .map_err(NegotiationError::MultistreamSelectError)?; tracing::trace!(target: LOG_TARGET, ?protocol, "protocol negotiated"); @@ -158,12 +159,12 @@ impl QuicConnection { substream_id: SubstreamId, protocol: ProtocolName, fallback_names: Vec, - ) -> crate::Result { + ) -> Result { tracing::debug!(target: LOG_TARGET, ?protocol, ?substream_id, "open substream"); let stream = match handle.open_bi().await { Ok((send_stream, recv_stream)) => NegotiatingSubstream::new(send_stream, recv_stream), - Err(error) => return Err(Error::Quinn(error)), + Err(error) => return Err(NegotiationError::Quic(error.into()).into()), }; // TODO: protocols don't change after they've been initialized so this should be done only @@ -200,7 +201,7 @@ impl QuicConnection { protocols: Vec, substream_id: SubstreamId, permit: Permit, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?substream_id, @@ -258,7 +259,7 @@ impl QuicConnection { Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { protocol: None, substream_id: None, - error, + error: SubstreamError::NegotiationError(error), }), Err(_) => Err(ConnectionError::Timeout { protocol: None, @@ -283,7 +284,7 @@ impl QuicConnection { let (protocol, substream_id, error) = match error { ConnectionError::Timeout { protocol, substream_id } => { - (protocol, substream_id, Error::Timeout) + (protocol, substream_id, SubstreamError::NegotiationError(NegotiationError::Timeout)) } ConnectionError::FailedToNegotiate { protocol, substream_id, error } => { (protocol, substream_id, error) diff --git a/src/transport/quic/listener.rs b/src/transport/quic/listener.rs index 7f6c3ad0..e8550ffb 100644 --- a/src/transport/quic/listener.rs +++ b/src/transport/quic/listener.rs @@ -20,7 +20,7 @@ use crate::{ crypto::{ed25519::Keypair, tls::make_server_config}, - error::{AddressError, Error}, + error::AddressError, PeerId, }; @@ -101,7 +101,9 @@ impl QuicListener { } /// Extract socket address and `PeerId`, if found, from `address`. - pub fn get_socket_address(address: &Multiaddr) -> crate::Result<(SocketAddr, Option)> { + pub fn get_socket_address( + address: &Multiaddr, + ) -> Result<(SocketAddr, Option), AddressError> { tracing::trace!(target: LOG_TARGET, ?address, "parse multi address"); let mut iter = address.iter(); @@ -114,7 +116,7 @@ impl QuicListener { ?protocol, "invalid transport protocol, expected `QuicV1`", ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }, Some(Protocol::Ip4(address)) => match iter.next() { @@ -125,19 +127,19 @@ impl QuicListener { ?protocol, "invalid transport protocol, expected `QuicV1`", ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }, protocol => { tracing::error!(target: LOG_TARGET, ?protocol, "invalid transport protocol"); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }; // verify that quic exists match iter.next() { Some(Protocol::QuicV1) => {} - _ => return Err(Error::AddressError(AddressError::InvalidProtocol)), + _ => return Err(AddressError::InvalidProtocol), } let maybe_peer = match iter.next() { @@ -149,7 +151,7 @@ impl QuicListener { ?protocol, "invalid protocol, expected `P2p` or `None`" ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::PeerIdMissing); } }; @@ -268,12 +270,10 @@ mod tests { let crypto_config = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config = ClientConfig::new(crypto_config); - let client = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client = + Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).unwrap(); let connection = client .connect_with(client_config, format!("[::1]:{port}").parse().unwrap(), "l") - .map_err(|error| Error::Other(error.to_string())) .unwrap(); let (res1, res2) = tokio::join!( @@ -318,31 +318,27 @@ mod tests { let crypto_config1 = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config1 = ClientConfig::new(crypto_config1); - let client1 = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client1 = + Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).unwrap(); let connection1 = client1 .connect_with( client_config1, format!("[::1]:{port1}").parse().unwrap(), "l", ) - .map_err(|error| Error::Other(error.to_string())) .unwrap(); let crypto_config2 = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config2 = ClientConfig::new(crypto_config2); - let client2 = Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client2 = + Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)).unwrap(); let connection2 = client2 .connect_with( client_config2, format!("127.0.0.1:{port2}").parse().unwrap(), "l", ) - .map_err(|error| Error::Other(error.to_string())) .unwrap(); tokio::spawn(async move { @@ -391,31 +387,27 @@ mod tests { let crypto_config1 = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config1 = ClientConfig::new(crypto_config1); - let client1 = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client1 = + Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).unwrap(); let connection1 = client1 .connect_with( client_config1, format!("[::1]:{port}").parse().unwrap(), "l", ) - .map_err(|error| Error::Other(error.to_string())) .unwrap(); let crypto_config2 = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config2 = ClientConfig::new(crypto_config2); - let client2 = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client2 = + Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).unwrap(); let connection2 = client2 .connect_with( client_config2, format!("[::1]:{port}").parse().unwrap(), "l", ) - .map_err(|error| Error::Other(error.to_string())) .unwrap(); tokio::spawn(async move { diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index d274a977..ad03674d 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -24,7 +24,7 @@ use crate::{ crypto::tls::make_client_config, - error::{AddressError, Error}, + error::{AddressError, DialError, Error, QuicError}, transport::{ manager::TransportHandle, quic::{config::Config as QuicConfig, connection::QuicConnection, listener::QuicListener}, @@ -84,15 +84,22 @@ pub(crate) struct QuicTransport { pending_inbound_connections: HashMap, /// Pending connections. - pending_connections: - FuturesUnordered)>>, + pending_connections: FuturesUnordered< + BoxFuture<'static, (ConnectionId, Result)>, + >, /// Negotiated connections waiting for validation. pending_open: HashMap, /// Pending raw, unnegotiated connections. pending_raw_connections: FuturesUnordered< - BoxFuture<'static, Result<(ConnectionId, Multiaddr, NegotiatedConnection), ConnectionId>>, + BoxFuture< + 'static, + Result< + (ConnectionId, Multiaddr, NegotiatedConnection), + (ConnectionId, Vec<(Multiaddr, DialError)>), + >, + >, >, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. @@ -118,11 +125,14 @@ impl QuicTransport { self.pending_connections.push(Box::pin(async move { let connection = match connection.await { Ok(connection) => connection, - Err(error) => return (connection_id, Err(error.into())), + Err(error) => return (connection_id, Err(DialError::from(error))), }; let Some(peer) = Self::extract_peer_id(&connection) else { - return (connection_id, Err(Error::InvalidCertificate)); + return ( + connection_id, + Err(crate::error::NegotiationError::Quic(QuicError::InvalidCertificate).into()), + ); }; (connection_id, Ok(NegotiatedConnection { peer, connection })) @@ -133,7 +143,7 @@ impl QuicTransport { fn on_connection_established( &mut self, connection_id: ConnectionId, - result: crate::Result, + result: Result, ) -> Option { tracing::debug!(target: LOG_TARGET, ?connection_id, success = result.is_ok(), "connection established"); @@ -257,14 +267,18 @@ impl Transport for QuicTransport { ); self.pending_dials.insert(connection_id, address); + self.pending_connections.push(Box::pin(async move { let connection = match connection.await { Ok(connection) => connection, - Err(error) => return (connection_id, Err(error.into())), + Err(error) => return (connection_id, Err(DialError::from(error))), }; let Some(peer) = Self::extract_peer_id(&connection) else { - return (connection_id, Err(Error::InvalidCertificate)); + return ( + connection_id, + Err(crate::error::NegotiationError::Quic(QuicError::InvalidCertificate).into()), + ); }; (connection_id, Ok(NegotiatedConnection { peer, connection })) @@ -332,21 +346,19 @@ impl Transport for QuicTransport { connection_id: ConnectionId, addresses: Vec, ) -> crate::Result<()> { + let num_addresses = addresses.len(); let mut futures: FuturesUnordered<_> = addresses .into_iter() .map(|address| { let keypair = self.context.keypair.clone(); let connection_open_timeout = self.config.connection_open_timeout; + let addr = address.clone(); - async move { - let Ok((socket_address, Some(peer))) = - QuicListener::get_socket_address(&address) - else { - return ( - connection_id, - Err(Error::AddressError(AddressError::PeerIdMissing)), - ); - }; + let future = async move { + let (socket_address, peer) = QuicListener::get_socket_address(&address) + .map_err(DialError::AddressError)?; + let peer = + peer.ok_or_else(|| DialError::AddressError(AddressError::PeerIdMissing))?; let crypto_config = Arc::new(make_client_config(&keypair, Some(peer)).expect("to succeed")); @@ -362,59 +374,58 @@ impl Transport for QuicTransport { SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), Some(Protocol::Ip4(_)) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), - _ => - return ( - connection_id, - Err(Error::AddressError(AddressError::InvalidProtocol)), - ), + _ => return Err(AddressError::InvalidProtocol.into()), }; let client = match Endpoint::client(client_listen_address) { Ok(client) => client, Err(error) => { - return (connection_id, Err(Error::Other(error.to_string()))); + return Err(DialError::from(error)); } }; let connection = match client.connect_with(client_config, socket_address, "l") { Ok(connection) => connection, - Err(error) => { - return (connection_id, Err(Error::Other(error.to_string()))); - } + Err(error) => return Err(DialError::from(error)), }; let connection = match connection.await { Ok(connection) => connection, - Err(error) => return (connection_id, Err(error.into())), + Err(error) => return Err(DialError::from(error)), }; let Some(peer) = Self::extract_peer_id(&connection) else { - return (connection_id, Err(Error::InvalidCertificate)); + return Err(crate::error::NegotiationError::Quic( + QuicError::InvalidCertificate, + ) + .into()); }; - ( - connection_id, - Ok((address, NegotiatedConnection { peer, connection })), - ) - } + Ok(NegotiatedConnection { peer, connection }) + }; + + async move { future.await.map(|ok| (addr.clone(), ok)).map_err(|err| (addr, err)) } }) .collect(); self.pending_raw_connections.push(Box::pin(async move { - while let Some(result) = futures.next().await { - let (connection_id, result) = result; + let mut errors = Vec::with_capacity(num_addresses); + while let Some(result) = futures.next().await { match result { Ok((address, connection)) => return Ok((connection_id, address, connection)), - Err(error) => tracing::debug!( - target: LOG_TARGET, - ?connection_id, - ?error, - "failed to open connection", - ), + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + ?error, + "failed to open connection", + ); + errors.push(error) + } } } - Err(connection_id) + Err((connection_id, errors)) })); Ok(()) @@ -478,9 +489,12 @@ impl Stream for QuicTransport { })); } } - Err(connection_id) => + Err((connection_id, errors)) => if !self.canceled.remove(&connection_id) { - return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id })); + return Poll::Ready(Some(TransportEvent::OpenFailure { + connection_id, + errors, + })); }, } } diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index 3d3511e0..cb752244 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -24,7 +24,7 @@ use crate::{ ed25519::Keypair, noise::{self, NoiseSocket}, }, - error::{Error, NegotiationError}, + error::{Error, NegotiationError, SubstreamError}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -101,7 +101,7 @@ enum ConnectionError { substream_id: Option, /// Error. - error: Error, + error: SubstreamError, }, } @@ -222,7 +222,7 @@ impl TcpConnection { max_write_buffer_size: usize, connection_open_timeout: Duration, substream_open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::debug!( target: LOG_TARGET, ?address, @@ -249,7 +249,7 @@ impl TcpConnection { { Err(_) => { tracing::trace!(target: LOG_TARGET, ?connection_id, "connection timed out during negotiation"); - Err(Error::Timeout) + Err(NegotiationError::Timeout) } Ok(result) => result, } @@ -263,7 +263,7 @@ impl TcpConnection { protocol: ProtocolName, fallback_names: Vec, open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::debug!(target: LOG_TARGET, ?protocol, ?substream_id, "open substream"); let stream = match control.open_stream().await { @@ -278,7 +278,10 @@ impl TcpConnection { ?error, "failed to open substream" ); - return Err(Error::YamuxError(Direction::Outbound(substream_id), error)); + return Err(SubstreamError::YamuxError( + error, + Direction::Outbound(substream_id), + )); } }; @@ -311,7 +314,7 @@ impl TcpConnection { max_write_buffer_size: usize, connection_open_timeout: Duration, substream_open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::debug!(target: LOG_TARGET, ?address, "accept connection"); match tokio::time::timeout(connection_open_timeout, async move { @@ -331,7 +334,7 @@ impl TcpConnection { }) .await { - Err(_) => Err(Error::Timeout), + Err(_) => Err(NegotiationError::Timeout), Ok(result) => result, } } @@ -343,7 +346,7 @@ impl TcpConnection { substream_id: SubstreamId, protocols: Vec, open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?substream_id, @@ -375,7 +378,7 @@ impl TcpConnection { role: &Role, protocols: Vec<&str>, substream_open_timeout: Duration, - ) -> crate::Result<(Negotiated, ProtocolName)> { + ) -> Result<(Negotiated, ProtocolName), NegotiationError> { tracing::trace!(target: LOG_TARGET, ?protocols, "negotiating protocols"); match tokio::time::timeout(substream_open_timeout, async move { @@ -386,10 +389,8 @@ impl TcpConnection { }) .await { - Err(_) => Err(Error::Timeout), - Ok(Err(error)) => Err(Error::NegotiationError( - NegotiationError::MultistreamSelectError(error), - )), + Err(_) => Err(NegotiationError::Timeout), + Ok(Err(error)) => Err(NegotiationError::MultistreamSelectError(error)), Ok(Ok((protocol, socket))) => { tracing::trace!(target: LOG_TARGET, ?protocol, "protocol negotiated"); @@ -410,7 +411,7 @@ impl TcpConnection { max_read_ahead_factor: usize, max_write_buffer_size: usize, substream_open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?role, @@ -442,7 +443,7 @@ impl TcpConnection { if let Some(dialed_peer) = dialed_peer { if dialed_peer != peer { tracing::debug!(target: LOG_TARGET, ?dialed_peer, ?peer, "peer id mismatch"); - return Err(Error::PeerIdMismatch(dialed_peer, peer)); + return Err(NegotiationError::PeerIdMismatch(dialed_peer, peer)); } } @@ -521,7 +522,7 @@ impl TcpConnection { Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { protocol: None, substream_id: None, - error, + error: SubstreamError::NegotiationError(error), }), Err(_) => Err(ConnectionError::Timeout { protocol: None, @@ -561,7 +562,7 @@ impl TcpConnection { let (protocol, substream_id, error) = match error { ConnectionError::Timeout { protocol, substream_id } => { - (protocol, substream_id, Error::Timeout) + (protocol, substream_id, SubstreamError::NegotiationError(NegotiationError::Timeout)) } ConnectionError::FailedToNegotiate { protocol, substream_id, error } => { (protocol, substream_id, error) @@ -717,11 +718,11 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::ProtocolError( crate::multistream_select::ProtocolError::InvalidMessage, ), - ))) => {} + )) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -759,11 +760,11 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::ProtocolError( crate::multistream_select::ProtocolError::InvalidMessage, ), - ))) => {} + )) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -812,9 +813,9 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::Failed, - ))) => {} + )) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -856,9 +857,9 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::Failed, - ))) => {} + )) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -903,7 +904,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -954,7 +955,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -1000,7 +1001,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -1040,7 +1041,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -1095,9 +1096,9 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::Failed, - ))) => {} + )) => {} Err(error) => panic!("{error:?}"), } } @@ -1155,9 +1156,9 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::Failed, - ))) => {} + )) => {} Err(error) => panic!("{error:?}"), } } @@ -1208,7 +1209,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -1265,7 +1266,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 35c8da59..856c9410 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -23,7 +23,7 @@ use crate::{ config::Role, - error::Error, + error::{DialError, Error}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, TcpAddress}, manager::TransportHandle, @@ -91,12 +91,19 @@ pub(crate) struct TcpTransport { pending_inbound_connections: HashMap, /// Pending opening connections. - pending_connections: - FuturesUnordered>>, + pending_connections: FuturesUnordered< + BoxFuture<'static, Result>, + >, /// Pending raw, unnegotiated connections. pending_raw_connections: FuturesUnordered< - BoxFuture<'static, Result<(ConnectionId, Multiaddr, TcpStream), ConnectionId>>, + BoxFuture< + 'static, + Result< + (ConnectionId, Multiaddr, TcpStream), + (ConnectionId, Vec<(Multiaddr, DialError)>), + >, + >, >, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. @@ -145,7 +152,7 @@ impl TcpTransport { substream_open_timeout, ) .await - .map_err(|error| (connection_id, error)) + .map_err(|error| (connection_id, error.into())) })); } @@ -155,8 +162,9 @@ impl TcpTransport { dial_addresses: DialAddresses, connection_open_timeout: Duration, nodelay: bool, - ) -> crate::Result<(Multiaddr, TcpStream)> { + ) -> Result<(Multiaddr, TcpStream), DialError> { let (socket_address, _) = TcpAddress::multiaddr_to_socket_address(&address)?; + let remote_address = match tokio::time::timeout(connection_open_timeout, socket_address.lookup_ip()).await { Err(_) => { @@ -166,9 +174,9 @@ impl TcpTransport { ?connection_open_timeout, "failed to resolve address within timeout", ); - return Err(Error::Timeout); + return Err(DialError::Timeout); } - Ok(Err(error)) => return Err(error), + Ok(Err(error)) => return Err(error.into()), Ok(Ok(address)) => address, }; @@ -225,7 +233,7 @@ impl TcpTransport { ?connection_open_timeout, "failed to connect within timeout", ); - Err(Error::Timeout) + Err(DialError::Timeout) } Ok(Err(error)) => Err(error.into()), Ok(Ok((address, stream))) => { @@ -316,7 +324,7 @@ impl Transport for TcpTransport { substream_open_timeout, ) .await - .map_err(|error| (connection_id, error)) + .map_err(|error| (connection_id, error.into())) })); Ok(()) @@ -383,6 +391,7 @@ impl Transport for TcpTransport { connection_id: ConnectionId, addresses: Vec, ) -> crate::Result<()> { + let num_addresses = addresses.len(); let mut futures: FuturesUnordered<_> = addresses .into_iter() .map(|address| { @@ -392,30 +401,35 @@ impl Transport for TcpTransport { async move { TcpTransport::dial_peer( - address, + address.clone(), dial_addresses, connection_open_timeout, nodelay, ) .await + .map_err(|error| (address, error)) } }) .collect(); self.pending_raw_connections.push(Box::pin(async move { + let mut errors = Vec::with_capacity(num_addresses); while let Some(result) = futures.next().await { match result { Ok((address, stream)) => return Ok((connection_id, address, stream)), - Err(error) => tracing::debug!( - target: LOG_TARGET, - ?connection_id, - ?error, - "failed to open connection", - ), + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + ?error, + "failed to open connection", + ); + errors.push(error) + } } } - Err(connection_id) + Err((connection_id, errors)) })); Ok(()) @@ -459,11 +473,11 @@ impl Transport for TcpTransport { substream_open_timeout, ) .await - .map_err(|error| (connection_id, error)) + .map_err(|error| (connection_id, error.into())) }) .await { - Err(_) => Err((connection_id, Error::Timeout)), + Err(_) => Err((connection_id, DialError::Timeout)), Ok(Err(error)) => Err(error), Ok(Ok(connection)) => Ok(connection), } @@ -528,9 +542,12 @@ impl Stream for TcpTransport { })); } } - Err(connection_id) => + Err((connection_id, errors)) => if !self.canceled.remove(&connection_id) { - return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id })); + return Poll::Ready(Some(TransportEvent::OpenFailure { + connection_id, + errors, + })); }, } } @@ -554,6 +571,8 @@ impl Stream for TcpTransport { address, error, })); + } else { + tracing::debug!(target: LOG_TARGET, ?error, ?connection_id, "Pending inbound connection failed"); } } } @@ -900,7 +919,7 @@ mod tests { assert!(!transport.pending_dials.is_empty()); transport.pending_connections.push(Box::pin(async move { - Err((ConnectionId::from(0usize), Error::Unknown)) + Err((ConnectionId::from(0usize), DialError::Timeout)) })); assert!(std::matches!( diff --git a/src/transport/webrtc/connection.rs b/src/transport/webrtc/connection.rs index d7abfb4a..f31a48b2 100644 --- a/src/transport/webrtc/connection.rs +++ b/src/transport/webrtc/connection.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - error::Error, + error::{Error, ParseError, SubstreamError}, multistream_select::{listener_negotiate, DialerState, HandshakeResult, ListenerSelectResult}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream::Substream, @@ -349,6 +349,7 @@ impl WebRtcConnection { .report_substream_open(self.peer, protocol.clone(), Direction::Inbound, substream) .await .map(|_| (substream_id, handle, permit)) + .map_err(Into::into) } /// Handle data received to an opening outbound channel. @@ -372,7 +373,7 @@ impl WebRtcConnection { data: Vec, mut dialer_state: DialerState, context: ChannelContext, - ) -> crate::Result> { + ) -> Result, SubstreamError> { tracing::trace!( target: LOG_TARGET, peer = ?self.peer, @@ -380,7 +381,12 @@ impl WebRtcConnection { "handle opening outbound substream", ); - let message = WebRtcMessage::decode(&data)?.payload.ok_or(Error::InvalidData)?; + let rtc_message = WebRtcMessage::decode(&data) + .map_err(|err| SubstreamError::NegotiationError(err.into()))?; + let message = rtc_message.payload.ok_or(SubstreamError::NegotiationError( + ParseError::InvalidData.into(), + ))?; + let HandshakeResult::Succeeded(protocol) = dialer_state.register_response(message)? else { tracing::trace!( target: LOG_TARGET, diff --git a/src/transport/webrtc/util.rs b/src/transport/webrtc/util.rs index 951434b8..c9d4d141 100644 --- a/src/transport/webrtc/util.rs +++ b/src/transport/webrtc/util.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{codec::unsigned_varint::UnsignedVarint, error::Error, transport::webrtc::schema}; +use crate::{codec::unsigned_varint::UnsignedVarint, error::ParseError, transport::webrtc::schema}; use prost::Message; use tokio_util::codec::{Decoder, Encoder}; @@ -72,18 +72,21 @@ impl WebRtcMessage { } /// Decode payload into [`WebRtcMessage`]. - pub fn decode(payload: &[u8]) -> crate::Result { + pub fn decode(payload: &[u8]) -> Result { // TODO: set correct size let mut codec = UnsignedVarint::new(None); let mut data = bytes::BytesMut::from(payload); - let result = codec.decode(&mut data)?.ok_or(Error::InvalidData)?; + let result = codec + .decode(&mut data) + .map_err(|_| ParseError::InvalidData)? + .ok_or(ParseError::InvalidData)?; match schema::webrtc::Message::decode(result) { Ok(message) => Ok(Self { payload: message.message, flags: message.flag, }), - Err(_) => Err(Error::InvalidData), + Err(_) => Err(ParseError::InvalidData), } } } diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index 19a3e14f..8c505607 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -24,7 +24,7 @@ use crate::{ ed25519::Keypair, noise::{self, NoiseSocket}, }, - error::Error, + error::{Error, NegotiationError, SubstreamError}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -93,7 +93,7 @@ enum ConnectionError { substream_id: Option, /// Error. - error: Error, + error: SubstreamError, }, } @@ -195,13 +195,14 @@ impl WebSocketConnection { stream: S, role: &Role, protocols: Vec<&str>, - ) -> crate::Result<(Negotiated, ProtocolName)> { + ) -> Result<(Negotiated, ProtocolName), NegotiationError> { tracing::trace!(target: LOG_TARGET, ?protocols, "negotiating protocols"); let (protocol, socket) = match role { - Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await?, - Role::Listener => listener_select_proto(stream, protocols).await?, - }; + Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await, + Role::Listener => listener_select_proto(stream, protocols).await, + } + .map_err(NegotiationError::MultistreamSelectError)?; tracing::trace!(target: LOG_TARGET, ?protocol, "protocol negotiated"); @@ -219,7 +220,7 @@ impl WebSocketConnection { yamux_config: crate::yamux::Config, max_read_ahead_factor: usize, max_write_buffer_size: usize, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?address, @@ -251,11 +252,13 @@ impl WebSocketConnection { yamux_config: crate::yamux::Config, max_read_ahead_factor: usize, max_write_buffer_size: usize, - ) -> crate::Result { + ) -> Result { let stream = MaybeTlsStream::Plain(stream); Self::negotiate_connection( - tokio_tungstenite::accept_async(stream).await?, + tokio_tungstenite::accept_async(stream) + .await + .map_err(NegotiationError::WebSocket)?, None, Role::Listener, address, @@ -279,7 +282,7 @@ impl WebSocketConnection { yamux_config: crate::yamux::Config, max_read_ahead_factor: usize, max_write_buffer_size: usize, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?connection_id, @@ -310,12 +313,11 @@ impl WebSocketConnection { if let Some(dialed_peer) = dialed_peer { if peer != dialed_peer { - return Err(Error::PeerIdMismatch(dialed_peer, peer)); + return Err(NegotiationError::PeerIdMismatch(dialed_peer, peer)); } } let stream: NoiseSocket> = stream; - tracing::trace!(target: LOG_TARGET, "noise handshake done"); // negotiate `yamux` @@ -347,7 +349,7 @@ impl WebSocketConnection { permit: Permit, substream_id: SubstreamId, protocols: Vec, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?substream_id, @@ -379,7 +381,7 @@ impl WebSocketConnection { substream_id: SubstreamId, protocol: ProtocolName, fallback_names: Vec, - ) -> crate::Result { + ) -> Result { tracing::debug!(target: LOG_TARGET, ?protocol, ?substream_id, "open substream"); let stream = match control.open_stream().await { @@ -394,7 +396,10 @@ impl WebSocketConnection { ?error, "failed to open substream" ); - return Err(Error::YamuxError(Direction::Outbound(substream_id), error)); + return Err(SubstreamError::YamuxError( + error, + Direction::Outbound(substream_id), + )); } }; @@ -441,7 +446,7 @@ impl WebSocketConnection { Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { protocol: None, substream_id: None, - error, + error: SubstreamError::NegotiationError(error), }), Err(_) => Err(ConnectionError::Timeout { protocol: None, @@ -481,7 +486,7 @@ impl WebSocketConnection { let (protocol, substream_id, error) = match error { ConnectionError::Timeout { protocol, substream_id } => { - (protocol, substream_id, Error::Timeout) + (protocol, substream_id, SubstreamError::NegotiationError(NegotiationError::Timeout)) } ConnectionError::FailedToNegotiate { protocol, substream_id, error } => { (protocol, substream_id, error) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 75360f31..f7999735 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -22,7 +22,7 @@ use crate::{ config::Role, - error::{AddressError, Error}, + error::{AddressError, Error, NegotiationError}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, WebSocketAddress}, manager::TransportHandle, @@ -33,7 +33,7 @@ use crate::{ Transport, TransportBuilder, TransportEvent, }, types::ConnectionId, - PeerId, + DialError, PeerId, }; use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; @@ -60,24 +60,6 @@ mod substream; pub mod config; -#[derive(Debug)] -pub(super) struct WebSocketError { - /// Error. - error: Error, - - /// Connection ID. - connection_id: Option, -} - -impl WebSocketError { - pub fn new(error: Error, connection_id: Option) -> Self { - Self { - error, - connection_id, - } - } -} - /// Logging target for the file. const LOG_TARGET: &str = "litep2p::websocket"; @@ -110,8 +92,9 @@ pub(crate) struct WebSocketTransport { pending_inbound_connections: HashMap, /// Pending connections. - pending_connections: - FuturesUnordered>>, + pending_connections: FuturesUnordered< + BoxFuture<'static, Result>, + >, /// Pending raw, unnegotiated connections. pending_raw_connections: FuturesUnordered< @@ -123,7 +106,7 @@ pub(crate) struct WebSocketTransport { Multiaddr, WebSocketStream>, ), - ConnectionId, + (ConnectionId, Vec<(Multiaddr, DialError)>), >, >, >, @@ -168,11 +151,11 @@ impl WebSocketTransport { max_write_buffer_size, ) .await - .map_err(|error| WebSocketError::new(error, None)) + .map_err(|error| (connection_id, error.into())) }) .await { - Err(_) => Err(WebSocketError::new(Error::Timeout, None)), + Err(_) => Err((connection_id, DialError::Timeout)), Ok(Err(error)) => Err(error), Ok(Ok(result)) => Ok(result), } @@ -180,31 +163,25 @@ impl WebSocketTransport { } /// Convert `Multiaddr` into `url::Url` - fn multiaddr_into_url(address: Multiaddr) -> crate::Result<(Url, PeerId)> { + fn multiaddr_into_url(address: Multiaddr) -> Result<(Url, PeerId), AddressError> { let mut protocol_stack = address.iter(); - let dial_address = match protocol_stack - .next() - .ok_or_else(|| Error::TransportNotSupported(address.clone()))? - { + let dial_address = match protocol_stack.next().ok_or(AddressError::InvalidProtocol)? { Protocol::Ip4(address) => address.to_string(), Protocol::Ip6(address) => format!("[{address}]"), Protocol::Dns(address) | Protocol::Dns4(address) | Protocol::Dns6(address) => address.to_string(), - _ => return Err(Error::TransportNotSupported(address)), + _ => return Err(AddressError::InvalidProtocol), }; - let url = match protocol_stack - .next() - .ok_or_else(|| Error::TransportNotSupported(address.clone()))? - { + let url = match protocol_stack.next().ok_or(AddressError::InvalidProtocol)? { Protocol::Tcp(port) => match protocol_stack.next() { Some(Protocol::Ws(_)) => format!("ws://{dial_address}:{port}/"), Some(Protocol::Wss(_)) => format!("wss://{dial_address}:{port}/"), - _ => return Err(Error::TransportNotSupported(address.clone())), + _ => return Err(AddressError::InvalidProtocol), }, - _ => return Err(Error::TransportNotSupported(address)), + _ => return Err(AddressError::InvalidProtocol), }; let peer = match protocol_stack.next() { @@ -215,13 +192,15 @@ impl WebSocketTransport { ?protocol, "invalid protocol, expected `Protocol::Ws`/`Protocol::Wss`", ); - return Err(Error::AddressError(AddressError::PeerIdMissing)); + return Err(AddressError::PeerIdMissing); } }; tracing::trace!(target: LOG_TARGET, ?url, "parse address"); - url::Url::parse(&url).map(|url| (url, peer)).map_err(|_| Error::InvalidData) + url::Url::parse(&url) + .map(|url| (url, peer)) + .map_err(|_| AddressError::InvalidUrl) } /// Dial remote peer over `address`. @@ -230,14 +209,14 @@ impl WebSocketTransport { dial_addresses: DialAddresses, connection_open_timeout: Duration, nodelay: bool, - ) -> crate::Result<(Multiaddr, WebSocketStream>)> { + ) -> Result<(Multiaddr, WebSocketStream>), DialError> { let (url, _) = Self::multiaddr_into_url(address.clone())?; let (socket_address, _) = WebSocketAddress::multiaddr_to_socket_address(&address)?; let remote_address = match tokio::time::timeout(connection_open_timeout, socket_address.lookup_ip()).await { - Err(_) => return Err(Error::Timeout), - Ok(Err(error)) => return Err(error), + Err(_) => return Err(DialError::Timeout), + Ok(Err(error)) => return Err(error.into()), Ok(Ok(address)) => address, }; @@ -274,27 +253,26 @@ impl WebSocketTransport { Ok(()) => {} Err(error) if error.raw_os_error() == Some(libc::EINPROGRESS) => {} Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {} - Err(error) => return Err(Error::Other(error.to_string())), + Err(err) => return Err(DialError::from(err)), } - let stream = TcpStream::try_from(Into::::into(socket)) - .map_err(|error| Error::Other(error.to_string()))?; - stream.writable().await.map_err(|error| Error::Other(error.to_string()))?; - - if let Some(error) = - stream.take_error().map_err(|error| Error::Other(error.to_string()))? - { - return Err(Error::Other(error.to_string())); + let stream = TcpStream::try_from(Into::::into(socket))?; + stream.writable().await?; + if let Some(e) = stream.take_error()? { + return Err(DialError::from(e)); } Ok(( address, - tokio_tungstenite::client_async_tls(url, stream).await?.0, + tokio_tungstenite::client_async_tls(url, stream) + .await + .map_err(NegotiationError::WebSocket)? + .0, )) }; match tokio::time::timeout(connection_open_timeout, future).await { - Err(_) => Err(Error::Timeout), + Err(_) => Err(DialError::Timeout), Ok(Err(error)) => Err(error), Ok(Ok((address, stream))) => Ok((address, stream)), } @@ -366,7 +344,7 @@ impl Transport for WebSocketTransport { nodelay, ) .await - .map_err(|error| WebSocketError::new(error, Some(connection_id)))?; + .map_err(|error| (connection_id, error))?; WebSocketConnection::open_connection( connection_id, @@ -380,12 +358,12 @@ impl Transport for WebSocketTransport { max_write_buffer_size, ) .await - .map_err(|error| WebSocketError::new(error, Some(connection_id))) + .map_err(|error| (connection_id, error.into())) }; self.pending_connections.push(Box::pin(async move { match tokio::time::timeout(connection_open_timeout, future).await { - Err(_) => Err(WebSocketError::new(Error::Timeout, Some(connection_id))), + Err(_) => Err((connection_id, DialError::Timeout)), Ok(Err(error)) => Err(error), Ok(Ok(result)) => Ok(result), } @@ -459,6 +437,7 @@ impl Transport for WebSocketTransport { connection_id: ConnectionId, addresses: Vec, ) -> crate::Result<()> { + let num_addresses = addresses.len(); let mut futures: FuturesUnordered<_> = addresses .into_iter() .map(|address| { @@ -468,30 +447,36 @@ impl Transport for WebSocketTransport { async move { WebSocketTransport::dial_peer( - address, + address.clone(), dial_addresses, connection_open_timeout, nodelay, ) .await + .map_err(|error| (address, error)) } }) .collect(); self.pending_raw_connections.push(Box::pin(async move { + let mut errors = Vec::with_capacity(num_addresses); + while let Some(result) = futures.next().await { match result { Ok((address, stream)) => return Ok((connection_id, address, stream)), - Err(error) => tracing::debug!( - target: LOG_TARGET, - ?connection_id, - ?error, - "failed to open connection", - ), + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + ?error, + "failed to open connection", + ); + errors.push(error) + } } } - Err(connection_id) + Err((connection_id, errors)) })); Ok(()) @@ -536,11 +521,11 @@ impl Transport for WebSocketTransport { max_write_buffer_size, ) .await - .map_err(|error| WebSocketError::new(error, Some(connection_id))) + .map_err(|error| (connection_id, error.into())) }) .await { - Err(_) => Err(WebSocketError::new(Error::Timeout, Some(connection_id))), + Err(_) => Err((connection_id, DialError::Timeout)), Ok(Err(error)) => Err(error), Ok(Ok(connection)) => Ok(connection), } @@ -599,9 +584,12 @@ impl Stream for WebSocketTransport { })); } } - Err(connection_id) => + Err((connection_id, errors)) => if !self.canceled.remove(&connection_id) { - return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id })); + return Poll::Ready(Some(TransportEvent::OpenFailure { + connection_id, + errors, + })); }, } } @@ -618,22 +606,17 @@ impl Stream for WebSocketTransport { endpoint, })); } - Err(error) => match error.connection_id { - Some(connection_id) => match self.pending_dials.remove(&connection_id) { - Some(address) => - return Poll::Ready(Some(TransportEvent::DialFailure { - connection_id, - address, - error: error.error, - })), - None => { - tracing::debug!(target: LOG_TARGET, ?error, "failed to establish connection") - } - }, - None => { - tracing::debug!(target: LOG_TARGET, ?error, "failed to establish connection") + Err((connection_id, error)) => { + if let Some(address) = self.pending_dials.remove(&connection_id) { + return Poll::Ready(Some(TransportEvent::DialFailure { + connection_id, + address, + error, + })); + } else { + tracing::debug!(target: LOG_TARGET, ?error, ?connection_id, "Pending inbound connection failed"); } - }, + } } } diff --git a/tests/connection/mod.rs b/tests/connection/mod.rs index 769a161f..8f037e62 100644 --- a/tests/connection/mod.rs +++ b/tests/connection/mod.rs @@ -21,7 +21,7 @@ use litep2p::{ config::ConfigBuilder, crypto::ed25519::Keypair, - error::Error, + error::{DialError, Error, NegotiationError}, protocol::libp2p::ping::{Config as PingConfig, PingEvent}, transport::tcp::config::Config as TcpConfig, Litep2p, Litep2pEvent, PeerId, @@ -366,7 +366,11 @@ async fn connection_timeout(transport: Transport, address: Multiaddr) { assert_eq!(dial_address, address); println!("{error:?}"); - assert!(std::matches!(error, Error::Timeout)); + match error { + DialError::Timeout => {} + DialError::NegotiationError(NegotiationError::Timeout) => {} + _ => panic!("unexpected error {error:?}"), + } } #[cfg(feature = "quic")]