diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index d69e1603..0cf5e255 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -43,7 +43,7 @@ use multiaddr::{Multiaddr, Protocol}; use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout}; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, pin::Pin, sync::Arc, @@ -120,9 +120,9 @@ pub(crate) struct QuicTransport { /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap, - /// Canceled raw connections. - canceled: HashSet, - + /// Cancel raw connections futures. + /// + /// This is cancelling `Self::pending_raw_connections`. cancel_futures: HashMap, } @@ -235,7 +235,6 @@ impl TransportBuilder for QuicTransport { context, config, listener, - canceled: HashSet::new(), opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), @@ -477,8 +476,11 @@ impl Transport for QuicTransport { /// Cancel opening connections. fn cancel(&mut self, connection_id: ConnectionId) { - self.canceled.insert(connection_id); - self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); + // Cancel the future if it exists. + // State clean-up happens inside the `poll_next`. + if let Some(handle) = self.cancel_futures.get(&connection_id) { + handle.abort(); + } } } @@ -510,27 +512,57 @@ impl Stream for QuicTransport { connection_id, address, stream, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?address, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { self.opened_raw.insert(connection_id, (stream, address.clone())); return Poll::Ready(Some(TransportEvent::ConnectionOpened { connection_id, address, })); - }, + } + } + RawConnectionResult::Failed { connection_id, errors, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?errors, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); - }, + } + } + RawConnectionResult::Canceled { connection_id } => { - self.canceled.remove(&connection_id); + if self.cancel_futures.remove(&connection_id).is_none() { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + "raw cancelled connection without a cancel handle", + ); + } } } } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 4ef52104..ff8c1655 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -46,7 +46,7 @@ use socket2::{Domain, Socket, Type}; use tokio::net::TcpStream; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, net::SocketAddr, pin::Pin, task::{Context, Poll}, @@ -121,9 +121,9 @@ pub(crate) struct TcpTransport { /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap, - /// Canceled raw connections. - canceled: HashSet, - + /// Cancel raw connections futures. + /// + /// This is cancelling `Self::pending_raw_connections`. cancel_futures: HashMap, /// Connections which have been opened and negotiated but are being validated by the @@ -291,7 +291,6 @@ impl TransportBuilder for TcpTransport { config, context, dial_addresses, - canceled: HashSet::new(), opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), @@ -516,8 +515,11 @@ impl Transport for TcpTransport { } fn cancel(&mut self, connection_id: ConnectionId) { - self.canceled.insert(connection_id); - self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); + // Cancel the future if it exists. + // State clean-up happens inside the `poll_next`. + if let Some(handle) = self.cancel_futures.get(&connection_id) { + handle.abort(); + } } } @@ -560,27 +562,56 @@ impl Stream for TcpTransport { connection_id, address, stream, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?address, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { self.opened_raw.insert(connection_id, (stream, address.clone())); return Poll::Ready(Some(TransportEvent::ConnectionOpened { connection_id, address, })); - }, + } + } + RawConnectionResult::Failed { connection_id, errors, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?errors, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); - }, + } + } RawConnectionResult::Canceled { connection_id } => { - self.canceled.remove(&connection_id); + if self.cancel_futures.remove(&connection_id).is_none() { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + "raw cancelled connection without a cancel handle", + ); + } } } } diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index bcf37002..d7b374a9 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -50,7 +50,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use url::Url; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, pin::Pin, task::{Context, Poll}, time::Duration, @@ -125,9 +125,9 @@ pub(crate) struct WebSocketTransport { /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap>, Multiaddr)>, - /// Canceled raw connections. - canceled: HashSet, - + /// Cancel raw connections futures. + /// + /// This is cancelling `Self::pending_raw_connections`. cancel_futures: HashMap, /// Negotiated connections waiting validation. @@ -321,7 +321,6 @@ impl TransportBuilder for WebSocketTransport { config, context, dial_addresses, - canceled: HashSet::new(), opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), @@ -562,8 +561,11 @@ impl Transport for WebSocketTransport { } fn cancel(&mut self, connection_id: ConnectionId) { - self.canceled.insert(connection_id); - self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); + // Cancel the future if it exists. + // State clean-up happens inside the `poll_next`. + if let Some(handle) = self.cancel_futures.get(&connection_id) { + handle.abort(); + } } } @@ -600,27 +602,56 @@ impl Stream for WebSocketTransport { connection_id, address, stream, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?address, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { self.opened_raw.insert(connection_id, (stream, address.clone())); return Poll::Ready(Some(TransportEvent::ConnectionOpened { connection_id, address, })); - }, + } + } + RawConnectionResult::Failed { connection_id, errors, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?errors, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); - }, + } + } RawConnectionResult::Canceled { connection_id } => { - self.canceled.remove(&connection_id); + if self.cancel_futures.remove(&connection_id).is_none() { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + "raw cancelled connection without a cancel handle", + ); + } } } }