From e50661c1a4073ca5a8f96a3d8d1fda656fb38178 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 17 Jan 2024 16:32:44 +1100 Subject: [PATCH 1/7] Simplify the code, name changes group handler API --- Cargo.toml | 2 - src/error.rs | 11 ++ src/handler/mod.rs | 147 ++++++++++-------- .../{nat_hole_punch/utils.rs => nat.rs} | 14 +- src/handler/nat_hole_punch/error.rs | 14 -- src/handler/nat_hole_punch/mod.rs | 50 ------ src/handler/tests.rs | 18 +-- src/service.rs | 63 ++++---- 8 files changed, 138 insertions(+), 181 deletions(-) rename src/handler/{nat_hole_punch/utils.rs => nat.rs} (94%) delete mode 100644 src/handler/nat_hole_punch/error.rs delete mode 100644 src/handler/nat_hole_punch/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 4d911c3b4..351c7d62d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,9 +38,7 @@ lru = {version = "0.7.1", default-features = false } hashlink = "0.7.0" delay_map = "0.3.0" more-asserts = "0.2.2" -thiserror = "1.0.40" derive_more = { version = "0.99.17", default-features = false, features = ["from", "display", "deref", "deref_mut"] } -async-trait = "0.1.74" [dev-dependencies] rand_07 = { package = "rand", version = "0.7" } diff --git a/src/error.rs b/src/error.rs index c6323dffb..3afd77502 100644 --- a/src/error.rs +++ b/src/error.rs @@ -51,6 +51,17 @@ pub enum Discv5Error { Io(std::io::Error), } +/// An error occurred whilst attempting to hole punch NAT. +#[derive(Debug)] +pub enum NatError { + /// Initiator error. + Initiator(Discv5Error), + /// Relayer error. + Relay(Discv5Error), + /// Target error. + Target(Discv5Error), +} + macro_rules! impl_from_variant { ($(<$($generic: ident,)+>)*, $from_type: ty, $to_type: ty, $variant: path) => { impl$(<$($generic,)+>)* From<$from_type> for $to_type { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 0d108ea25..3618ebf06 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -29,7 +29,7 @@ use crate::{ config::Discv5Config, discv5::PERMIT_BAN_LIST, - error::{Discv5Error, RequestError}, + error::{Discv5Error, RequestError, NatError}, packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind, ProtocolIdentity}, rpc::{ Message, Payload, RelayInitNotification, RelayMsgNotification, Request, RequestBody, @@ -59,7 +59,7 @@ use tracing::{debug, error, trace, warn}; mod active_requests; mod crypto; -mod nat_hole_punch; +mod nat; mod request_call; mod session; mod tests; @@ -69,7 +69,7 @@ pub use crate::node_info::{NodeAddress, NodeContact}; use crate::{lru_time_cache::LruTimeCache, socket::ListenConfig}; use active_requests::ActiveRequests; -use nat_hole_punch::{Error as NatError, HolePunchNat, NatUtils}; +use nat::Nat; use request_call::RequestCall; use session::Session; @@ -85,7 +85,6 @@ const ONE_TIME_SESSION_CACHE_CAPACITY: usize = 100; /// Messages sent from the application layer to `Handler`. #[derive(Debug, Clone, PartialEq, Eq)] -#[allow(clippy::large_enum_variant)] pub enum HandlerIn { /// A Request to send to a `NodeContact` has been received from the application layer. A /// `NodeContact` is an abstract type that allows for either an ENR to be sent or a `Raw` type @@ -108,15 +107,9 @@ pub enum HandlerIn { /// response back to the `NodeAddress` from which the request was received. Response(NodeAddress, Box), - /// A Random packet has been received and we have requested the application layer to inform - /// us what the highest known ENR is for this node. - /// The `WhoAreYouRef` is sent out in the `HandlerOut::WhoAreYou` event and should - /// be returned here to submit the application's response. - WhoAreYou(WhoAreYouRef, Option), - - /// A response to a [`HandlerOut::FindHolePunchEnr`]. Returns the ENR and the - /// [`RelayInitNotification`] from [`HandlerOut::FindHolePunchEnr`]. - HolePunchEnr(Enr, RelayInitNotification), + /// The application layer is responding with an ENR to a `RequestEnr` request. This function + /// returns the requested data and optionally and ENR if one is found. + EnrResponse(Option, EnrRequestData), /// Observed socket has been update. The old socket and the current socket. SocketUpdate(Option, SocketAddr), @@ -138,19 +131,15 @@ pub enum HandlerOut { /// A Response has been received from a node on the network. Response(NodeAddress, Box), - /// An unknown source has requested information from us. Return the reference with the known - /// ENR of this node (if known). See the `HandlerIn::WhoAreYou` variant. - WhoAreYou(WhoAreYouRef), + /// We need to request the ENR of a specific node. This could be due to an unknown ENR or a + /// hole punch request. + RequestEnr(EnrRequestData), /// An RPC request failed. /// /// This returns the request ID and an error indicating why the request failed. RequestFailed(RequestId, RequestError), - /// Look-up an ENR in k-buckets. Passes the node id of the peer to look up and the - /// [`RelayMsgNotification`] we intend to send to it. - FindHolePunchEnr(RelayInitNotification), - /// Triggers a ping to all peers, outside of the regular ping interval. Needed to trigger /// renewed session establishment after updating the local ENR from unreachable to reachable /// and clearing all sessions. Only this way does the local node have a chance to make it into @@ -170,6 +159,19 @@ pub enum ConnectionDirection { Outgoing, } +/// The kind of request data being sent to the service. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EnrRequestData { + /// A Random packet has been received and request the application layer to inform + /// us what the highest known ENR is for this node. + /// The `WhoAreYouRef` is sent out in the `HandlerOut::WhoAreYou` event and should + /// be returned here to submit the application's response. + WhoAreYou(WhoAreYouRef), + /// Look-up an ENR in k-buckets. Passes the node id of the peer to look up and the + /// [`RelayMsgNotification`] we intend to send to it. + Nat(RelayInitNotification), +} + /// A reference for the application layer to send back when the handler requests any known /// ENR for the NodeContact. #[derive(Debug, Clone, PartialEq, Eq)] @@ -233,8 +235,8 @@ pub struct Handler { socket: Socket, /// Exit channel to shutdown the handler. exit: oneshot::Receiver<()>, - /// Types necessary to plug in nat hole punching. - nat_utils: NatUtils, + /// Struct to handle nat hole punching logic. + nat: Nat, } type HandlerReturn = ( @@ -319,7 +321,7 @@ impl Handler { let sessions = LruTimeCache::new(session_timeout, Some(session_cache_capacity)); - let nat_utils = NatUtils::new( + let nat = Nat::new( &listen_sockets, &enr.read(), ip_mode, @@ -350,7 +352,7 @@ impl Handler { service_send, listen_sockets, socket, - nat_utils, + nat, exit, }; debug!("Handler Starting"); @@ -378,15 +380,19 @@ impl Handler { } } HandlerIn::Response(dst, response) => self.send_response::

