Skip to content

Commit

Permalink
Limit the number of inbound peer connections (#2961)
Browse files Browse the repository at this point in the history
* Limit open inbound connections based on the config

* Log inbound connection errors at debug level

* Test inbound connection limits

* Use clone directly in function call argument lists

* Remove an outdated comment

* Update tests to use an unbounded channel rather than mem::forget

And rename some variables.

* Use a lower limit in a slow test and require that it is exceeded
  • Loading branch information
teor2345 authored Oct 28, 2021
1 parent 8d01750 commit f26a60b
Show file tree
Hide file tree
Showing 3 changed files with 738 additions and 79 deletions.
71 changes: 43 additions & 28 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ where

// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
let peer_set = PeerSet::new(
&config,
PeakEwmaDiscover::new(
// Discover interprets an error as stream termination,
// so discard any errored connections...
Expand All @@ -162,19 +163,20 @@ where
// Connect peerset_tx to the 3 peer sources:
//
// 1. Incoming peer connections, via a listener.
let listen_guard = tokio::spawn(
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
.instrument(Span::current()),
let listen_fut = accept_inbound_connections(
config.clone(),
tcp_listener,
listen_handshaker,
peerset_tx.clone(),
);
let listen_guard = tokio::spawn(listen_fut.instrument(Span::current()));

// 2. Initial peers, specified in the config.
let initial_peers_fut = {
let config = config.clone();
let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone();
add_initial_peers(config, outbound_connector, peerset_tx)
};

let initial_peers_fut = add_initial_peers(
config.clone(),
outbound_connector.clone(),
peerset_tx.clone(),
);
let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));

// 3. Outgoing peers we connect to in response to load.
Expand Down Expand Up @@ -202,18 +204,15 @@ where
let _ = demand_tx.try_send(MorePeers);
}

let crawl_fut = {
let config = config.clone();
crawl_and_dial(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
)
};
let crawl_fut = crawl_and_dial(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
);
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));

handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();
Expand Down Expand Up @@ -434,8 +433,11 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`peer::Client`] result over `peerset_tx`.
#[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
///
/// Limit the number of active inbound connections based on `config`.
#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
async fn accept_inbound_connections<S>(
config: Config,
listener: TcpListener,
mut handshaker: S,
peerset_tx: mpsc::Sender<PeerChange>,
Expand All @@ -447,8 +449,19 @@ where
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();

loop {
if let Ok((tcp_stream, addr)) = listener.accept().await {
// The peer already opened a connection, so increment the connection count immediately.
let inbound_result = listener.accept().await;
if let Ok((tcp_stream, addr)) = inbound_result {
if active_inbound_connections.update_count()
>= config.peerset_inbound_connection_limit()
{
// Too many open inbound connections or pending handshakes already.
// Close the connection.
std::mem::drop(tcp_stream);
continue;
}

// The peer already opened a connection to us.
// So we want to increment the connection count as soon as possible.
let connection_tracker = active_inbound_connections.track_connection();
debug!(
inbound_connections = ?active_inbound_connections.update_count(),
Expand Down Expand Up @@ -492,6 +505,8 @@ where
// Zebra can't control how many queued connections are waiting,
// but most OSes also limit the number of queued inbound connections on a listener port.
tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await;
} else {
debug!(?inbound_result, "error accepting inbound connection");
}

// Security: Let other tasks run after each connection is processed.
Expand Down Expand Up @@ -536,8 +551,8 @@ enum CrawlerAction {
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
///
/// Uses `active_outbound_connections` to track the number of active outbound connections
/// in both the initial peers and crawler.
/// Uses `active_outbound_connections` to limit the number of active outbound connections
/// across both the initial peers and crawler. The limit is based on `config`.
#[instrument(skip(
config,
demand_tx,
Expand Down Expand Up @@ -606,7 +621,7 @@ where
// turn the demand into an action, based on the crawler's current state
_ = demand_rx.next() => {
if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
// Too many open connections or pending handshakes already
// Too many open outbound connections or pending handshakes already
DemandDrop
} else if let Some(candidate) = candidates.next().await {
// candidates.next has a short delay, and briefly holds the address
Expand Down
Loading

0 comments on commit f26a60b

Please sign in to comment.