Skip to content

Commit

Permalink
Limit the number of initial peers (#2913)
Browse files Browse the repository at this point in the history
* limit the number of initial peers

* Move more code out of zebra_network::initialize

* Always limit the number of initial peers in the Config

This way, we can never get the unused peers out.

* Revert "Always limit the number of initial peers in the Config"

This reverts commit 81ede59.

Actually, this doesn't work, because we want those extra peers.

* Minor tweaks

Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
Co-authored-by: teor <teor@riseup.net>
  • Loading branch information
3 people authored Oct 21, 2021
1 parent 4cdd12e commit 2de93bb
Showing 1 changed file with 36 additions and 12 deletions.
48 changes: 36 additions & 12 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Portions of this submodule were adapted from tower-balance,
// which is (c) 2019 Tower Contributors (MIT licensed).

use std::{net::SocketAddr, sync::Arc};
use std::{collections::HashSet, net::SocketAddr, sync::Arc};

use futures::{
channel::mpsc,
Expand All @@ -12,6 +12,7 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
TryFutureExt,
};
use rand::seq::SliceRandom;
use tokio::{net::TcpListener, sync::broadcast, time::Instant};
use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
Expand Down Expand Up @@ -141,11 +142,7 @@ where
let config = config.clone();
let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone();
async move {
let initial_peers = config.initial_peers().await;
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
}
.boxed()
async move { add_initial_peers(&config, outbound_connector, peerset_tx).await }.boxed()
};

let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));
Expand Down Expand Up @@ -192,11 +189,11 @@ where
(peer_set, address_book)
}

/// Use the provided `handshaker` to connect to `initial_peers`, then send
/// the results over `peerset_tx`.
#[instrument(skip(initial_peers, outbound_connector, peerset_tx))]
/// Use the provided `outbound_connector` to connect to the configured initial peers,
/// then send the resulting peer connections over `peerset_tx`.
#[instrument(skip(config, outbound_connector, peerset_tx))]
async fn add_initial_peers<S>(
initial_peers: std::collections::HashSet<SocketAddr>,
config: &Config,
outbound_connector: S,
mut peerset_tx: mpsc::Sender<PeerChange>,
) -> Result<ActiveConnectionCounter, BoxError>
Expand All @@ -208,14 +205,15 @@ where
> + Clone,
S::Future: Send + 'static,
{
let initial_peer_count = initial_peers.len();
let initial_peers = limit_initial_peers(config).await;

let mut handshake_success_total: usize = 0;
let mut handshake_error_total: usize = 0;

let mut active_outbound_connections = ActiveConnectionCounter::new_counter();

info!(
?initial_peer_count,
initial_peer_count = ?initial_peers.len(),
?initial_peers,
"connecting to initial peer set"
);
Expand Down Expand Up @@ -286,6 +284,32 @@ where
Ok(active_outbound_connections)
}

/// Limit the number of `initial_peers` addresses entries to the configured
/// `peerset_initial_target_size`.
///
/// The result is randomly chosen entries from the provided set of addresses.
async fn limit_initial_peers(config: &Config) -> HashSet<SocketAddr> {
let initial_peers = config.initial_peers().await;
let initial_peer_count = initial_peers.len();

// Limit the number of initial peers to `config.peerset_initial_target_size`
if initial_peer_count > config.peerset_initial_target_size {
info!(
"Limiting the initial peers list from {} to {}",
initial_peer_count, config.peerset_initial_target_size
);
}

let initial_peers_vect: Vec<SocketAddr> = initial_peers.iter().copied().collect();

// TODO: add unused peers to the AddressBook (#2931)
// https://docs.rs/rand/0.8.4/rand/seq/trait.SliceRandom.html#tymethod.partial_shuffle
initial_peers_vect
.choose_multiple(&mut rand::thread_rng(), config.peerset_initial_target_size)
.copied()
.collect()
}

/// Open a peer connection listener on `config.listen_addr`,
/// returning the opened [`TcpListener`], and the address it is bound to.
///
Expand Down

0 comments on commit 2de93bb

Please sign in to comment.