Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of outbound peer connections #2944

Merged
merged 16 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions zebra-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use serde::{de, Deserialize, Deserializer};

use zebra_chain::{parameters::Network, serialization::canonical_socket_addr};

use crate::BoxError;
use crate::{constants, BoxError};

#[cfg(test)]
mod tests;

/// The number of times Zebra will retry each initial peer's DNS resolution,
/// before checking if any other initial peers have returned addresses.
Expand Down Expand Up @@ -53,6 +56,8 @@ pub struct Config {

/// The initial target size for the peer set.
///
/// Also used to limit the number of inbound and outbound connections made by Zebra.
///
/// If you have a slow network connection, and Zebra is having trouble
/// syncing, try reducing the peer set size. You can also reduce the peer
/// set size to reduce Zebra's bandwidth usage.
Expand All @@ -72,6 +77,38 @@ pub struct Config {
}

impl Config {
/// The maximum number of outbound connections that Zebra will open at the same time.
/// When this limit is reached, Zebra stops opening outbound connections.
///
/// # Security
///
/// This is larger than the inbound connection limit,
/// so Zebra is more likely to be connected to peers that it has selected.
pub fn peerset_outbound_connection_limit(&self) -> usize {
let inbound_limit = self.peerset_inbound_connection_limit();

inbound_limit + inbound_limit / constants::OUTBOUND_PEER_BIAS_DENOMINATOR
}

/// The maximum number of inbound connections that Zebra will accept at the same time.
/// When this limit is reached, Zebra drops new inbound connections without handshaking on them.
pub fn peerset_inbound_connection_limit(&self) -> usize {
self.peerset_initial_target_size
}

/// The maximum number of inbound and outbound connections that Zebra will have at the same time.
pub fn peerset_total_connection_limit(&self) -> usize {
self.peerset_outbound_connection_limit() + self.peerset_inbound_connection_limit()
}

/// Get the initial seed peers based on the configured network.
pub async fn initial_peers(&self) -> HashSet<SocketAddr> {
match self.network {
Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await,
Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await,
}
}

/// Concurrently resolves `peers` into zero or more IP addresses, with a
/// timeout of a few seconds on each DNS request.
///
Expand Down Expand Up @@ -115,14 +152,6 @@ impl Config {
}
}

/// Get the initial seed peers based on the configured network.
pub async fn initial_peers(&self) -> HashSet<SocketAddr> {
match self.network {
Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await,
Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await,
}
}

/// Resolves `host` into zero or more IP addresses, retrying up to
/// `max_retries` times.
///
Expand Down Expand Up @@ -265,6 +294,7 @@ impl<'de> Deserialize<'de> for Config {
}

let config = DConfig::deserialize(deserializer)?;

// TODO: perform listener DNS lookups asynchronously with a timeout (#1631)
let listen_addr = match config.listen_addr.parse::<SocketAddr>() {
Ok(socket) => Ok(socket),
Expand All @@ -287,6 +317,3 @@ impl<'de> Deserialize<'de> for Config {
})
}
}

#[cfg(test)]
mod tests;
18 changes: 18 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@ use zebra_chain::{
serialization::Duration32,
};

/// The fractional bias towards outbound peers in the peer set,
/// if connection limits have been reached.
///
/// Inbound and outbound connections are limited based on
/// [`Config.peerset_initial_target_size`].
///
/// The outbound limit is larger than the inbound limit by:
/// `Config.peerset_initial_target_size / OUTBOUND_PEER_BIAS_DENOMINATOR`.
///
/// # Security
///
/// This bias helps make sure that Zebra is connected to a majority of peers
/// that it has chosen from its [`AddressBook`].
///
/// Inbound peer connections are initiated by the remote peer,
/// so inbound peer selection is not controlled by the local node.
pub const OUTBOUND_PEER_BIAS_DENOMINATOR: usize = 2;

/// The buffer size for the peer set.
///
/// This should be greater than 1 to avoid sender contention, but also reasonably
Expand Down
3 changes: 1 addition & 2 deletions zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ mod error;
mod handshake;

use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
use error::ErrorSlot;

