Skip to content

Commit

Permalink
Limit the number of outbound peer connections (#2944)
Browse files Browse the repository at this point in the history
* Limit the number of outbound connections in the crawler

* Make zebra-network channel bounds depend on config.peerset_initial_target_size

* Bias Zebra towards outbound connections

And turn connection limits into `Config` methods.

* Downgrade some connection logs to debug

* Remove verbose or outdated fields in tracing logs

* Clarify connection limits

Includes:
- `fastmod OUTBOUND_PEER_BIAS_FRACTION OUTBOUND_PEER_BIAS_DENOMINATOR zebra*`
- clarify connection limit documentation

* Clarify inventory channel capacity

* Add zebra_network::initialize tests with limited numbers of peers

* Avoid cooperative async task starvation in the peer crawler and listener

If we don't yield in these loops, they can run for a long time before
tokio forces them to yield.

* Test the crawler with small connection limits

And use the multi-threaded runtime to avoid long hangs.

* Stop using the multi-threaded executor in tests where it's not needed

* Avoid starvation for every connection

Adds yields after inbound successes and initial peer connections.

* Add a crawler peer connection success test

* Add outbound connection limit tests

* Improve outbound tests
  • Loading branch information
teor2345 authored Oct 27, 2021
1 parent 46fb33a commit 3e03d48
Show file tree
Hide file tree
Showing 8 changed files with 783 additions and 51 deletions.
51 changes: 39 additions & 12 deletions zebra-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use serde::{de, Deserialize, Deserializer};

use zebra_chain::{parameters::Network, serialization::canonical_socket_addr};

use crate::BoxError;
use crate::{constants, BoxError};

#[cfg(test)]
mod tests;

/// The number of times Zebra will retry each initial peer's DNS resolution,
/// before checking if any other initial peers have returned addresses.
Expand Down Expand Up @@ -53,6 +56,8 @@ pub struct Config {

/// The initial target size for the peer set.
///
/// Also used to limit the number of inbound and outbound connections made by Zebra.
///
/// If you have a slow network connection, and Zebra is having trouble
/// syncing, try reducing the peer set size. You can also reduce the peer
/// set size to reduce Zebra's bandwidth usage.
Expand All @@ -72,6 +77,38 @@ pub struct Config {
}

impl Config {
/// The maximum number of outbound connections that Zebra will open at the same time.
/// When this limit is reached, Zebra stops opening outbound connections.
///
/// # Security
///
/// This is larger than the inbound connection limit,
/// so Zebra is more likely to be connected to peers that it has selected.
pub fn peerset_outbound_connection_limit(&self) -> usize {
let inbound_limit = self.peerset_inbound_connection_limit();

inbound_limit + inbound_limit / constants::OUTBOUND_PEER_BIAS_DENOMINATOR
}

/// The maximum number of inbound connections that Zebra will accept at the same time.
/// When this limit is reached, Zebra drops new inbound connections without handshaking on them.
pub fn peerset_inbound_connection_limit(&self) -> usize {
self.peerset_initial_target_size
}

/// The maximum number of inbound and outbound connections that Zebra will have at the same time.
pub fn peerset_total_connection_limit(&self) -> usize {
self.peerset_outbound_connection_limit() + self.peerset_inbound_connection_limit()
}

/// Get the initial seed peers based on the configured network.
pub async fn initial_peers(&self) -> HashSet<SocketAddr> {
match self.network {
Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await,
Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await,
}
}

/// Concurrently resolves `peers` into zero or more IP addresses, with a
/// timeout of a few seconds on each DNS request.
///
Expand Down Expand Up @@ -115,14 +152,6 @@ impl Config {
}
}

/// Get the initial seed peers based on the configured network.
pub async fn initial_peers(&self) -> HashSet<SocketAddr> {
match self.network {
Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await,
Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await,
}
}

/// Resolves `host` into zero or more IP addresses, retrying up to
/// `max_retries` times.
///
Expand Down Expand Up @@ -265,6 +294,7 @@ impl<'de> Deserialize<'de> for Config {
}

let config = DConfig::deserialize(deserializer)?;

// TODO: perform listener DNS lookups asynchronously with a timeout (#1631)
let listen_addr = match config.listen_addr.parse::<SocketAddr>() {
Ok(socket) => Ok(socket),
Expand All @@ -287,6 +317,3 @@ impl<'de> Deserialize<'de> for Config {
})
}
}

#[cfg(test)]
mod tests;
18 changes: 18 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@ use zebra_chain::{
serialization::Duration32,
};