(dst, *response).await, - HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge::

(wru_ref, enr).await, - HandlerIn::HolePunchEnr(tgt_enr, relay_init) => { + HandlerIn::EnrResponse(enr, EnrRequestData::WhoAreYou(wru_ref)) => self.send_challenge::

(wru_ref, enr).await, + HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiator)) => { // Assemble the notification for the target - let (inr_enr, _tgt, timed_out_nonce) = relay_init.into(); - let relay_msg_notif = RelayMsgNotification::new(inr_enr, timed_out_nonce); - if let Err(e) = self.send_relay_msg_notif::

(tgt_enr, relay_msg_notif).await { - warn!("Failed to relay. Error: {}", e); + let (initiator_enr, _target, timed_out_nonce) = relay_initiator.into(); + let relay_msg_notification = RelayMsgNotification::new(initiator_enr, timed_out_nonce); + if let Err(e) = self.send_relay_msg_notification::

(target_enr, relay_msg_notification).await { + warn!("Failed to relay. Error: {:?}", e); } } + HandlerIn::EnrResponse(_,_) => {} // This handles the case that No ENR was + // found for a target relayer. This + // message never gets sent, so it is + // ignored. HandlerIn::SocketUpdate(old_socket, socket) => { let ip = socket.ip(); let port = socket.port(); @@ -408,7 +414,7 @@ impl Handler { warn!("Failed to inform that request failed {}", e); } } - self.nat_utils.set_is_behind_nat(&self.listen_sockets, Some(ip), Some(port)); + self.nat.set_is_behind_nat(&self.listen_sockets, Some(ip), Some(port)); } } } @@ -423,14 +429,14 @@ impl Handler { // challenge. We process them here self.send_next_request::

(node_address).await; } - Some(Ok(peer_socket)) = self.nat_utils.hole_punch_tracker.next() => { - if self.nat_utils.is_behind_nat == Some(false) { + Some(Ok(peer_socket)) = self.nat.hole_punch_tracker.next() => { + if self.nat.is_behind_nat == Some(false) { // Until ip voting is done and an observed public address is finalised, all nodes act as // if they are behind a NAT. return; } if let Err(e) = self.on_hole_punch_expired(peer_socket).await { - warn!("Failed to keep hole punched for peer, error: {}", e); + warn!("Failed to keep hole punched for peer, error: {:?}", e); } } _ = banned_nodes_check.tick() => self.unban_nodes_check(), // Unban nodes that are past the timeout @@ -539,7 +545,7 @@ impl Handler { if request_call.retries() >= self.request_retries { trace!("Request timed out with {}", node_address); if let Some(relay) = self - .nat_utils + .nat .new_peer_latest_relay_cache .pop(&node_address.node_id) { @@ -836,7 +842,7 @@ impl Handler { ConnectionDirection::Incoming }; - enr_not_reachable = NatUtils::is_enr_reachable(&enr); + enr_not_reachable = Nat::is_enr_reachable(&enr); // We already know the ENR. Send the handshake response packet trace!("Sending Authentication response to node: {}", node_address); @@ -928,11 +934,11 @@ impl Handler { // Keep count of the unreachable Sessions we are tracking // Peer is reachable - let enr_not_reachable = !NatUtils::is_enr_reachable(&most_recent_enr); + let enr_not_reachable = !Nat::is_enr_reachable(&most_recent_enr); // Decide whether to establish this connection based on our appetite for unreachable if enr_not_reachable - && Some(self.sessions.tagged()) >= self.nat_utils.unreachable_enr_limit + && Some(self.sessions.tagged()) >= self.nat.unreachable_enr_limit { debug!("Reached limit of unreachable ENR sessions. Avoiding a new connection. Limit: {}", self.sessions.tagged()); return; @@ -962,7 +968,7 @@ impl Handler { ) .await; self.new_session(node_address.clone(), session, enr_not_reachable); - self.nat_utils + self.nat .new_peer_latest_relay_cache .pop(&node_address.node_id); self.handle_message::

( @@ -1137,14 +1143,14 @@ impl Handler { warn!("peer {node_address} tried to initiate hole punch attempt for another node {initiator_node_id}, banning peer {node_address}"); self.fail_session(&node_address, RequestError::MaliciousRelayInit, true) .await; - let ban_timeout = self.nat_utils.ban_duration.map(|v| Instant::now() + v); + let ban_timeout = self.nat.ban_duration.map(|v| Instant::now() + v); PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); - } else if let Err(e) = self.on_relay_init(notification).await { - warn!("failed handling notification to relay for {node_address}, {e}"); + } else if let Err(e) = self.on_relay_initiator(notification).await { + warn!("failed handling notification to relay for {node_address}, {:?}", e); } } Message::RelayMsgNotification(notification) => { - match self.nat_utils.is_behind_nat { + match self.nat.is_behind_nat { Some(false) => { // inr may not be malicious and initiated a hole punch attempt when // a request to this node timed out for another reason @@ -1152,7 +1158,7 @@ impl Handler { } _ => { if let Err(e) = self.on_relay_msg::

(notification).await { - warn!("failed handling notification relayed from {node_address}, {e}"); + warn!("failed handling notification relayed from {node_address}, {:?}", e); } } } @@ -1205,7 +1211,7 @@ impl Handler { let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); if let Err(e) = self .service_send - .send(HandlerOut::WhoAreYou(whoareyou_ref)) + .send(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(whoareyou_ref))) .await { warn!("Failed to send WhoAreYou to the service {}", e) @@ -1251,7 +1257,7 @@ impl Handler { let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); if let Err(e) = self .service_send - .send(HandlerOut::WhoAreYou(whoareyou_ref)) + .send(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(whoareyou_ref))) .await { warn!( @@ -1333,14 +1339,14 @@ impl Handler { // extra responses if let ResponseBody::Nodes { total, ref nodes } = response.body { for node in nodes { - if let Some(socket_addr) = self.nat_utils.ip_mode.get_contactable_addr(node) { + if let Some(socket_addr) = self.nat.ip_mode.get_contactable_addr(node) { let node_id = node.node_id(); let new_peer_node_address = NodeAddress { socket_addr, node_id, }; if self.sessions.peek(&new_peer_node_address).is_none() { - self.nat_utils + self.nat .new_peer_latest_relay_cache .put(node_id, node_address.clone()); } @@ -1472,7 +1478,7 @@ impl Handler { } } let node_address = request_call.contact().node_address(); - self.nat_utils + self.nat .new_peer_latest_relay_cache .pop(&node_address.node_id); self.fail_session(&node_address, error, remove_session) @@ -1492,7 +1498,7 @@ impl Handler { .active_sessions .store(self.sessions.len(), Ordering::Relaxed); // stop keeping hole punched for peer - self.nat_utils.untrack(&node_address.socket_addr); + self.nat.untrack(&node_address.socket_addr); } if let Some(to_remove) = self.pending_requests.remove(node_address) { for PendingRequest { request_id, .. } in to_remove { @@ -1529,7 +1535,7 @@ impl Handler { if let Err(e) = self.socket.send.send(packet).await { warn!("Failed to send outbound packet {}", e) } - self.nat_utils.track(dst); + self.nat.track(dst); } /// Check if any banned nodes have served their time and unban them. @@ -1580,8 +1586,12 @@ fn most_recent_enr(first: Option, second: Option) -> Result { } } -#[async_trait::async_trait] -impl HolePunchNat for Handler { + +// NAT-related functions +impl Handler { + /// A request times out. Should trigger the initiation of a hole punch attempt, given a + /// transitive route to the target exists. Sends a RELAYINIT notification to the given + /// relay. async fn on_request_time_out( &mut self, relay: NodeAddress, @@ -1624,11 +1634,13 @@ impl HolePunchNat for Handler { Ok(()) } - async fn on_relay_init(&mut self, relay_init: RelayInitNotification) -> Result<(), NatError> { + /// A RelayInit notification is received over discv5 indicating this node is the relay. Should + /// trigger sending a RelayMsg to the target. + async fn on_relay_initiator(&mut self, relay_initiator: RelayInitNotification) -> Result<(), NatError> { // Check for target peer in our kbuckets otherwise drop notification. if let Err(e) = self .service_send - .send(HandlerOut::FindHolePunchEnr(relay_init)) + .send(HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiator))) .await { return Err(NatError::Relay(e.into())); @@ -1636,13 +1648,15 @@ impl HolePunchNat for Handler { Ok(()) } + /// A RelayMsg notification is received over discv5 indicating this node is the target. Should + /// trigger a WHOAREYOU to be sent to the initiator using the `nonce` in the RelayMsg. async fn on_relay_msg( &mut self, relay_msg: RelayMsgNotification, ) -> Result<(), NatError> { let (inr_enr, timed_out_msg_nonce) = relay_msg.into(); let initiator_node_address = - match NodeContact::try_from_enr(inr_enr, self.nat_utils.ip_mode) { + match NodeContact::try_from_enr(inr_enr, self.nat.ip_mode) { Ok(contact) => contact.node_address(), Err(e) => return Err(NatError::Target(e.into())), }; @@ -1670,31 +1684,32 @@ impl HolePunchNat for Handler { Ok(()) } - async fn send_relay_msg_notif( + /// Send a RELAYMSG notification. + async fn send_relay_msg_notification( &mut self, - tgt_enr: Enr, - relay_msg_notif: RelayMsgNotification, + target_enr: Enr, + relay_msg_notification: RelayMsgNotification, ) -> Result<(), NatError> { - let tgt_node_address = match NodeContact::try_from_enr(tgt_enr, self.nat_utils.ip_mode) { + let target_node_address = match NodeContact::try_from_enr(target_enr, self.nat.ip_mode) { Ok(contact) => contact.node_address(), Err(e) => return Err(NatError::Relay(e.into())), }; - if let Some(session) = self.sessions.get_mut(&tgt_node_address) { + if let Some(session) = self.sessions.get_mut(&target_node_address) { trace!( - "Sending notif to target {}. relay msg: {}", - tgt_node_address.node_id, - relay_msg_notif, + "Sending notification to target {}. relay msg: {}", + target_node_address.node_id, + relay_msg_notification, ); // Encrypt the notification and send let packet = match session - .encrypt_session_message::

(self.node_id, &relay_msg_notif.encode()) + .encrypt_session_message::

(self.node_id, &relay_msg_notification.encode()) { Ok(packet) => packet, Err(e) => { return Err(NatError::Relay(e)); } }; - self.send(tgt_node_address, packet).await; + self.send(target_node_address, packet).await; Ok(()) } else { // Either the session is being established or has expired. We simply drop the diff --git a/src/handler/nat_hole_punch/utils.rs b/src/handler/nat.rs similarity index 94% rename from src/handler/nat_hole_punch/utils.rs rename to src/handler/nat.rs index 5674fb238..b9e859f65 100644 --- a/src/handler/nat_hole_punch/utils.rs +++ b/src/handler/nat.rs @@ -19,7 +19,7 @@ pub const PORT_BIND_TRIES: usize = 4; pub const USER_AND_DYNAMIC_PORTS: RangeInclusive = 1025..=u16::MAX; /// Aggregates types necessary to implement nat hole punching for [`crate::handler::Handler`]. -pub struct NatUtils { +pub struct Nat { /// Ip mode as set in config. pub ip_mode: IpMode, /// This node has been observed to be behind a NAT. @@ -42,7 +42,7 @@ pub struct NatUtils { pub unreachable_enr_limit: Option, } -impl NatUtils { +impl Nat { pub fn new( listen_sockets: &[SocketAddr], local_enr: &Enr, @@ -52,7 +52,7 @@ impl NatUtils { session_cache_capacity: usize, unreachable_enr_limit: Option, ) -> Self { - let mut nat_hole_puncher = NatUtils { + let mut nat = Nat { ip_mode, is_behind_nat: None, new_peer_latest_relay_cache: LruCache::new(session_cache_capacity), @@ -70,17 +70,17 @@ impl NatUtils { local_enr.udp6(), ) { (Some(ip), port, _, _) => { - nat_hole_puncher.set_is_behind_nat(listen_sockets, Some(ip.into()), port); + nat.set_is_behind_nat(listen_sockets, Some(ip.into()), port); } (_, _, Some(ip6), port) => { - nat_hole_puncher.set_is_behind_nat(listen_sockets, Some(ip6.into()), port); + nat.set_is_behind_nat(listen_sockets, Some(ip6.into()), port); } (None, Some(port), _, _) | (_, _, None, Some(port)) => { - nat_hole_puncher.set_is_behind_nat(listen_sockets, None, Some(port)); + nat.set_is_behind_nat(listen_sockets, None, Some(port)); } (None, None, None, None) => {} } - nat_hole_puncher + nat } pub fn track(&mut self, peer_socket: SocketAddr) { diff --git a/src/handler/nat_hole_punch/error.rs b/src/handler/nat_hole_punch/error.rs deleted file mode 100644 index 58d48b522..000000000 --- a/src/handler/nat_hole_punch/error.rs +++ /dev/null @@ -1,14 +0,0 @@ -use thiserror::Error; - -use crate::Discv5Error; - -/// An error occurred whilst attempting to hole punch NAT. -#[derive(Debug, Error)] -pub enum Error { - #[error("NAT error, failed as initiator of a hole punch attempt, {0}")] - Initiator(Discv5Error), - #[error("NAT error, failed as relay of a hole punch attempt, {0}")] - Relay(Discv5Error), - #[error("NAT error, failed as target of a hole punch attempt, {0}")] - Target(Discv5Error), -} diff --git a/src/handler/nat_hole_punch/mod.rs b/src/handler/nat_hole_punch/mod.rs deleted file mode 100644 index e5b03a6fb..000000000 --- a/src/handler/nat_hole_punch/mod.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::net::SocketAddr; - -use crate::{ - node_info::NodeAddress, - packet::MessageNonce, - rpc::{RelayInitNotification, RelayMsgNotification}, - Enr, ProtocolIdentity, -}; - -mod error; -mod utils; - -pub use error::Error; -pub use utils::NatUtils; - -#[async_trait::async_trait] -pub trait HolePunchNat { - /// A request times out. Should trigger the initiation of a hole punch attempt, given a - /// transitive route to the target exists. Sends a RELAYINIT notification to the given - /// relay. - async fn on_request_time_out( - &mut self, - relay: NodeAddress, - local_enr: Enr, // initiator-enr - timed_out_nonce: MessageNonce, - target_node_address: NodeAddress, - ) -> Result<(), Error>; - - /// A RelayInit notification is received over discv5 indicating this node is the relay. Should - /// trigger sending a RelayMsg to the target. - async fn on_relay_init(&mut self, relay_init: RelayInitNotification) -> Result<(), Error>; - - /// A RelayMsg notification is received over discv5 indicating this node is the target. Should - /// trigger a WHOAREYOU to be sent to the initiator using the `nonce` in the RelayMsg. - async fn on_relay_msg( - &mut self, - relay_msg: RelayMsgNotification, - ) -> Result<(), Error>; - - /// Send a RELAYMSG notification. - async fn send_relay_msg_notif( - &mut self, - tgt_enr: Enr, - relay_msg_notif: RelayMsgNotification, - ) -> Result<(), Error>; - - /// A hole punched for a peer closes. Should trigger an empty packet to be sent to the - /// peer to keep it open. - async fn on_hole_punch_expired(&mut self, peer: SocketAddr) -> Result<(), Error>; -} diff --git a/src/handler/tests.rs b/src/handler/tests.rs index b82099010..75f5f7203 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -75,7 +75,7 @@ async fn build_handler_with_listen_config( let (service_send, handler_recv) = mpsc::channel(50); let (exit_tx, exit) = oneshot::channel(); - let nat_utils = NatUtils::new( + let nat = Nat::new( &listen_sockets, &enr, config.listen_config.ip_mode(), @@ -107,7 +107,7 @@ async fn build_handler_with_listen_config( service_send, listen_sockets, socket, - nat_utils, + nat, exit, }, MockService { @@ -191,9 +191,9 @@ async fn simple_session_message() { loop { if let Some(message) = receiver_recv.recv().await { match message { - HandlerOut::WhoAreYou(wru_ref) => { + HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref)) => { let _ = - recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone()))); + recv_send.send(HandlerIn::EnrResponse(Some(sender_enr.clone()), EnrRequestData::WhoAreYou(wru_ref))); } HandlerOut::Request(_, request) => { assert_eq!(request, send_message); @@ -307,8 +307,8 @@ async fn multiple_messages() { let receiver = async move { loop { match receiver_handler.recv().await { - Some(HandlerOut::WhoAreYou(wru_ref)) => { - let _ = recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone()))); + Some(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref))) => { + let _ = recv_send.send(HandlerIn::EnrResponse(Some(sender_enr.clone()), EnrRequestData::WhoAreYou(wru_ref))); } Some(HandlerOut::Request(addr, request)) => { assert_eq!(request, recv_send_message); @@ -551,8 +551,8 @@ async fn nat_hole_punch_relay() { let mock_service_handle = tokio::spawn(async move { let service_msg = rx.recv().await.expect("should receive service message"); match service_msg { - HandlerOut::FindHolePunchEnr(relay_init) => tx - .send(HandlerIn::HolePunchEnr(tgt_enr_clone, relay_init)) + HandlerOut::RequestEnr(EnrRequestData::Nat(relay_init)) => tx + .send(HandlerIn::EnrResponse(Some(tgt_enr_clone), EnrRequestData::Nat(relay_init))) .expect("should send message to handler"), _ => panic!("service message should be 'find hole punch enr'"), } @@ -641,7 +641,7 @@ async fn nat_hole_punch_target() { build_handler_with_listen_config::(listen_config).await; let tgt_addr = handler.enr.read().udp4_socket().unwrap().into(); let tgt_node_id = handler.enr.read().node_id(); - handler.nat_utils.is_behind_nat = Some(true); + handler.nat.is_behind_nat = Some(true); // Relay let relay_enr = { diff --git a/src/service.rs b/src/service.rs index 783d175ca..3e3ecc1ac 100644 --- a/src/service.rs +++ b/src/service.rs @@ -19,7 +19,7 @@ use self::{ }; use crate::{ error::{RequestError, ResponseError}, - handler::{Handler, HandlerIn, HandlerOut}, + handler::{Handler, HandlerIn, HandlerOut, EnrRequestData}, kbucket::{ self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable, NodeStatus, UpdateResult, MAX_NODES_PER_BUCKET, @@ -387,71 +387,68 @@ impl Service { HandlerOut::Response(node_address, response) => { self.handle_rpc_response(node_address, *response); } - HandlerOut::WhoAreYou(whoareyou_ref) => { + HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(whoareyou_ref)) => { // check what our latest known ENR is for this node. if let Some(known_enr) = self.find_enr(&whoareyou_ref.0.node_id) { - if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, Some(known_enr))) { + if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(known_enr), EnrRequestData::WhoAreYou(whoareyou_ref))) { warn!("Failed to send whoareyou {}", e); }; } else { // do not know of this peer debug!("NodeId unknown, requesting ENR. {}", whoareyou_ref.0); - if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, None)) { + if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(None, EnrRequestData::WhoAreYou(whoareyou_ref))) { warn!("Failed to send who are you to unknown enr peer {}", e); } } } - HandlerOut::RequestFailed(request_id, error) => { - if let RequestError::Timeout = error { - debug!("RPC Request timed out. id: {}", request_id); - } else { - warn!("RPC Request failed: id: {}, error {:?}", request_id, error); - } - self.rpc_failure(request_id, error); - } - HandlerOut::FindHolePunchEnr(relay_init) => { - // update initiator's enr if it's in kbuckets - let inr_enr = relay_init.initiator_enr(); - let inr_key = kbucket::Key::from(inr_enr.node_id()); - match self.kbuckets.write().entry(&inr_key) { + HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiator)) => { + // Update initiator's Enr if it's in kbuckets + let initiator_enr = relay_initiator.initiator_enr(); + let initiator_key = kbucket::Key::from(initiator_enr.node_id()); + match self.kbuckets.write().entry(&initiator_key) { kbucket::Entry::Present(ref mut entry, _) => { let enr = entry.value_mut(); - if enr.seq() < inr_enr.seq() { - *enr = inr_enr.clone(); + if enr.seq() < initiator_enr.seq() { + *enr = initiator_enr.clone(); } } kbucket::Entry::Pending(ref mut entry, _) => { let enr = entry.value_mut(); - if enr.seq() < inr_enr.seq() { - *enr = inr_enr.clone(); + if enr.seq() < initiator_enr.seq() { + *enr = initiator_enr.clone(); } } _ => () } // check if we know the target node id in our routing table, otherwise // drop relay attempt. - let tgt_node_id = relay_init.target_node_id(); - let tgt_key = kbucket::Key::from(tgt_node_id); - if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&tgt_key) { - let tgt_enr = entry.value().clone(); - if let Err(e) = self.handler_send.send(HandlerIn::HolePunchEnr(tgt_enr, relay_init)) { + let target_node_id = relay_initiator.target_node_id(); + let target_key = kbucket::Key::from(target_node_id); + if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&target_key) { + let target_enr = entry.value().clone(); + if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiator))) { warn!( "Failed to send target enr to relay process, error: {e}" ); } } else { - // todo(emhane): ban peers that ask us to relay to a peer we very - // unlikely could have sent to them in a NODES response. - let inr_node_id = relay_init.initiator_enr().node_id(); - + let initiator_node_id = relay_initiator.initiator_enr().node_id(); warn!( - inr_node_id=%inr_node_id, - tgt_node_id=%tgt_node_id, + initiator_node_id=%initiator_node_id, + target_node_id=%target_node_id, "Peer requested relaying to a peer not in k-buckets" ); } + }, + HandlerOut::PingAllPeers => self.ping_connected_peers(), + HandlerOut::RequestFailed(request_id, error) => { + if let RequestError::Timeout = error { + debug!("RPC Request timed out. id: {}", request_id); + } else { + warn!("RPC Request failed: id: {}, error {:?}", request_id, error); + } + self.rpc_failure(request_id, error); } - HandlerOut::PingAllPeers => self.ping_connected_peers() } } event = Service::bucket_maintenance_poll(&self.kbuckets) => { From 18e44f2fd3e7186fac783a97a784c7341e5adf02 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 17 Jan 2024 16:39:06 +1100 Subject: [PATCH 2/7] Old mate fmt --- src/handler/mod.rs | 39 ++++++++++++++++++++++++--------------- src/handler/tests.rs | 16 ++++++++++++---- src/service.rs | 2 +- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 3618ebf06..962f6d9e5 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -29,7 +29,7 @@ use crate::{ config::Discv5Config, discv5::PERMIT_BAN_LIST, - error::{Discv5Error, RequestError, NatError}, + error::{Discv5Error, NatError, RequestError}, packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind, ProtocolIdentity}, rpc::{ Message, Payload, RelayInitNotification, RelayMsgNotification, Request, RequestBody, @@ -937,9 +937,7 @@ impl Handler { let enr_not_reachable = !Nat::is_enr_reachable(&most_recent_enr); // Decide whether to establish this connection based on our appetite for unreachable - if enr_not_reachable - && Some(self.sessions.tagged()) >= self.nat.unreachable_enr_limit - { + if enr_not_reachable && Some(self.sessions.tagged()) >= self.nat.unreachable_enr_limit { debug!("Reached limit of unreachable ENR sessions. Avoiding a new connection. Limit: {}", self.sessions.tagged()); return; } @@ -1146,7 +1144,10 @@ impl Handler { let ban_timeout = self.nat.ban_duration.map(|v| Instant::now() + v); PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); } else if let Err(e) = self.on_relay_initiator(notification).await { - warn!("failed handling notification to relay for {node_address}, {:?}", e); + warn!( + "failed handling notification to relay for {node_address}, {:?}", + e + ); } } Message::RelayMsgNotification(notification) => { @@ -1158,7 +1159,10 @@ impl Handler { } _ => { if let Err(e) = self.on_relay_msg::

(notification).await { - warn!("failed handling notification relayed from {node_address}, {:?}", e); + warn!( + "failed handling notification relayed from {node_address}, {:?}", + e + ); } } } @@ -1211,7 +1215,9 @@ impl Handler { let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); if let Err(e) = self .service_send - .send(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(whoareyou_ref))) + .send(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou( + whoareyou_ref, + ))) .await { warn!("Failed to send WhoAreYou to the service {}", e) @@ -1257,7 +1263,9 @@ impl Handler { let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); if let Err(e) = self .service_send - .send(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(whoareyou_ref))) + .send(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou( + whoareyou_ref, + ))) .await { warn!( @@ -1586,7 +1594,6 @@ fn most_recent_enr(first: Option, second: Option) -> Result { } } - // NAT-related functions impl Handler { /// A request times out. Should trigger the initiation of a hole punch attempt, given a @@ -1636,7 +1643,10 @@ impl Handler { /// A RelayInit notification is received over discv5 indicating this node is the relay. Should /// trigger sending a RelayMsg to the target. - async fn on_relay_initiator(&mut self, relay_initiator: RelayInitNotification) -> Result<(), NatError> { + async fn on_relay_initiator( + &mut self, + relay_initiator: RelayInitNotification, + ) -> Result<(), NatError> { // Check for target peer in our kbuckets otherwise drop notification. if let Err(e) = self .service_send @@ -1655,11 +1665,10 @@ impl Handler { relay_msg: RelayMsgNotification, ) -> Result<(), NatError> { let (inr_enr, timed_out_msg_nonce) = relay_msg.into(); - let initiator_node_address = - match NodeContact::try_from_enr(inr_enr, self.nat.ip_mode) { - Ok(contact) => contact.node_address(), - Err(e) => return Err(NatError::Target(e.into())), - }; + let initiator_node_address = match NodeContact::try_from_enr(inr_enr, self.nat.ip_mode) { + Ok(contact) => contact.node_address(), + Err(e) => return Err(NatError::Target(e.into())), + }; // A session may already have been established. if self.sessions.get(&initiator_node_address).is_some() { diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 75f5f7203..c2358c394 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -192,8 +192,10 @@ async fn simple_session_message() { if let Some(message) = receiver_recv.recv().await { match message { HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref)) => { - let _ = - recv_send.send(HandlerIn::EnrResponse(Some(sender_enr.clone()), EnrRequestData::WhoAreYou(wru_ref))); + let _ = recv_send.send(HandlerIn::EnrResponse( + Some(sender_enr.clone()), + EnrRequestData::WhoAreYou(wru_ref), + )); } HandlerOut::Request(_, request) => { assert_eq!(request, send_message); @@ -308,7 +310,10 @@ async fn multiple_messages() { loop { match receiver_handler.recv().await { Some(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref))) => { - let _ = recv_send.send(HandlerIn::EnrResponse(Some(sender_enr.clone()), EnrRequestData::WhoAreYou(wru_ref))); + let _ = recv_send.send(HandlerIn::EnrResponse( + Some(sender_enr.clone()), + EnrRequestData::WhoAreYou(wru_ref), + )); } Some(HandlerOut::Request(addr, request)) => { assert_eq!(request, recv_send_message); @@ -552,7 +557,10 @@ async fn nat_hole_punch_relay() { let service_msg = rx.recv().await.expect("should receive service message"); match service_msg { HandlerOut::RequestEnr(EnrRequestData::Nat(relay_init)) => tx - .send(HandlerIn::EnrResponse(Some(tgt_enr_clone), EnrRequestData::Nat(relay_init))) + .send(HandlerIn::EnrResponse( + Some(tgt_enr_clone), + EnrRequestData::Nat(relay_init), + )) .expect("should send message to handler"), _ => panic!("service message should be 'find hole punch enr'"), } diff --git a/src/service.rs b/src/service.rs index 3e3ecc1ac..8d7cc2eb7 100644 --- a/src/service.rs +++ b/src/service.rs @@ -19,7 +19,7 @@ use self::{ }; use crate::{ error::{RequestError, ResponseError}, - handler::{Handler, HandlerIn, HandlerOut, EnrRequestData}, + handler::{EnrRequestData, Handler, HandlerIn, HandlerOut}, kbucket::{ self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable, NodeStatus, UpdateResult, MAX_NODES_PER_BUCKET, From b42e6776bd8ea6465b890dd10940d7a1f985a65d Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 18 Jan 2024 11:26:07 +1100 Subject: [PATCH 3/7] Update src/handler/mod.rs Co-authored-by: Emilia Hane --- src/handler/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 194961c51..9a4b44a44 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -1650,7 +1650,7 @@ impl Handler { /// A RelayInit notification is received over discv5 indicating this node is the relay. Should /// trigger sending a RelayMsg to the target. - async fn on_relay_initiator( + async fn on_relay_initiation( &mut self, relay_initiator: RelayInitNotification, ) -> Result<(), NatError> { From bf4723db2ba9951b875796c8308d109f6a6e0347 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 18 Jan 2024 11:26:13 +1100 Subject: [PATCH 4/7] Update src/service.rs Co-authored-by: Emilia Hane --- src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service.rs b/src/service.rs index 8d7cc2eb7..75257c0a6 100644 --- a/src/service.rs +++ b/src/service.rs @@ -401,7 +401,7 @@ impl Service { } } } - HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiator)) => { + HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiation)) => { // Update initiator's Enr if it's in kbuckets let initiator_enr = relay_initiator.initiator_enr(); let initiator_key = kbucket::Key::from(initiator_enr.node_id()); From 942661c01567d0d591060c304949693c87986684 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 18 Jan 2024 11:26:19 +1100 Subject: [PATCH 5/7] Update src/handler/mod.rs Co-authored-by: Emilia Hane --- src/handler/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 9a4b44a44..740606d45 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -381,7 +381,7 @@ impl Handler { } HandlerIn::Response(dst, response) => self.send_response::

(dst, *response).await, HandlerIn::EnrResponse(enr, EnrRequestData::WhoAreYou(wru_ref)) => self.send_challenge::

(wru_ref, enr).await, - HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiator)) => { + HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiation)) => { // Assemble the notification for the target let (initiator_enr, _target, timed_out_nonce) = relay_initiator.into(); let relay_msg_notification = RelayMsgNotification::new(initiator_enr, timed_out_nonce); From 08cd2b6dadcbe5ac93df8f2ef631ecfc440ab33f Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 18 Jan 2024 11:47:19 +1100 Subject: [PATCH 6/7] Complete the renaming --- src/handler/mod.rs | 8 ++++---- src/service.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 740606d45..1f695fbf3 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -383,7 +383,7 @@ impl Handler { HandlerIn::EnrResponse(enr, EnrRequestData::WhoAreYou(wru_ref)) => self.send_challenge::

(wru_ref, enr).await, HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiation)) => { // Assemble the notification for the target - let (initiator_enr, _target, timed_out_nonce) = relay_initiator.into(); + let (initiator_enr, _target, timed_out_nonce) = relay_initiation.into(); let relay_msg_notification = RelayMsgNotification::new(initiator_enr, timed_out_nonce); if let Err(e) = self.send_relay_msg_notification::

(target_enr, relay_msg_notification).await { warn!("Failed to relay. Error: {:?}", e); @@ -1148,7 +1148,7 @@ impl Handler { .await; let ban_timeout = self.nat.ban_duration.map(|v| Instant::now() + v); PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); - } else if let Err(e) = self.on_relay_initiator(notification).await { + } else if let Err(e) = self.on_relay_initiation(notification).await { warn!( "failed handling notification to relay for {node_address}, {:?}", e @@ -1652,12 +1652,12 @@ impl Handler { /// trigger sending a RelayMsg to the target. async fn on_relay_initiation( &mut self, - relay_initiator: RelayInitNotification, + relay_initiation: RelayInitNotification, ) -> Result<(), NatError> { // Check for target peer in our kbuckets otherwise drop notification. if let Err(e) = self .service_send - .send(HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiator))) + .send(HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiation))) .await { return Err(NatError::Relay(e.into())); diff --git a/src/service.rs b/src/service.rs index 75257c0a6..f55089010 100644 --- a/src/service.rs +++ b/src/service.rs @@ -403,7 +403,7 @@ impl Service { } HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiation)) => { // Update initiator's Enr if it's in kbuckets - let initiator_enr = relay_initiator.initiator_enr(); + let initiator_enr = relay_initiation.initiator_enr(); let initiator_key = kbucket::Key::from(initiator_enr.node_id()); match self.kbuckets.write().entry(&initiator_key) { kbucket::Entry::Present(ref mut entry, _) => { @@ -422,17 +422,17 @@ impl Service { } // check if we know the target node id in our routing table, otherwise // drop relay attempt. - let target_node_id = relay_initiator.target_node_id(); + let target_node_id = relay_initiation.target_node_id(); let target_key = kbucket::Key::from(target_node_id); if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&target_key) { let target_enr = entry.value().clone(); - if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiator))) { + if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiation))) { warn!( "Failed to send target enr to relay process, error: {e}" ); } } else { - let initiator_node_id = relay_initiator.initiator_enr().node_id(); + let initiator_node_id = relay_initiation.initiator_enr().node_id(); warn!( initiator_node_id=%initiator_node_id, target_node_id=%target_node_id, From fb54740f1f3ad4e431b8c9332fa40683b289dde6 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 18 Jan 2024 11:47:50 +1100 Subject: [PATCH 7/7] Fmt --- src/handler/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 1f695fbf3..74a54f348 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -1657,7 +1657,9 @@ impl Handler { // Check for target peer in our kbuckets otherwise drop notification. if let Err(e) = self .service_send - .send(HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiation))) + .send(HandlerOut::RequestEnr(EnrRequestData::Nat( + relay_initiation, + ))) .await { return Err(NatError::Relay(e.into()));