pub use client::Client;
pub use connection::Connection;
pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};
24 changes: 16 additions & 8 deletions zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,29 @@ use super::{ErrorSlot, PeerError, SharedPeerError};

/// The "client" duplex half of a peer connection.
pub struct Client {
// Used to shut down the corresponding heartbeat.
// This is always Some except when we take it on drop.
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot,
/// Used to shut down the corresponding heartbeat.
/// This is always Some except when we take it on drop.
pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,

/// Used to send [`Request`]s to the remote peer.
pub(crate) server_tx: mpsc::Sender<ClientRequest>,

/// A slot for an error shared between the Connection and the Client that uses it.
///
/// `None` unless the connection or client have errored.
pub(crate) error_slot: ErrorSlot,
}

/// A message from the `peer::Client` to the `peer::Server`.
#[derive(Debug)]
pub(super) struct ClientRequest {
/// The actual request.
pub(crate) struct ClientRequest {
/// The actual network request for the peer.
pub request: Request,
/// The return message channel, included because `peer::Client::call` returns a

/// The response [`Message`] channel, included because `peer::Client::call` returns a
/// future that may be moved around before it resolves.
pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,

/// The tracing context for the request, so that work the connection task does
/// processing messages in the context of this request will have correct context.
pub span: tracing::Span,
Expand Down
2 changes: 2 additions & 0 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ pub struct Connection<S, Tx> {
pub(super) client_rx: ClientRequestReceiver,

/// A slot for an error shared between the Connection and the Client that uses it.
//
/// `None` unless the connection or client have errored.
pub(super) error_slot: ErrorSlot,

/// A channel for sending requests to the connected peer.
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub enum PeerError {
/// mutex should be held for as short a time as possible. This avoids blocking
/// the async task thread on acquiring the mutex.
#[derive(Default, Clone)]
pub(super) struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);
pub struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);

impl std::fmt::Debug for ErrorSlot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
90 changes: 69 additions & 21 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// In addition to returning a service for outbound requests, this method
/// returns a shared [`AddressBook`] updated with last-seen timestamps for
/// connected peers.
///
/// # Panics
///
/// If `config.config.peerset_initial_target_size` is zero.
/// (zebra-network expects to be able to connect to at least one peer.)
pub async fn init<S, C>(
config: Config,
inbound_service: S,
Expand All @@ -76,10 +81,25 @@ where
S::Future: Send + 'static,
C: ChainTip + Clone + Send + 'static,
{
// If we want Zebra to operate with no network,
// we should implement a `zebrad` command that doesn't use `zebra-network`.
assert!(
config.peerset_initial_target_size > 0,
"Zebra must be allowed to connect to at least one peer"
);

let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;

let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr);
let (inv_sender, inv_receiver) = broadcast::channel(100);

// Create a broadcast channel for peer inventory advertisements.
// If it reaches capacity, this channel drops older inventory advertisements.
//
// When Zebra is at the chain tip with an up-to-date mempool,
// we expect to have at most 1 new transaction per connected peer,
// and 1-2 new blocks across the entire network.
// (The block syncer and mempool crawler handle bulk fetches of blocks and transactions.)
let (inv_sender, inv_receiver) = broadcast::channel(config.peerset_total_connection_limit());
jvff marked this conversation as resolved.
Show resolved Hide resolved

// Construct services that handle inbound handshakes and perform outbound
// handshakes. These use the same handshake service internally to detect
Expand All @@ -106,10 +126,14 @@ where
)
};

// Create an mpsc channel for peer changes, with a generous buffer.
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
// Create an mpsc channel for peerset demand signaling.
let (mut demand_tx, demand_rx) = mpsc::channel::<MorePeers>(100);
// Create an mpsc channel for peer changes,
// based on the maximum number of inbound and outbound peers.
let (peerset_tx, peerset_rx) =
mpsc::channel::<PeerChange>(config.peerset_total_connection_limit());
// Create an mpsc channel for peerset demand signaling,
// based on the maximum number of outbound peers.
let (mut demand_tx, demand_rx) =
mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit());

