Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use ethrex_blockchain::{
};
use ethrex_common::types::{Block, DEFAULT_BUILDER_GAS_CEIL, Genesis, validate_block_body};
use ethrex_p2p::{
discv4::peer_table::TARGET_PEERS, sync::SyncMode, tx_broadcaster::BROADCAST_INTERVAL_MS,
discv4::{peer_table::TARGET_PEERS, server::INITIAL_LOOKUP_INTERVAL_MS},
sync::SyncMode,
tx_broadcaster::BROADCAST_INTERVAL_MS,
types::Node,
};
use ethrex_rlp::encode::RLPEncode;
Expand Down Expand Up @@ -239,13 +241,21 @@ pub struct Options {
)]
pub tx_broadcasting_time_interval: u64,
#[arg(
long = "target.peers",
long = "p2p.target-peers",
default_value_t = TARGET_PEERS,
value_name = "MAX_PEERS",
help = "Max amount of connected peers.",
help_heading = "P2P options"
)]
pub target_peers: usize,
#[arg(
long = "p2p.lookup-interval",
default_value_t = INITIAL_LOOKUP_INTERVAL_MS,
value_name = "INITIAL_LOOKUP_INTERVAL",
help = "Initial Lookup Time Interval (ms) to trigger each Discovery lookup message and RLPx connection attempt.",
help_heading = "P2P options"
)]
pub lookup_interval: f64,
#[arg(
long = "builder.extra-data",
default_value = get_minimal_client_version(),
Expand Down Expand Up @@ -333,6 +343,7 @@ impl Default for Options {
mempool_max_size: Default::default(),
tx_broadcasting_time_interval: Default::default(),
target_peers: Default::default(),
lookup_interval: Default::default(),
extra_data: get_minimal_client_version(),
gas_limit: DEFAULT_BUILDER_GAS_CEIL,
}
Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ pub async fn init_l1(
get_client_version(),
None,
opts.tx_broadcasting_time_interval,
opts.lookup_interval,
)
.await
.expect("P2P context could not be created");
Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub async fn init_l2(
),
}),
opts.node_opts.tx_broadcasting_time_interval,
opts.node_opts.lookup_interval,
)
.await
.expect("P2P context could not be created");
Expand Down
13 changes: 13 additions & 0 deletions crates/networking/p2p/discv4/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ impl PeerTable {
}
}

/// Return rate of target peers completion
pub async fn target_peers_completion(&mut self) -> Result<f64, PeerTableError> {
match self.handle.call(CallMessage::TargetPeersCompletion).await? {
OutMessage::TargetCompletion(result) => Ok(result),
_ => unreachable!(),
}
}

