Skip to content

Commit

Permalink
tcp/websocket/quic: Fix cancel memory leak (#272)
Browse files Browse the repository at this point in the history
This fixes a bug in the TCP and Websocket transports that was leaking
memory for:
- `canceled: HashSet<ConnectionId>` - leak since the beginning of
litep2p
- `cancel_futures: HashMap<ConnectionId, AbortHandle>` added in unmerged
#255


The memory leak is happening in the following scenarios:
- T0: transport manager: dials K (parallelism factor = 8) addresses on
TCP and WebSocket on ConnectionId=1
- T1: TCP: establishes a connection with the peer ConnectionId=1
- T2: WebSocket: establishes a connection with the peer ConnectionId=1
- T3: transport manager: receives TCP establishment event and cancels
`WebSocket` dials
 
The issue happens when T2 finishes before T3.
In this situation, the WebSocket transport no longer has a future with a
corresponding ConnectionId=1.
The canceling method simply inserts ConnectionId=1 into a hashset This
leads to the hashset growing over time, without a way to clean-up stale
connection IDs.


The fix relies on the changes added in
#255:
- `cancel_futures` maps a connection ID to an abort handle
- the `cancel_futures` is guaranteed to contain a connection ID that
corresponds to an unfinished `pending_raw_connections` future
- the cancel method just aborts the in-flight future, if it exists
- state of the `cancel_futures` is done when polling
`pending_raw_connections`

### Testing Done

I used a custom-patched version of litep2p to log the number of pending
dials. After a few hours, the pending dials for both TCP and WebSocket
connections stabilized at just a few. (Same as
#271).

```
2024-10-22 17:37:56.252  INFO tokio-runtime-worker litep2p::tcp: status pending_dials=1 pending_inbound_connections=0 pending_connections=1 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0
2024-10-22 17:38:26.252  INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=1 opened_raw=0 cancel_futures=1 pending_open=0
2024-10-22 17:38:56.253  INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0
2024-10-22 17:39:26.253  INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0
2024-10-22 17:39:56.252  INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0
2024-10-22 17:40:26.252  INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=1 opened_raw=0 cancel_futures=1 pending_open=0
2024-10-22 17:40:56.252  INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0
2024-10-22 17:41:26.252  INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0
```


Build on: #255

Closes: #270

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
  • Loading branch information
lexnv and dmitry-markin authored Oct 30, 2024
1 parent d07c455 commit c0fef8d
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 42 deletions.
60 changes: 46 additions & 14 deletions src/transport/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use multiaddr::{Multiaddr, Protocol};
use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout};

use std::{
collections::{HashMap, HashSet},
collections::HashMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -120,9 +120,9 @@ pub(crate) struct QuicTransport {
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
opened_raw: HashMap<ConnectionId, (NegotiatedConnection, Multiaddr)>,

/// Canceled raw connections.
canceled: HashSet<ConnectionId>,

/// Cancel raw connections futures.
///
/// This is cancelling `Self::pending_raw_connections`.
cancel_futures: HashMap<ConnectionId, AbortHandle>,
}

Expand Down Expand Up @@ -235,7 +235,6 @@ impl TransportBuilder for QuicTransport {
context,
config,
listener,
canceled: HashSet::new(),
opened_raw: HashMap::new(),
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
Expand Down Expand Up @@ -477,8 +476,11 @@ impl Transport for QuicTransport {

/// Cancel opening connections.
fn cancel(&mut self, connection_id: ConnectionId) {
self.canceled.insert(connection_id);
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
// Cancel the future if it exists.
// State clean-up happens inside the `poll_next`.
if let Some(handle) = self.cancel_futures.get(&connection_id) {
handle.abort();
}
}
}

Expand Down Expand Up @@ -510,27 +512,57 @@ impl Stream for QuicTransport {
connection_id,
address,
stream,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?address,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
self.opened_raw.insert(connection_id, (stream, address.clone()));

return Poll::Ready(Some(TransportEvent::ConnectionOpened {
connection_id,
address,
}));
},
}
}

RawConnectionResult::Failed {
connection_id,
errors,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?errors,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
return Poll::Ready(Some(TransportEvent::OpenFailure {
connection_id,
errors,
}));
},
}
}