// Create a oneshot to send background task JoinHandles to the peer set
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -174,18 +198,19 @@ where
let _ = demand_tx.try_send(MorePeers);
}

let crawl_guard = tokio::spawn(
let crawl_fut = {
let config = config.clone();
crawl_and_dial(
config.crawl_new_peer_interval,
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
)
.instrument(Span::current()),
);
};
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));

handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();

Expand Down Expand Up @@ -294,6 +319,11 @@ where
peerset_tx
.send(handshake_result.map_err(|(_addr, e)| e))
.await?;

// Security: Let other tasks run after each connection is processed.
//
// Avoids remote peers starving other Zebra tasks using initial connection successes or errors.
tokio::task::yield_now().await;
}

let outbound_connections = active_outbound_connections.update_count();
Expand Down Expand Up @@ -406,7 +436,7 @@ where
if let Ok((tcp_stream, addr)) = listener.accept().await {
// The peer already opened a connection, so increment the connection count immediately.
let connection_tracker = active_inbound_connections.track_connection();
info!(
debug!(
inbound_connections = ?active_inbound_connections.update_count(),
"handshaking on an open inbound peer connection"
);
Expand Down Expand Up @@ -449,6 +479,11 @@ where
// but most OSes also limit the number of queued inbound connections on a listener port.
tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await;
}

// Security: Let other tasks run after each connection is processed.
//
// Avoids remote peers starving other Zebra tasks using inbound connection successes or errors.
tokio::task::yield_now().await;
}
}

Expand Down Expand Up @@ -476,9 +511,9 @@ enum CrawlerAction {
/// and connect to new peers, and send the resulting `peer::Client`s through the
/// `peerset_tx` channel.
///
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
/// demand, but no new peers in `candidates`. After crawling, try to connect to
/// one new peer using `outbound_connector`.
/// Crawl for new peers every `config.crawl_new_peer_interval`.
/// Also crawl whenever there is demand, but no new peers in `candidates`.
/// After crawling, try to connect to one new peer using `outbound_connector`.
///
/// If a handshake fails, restore the unused demand signal by sending it to
/// `demand_tx`.
Expand All @@ -487,11 +522,19 @@ enum CrawlerAction {
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
///
/// Uses `active_outbound_connections` to track active outbound connections
/// Uses `active_outbound_connections` to track the number of active outbound connections
/// in both the initial peers and crawler.
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))]
#[instrument(skip(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
))]
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
config: Config,
mut demand_tx: mpsc::Sender<MorePeers>,
mut demand_rx: mpsc::Receiver<MorePeers>,
mut candidates: CandidateSet<S>,
Expand Down Expand Up @@ -530,7 +573,7 @@ where
handshakes.push(future::pending().boxed());

let mut crawl_timer =
tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
tokio::time::interval(config.crawl_new_peer_interval).map(|tick| TimerCrawl { tick });

loop {
metrics::gauge!(
Expand All @@ -548,8 +591,8 @@ where
next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"),
// turn the demand into an action, based on the crawler's current state
_ = demand_rx.next() => {
if handshakes.len() > 50 {
// Too many pending handshakes already
if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
// Too many open connections or pending handshakes already
DemandDrop
} else if let Some(candidate) = candidates.next().await {
// candidates.next has a short delay, and briefly holds the address
Expand All @@ -566,13 +609,13 @@ where
// This is set to trace level because when the peerset is
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many in-flight handshakes, dropping demand signal");
trace!("too many open connections or in-flight handshakes, dropping demand signal");
continue;
}
DemandHandshake { candidate } => {
// Increment the connection count before we spawn the connection.
let outbound_connection_tracker = active_outbound_connections.track_connection();
info!(
debug!(
outbound_connections = ?active_outbound_connections.update_count(),
"opening an outbound peer connection"
);
Expand Down Expand Up @@ -635,6 +678,11 @@ where
let _ = demand_tx.try_send(MorePeers);
}
}

// Security: Let other tasks run after each crawler action is processed.
//
// Avoids remote peers starving other Zebra tasks using outbound connection errors.
tokio::task::yield_now().await;
}
}

Expand Down
Loading