Skip to content

Commit

Permalink
fix(l1): send our advertised tcp ports in discovery (#1785)
Browse files Browse the repository at this point in the history
**Motivation**

In discovery, we missed sending our local node TCP port when sending
`ping` and `pong` messages.

**Description**

This pr fixes it by sending our local node configured ports. 


Closes None
  • Loading branch information
MarcosNicolau authored Jan 23, 2025
1 parent 6914df1 commit e340bd9
Showing 1 changed file with 50 additions and 58 deletions.
108 changes: 50 additions & 58 deletions crates/networking/p2p/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ async fn discover_peers(
));

tracker.spawn(peers_revalidation(
udp_addr,
local_node,
udp_socket.clone(),
table.clone(),
signer.clone(),
REVALIDATION_INTERVAL_IN_SECONDS as u64,
));

discovery_startup(
udp_addr,
local_node,
udp_socket.clone(),
table.clone(),
signer.clone(),
Expand Down Expand Up @@ -189,16 +189,22 @@ async fn discover_peers_server(
debug!("Ignoring ping as it is expired.");
continue;
};
let node = Node {
ip: from.ip(),
udp_port: from.port(),
tcp_port: msg.from.tcp_port,
node_id: packet.get_node_id(),
};
let ping_hash = packet.get_hash();
pong(&udp_socket, from, ping_hash, &signer).await;
let node = {
pong(&udp_socket, node, ping_hash, &signer).await;
let peer = {
let table = table.lock().await;
table.get_by_node_id(packet.get_node_id()).cloned()
};
if let Some(peer) = node {
if let Some(peer) = peer {
// send a a ping to get an endpoint proof
if time_since_in_hs(peer.last_ping) >= PROOF_EXPIRATION_IN_HS as u64 {
let hash = ping(&udp_socket, udp_addr, from, &signer).await;
let hash = ping(&udp_socket, local_node, peer.node, &signer).await;
if let Some(hash) = hash {
table
.lock()
Expand All @@ -222,15 +228,9 @@ async fn discover_peers_server(
}
} else {
let mut table = table.lock().await;
let node = Node {
ip: from.ip(),
udp_port: from.port(),
tcp_port: 0,
node_id: packet.get_node_id(),
};
if let (Some(peer), true) = table.insert_node(node) {
// send a ping to get the endpoint proof from our end
let hash = ping(&udp_socket, udp_addr, from, &signer).await;
let hash = ping(&udp_socket, local_node, node, &signer).await;
table.update_peer_ping(peer.node.node_id, hash);
}
}
Expand Down Expand Up @@ -372,10 +372,8 @@ async fn discover_peers_server(
if let Some(nodes) = nodes_to_insert {
for node in nodes {
if let (Some(peer), true) = table.insert_node(node) {
let node_addr =
SocketAddr::new(peer.node.ip, peer.node.udp_port);
let ping_hash =
ping(&udp_socket, udp_addr, node_addr, &signer).await;
ping(&udp_socket, local_node, peer.node, &signer).await;
table.update_peer_ping(peer.node.node_id, ping_hash);
}
}
Expand Down Expand Up @@ -492,22 +490,23 @@ async fn discover_peers_server(
/// currently, since we are not storing nodes, the only way to have startup nodes is by providing
/// an array of bootnodes.
async fn discovery_startup(
udp_addr: SocketAddr,
local_node: Node,
udp_socket: Arc<UdpSocket>,
table: Arc<Mutex<KademliaTable>>,
signer: SigningKey,
bootnodes: Vec<BootNode>,
) {
for bootnode in bootnodes {
table.lock().await.insert_node(Node {
let node = Node {
ip: bootnode.socket_address.ip(),
udp_port: bootnode.socket_address.port(),
// TODO: udp port can differ from tcp port.
// see https://github.com/lambdaclass/ethrex/issues/905
tcp_port: bootnode.socket_address.port(),
node_id: bootnode.node_id,
});
let ping_hash = ping(&udp_socket, udp_addr, bootnode.socket_address, &signer).await;
};
table.lock().await.insert_node(node);
let ping_hash = ping(&udp_socket, local_node, node, &signer).await;
table
.lock()
.await
Expand All @@ -532,7 +531,7 @@ const PROOF_EXPIRATION_IN_HS: usize = 12;
///
/// See more https://github.com/ethereum/devp2p/blob/master/discv4.md#kademlia-table
async fn peers_revalidation(
udp_addr: SocketAddr,
local_node: Node,
udp_socket: Arc<UdpSocket>,
table: Arc<Mutex<KademliaTable>>,
signer: SigningKey,
Expand Down Expand Up @@ -566,13 +565,7 @@ async fn peers_revalidation(
if peer.liveness == 0 {
let new_peer = table.replace_peer(node_id);
if let Some(new_peer) = new_peer {
let ping_hash = ping(
&udp_socket,
udp_addr,
SocketAddr::new(new_peer.node.ip, new_peer.node.udp_port),
&signer,
)
.await;
let ping_hash = ping(&udp_socket, local_node, new_peer.node, &signer).await;
table.update_peer_ping(new_peer.node.node_id, ping_hash);
}
}
Expand All @@ -585,13 +578,7 @@ async fn peers_revalidation(
let peers = table.lock().await.get_least_recently_pinged_peers(3);
previously_pinged_peers = HashSet::default();
for peer in peers {
let ping_hash = ping(
&udp_socket,
udp_addr,
SocketAddr::new(peer.node.ip, peer.node.udp_port),
&signer,
)
.await;
let ping_hash = ping(&udp_socket, local_node, peer.node, &signer).await;
let mut table = table.lock().await;
table.update_peer_ping_with_revalidation(peer.node.node_id, ping_hash);
previously_pinged_peers.insert(peer.node.node_id);
Expand Down Expand Up @@ -792,8 +779,8 @@ fn peers_to_ask_push(peers_to_ask: &mut Vec<Node>, target: H512, node: Node) {
/// an optional hash corresponding to the message header hash to account if the send was successful
async fn ping(
socket: &UdpSocket,
local_addr: SocketAddr,
to_addr: SocketAddr,
local_node: Node,
node: Node,
signer: &SigningKey,
) -> Option<H256> {
let mut buf = Vec::new();
Expand All @@ -803,24 +790,26 @@ async fn ping(
.unwrap_or_default()
.as_secs();

// TODO: this should send our advertised TCP port
let from = Endpoint {
ip: local_addr.ip(),
udp_port: local_addr.port(),
tcp_port: 0,
ip: local_node.ip,
udp_port: local_node.udp_port,
tcp_port: local_node.tcp_port,
};
let to = Endpoint {
ip: to_addr.ip(),
udp_port: to_addr.port(),
tcp_port: 0,
ip: node.ip,
udp_port: node.udp_port,
tcp_port: node.tcp_port,
};

let ping =
discv4::Message::Ping(PingMessage::new(from, to, expiration).with_enr_seq(time_now_unix()));
ping.encode_with_header(&mut buf, signer);

// Send ping and log if error
match socket.send_to(&buf, to_addr).await {
match socket
.send_to(&buf, SocketAddr::new(to.ip, to.udp_port))
.await
{
Ok(bytes_sent) => {
// sanity check to make sure the ping was well sent
// though idk if this is actually needed or if it might break other stuff
Expand Down Expand Up @@ -877,7 +866,7 @@ async fn find_node_and_wait_for_response(
}
}

async fn pong(socket: &UdpSocket, to_addr: SocketAddr, ping_hash: H256, signer: &SigningKey) {
async fn pong(socket: &UdpSocket, node: Node, ping_hash: H256, signer: &SigningKey) {
let mut buf = Vec::new();

let expiration: u64 = (SystemTime::now() + Duration::from_secs(20))
Expand All @@ -886,9 +875,9 @@ async fn pong(socket: &UdpSocket, to_addr: SocketAddr, ping_hash: H256, signer:
.as_secs();

let to = Endpoint {
ip: to_addr.ip(),
udp_port: to_addr.port(),
tcp_port: 0,
ip: node.ip,
udp_port: node.udp_port,
tcp_port: node.tcp_port,
};
let pong: discv4::Message = discv4::Message::Pong(
PongMessage::new(to, ping_hash, expiration).with_enr_seq(time_now_unix()),
Expand All @@ -897,7 +886,10 @@ async fn pong(socket: &UdpSocket, to_addr: SocketAddr, ping_hash: H256, signer:
pong.encode_with_header(&mut buf, signer);

// Send pong and log if error
if let Err(e) = socket.send_to(&buf, to_addr).await {
if let Err(e) = socket
.send_to(&buf, SocketAddr::new(node.ip, node.udp_port))
.await
{
error!("Unable to send pong: {e}")
}
}
Expand Down Expand Up @@ -1132,17 +1124,17 @@ mod tests {
async fn connect_servers(server_a: &mut MockServer, server_b: &mut MockServer) {
let ping_hash = ping(
&server_a.udp_socket,
server_a.addr,
server_b.addr,
server_a.local_node,
server_b.local_node,
&server_a.signer,
)
.await;
{
let mut table = server_a.table.lock().await;
table.insert_node(Node {
ip: server_b.addr.ip(),
udp_port: server_b.addr.port(),
tcp_port: 0,
ip: server_b.local_node.ip,
udp_port: server_b.local_node.udp_port,
tcp_port: server_b.local_node.tcp_port,
node_id: server_b.node_id,
});
table.update_peer_ping(server_b.node_id, ping_hash);
Expand All @@ -1169,7 +1161,7 @@ mod tests {

// start revalidation server
tokio::spawn(peers_revalidation(
server_b.addr,
server_b.local_node,
server_b.udp_socket.clone(),
server_b.table.clone(),
server_b.signer.clone(),
Expand Down Expand Up @@ -1400,8 +1392,8 @@ mod tests {
// and trigger a enr-request to server_b to update the record.
ping(
&server_b.udp_socket,
server_b.addr,
server_a.addr,
server_b.local_node,
server_a.local_node,
&server_b.signer,
)
.await;
Expand Down

0 comments on commit e340bd9

Please sign in to comment.