Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of inbound peer connections #2961

Merged
merged 9 commits into from
Oct 28, 2021
71 changes: 43 additions & 28 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ where

// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
let peer_set = PeerSet::new(
&config,
PeakEwmaDiscover::new(
// Discover interprets an error as stream termination,
// so discard any errored connections...
Expand All @@ -162,19 +163,20 @@ where
// Connect peerset_tx to the 3 peer sources:
//
// 1. Incoming peer connections, via a listener.
let listen_guard = tokio::spawn(
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
.instrument(Span::current()),
let listen_fut = accept_inbound_connections(
config.clone(),
tcp_listener,
listen_handshaker,
peerset_tx.clone(),
);
let listen_guard = tokio::spawn(listen_fut.instrument(Span::current()));

// 2. Initial peers, specified in the config.
let initial_peers_fut = {
let config = config.clone();
let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone();
add_initial_peers(config, outbound_connector, peerset_tx)
};

let initial_peers_fut = add_initial_peers(
config.clone(),
outbound_connector.clone(),
peerset_tx.clone(),
);
let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));

// 3. Outgoing peers we connect to in response to load.
Expand Down Expand Up @@ -202,18 +204,15 @@ where
let _ = demand_tx.try_send(MorePeers);
}

let crawl_fut = {
let config = config.clone();
crawl_and_dial(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
)
};
let crawl_fut = crawl_and_dial(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
);
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));

handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();
Expand Down Expand Up @@ -434,8 +433,11 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`peer::Client`] result over `peerset_tx`.
#[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
///
/// Limit the number of active inbound connections based on `config`.
#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
async fn accept_inbound_connections<S>(
config: Config,
listener: TcpListener,
mut handshaker: S,
peerset_tx: mpsc::Sender<PeerChange>,
Expand All @@ -447,8 +449,19 @@ where
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();

loop {
if let Ok((tcp_stream, addr)) = listener.accept().await {
// The peer already opened a connection, so increment the connection count immediately.
let inbound_result = listener.accept().await;
if let Ok((tcp_stream, addr)) = inbound_result {
if active_inbound_connections.update_count()
>= config.peerset_inbound_connection_limit()
{
// Too many open inbound connections or pending handshakes already.
// Close the connection.
std::mem::drop(tcp_stream);
continue;
}

// The peer already opened a connection to us.
// So we want to increment the connection count as soon as possible.
let connection_tracker = active_inbound_connections.track_connection();
debug!(
inbound_connections = ?active_inbound_connections.update_count(),
Expand Down Expand Up @@ -492,6 +505,8 @@ where
// Zebra can't control how many queued connections are waiting,
// but most OSes also limit the number of queued inbound connections on a listener port.
tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await;
} else {
debug!(?inbound_result, "error accepting inbound connection");
}

// Security: Let other tasks run after each connection is processed.
Expand Down Expand Up @@ -536,8 +551,8 @@ enum CrawlerAction {
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
///
/// Uses `active_outbound_connections` to track the number of active outbound connections
/// in both the initial peers and crawler.
/// Uses `active_outbound_connections` to limit the number of active outbound connections
/// across both the initial peers and crawler. The limit is based on `config`.
#[instrument(skip(
config,
demand_tx,
Expand Down Expand Up @@ -606,7 +621,7 @@ where
// turn the demand into an action, based on the crawler's current state
_ = demand_rx.next() => {
if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
// Too many open connections or pending handshakes already
// Too many open outbound connections or pending handshakes already
DemandDrop
} else if let Some(candidate) = candidates.next().await {
// candidates.next has a short delay, and briefly holds the address
Expand Down
Loading