/// The fractional bias towards outbound peers in the peer set,
/// if connection limits have been reached.
///
/// Inbound and outbound connections are limited based on
/// [`Config.peerset_initial_target_size`].
///
/// The outbound limit is larger than the inbound limit by:
/// `Config.peerset_initial_target_size / OUTBOUND_PEER_BIAS_DENOMINATOR`.
///
/// # Security
///
/// This bias helps make sure that Zebra is connected to a majority of peers
/// that it has chosen from its [`AddressBook`].
///
/// Inbound peer connections are initiated by the remote peer,
/// so inbound peer selection is not controlled by the local node.
pub const OUTBOUND_PEER_BIAS_DENOMINATOR: usize = 2;

/// The buffer size for the peer set.
///
/// This should be greater than 1 to avoid sender contention, but also reasonably
Expand Down
3 changes: 1 addition & 2 deletions zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ mod error;
mod handshake;

use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
use error::ErrorSlot;

pub use client::Client;
pub use connection::Connection;
pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};
24 changes: 16 additions & 8 deletions zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,29 @@ use super::{ErrorSlot, PeerError, SharedPeerError};

/// The "client" duplex half of a peer connection.
pub struct Client {
// Used to shut down the corresponding heartbeat.
// This is always Some except when we take it on drop.
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot,
/// Used to shut down the corresponding heartbeat.
/// This is always Some except when we take it on drop.
pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,

/// Used to send [`Request`]s to the remote peer.
pub(crate) server_tx: mpsc::Sender<ClientRequest>,

/// A slot for an error shared between the Connection and the Client that uses it.
///
/// `None` unless the connection or client have errored.
pub(crate) error_slot: ErrorSlot,
}

/// A message from the `peer::Client` to the `peer::Server`.
#[derive(Debug)]
pub(super) struct ClientRequest {
/// The actual request.
pub(crate) struct ClientRequest {
/// The actual network request for the peer.
pub request: Request,
/// The return message channel, included because `peer::Client::call` returns a

/// The response [`Message`] channel, included because `peer::Client::call` returns a
/// future that may be moved around before it resolves.
pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,

/// The tracing context for the request, so that work the connection task does
/// processing messages in the context of this request will have correct context.
pub span: tracing::Span,
Expand Down
2 changes: 2 additions & 0 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ pub struct Connection<S, Tx> {
pub(super) client_rx: ClientRequestReceiver,

/// A slot for an error shared between the Connection and the Client that uses it.
//
/// `None` unless the connection or client have errored.
pub(super) error_slot: ErrorSlot,

/// A channel for sending requests to the connected peer.
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub enum PeerError {
/// mutex should be held for as short a time as possible. This avoids blocking
/// the async task thread on acquiring the mutex.
#[derive(Default, Clone)]
pub(super) struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);
pub struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);

impl std::fmt::Debug for ErrorSlot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
90 changes: 69 additions & 21 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// In addition to returning a service for outbound requests, this method
/// returns a shared [`AddressBook`] updated with last-seen timestamps for
/// connected peers.
///
/// # Panics
///
/// If `config.config.peerset_initial_target_size` is zero.
/// (zebra-network expects to be able to connect to at least one peer.)
pub async fn init<S, C>(
config: Config,
inbound_service: S,
Expand All @@ -76,10 +81,25 @@ where
S::Future: Send + 'static,
C: ChainTip + Clone + Send + 'static,
{
// If we want Zebra to operate with no network,
// we should implement a `zebrad` command that doesn't use `zebra-network`.
assert!(
config.peerset_initial_target_size > 0,
"Zebra must be allowed to connect to at least one peer"
);

let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;

let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr);
let (inv_sender, inv_receiver) = broadcast::channel(100);

// Create a broadcast channel for peer inventory advertisements.
// If it reaches capacity, this channel drops older inventory advertisements.
//
// When Zebra is at the chain tip with an up-to-date mempool,
// we expect to have at most 1 new transaction per connected peer,
// and 1-2 new blocks across the entire network.
// (The block syncer and mempool crawler handle bulk fetches of blocks and transactions.)
let (inv_sender, inv_receiver) = broadcast::channel(config.peerset_total_connection_limit());

// Construct services that handle inbound handshakes and perform outbound
// handshakes. These use the same handshake service internally to detect
Expand All @@ -106,10 +126,14 @@ where
)
};

// Create an mpsc channel for peer changes, with a generous buffer.
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
// Create an mpsc channel for peerset demand signaling.
let (mut demand_tx, demand_rx) = mpsc::channel::<MorePeers>(100);
// Create an mpsc channel for peer changes,
// based on the maximum number of inbound and outbound peers.
let (peerset_tx, peerset_rx) =
mpsc::channel::<PeerChange>(config.peerset_total_connection_limit());
// Create an mpsc channel for peerset demand signaling,
// based on the maximum number of outbound peers.
let (mut demand_tx, demand_rx) =
mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit());

