Skip to content

Commit

Permalink
manager: Fix connection limits tracking of rejected connections (#286)
Browse files Browse the repository at this point in the history
This PR ensures that connection IDs are properly tracked to enforce
connection limits.

After ~1/2 days the substrate litep2p running node is starting to reject
multiple pending socket connections.

This is related to the fact that accepting of established connection is
a two-part process:
- step 1. Can we accept the connection based on number of connections?
- step 2. The transport manager transitions the peer state
- step 3. If the transition succeeds, the connection is tracked
(inserted in a hashmap).

If step 2 fails, we were previously leaking the connection ID. The
connection ID is also counting towards the maximum number of connections
the node can sustain.

This PR effectively modifies the connection limits API,
`on_connection_established` is split into two methods:
  - `can_accept_connection()`
  - `accept_established_connection()`

This fixes a subtle memory leak bounded at 10000 Connection IDs.
However, even more important, this fixes connection stability for
long-running nodes.

Discovered during the investigation of:
- #282

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Nov 13, 2024
1 parent e7cb35e commit 40ea4ea
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
42 changes: 29 additions & 13 deletions src/transport/manager/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ impl ConnectionLimits {
}

/// Called when a new connection is established.
pub fn on_connection_established(
///
/// Returns an error if the connection cannot be accepted due to connection limits.
pub fn can_accept_connection(
&mut self,
connection_id: ConnectionId,
is_listener: bool,
) -> Result<(), ConnectionLimitsError> {
// Check connection limits.
Expand All @@ -131,16 +132,27 @@ impl ConnectionLimits {
}
}

// Keep track of the connection.
Ok(())
}

/// Accept an established connection.
///
/// # Note
///
/// This method should be called after the `Self::can_accept_connection` method
/// to ensure that the connection can be accepted.
pub fn accept_established_connection(
&mut self,
connection_id: ConnectionId,
is_listener: bool,
) {
if is_listener {
if self.config.max_incoming_connections.is_some() {
self.incoming_connections.insert(connection_id);
}
} else if self.config.max_outgoing_connections.is_some() {
self.outgoing_connections.insert(connection_id);
}

Ok(())
}

/// Called when a connection is closed.
Expand All @@ -167,35 +179,39 @@ mod tests {
let connection_id_out_1 = ConnectionId::random();
let connection_id_out_2 = ConnectionId::random();
let connection_id_in_3 = ConnectionId::random();
let connection_id_out_3 = ConnectionId::random();

// Establish incoming connection.
assert!(limits.on_connection_established(connection_id_in_1, true).is_ok());
assert!(limits.can_accept_connection(true).is_ok());
limits.accept_established_connection(connection_id_in_1, true);
assert_eq!(limits.incoming_connections.len(), 1);

assert!(limits.on_connection_established(connection_id_in_2, true).is_ok());
assert!(limits.can_accept_connection(true).is_ok());
limits.accept_established_connection(connection_id_in_2, true);
assert_eq!(limits.incoming_connections.len(), 2);

assert!(limits.on_connection_established(connection_id_in_3, true).is_ok());
assert!(limits.can_accept_connection(true).is_ok());
limits.accept_established_connection(connection_id_in_3, true);
assert_eq!(limits.incoming_connections.len(), 3);

assert_eq!(
limits.on_connection_established(ConnectionId::random(), true).unwrap_err(),
limits.can_accept_connection(true).unwrap_err(),
ConnectionLimitsError::MaxIncomingConnectionsExceeded
);
assert_eq!(limits.incoming_connections.len(), 3);

// Establish outgoing connection.
assert!(limits.on_connection_established(connection_id_out_1, false).is_ok());
assert!(limits.can_accept_connection(false).is_ok());
limits.accept_established_connection(connection_id_out_1, false);
assert_eq!(limits.incoming_connections.len(), 3);
assert_eq!(limits.outgoing_connections.len(), 1);

assert!(limits.on_connection_established(connection_id_out_2, false).is_ok());
assert!(limits.can_accept_connection(false).is_ok());
limits.accept_established_connection(connection_id_out_2, false);
assert_eq!(limits.incoming_connections.len(), 3);
assert_eq!(limits.outgoing_connections.len(), 2);

assert_eq!(
limits.on_connection_established(connection_id_out_3, false).unwrap_err(),
limits.can_accept_connection(false).unwrap_err(),
ConnectionLimitsError::MaxOutgoingConnectionsExceeded
);

Expand Down
8 changes: 4 additions & 4 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,7 @@ impl TransportManager {
};

// Reject the connection if exceeded limits.
if let Err(error) = self
.connection_limits
.on_connection_established(endpoint.connection_id(), endpoint.is_listener())
{
if let Err(error) = self.connection_limits.can_accept_connection(endpoint.is_listener()) {
tracing::debug!(
target: LOG_TARGET,
?peer,
Expand Down Expand Up @@ -806,6 +803,9 @@ impl TransportManager {
);

if connection_accepted {
self.connection_limits
.accept_established_connection(endpoint.connection_id(), endpoint.is_listener());

// Cancel all pending dials if the connection was established.
if let PeerState::Opening {
connection_id,
Expand Down

0 comments on commit 40ea4ea

Please sign in to comment.