RawConnectionResult::Canceled { connection_id } => {
self.canceled.remove(&connection_id);
if self.cancel_futures.remove(&connection_id).is_none() {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
"raw cancelled connection without a cancel handle",
);
}
}
}
}
Expand Down
59 changes: 45 additions & 14 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use socket2::{Domain, Socket, Type};
use tokio::net::TcpStream;

use std::{
collections::{HashMap, HashSet},
collections::HashMap,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -121,9 +121,9 @@ pub(crate) struct TcpTransport {
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
opened_raw: HashMap<ConnectionId, (TcpStream, Multiaddr)>,

/// Canceled raw connections.
canceled: HashSet<ConnectionId>,

/// Cancel raw connections futures.
///
/// This is cancelling `Self::pending_raw_connections`.
cancel_futures: HashMap<ConnectionId, AbortHandle>,

/// Connections which have been opened and negotiated but are being validated by the
Expand Down Expand Up @@ -291,7 +291,6 @@ impl TransportBuilder for TcpTransport {
config,
context,
dial_addresses,
canceled: HashSet::new(),
opened_raw: HashMap::new(),
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
Expand Down Expand Up @@ -516,8 +515,11 @@ impl Transport for TcpTransport {
}

fn cancel(&mut self, connection_id: ConnectionId) {
self.canceled.insert(connection_id);
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
// Cancel the future if it exists.
// State clean-up happens inside the `poll_next`.
if let Some(handle) = self.cancel_futures.get(&connection_id) {
handle.abort();
}
}
}

Expand Down Expand Up @@ -560,27 +562,56 @@ impl Stream for TcpTransport {
connection_id,
address,
stream,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?address,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
self.opened_raw.insert(connection_id, (stream, address.clone()));

return Poll::Ready(Some(TransportEvent::ConnectionOpened {
connection_id,
address,
}));
},
}
}

RawConnectionResult::Failed {
connection_id,
errors,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?errors,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
return Poll::Ready(Some(TransportEvent::OpenFailure {
connection_id,
errors,
}));
},
}
}
RawConnectionResult::Canceled { connection_id } => {
self.canceled.remove(&connection_id);
if self.cancel_futures.remove(&connection_id).is_none() {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
"raw cancelled connection without a cancel handle",
);
}
}
}
}
Expand Down
59 changes: 45 additions & 14 deletions src/transport/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use url::Url;

use std::{
collections::{HashMap, HashSet},
collections::HashMap,
pin::Pin,
task::{Context, Poll},
time::Duration,
Expand Down Expand Up @@ -125,9 +125,9 @@ pub(crate) struct WebSocketTransport {
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
opened_raw: HashMap<ConnectionId, (WebSocketStream<MaybeTlsStream<TcpStream>>, Multiaddr)>,

/// Canceled raw connections.
canceled: HashSet<ConnectionId>,

/// Cancel raw connections futures.
///
/// This is cancelling `Self::pending_raw_connections`.
cancel_futures: HashMap<ConnectionId, AbortHandle>,

/// Negotiated connections waiting validation.
Expand Down Expand Up @@ -321,7 +321,6 @@ impl TransportBuilder for WebSocketTransport {
config,
context,
dial_addresses,
canceled: HashSet::new(),
opened_raw: HashMap::new(),
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
Expand Down Expand Up @@ -562,8 +561,11 @@ impl Transport for WebSocketTransport {
}

fn cancel(&mut self, connection_id: ConnectionId) {
self.canceled.insert(connection_id);
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
// Cancel the future if it exists.
// State clean-up happens inside the `poll_next`.
if let Some(handle) = self.cancel_futures.get(&connection_id) {
handle.abort();
}
}
}

Expand Down Expand Up @@ -600,27 +602,56 @@ impl Stream for WebSocketTransport {
connection_id,
address,
stream,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?address,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
self.opened_raw.insert(connection_id, (stream, address.clone()));

return Poll::Ready(Some(TransportEvent::ConnectionOpened {
connection_id,
address,
}));
},
}
}

RawConnectionResult::Failed {
connection_id,
errors,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?errors,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
return Poll::Ready(Some(TransportEvent::OpenFailure {
connection_id,
errors,
}));
},
}
}
RawConnectionResult::Canceled { connection_id } => {
self.canceled.remove(&connection_id);
if self.cancel_futures.remove(&connection_id).is_none() {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
"raw cancelled connection without a cancel handle",
);
}
}
}
}
Expand Down

0 comments on commit c0fef8d

Please sign in to comment.