Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

devp2p: Move UDP socket handling from Discovery to Host. #8790

Merged
merged 6 commits into from
Jun 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 33 additions & 89 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@
use ethcore_bytes::Bytes;
use std::net::SocketAddr;
use std::collections::{HashSet, HashMap, VecDeque};
use std::mem;
use std::default::Default;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use mio::*;
use mio::deprecated::{Handler, EventLoop};
use mio::udp::*;
use hash::keccak;
use ethereum_types::{H256, H520};
use rlp::{Rlp, RlpStream, encode_list};
use node_table::*;
use network::{Error, ErrorKind};
use io::{StreamToken, IoContext};
use ethkey::{Secret, KeyPair, sign, recover};
use network::IpFilter;

Expand All @@ -39,7 +34,7 @@ const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademl
const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover)
const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
const MAX_DATAGRAM_SIZE: usize = 1280;
pub const MAX_DATAGRAM_SIZE: usize = 1280;

const PACKET_PING: u8 = 1;
const PACKET_PONG: u8 = 2;
Expand Down Expand Up @@ -79,23 +74,21 @@ impl NodeBucket {
}
}

struct Datagramm {
payload: Bytes,
address: SocketAddr,
pub struct Datagram {
pub payload: Bytes,
pub address: SocketAddr,
}

pub struct Discovery {
id: NodeId,
id_hash: H256,
secret: Secret,
public_endpoint: NodeEndpoint,
udp_socket: UdpSocket,
token: StreamToken,
discovery_round: u16,
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,
send_queue: VecDeque<Datagramm>,
send_queue: VecDeque<Datagram>,
check_timestamps: bool,
adding_nodes: Vec<NodeEntry>,
ip_filter: IpFilter,
Expand All @@ -107,19 +100,16 @@ pub struct TableUpdates {
}

impl Discovery {
pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, ip_filter: IpFilter) -> Discovery {
let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket");
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery {
Discovery {
id: key.public().clone(),
id_hash: keccak(key.public()),
secret: key.secret().clone(),
public_endpoint: public,
token: token,
discovery_round: 0,
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
udp_socket: socket,
send_queue: VecDeque::new(),
check_timestamps: true,
adding_nodes: Vec::new(),
Expand Down Expand Up @@ -352,53 +342,12 @@ impl Discovery {
ret
}

pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
while let Some(data) = self.send_queue.pop_front() {
match self.udp_socket.send_to(&data.payload, &data.address) {
Ok(Some(size)) if size == data.payload.len() => {
},
Ok(Some(_)) => {
warn!("UDP sent incomplete datagramm");
},
Ok(None) => {
self.send_queue.push_front(data);
return;
}
Err(e) => {
debug!("UDP send error: {:?}, address: {:?}", e, &data.address);
return;
}
}
}
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}

fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
self.send_queue.push_back(Datagramm { payload: payload, address: address });
}

pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Option<TableUpdates> where Message: Send + Sync + Clone {
let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() };
let writable = !self.send_queue.is_empty();
let res = match self.udp_socket.recv_from(&mut buf) {
Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
debug!("Error processing UDP packet: {:?}", e);
None
}),
Ok(_) => None,
Err(e) => {
debug!("Error reading UPD socket: {:?}", e);
None
}
};
let new_writable = !self.send_queue.is_empty();
if writable != new_writable {
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}
res
self.send_queue.push_back(Datagram { payload: payload, address: address });
}

fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {

pub fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
// validate packet
if packet.len() < 32 + 65 + 4 + 1 {
return Err(ErrorKind::BadProtocol.into());
Expand Down Expand Up @@ -571,19 +520,16 @@ impl Discovery {
self.start();
}

pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket");
Ok(())
pub fn any_sends_queued(&self) -> bool {
!self.send_queue.is_empty()
}

pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
let registration = if !self.send_queue.is_empty() {
Ready::readable() | Ready::writable()
} else {
Ready::readable()
};
event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket");
Ok(())
pub fn dequeue_send(&mut self) -> Option<Datagram> {
self.send_queue.pop_front()
}

pub fn requeue_send(&mut self, datagram: Datagram) {
self.send_queue.push_front(datagram)
}
}

Expand Down Expand Up @@ -620,8 +566,8 @@ mod tests {
let key2 = Random.generate().unwrap();
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40444").unwrap(), udp_port: 40444 };
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 };
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default());
let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ep: NodeEntrypoint is a copy type so no need to explictly call clone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NodeEntrypoint is not Copy, just Clone.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I got it wrong sorry!

let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default());

let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap();
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap();
Expand All @@ -632,16 +578,14 @@ mod tests {
discovery2.refresh();

for _ in 0 .. 10 {
while !discovery1.send_queue.is_empty() {
let datagramm = discovery1.send_queue.pop_front().unwrap();
if datagramm.address == ep2.address {
discovery2.on_packet(&datagramm.payload, ep1.address.clone()).ok();
while let Some(datagram) = discovery1.dequeue_send() {
if datagram.address == ep2.address {
discovery2.on_packet(&datagram.payload, ep1.address.clone()).ok();
}
}
while !discovery2.send_queue.is_empty() {
let datagramm = discovery2.send_queue.pop_front().unwrap();
if datagramm.address == ep1.address {
discovery1.on_packet(&datagramm.payload, ep2.address.clone()).ok();
while let Some(datagram) = discovery2.dequeue_send() {
if datagram.address == ep1.address {
discovery1.on_packet(&datagram.payload, ep2.address.clone()).ok();
}
}
discovery2.round();
Expand All @@ -653,7 +597,7 @@ mod tests {
fn removes_expired() {
let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
for _ in 0..1200 {
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
}
Expand All @@ -668,7 +612,7 @@ mod tests {

let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());

for _ in 0..(16 + 10) {
discovery.node_buckets[0].nodes.push_back(BucketEntry {
Expand Down Expand Up @@ -728,7 +672,7 @@ mod tests {
let key = Secret::from_str(secret_hex)
.and_then(|secret| KeyPair::from_secret(secret))
.unwrap();
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());

node_entries.iter().for_each(|entry| discovery.update_node(entry.clone()));

Expand Down Expand Up @@ -773,7 +717,7 @@ mod tests {
fn packets() {
let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40449").unwrap(), udp_port: 40449 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
discovery.check_timestamps = false;
let from = SocketAddr::from_str("99.99.99.99:40445").unwrap();

Expand Down Expand Up @@ -840,13 +784,13 @@ mod tests {
let key2 = Random.generate().unwrap();
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40344").unwrap(), udp_port: 40344 };
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40345").unwrap(), udp_port: 40345 };
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default());
let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default());

discovery1.ping(&ep2);
let ping_data = discovery1.send_queue.pop_front().unwrap();
let ping_data = discovery1.dequeue_send().unwrap();
discovery2.on_packet(&ping_data.payload, ep1.address.clone()).ok();
let pong_data = discovery2.send_queue.pop_front().unwrap();
let pong_data = discovery2.dequeue_send().unwrap();
let data = &pong_data.payload[(32 + 65)..];
let rlp = Rlp::new(&data[1..]);
assert_eq!(ping_data.payload[0..32], rlp.val_at::<Vec<u8>>(1).unwrap()[..])
Expand Down
Loading