From 1e6fe51e37913e3fae5efd72e1c9f7f9ce8f5559 Mon Sep 17 00:00:00 2001 From: Jim Posen <jim.posen@gmail.com> Date: Sun, 3 Jun 2018 00:39:23 -0700 Subject: [PATCH 1/6] devp2p: Fix bug with potentially incorrect UDP registration. This works right now because the Host handler happens to be the first one registered on the IoService. --- util/network-devp2p/src/discovery.rs | 8 ++++---- util/network-devp2p/src/host.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 8e8a3d6cc69..506bab74612 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -571,18 +571,18 @@ impl Discovery { self.start(); } - pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> { - event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket"); + pub fn register_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> { + event_loop.register(&self.udp_socket, reg, Ready::all(), PollOpt::edge()).expect("Error registering UDP socket"); Ok(()) } - pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> { + pub fn update_registration<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> { let registration = if !self.send_queue.is_empty() { Ready::readable() | Ready::writable() } else { Ready::readable() }; - event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket"); + event_loop.reregister(&self.udp_socket, reg, registration, PollOpt::edge()).expect("Error reregistering UDP socket"); Ok(()) } } diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 6d28a838c27..4e9e42c2598 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -1055,7 +1055,7 @@ impl IoHandler<NetworkIoMessage> for Host { session.lock().register_socket(reg, event_loop).expect("Error registering socket"); } } - DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(event_loop).ok()).expect("Error registering discovery socket"), + DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(reg, event_loop).ok()).expect("Error registering discovery socket"), TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } @@ -1086,7 +1086,7 @@ impl IoHandler<NetworkIoMessage> for Host { connection.lock().update_socket(reg, event_loop).expect("Error updating socket"); } } - DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(event_loop).ok()).expect("Error reregistering discovery socket"), + DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(reg, event_loop).ok()).expect("Error reregistering discovery socket"), TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } From af758149a1ee5edd665ffbd416d04e222e889da5 Mon Sep 17 00:00:00 2001 From: Jim Posen <jim.posen@gmail.com> Date: Sun, 3 Jun 2018 00:31:48 -0700 Subject: [PATCH 2/6] devp2p: Move UDP socket handling from Discovery to Host. --- util/network-devp2p/src/discovery.rs | 96 ++++--------------------- util/network-devp2p/src/host.rs | 104 +++++++++++++++++++++++---- 2 files changed, 104 insertions(+), 96 deletions(-) diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 506bab74612..3491159d169 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -17,18 +17,13 @@ use ethcore_bytes::Bytes; use std::net::SocketAddr; use std::collections::{HashSet, HashMap, VecDeque}; -use std::mem; use std::default::Default; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use mio::*; -use mio::deprecated::{Handler, EventLoop}; -use mio::udp::*; use hash::keccak; use ethereum_types::{H256, H520}; use rlp::{Rlp, RlpStream, encode_list}; use node_table::*; use network::{Error, ErrorKind}; -use io::{StreamToken, IoContext}; use ethkey::{Secret, KeyPair, sign, recover}; use network::IpFilter; @@ -39,7 +34,7 @@ const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademl const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover) const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket. const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. -const MAX_DATAGRAM_SIZE: usize = 1280; +pub const MAX_DATAGRAM_SIZE: usize = 1280; const PACKET_PING: u8 = 1; const PACKET_PONG: u8 = 2; @@ -79,9 +74,9 @@ impl NodeBucket { } } -struct Datagramm { - payload: Bytes, - address: SocketAddr, +pub struct Datagramm { + pub payload: Bytes, + pub address: SocketAddr, } pub struct Discovery { @@ -89,13 +84,11 @@ pub struct Discovery { id_hash: H256, secret: Secret, public_endpoint: NodeEndpoint, - udp_socket: UdpSocket, - token: StreamToken, discovery_round: u16, discovery_id: NodeId, discovery_nodes: HashSet<NodeId>, node_buckets: Vec<NodeBucket>, - send_queue: VecDeque<Datagramm>, + pub send_queue: VecDeque<Datagramm>, check_timestamps: bool, adding_nodes: Vec<NodeEntry>, ip_filter: IpFilter, @@ -107,19 +100,16 @@ pub struct TableUpdates { } impl Discovery { - pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, ip_filter: IpFilter) -> Discovery { - let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket"); + pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery { Discovery { id: key.public().clone(), id_hash: keccak(key.public()), secret: key.secret().clone(), public_endpoint: public, - token: token, discovery_round: 0, discovery_id: NodeId::new(), discovery_nodes: HashSet::new(), node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(), - udp_socket: socket, send_queue: VecDeque::new(), check_timestamps: true, adding_nodes: Vec::new(), @@ -352,53 +342,12 @@ impl Discovery { ret } - pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone { - while let Some(data) = self.send_queue.pop_front() { - match self.udp_socket.send_to(&data.payload, &data.address) { - Ok(Some(size)) if size == data.payload.len() => { - }, - Ok(Some(_)) => { - warn!("UDP sent incomplete datagramm"); - }, - Ok(None) => { - self.send_queue.push_front(data); - return; - } - Err(e) => { - debug!("UDP send error: {:?}, address: {:?}", e, &data.address); - return; - } - } - } - io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); - } - fn send_to(&mut self, payload: Bytes, address: SocketAddr) { self.send_queue.push_back(Datagramm { payload: payload, address: address }); } - pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Option<TableUpdates> where Message: Send + Sync + Clone { - let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() }; - let writable = !self.send_queue.is_empty(); - let res = match self.udp_socket.recv_from(&mut buf) { - Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| { - debug!("Error processing UDP packet: {:?}", e); - None - }), - Ok(_) => None, - Err(e) => { - debug!("Error reading UPD socket: {:?}", e); - None - } - }; - let new_writable = !self.send_queue.is_empty(); - if writable != new_writable { - io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); - } - res - } - fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> { + pub fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> { // validate packet if packet.len() < 32 + 65 + 4 + 1 { return Err(ErrorKind::BadProtocol.into()); @@ -570,21 +519,6 @@ impl Discovery { pub fn refresh(&mut self) { self.start(); } - - pub fn register_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> { - event_loop.register(&self.udp_socket, reg, Ready::all(), PollOpt::edge()).expect("Error registering UDP socket"); - Ok(()) - } - - pub fn update_registration<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> { - let registration = if !self.send_queue.is_empty() { - Ready::readable() | Ready::writable() - } else { - Ready::readable() - }; - event_loop.reregister(&self.udp_socket, reg, registration, PollOpt::edge()).expect("Error reregistering UDP socket"); - Ok(()) - } } #[cfg(test)] @@ -620,8 +554,8 @@ mod tests { let key2 = Random.generate().unwrap(); let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40444").unwrap(), udp_port: 40444 }; let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 }; - let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default()); - let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default()); + let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default()); + let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default()); let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap(); let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap(); @@ -653,7 +587,7 @@ mod tests { fn removes_expired() { let key = Random.generate().unwrap(); let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 }; - let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default()); + let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default()); for _ in 0..1200 { discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); } @@ -668,7 +602,7 @@ mod tests { let key = Random.generate().unwrap(); let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 }; - let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default()); + let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default()); for _ in 0..(16 + 10) { discovery.node_buckets[0].nodes.push_back(BucketEntry { @@ -728,7 +662,7 @@ mod tests { let key = Secret::from_str(secret_hex) .and_then(|secret| KeyPair::from_secret(secret)) .unwrap(); - let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default()); + let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default()); node_entries.iter().for_each(|entry| discovery.update_node(entry.clone())); @@ -773,7 +707,7 @@ mod tests { fn packets() { let key = Random.generate().unwrap(); let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40449").unwrap(), udp_port: 40449 }; - let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default()); + let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default()); discovery.check_timestamps = false; let from = SocketAddr::from_str("99.99.99.99:40445").unwrap(); @@ -840,8 +774,8 @@ mod tests { let key2 = Random.generate().unwrap(); let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40344").unwrap(), udp_port: 40344 }; let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40345").unwrap(), udp_port: 40345 }; - let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default()); - let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default()); + let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default()); + let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default()); discovery1.ping(&ep2); let ping_data = discovery1.send_queue.pop_front().unwrap(); diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 4e9e42c2598..b8837e68652 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -24,12 +24,14 @@ use std::cmp::{min, max}; use std::path::{Path, PathBuf}; use std::io::{Read, Write, self}; use std::fs; +use std::mem; use std::time::Duration; use ethkey::{KeyPair, Secret, Random, Generator}; use hash::keccak; use mio::*; use mio::deprecated::{EventLoop}; use mio::tcp::*; +use mio::udp::*; use ethereum_types::H256; use rlp::{RlpStream, Encodable}; @@ -40,7 +42,7 @@ use node_table::*; use network::{NetworkConfiguration, NetworkIoMessage, ProtocolId, PeerId, PacketId}; use network::{NonReservedPeerMode, NetworkContext as NetworkContextTrait}; use network::{SessionInfo, Error, ErrorKind, DisconnectReason, NetworkProtocolHandler}; -use discovery::{Discovery, TableUpdates, NodeEntry}; +use discovery::{Discovery, TableUpdates, NodeEntry, MAX_DATAGRAM_SIZE}; use ip_utils::{map_external_address, select_public_address}; use path::restrict_permissions_owner; use parking_lot::{Mutex, RwLock}; @@ -239,6 +241,7 @@ struct ProtocolTimer { /// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host { pub info: RwLock<HostInfo>, + udp_socket: Mutex<Option<UdpSocket>>, tcp_listener: Mutex<TcpListener>, sessions: Arc<RwLock<Slab<SharedSession>>>, discovery: Mutex<Option<Discovery>>, @@ -295,6 +298,7 @@ impl Host { local_endpoint: local_endpoint, }), discovery: Mutex::new(None), + udp_socket: Mutex::new(None), tcp_listener: Mutex::new(tcp_listener), sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))), nodes: RwLock::new(NodeTable::new(path)), @@ -458,13 +462,16 @@ impl Host { let discovery = { let info = self.info.read(); if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept { - let mut udp_addr = local_endpoint.address.clone(); - udp_addr.set_port(local_endpoint.udp_port); - Some(Discovery::new(&info.keys, udp_addr, public_endpoint, DISCOVERY, allow_ips)) + Some(Discovery::new(&info.keys, public_endpoint, allow_ips)) } else { None } }; if let Some(mut discovery) = discovery { + let mut udp_addr = local_endpoint.address; + udp_addr.set_port(local_endpoint.udp_port); + let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket"); + *self.udp_socket.lock() = Some(socket); + discovery.init_node_list(self.nodes.read().entries()); discovery.add_node_list(self.nodes.read().entries()); *self.discovery.lock() = Some(discovery); @@ -819,6 +826,63 @@ impl Host { } } + fn discovery_readable(&self, io: &IoContext<NetworkIoMessage>) { + let node_changes = match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { + (Some(udp_socket), Some(discovery)) => { + let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() }; + let writable = !discovery.send_queue.is_empty(); + let res = match udp_socket.recv_from(&mut buf) { + Ok(Some((len, address))) => discovery.on_packet(&buf[0..len], address).unwrap_or_else(|e| { + debug!("Error processing UDP packet: {:?}", e); + None + }), + Ok(_) => None, + Err(e) => { + debug!("Error reading UPD socket: {:?}", e); + None + } + }; + let new_writable = !discovery.send_queue.is_empty(); + if writable != new_writable { + io.update_registration(DISCOVERY) + .unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); + } + res + }, + _ => None, + }; + if let Some(node_changes) = node_changes { + self.update_nodes(io, node_changes); + } + } + + fn discovery_writable(&self, io: &IoContext<NetworkIoMessage>) { + match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { + (Some(udp_socket), Some(discovery)) => { + while let Some(data) = discovery.send_queue.pop_front() { + match udp_socket.send_to(&data.payload, &data.address) { + Ok(Some(size)) if size == data.payload.len() => { + }, + Ok(Some(_)) => { + warn!("UDP sent incomplete datagramm"); + }, + Ok(None) => { + discovery.send_queue.push_front(data); + return; + } + Err(e) => { + debug!("UDP send error: {:?}, address: {:?}", e, &data.address); + return; + } + } + } + io.update_registration(DISCOVERY) + .unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); + }, + _ => (), + } + } + fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) { trace!(target: "network", "Connection timeout: {}", token); self.kill_connection(token, io, true) @@ -920,12 +984,7 @@ impl IoHandler<NetworkIoMessage> for Host { } match stream { FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), - DISCOVERY => { - let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.readable(io)) }; - if let Some(node_changes) = node_changes { - self.update_nodes(io, node_changes); - } - }, + DISCOVERY => self.discovery_readable(io), TCP_ACCEPT => self.accept(io), _ => panic!("Received unknown readable token"), } @@ -937,9 +996,7 @@ impl IoHandler<NetworkIoMessage> for Host { } match stream { FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), - DISCOVERY => { - self.discovery.lock().as_mut().map(|d| d.writable(io)); - } + DISCOVERY => self.discovery_writable(io), _ => panic!("Received unknown writable token"), } } @@ -1055,7 +1112,13 @@ impl IoHandler<NetworkIoMessage> for Host { session.lock().register_socket(reg, event_loop).expect("Error registering socket"); } } - DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(reg, event_loop).ok()).expect("Error registering discovery socket"), + DISCOVERY => match self.udp_socket.lock().as_ref() { + Some(udp_socket) => { + event_loop.register(udp_socket, reg, Ready::all(), PollOpt::edge()) + .expect("Error registering UDP socket"); + }, + _ => panic!("Error registering discovery socket"), + } TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } @@ -1086,7 +1149,18 @@ impl IoHandler<NetworkIoMessage> for Host { connection.lock().update_socket(reg, event_loop).expect("Error updating socket"); } } - DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(reg, event_loop).ok()).expect("Error reregistering discovery socket"), + DISCOVERY => match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_ref()) { + (Some(udp_socket), Some(discovery)) => { + let registration = if !discovery.send_queue.is_empty() { + Ready::readable() | Ready::writable() + } else { + Ready::readable() + }; + event_loop.reregister(udp_socket, reg, registration, PollOpt::edge()) + .expect("Error reregistering UDP socket"); + }, + _ => panic!("Error reregistering discovery socket"), + } TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } From 46fe32bc2d8badfac97e2057a38d07dc58c2a2ab Mon Sep 17 00:00:00 2001 From: Jim Posen <jim.posen@gmail.com> Date: Wed, 6 Jun 2018 12:06:14 -0700 Subject: [PATCH 3/6] devp2p: Use 0-initialized memory buffer instead of unsafe. --- util/network-devp2p/src/host.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index b8837e68652..78a3d683af1 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -24,7 +24,6 @@ use std::cmp::{min, max}; use std::path::{Path, PathBuf}; use std::io::{Read, Write, self}; use std::fs; -use std::mem; use std::time::Duration; use ethkey::{KeyPair, Secret, Random, Generator}; use hash::keccak; @@ -829,7 +828,7 @@ impl Host { fn discovery_readable(&self, io: &IoContext<NetworkIoMessage>) { let node_changes = match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { (Some(udp_socket), Some(discovery)) => { - let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() }; + let mut buf = [0u8; MAX_DATAGRAM_SIZE]; let writable = !discovery.send_queue.is_empty(); let res = match udp_socket.recv_from(&mut buf) { Ok(Some((len, address))) => discovery.on_packet(&buf[0..len], address).unwrap_or_else(|e| { From e9c983ea7729221f399a4c88a7c8c9f4063708ac Mon Sep 17 00:00:00 2001 From: Jim Posen <jim.posen@gmail.com> Date: Wed, 6 Jun 2018 12:23:06 -0700 Subject: [PATCH 4/6] Remove send_queue field from public interface of Discovery. --- util/network-devp2p/src/discovery.rs | 24 +++++++++++++++++------- util/network-devp2p/src/host.rs | 10 +++++----- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 3491159d169..62c2e8f0abe 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -88,7 +88,7 @@ pub struct Discovery { discovery_id: NodeId, discovery_nodes: HashSet<NodeId>, node_buckets: Vec<NodeBucket>, - pub send_queue: VecDeque<Datagramm>, + send_queue: VecDeque<Datagramm>, check_timestamps: bool, adding_nodes: Vec<NodeEntry>, ip_filter: IpFilter, @@ -519,6 +519,18 @@ impl Discovery { pub fn refresh(&mut self) { self.start(); } + + pub fn any_sends_queued(&self) -> bool { + !self.send_queue.is_empty() + } + + pub fn dequeue_send(&mut self) -> Option<Datagramm> { + self.send_queue.pop_front() + } + + pub fn requeue_send(&mut self, datagramm: Datagramm) { + self.send_queue.push_front(datagramm) + } } #[cfg(test)] @@ -566,14 +578,12 @@ mod tests { discovery2.refresh(); for _ in 0 .. 10 { - while !discovery1.send_queue.is_empty() { - let datagramm = discovery1.send_queue.pop_front().unwrap(); + while let Some(datagramm) = discovery1.dequeue_send() { if datagramm.address == ep2.address { discovery2.on_packet(&datagramm.payload, ep1.address.clone()).ok(); } } - while !discovery2.send_queue.is_empty() { - let datagramm = discovery2.send_queue.pop_front().unwrap(); + while let Some(datagramm) = discovery2.dequeue_send() { if datagramm.address == ep1.address { discovery1.on_packet(&datagramm.payload, ep2.address.clone()).ok(); } @@ -778,9 +788,9 @@ mod tests { let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default()); discovery1.ping(&ep2); - let ping_data = discovery1.send_queue.pop_front().unwrap(); + let ping_data = discovery1.dequeue_send().unwrap(); discovery2.on_packet(&ping_data.payload, ep1.address.clone()).ok(); - let pong_data = discovery2.send_queue.pop_front().unwrap(); + let pong_data = discovery2.dequeue_send().unwrap(); let data = &pong_data.payload[(32 + 65)..]; let rlp = Rlp::new(&data[1..]); assert_eq!(ping_data.payload[0..32], rlp.val_at::<Vec<u8>>(1).unwrap()[..]) diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 78a3d683af1..953b75e2b36 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -829,7 +829,7 @@ impl Host { let node_changes = match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { (Some(udp_socket), Some(discovery)) => { let mut buf = [0u8; MAX_DATAGRAM_SIZE]; - let writable = !discovery.send_queue.is_empty(); + let writable = discovery.any_sends_queued(); let res = match udp_socket.recv_from(&mut buf) { Ok(Some((len, address))) => discovery.on_packet(&buf[0..len], address).unwrap_or_else(|e| { debug!("Error processing UDP packet: {:?}", e); @@ -841,7 +841,7 @@ impl Host { None } }; - let new_writable = !discovery.send_queue.is_empty(); + let new_writable = discovery.any_sends_queued(); if writable != new_writable { io.update_registration(DISCOVERY) .unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); @@ -858,7 +858,7 @@ impl Host { fn discovery_writable(&self, io: &IoContext<NetworkIoMessage>) { match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { (Some(udp_socket), Some(discovery)) => { - while let Some(data) = discovery.send_queue.pop_front() { + while let Some(data) = discovery.dequeue_send() { match udp_socket.send_to(&data.payload, &data.address) { Ok(Some(size)) if size == data.payload.len() => { }, @@ -866,7 +866,7 @@ impl Host { warn!("UDP sent incomplete datagramm"); }, Ok(None) => { - discovery.send_queue.push_front(data); + discovery.requeue_send(data); return; } Err(e) => { @@ -1150,7 +1150,7 @@ impl IoHandler<NetworkIoMessage> for Host { } DISCOVERY => match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_ref()) { (Some(udp_socket), Some(discovery)) => { - let registration = if !discovery.send_queue.is_empty() { + let registration = if discovery.any_sends_queued() { Ready::readable() | Ready::writable() } else { Ready::readable() From d46948002c9f3e95c9af3b8cbb635948dcc2b5c6 Mon Sep 17 00:00:00 2001 From: Jim Posen <jim.posen@gmail.com> Date: Thu, 7 Jun 2018 11:00:25 -0700 Subject: [PATCH 5/6] Rename Datagramm to Datagram. sed -i 's/Datagramm/Datagram/g' util/network-devp2p/src/discovery.rs util/network-devp2p/src/host.rs sed -i 's/datagramm/datagram/g' util/network-devp2p/src/discovery.rs util/network-devp2p/src/host.rs --- util/network-devp2p/src/discovery.rs | 24 ++++++++++++------------ util/network-devp2p/src/host.rs | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 62c2e8f0abe..b7cce2832f0 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -74,7 +74,7 @@ impl NodeBucket { } } -pub struct Datagramm { +pub struct Datagram { pub payload: Bytes, pub address: SocketAddr, } @@ -88,7 +88,7 @@ pub struct Discovery { discovery_id: NodeId, discovery_nodes: HashSet<NodeId>, node_buckets: Vec<NodeBucket>, - send_queue: VecDeque<Datagramm>, + send_queue: VecDeque<Datagram>, check_timestamps: bool, adding_nodes: Vec<NodeEntry>, ip_filter: IpFilter, @@ -343,7 +343,7 @@ impl Discovery { } fn send_to(&mut self, payload: Bytes, address: SocketAddr) { - self.send_queue.push_back(Datagramm { payload: payload, address: address }); + self.send_queue.push_back(Datagram { payload: payload, address: address }); } @@ -524,12 +524,12 @@ impl Discovery { !self.send_queue.is_empty() } - pub fn dequeue_send(&mut self) -> Option<Datagramm> { + pub fn dequeue_send(&mut self) -> Option<Datagram> { self.send_queue.pop_front() } - pub fn requeue_send(&mut self, datagramm: Datagramm) { - self.send_queue.push_front(datagramm) + pub fn requeue_send(&mut self, datagram: Datagram) { + self.send_queue.push_front(datagram) } } @@ -578,14 +578,14 @@ mod tests { discovery2.refresh(); for _ in 0 .. 10 { - while let Some(datagramm) = discovery1.dequeue_send() { - if datagramm.address == ep2.address { - discovery2.on_packet(&datagramm.payload, ep1.address.clone()).ok(); + while let Some(datagram) = discovery1.dequeue_send() { + if datagram.address == ep2.address { + discovery2.on_packet(&datagram.payload, ep1.address.clone()).ok(); } } - while let Some(datagramm) = discovery2.dequeue_send() { - if datagramm.address == ep1.address { - discovery1.on_packet(&datagramm.payload, ep2.address.clone()).ok(); + while let Some(datagram) = discovery2.dequeue_send() { + if datagram.address == ep1.address { + discovery1.on_packet(&datagram.payload, ep2.address.clone()).ok(); } } discovery2.round(); diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 953b75e2b36..8f291f1772a 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -863,7 +863,7 @@ impl Host { Ok(Some(size)) if size == data.payload.len() => { }, Ok(Some(_)) => { - warn!("UDP sent incomplete datagramm"); + warn!("UDP sent incomplete datagram"); }, Ok(None) => { discovery.requeue_send(data); From deb52535720843ceea1abdb6d1e498987bd7b5ee Mon Sep 17 00:00:00 2001 From: Jim Posen <jim.posen@gmail.com> Date: Thu, 7 Jun 2018 11:10:05 -0700 Subject: [PATCH 6/6] Include target in log statements. --- util/network-devp2p/src/host.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 8f291f1772a..0fbd64b4209 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -832,19 +832,21 @@ impl Host { let writable = discovery.any_sends_queued(); let res = match udp_socket.recv_from(&mut buf) { Ok(Some((len, address))) => discovery.on_packet(&buf[0..len], address).unwrap_or_else(|e| { - debug!("Error processing UDP packet: {:?}", e); + debug!(target: "network", "Error processing UDP packet: {:?}", e); None }), Ok(_) => None, Err(e) => { - debug!("Error reading UPD socket: {:?}", e); + debug!(target: "network", "Error reading UPD socket: {:?}", e); None } }; let new_writable = discovery.any_sends_queued(); if writable != new_writable { io.update_registration(DISCOVERY) - .unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); + .unwrap_or_else(|e| { + debug!(target: "network" ,"Error updating discovery registration: {:?}", e) + }); } res }, @@ -863,20 +865,22 @@ impl Host { Ok(Some(size)) if size == data.payload.len() => { }, Ok(Some(_)) => { - warn!("UDP sent incomplete datagram"); + warn!(target: "network", "UDP sent incomplete datagram"); }, Ok(None) => { discovery.requeue_send(data); return; } Err(e) => { - debug!("UDP send error: {:?}, address: {:?}", e, &data.address); + debug!(target: "network", "UDP send error: {:?}, address: {:?}", e, &data.address); return; } } } io.update_registration(DISCOVERY) - .unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); + .unwrap_or_else(|e| { + debug!(target: "network", "Error updating discovery registration: {:?}", e) + }); }, _ => (), }