// Create a oneshot to send background task JoinHandles to the peer set
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -174,18 +198,19 @@ where
let _ = demand_tx.try_send(MorePeers);
}

let crawl_guard = tokio::spawn(
let crawl_fut = {
let config = config.clone();
crawl_and_dial(
config.crawl_new_peer_interval,
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
)
.instrument(Span::current()),
);
};
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));

handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();

Expand Down Expand Up @@ -294,6 +319,11 @@ where
peerset_tx
.send(handshake_result.map_err(|(_addr, e)| e))
.await?;

// Security: Let other tasks run after each connection is processed.
//
// Avoids remote peers starving other Zebra tasks using initial connection successes or errors.
tokio::task::yield_now().await;
}

let outbound_connections = active_outbound_connections.update_count();
Expand Down Expand Up @@ -406,7 +436,7 @@ where
if let Ok((tcp_stream, addr)) = listener.accept().await {
// The peer already opened a connection, so increment the connection count immediately.
let connection_tracker = active_inbound_connections.track_connection();
info!(
debug!(
inbound_connections = ?active_inbound_connections.update_count(),
"handshaking on an open inbound peer connection"
);
Expand Down Expand Up @@ -449,6 +479,11 @@ where
// but most OSes also limit the number of queued inbound connections on a listener port.
tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await;
}

// Security: Let other tasks run after each connection is processed.
//
// Avoids remote peers starving other Zebra tasks using inbound connection successes or errors.
tokio::task::yield_now().await;
}
}

Expand Down Expand Up @@ -476,9 +511,9 @@ enum CrawlerAction {
/// and connect to new peers, and send the resulting `peer::Client`s through the
/// `peerset_tx` channel.
///
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
/// demand, but no new peers in `candidates`. After crawling, try to connect to
/// one new peer using `outbound_connector`.
/// Crawl for new peers every `config.crawl_new_peer_interval`.
/// Also crawl whenever there is demand, but no new peers in `candidates`.
/// After crawling, try to connect to one new peer using `outbound_connector`.
///
/// If a handshake fails, restore the unused demand signal by sending it to
/// `demand_tx`.
Expand All @@ -487,11 +522,19 @@ enum CrawlerAction {
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
///
/// Uses `active_outbound_connections` to track active outbound connections
/// Uses `active_outbound_connections` to track the number of active outbound connections
/// in both the initial peers and crawler.
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))]
#[instrument(skip(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
))]
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
config: Config,
mut demand_tx: mpsc::Sender<MorePeers>,
mut demand_rx: mpsc::Receiver<MorePeers>,
mut candidates: CandidateSet<S>,
Expand Down Expand Up @@ -530,7 +573,7 @@ where
handshakes.push(future::pending().boxed());

let mut crawl_timer =
tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
tokio::time::interval(config.crawl_new_peer_interval).map(|tick| TimerCrawl { tick });

loop {
metrics::gauge!(
Expand All @@ -548,8 +591,8 @@ where
next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"),
// turn the demand into an action, based on the crawler's current state
_ = demand_rx.next() => {
if handshakes.len() > 50 {
// Too many pending handshakes already
if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
// Too many open connections or pending handshakes already
DemandDrop
} else if let Some(candidate) = candidates.next().await {
// candidates.next has a short delay, and briefly holds the address
Expand All @@ -566,13 +609,13 @@ where
// This is set to trace level because when the peerset is
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many in-flight handshakes, dropping demand signal");
trace!("too many open connections or in-flight handshakes, dropping demand signal");
continue;
}
DemandHandshake { candidate } => {
// Increment the connection count before we spawn the connection.
let outbound_connection_tracker = active_outbound_connections.track_connection();
info!(
debug!(
outbound_connections = ?active_outbound_connections.update_count(),
"opening an outbound peer connection"
);
Expand Down Expand Up @@ -635,6 +678,11 @@ where
let _ = demand_tx.try_send(MorePeers);
}
}

// Security: Let other tasks run after each crawler action is processed.
//
// Avoids remote peers starving other Zebra tasks using outbound connection errors.
tokio::task::yield_now().await;
}
}

Expand Down
Loading

0 comments on commit 3e03d48

Please sign in to comment.