/// Provide a contact to initiate a connection
pub async fn get_contact_to_initiate(&mut self) -> Result<Option<Contact>, PeerTableError> {
match self.handle.call(CallMessage::GetContactToInitiate).await? {
Expand Down Expand Up @@ -931,6 +939,7 @@ enum CallMessage {
PeerCountByCapabilities { capabilities: Vec<Capability> },
TargetReached,
TargetPeersReached,
TargetPeersCompletion,
GetContactToInitiate,
GetContactForLookup,
GetContactForEnrLookup,
Expand Down Expand Up @@ -961,6 +970,7 @@ pub enum OutMessage {
PeerConnection(Vec<(H256, PeerConnection)>),
Contacts(Vec<Contact>),
TargetReached(bool),
TargetCompletion(f64),
IsNew(bool),
Nodes(Vec<Node>),
Contact(Box<Contact>),
Expand Down Expand Up @@ -1009,6 +1019,9 @@ impl GenServer for PeerTableServer {
CallMessage::TargetPeersReached => CallResponse::Reply(Self::OutMsg::TargetReached(
self.peers.len() >= self.target_peers,
)),
CallMessage::TargetPeersCompletion => CallResponse::Reply(
Self::OutMsg::TargetCompletion(self.peers.len() as f64 / self.target_peers as f64),
),
CallMessage::GetContactToInitiate => CallResponse::Reply(
self.get_contact_to_initiate()
.map(Box::new)
Expand Down
39 changes: 30 additions & 9 deletions crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12
/// The initial interval between peer lookups, until the number of peers reaches
/// [target_peers](DiscoverySideCarState::target_peers), or the number of
/// contacts reaches [target_contacts](DiscoverySideCarState::target_contacts).
pub const INITIAL_LOOKUP_INTERVAL: Duration = Duration::from_millis(100); // 10 per second
pub const LOOKUP_INTERVAL: Duration = Duration::from_millis(600); // 100 per minute
pub const INITIAL_LOOKUP_INTERVAL_MS: f64 = 100.0; // 10 per second
pub const LOOKUP_INTERVAL_MS: f64 = 600.0; // 100 per minute
const CHANGE_FIND_NODE_MESSAGE_INTERVAL: Duration = Duration::from_secs(5);
const PRUNE_INTERVAL: Duration = Duration::from_secs(5);

Expand Down Expand Up @@ -90,6 +90,7 @@ pub struct DiscoveryServer {
/// The last `FindNode` message sent, cached due to message
/// signatures being expensive.
find_node_message: BytesMut,
initial_lookup_interval: f64,
}

impl DiscoveryServer {
Expand All @@ -100,6 +101,7 @@ impl DiscoveryServer {
udp_socket: Arc<UdpSocket>,
mut peer_table: PeerTable,
bootnodes: Vec<Node>,
initial_lookup_interval: f64,
) -> Result<(), DiscoveryServerError> {
info!("Starting Discovery Server");

Expand All @@ -119,6 +121,7 @@ impl DiscoveryServer {
store: storage.clone(),
peer_table: peer_table.clone(),
find_node_message: Self::random_message(&signer),
initial_lookup_interval,
};

info!(count = bootnodes.len(), "Adding bootnodes");
Expand Down Expand Up @@ -274,12 +277,16 @@ impl DiscoveryServer {
}

async fn get_lookup_interval(&mut self) -> Duration {
if !self.peer_table.target_reached().await.unwrap_or(false) {
INITIAL_LOOKUP_INTERVAL
} else {
trace!("Reached target number of peers or contacts. Using longer lookup interval.");
LOOKUP_INTERVAL
}
let peer_completion = self
.peer_table
.target_peers_completion()
.await
.unwrap_or_default();
lookup_interval_function(
peer_completion,
self.initial_lookup_interval,
LOOKUP_INTERVAL_MS,
)
}

async fn enr_lookup(&mut self) -> Result<(), DiscoveryServerError> {
Expand Down Expand Up @@ -346,7 +353,7 @@ impl DiscoveryServer {

self.send(msg, node.udp_addr()).await?;

debug!(sent = "Neighbors", to = %format!("{:#x}", node.public_key));
trace!(sent = "Neighbors", to = %format!("{:#x}", node.public_key));

Ok(())
}
Expand Down Expand Up @@ -790,5 +797,19 @@ impl Discv4Message {
}
}

pub fn lookup_interval_function(progress: f64, lower_limit: f64, upper_limit: f64) -> Duration {
// Smooth progression curve
// See https://easings.net/#easeInOutCubic
let ease_in_out_cubic = if progress < 0.5 {
4.0 * progress.powf(3.0)
} else {
1.0 - ((-2.0 * progress + 2.0).powf(3.0)) / 2.0
};
Duration::from_micros(
// Use `progress` here instead of `ease_in_out_cubic` for a linear function.
(1000f64 * (ease_in_out_cubic * (upper_limit - lower_limit) + lower_limit)).round() as u64,
)
}

// TODO: Reimplement tests removed during snap sync refactor
// https://github.com/lambdaclass/ethrex/issues/4423
4 changes: 4 additions & 0 deletions crates/networking/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct P2PContext {
#[cfg(feature = "l2")]
pub based_context: Option<P2PBasedContext>,
pub tx_broadcaster: GenServerHandle<TxBroadcaster>,
pub initial_lookup_interval: f64,
}

impl P2PContext {
Expand All @@ -60,6 +61,7 @@ impl P2PContext {
client_version: String,
based_context: Option<P2PBasedContext>,
tx_broadcasting_time_interval: u64,
lookup_interval: f64,
) -> Result<Self, NetworkError> {
let (channel_broadcast_send_end, _) = tokio::sync::broadcast::channel::<(
tokio::task::Id,
Expand Down Expand Up @@ -91,6 +93,7 @@ impl P2PContext {
#[cfg(feature = "l2")]
based_context,
tx_broadcaster,
initial_lookup_interval: lookup_interval,
})
}
}
Expand All @@ -117,6 +120,7 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec<Node>) -> Result<
udp_socket.clone(),
context.table.clone(),
bootnodes,
context.initial_lookup_interval,
)
.await
.inspect_err(|e| {
Expand Down
42 changes: 15 additions & 27 deletions crates/networking/p2p/rlpx/initiator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::discv4::server::lookup_interval_function;
use crate::types::Node;
use crate::{
discv4::{
peer_table::PeerTableError,
server::{INITIAL_LOOKUP_INTERVAL, LOOKUP_INTERVAL},
},
discv4::{peer_table::PeerTableError, server::LOOKUP_INTERVAL_MS},
metrics::METRICS,
network::P2PContext,
rlpx::connection::server::PeerConnection,
Expand All @@ -24,25 +22,11 @@ pub enum RLPxInitiatorError {
#[derive(Debug, Clone)]
pub struct RLPxInitiator {
context: P2PContext,

/// The initial interval between peer lookups, until the number of peers
/// reaches [target_peers](RLPxInitiatorState::target_peers).
initial_lookup_interval: Duration,
lookup_interval: Duration,

/// The target number of RLPx connections to reach.
target_peers: u64,
}

impl RLPxInitiator {
pub fn new(context: P2PContext) -> Self {
Self {
context,
// We use the same lookup intervals as Discovery to try to get both process to check at the same rate
initial_lookup_interval: INITIAL_LOOKUP_INTERVAL,
lookup_interval: LOOKUP_INTERVAL,
target_peers: 50,
}
Self { context }
}

pub async fn spawn(context: P2PContext) -> GenServerHandle<RLPxInitiator> {
Expand All @@ -65,15 +49,19 @@ impl RLPxInitiator {
Ok(())
}

// We use the same lookup intervals as Discovery to try to get both process to check at the same rate
async fn get_lookup_interval(&mut self) -> Duration {
let num_peers = self.context.table.peer_count().await.unwrap_or(0) as u64;

if num_peers < self.target_peers {
self.initial_lookup_interval
} else {
debug!("Reached target number of peers. Using longer lookup interval.");
self.lookup_interval
}
let peer_completion = self
.context
.table
.target_peers_completion()
.await
.unwrap_or_default();
lookup_interval_function(
peer_completion,
self.context.initial_lookup_interval,
LOOKUP_INTERVAL_MS,
)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/networking/rpc/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ pub async fn dummy_p2p_context(peer_table: PeerTable) -> P2PContext {
"".to_string(),
None,
1000,
100.0,
)
.await
.unwrap()
Expand Down
13 changes: 9 additions & 4 deletions docs/CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,15 @@ P2P options:
[default: 1000]
--target.peers <MAX_PEERS>
Max amount of connected peers.
[default: 100]
--p2p.target-peers <MAX_PEERS>
Max amount of connected peers.
[default: 100]
--p2p.lookup-interval <INITIAL_LOOKUP_INTERVAL>
Initial Lookup Time Interval (ms) to trigger each Discovery lookup message and RLPx connection attempt.
[default: 100]
RPC options:
--http.addr <ADDRESS>
Expand Down