diff --git a/Cargo.lock b/Cargo.lock index 6e1f6bbc9cc..5eef2e27689 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -971,6 +971,7 @@ dependencies = [ "keccak-hash 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-crypto 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/util/network-devp2p/Cargo.toml b/util/network-devp2p/Cargo.toml index 5f1a5d33978..8bf402e206e 100644 --- a/util/network-devp2p/Cargo.toml +++ b/util/network-devp2p/Cargo.toml @@ -34,6 +34,7 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" error-chain = { version = "0.12", default-features = false } +lru-cache = "0.1" [dev-dependencies] env_logger = "0.5" diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index f6eaf494b99..7bf8dc62e5e 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, VecDeque}; use std::collections::hash_map::Entry; use std::default::Default; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use lru_cache::LruCache; use hash::keccak; use ethereum_types::{H256, H520}; use rlp::{Rlp, RlpStream}; @@ -55,6 +56,8 @@ const REQUEST_BACKOFF: [Duration; 4] = [ const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60); +const OBSERVED_NODES_MAX_SIZE: usize = 10_000; + #[derive(Clone, Debug)] pub struct NodeEntry { pub id: NodeId, @@ -95,7 +98,27 @@ struct FindNodeRequest { #[derive(Clone, Copy)] enum PingReason { Default, - FromDiscoveryRequest(NodeId) + FromDiscoveryRequest(NodeId, NodeValidity), +} + +#[derive(Clone, Copy, PartialEq)] +enum NodeCategory { + Bucket, + Observed +} + +#[derive(Clone, Copy, PartialEq)] +enum NodeValidity { + Ourselves, + ValidNode(NodeCategory), + ExpiredNode(NodeCategory), + UnknownNode +} + +#[derive(Debug)] +enum BucketError { + Ourselves, + NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize}, } struct PingRequest { @@ -145,6 +168,12 @@ pub struct Discovery<'a> { discovery_id: NodeId, discovery_nodes: HashSet, node_buckets: Vec, + + // Sometimes we don't want to add nodes to the NodeTable, but still want to + // keep track of them to avoid excessive pinging (happens when an unknown node sends + // a discovery request to us -- the node might be on a different net). + other_observed_nodes: LruCache, + in_flight_pings: HashMap, in_flight_find_nodes: HashMap, send_queue: VecDeque, @@ -171,6 +200,7 @@ impl<'a> Discovery<'a> { discovery_id: NodeId::new(), discovery_nodes: HashSet::new(), node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(), + other_observed_nodes: LruCache::new(OBSERVED_NODES_MAX_SIZE), in_flight_pings: HashMap::new(), in_flight_find_nodes: HashMap::new(), send_queue: VecDeque::new(), @@ -200,41 +230,53 @@ impl<'a> Discovery<'a> { } } - fn update_node(&mut self, e: NodeEntry) -> Option { - trace!(target: "discovery", "Inserting {:?}", &e); + fn update_bucket_record(&mut self, e: NodeEntry) -> Result<(), BucketError> { let id_hash = keccak(e.id); let dist = match Discovery::distance(&self.id_hash, &id_hash) { Some(dist) => dist, None => { debug!(target: "discovery", "Attempted to update own entry: {:?}", e); - return None; + return Err(BucketError::Ourselves); } }; + let bucket = &mut self.node_buckets[dist]; + bucket.nodes.iter_mut().find(|n| n.address.id == e.id) + .map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| { + entry.address = e; + entry.last_seen = Instant::now(); + entry.backoff_until = Instant::now(); + entry.fail_count = 0; + Ok(()) + }) + } - let mut added_map = HashMap::new(); - let ping = { - let bucket = &mut self.node_buckets[dist]; - let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) { - node.address = e.clone(); - node.last_seen = Instant::now(); - node.backoff_until = Instant::now(); - node.fail_count = 0; - true - } else { false }; + fn update_node(&mut self, e: NodeEntry) -> Option { + trace!(target: "discovery", "Inserting {:?}", &e); + + match self.update_bucket_record(e) { + Ok(()) => None, + Err(BucketError::Ourselves) => None, + Err(BucketError::NotInTheBucket{node_entry, bucket_distance}) => Some((node_entry, bucket_distance)) + }.map(|(node_entry, bucket_distance)| { + trace!(target: "discovery", "Adding a new node {:?} into our bucket {}", &node_entry, bucket_distance); - if !updated { - added_map.insert(e.id, e.clone()); - bucket.nodes.push_front(BucketEntry::new(e)); + let mut added = HashMap::with_capacity(1); + added.insert(node_entry.id, node_entry.clone()); + let node_to_ping = { + let bucket = &mut self.node_buckets[bucket_distance]; + bucket.nodes.push_front(BucketEntry::new(node_entry)); if bucket.nodes.len() > BUCKET_SIZE { select_bucket_ping(bucket.nodes.iter()) - } else { None } - } else { None } - }; - if let Some(node) = ping { - self.try_ping(node, PingReason::Default); - } - Some(TableUpdates { added: added_map, removed: HashSet::new() }) + } else { + None + } + }; + if let Some(node) = node_to_ping { + self.try_ping(node, PingReason::Default); + }; + TableUpdates{added, removed: HashSet::new()} + }) } /// Starts the discovery process at round 0 @@ -541,10 +583,28 @@ impl<'a> Discovery<'a> { }; if let Some((node, ping_reason)) = expected_node { - if let PingReason::FromDiscoveryRequest(target) = ping_reason { + if let PingReason::FromDiscoveryRequest(target, validity) = ping_reason { self.respond_with_discovery(target, &node)?; + // kirushik: I would prefer to probe the network id of the remote node here, and add it to the nodes list if it's on "our" net -- + // but `on_packet` happens synchronously, so doing the full TCP handshake ceremony here is a bad idea. + // So instead we just LRU-caching most recently seen nodes to avoid unnecessary pinging + match validity { + NodeValidity::ValidNode(NodeCategory::Bucket) | NodeValidity::ExpiredNode(NodeCategory::Bucket) => { + trace!(target: "discovery", "Updating node {:?} in our Kad buckets", &node); + self.update_bucket_record(node).unwrap_or_else(|error| { + debug!(target: "discovery", "Error occured when processing ping from a bucket node: {:?}", &error); + }); + }, + NodeValidity::UnknownNode | NodeValidity::ExpiredNode(NodeCategory::Observed) | NodeValidity::ValidNode(NodeCategory::Observed)=> { + trace!(target: "discovery", "Updating node {:?} in the list of other_observed_nodes", &node); + self.other_observed_nodes.insert(node.id, (node.endpoint, Instant::now())); + }, + NodeValidity::Ourselves => (), + } + Ok(None) + } else { + Ok(self.update_node(node)) } - Ok(self.update_node(node)) } else { debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from); Ok(None) @@ -565,31 +625,41 @@ impl<'a> Discovery<'a> { } }; - if self.is_a_valid_known_node(&node) { - self.respond_with_discovery(target, &node)?; - } else { + match self.check_validity(&node) { + NodeValidity::Ourselves => (), // It makes no sense to respond to the discovery request from ourselves + NodeValidity::ValidNode(_) => self.respond_with_discovery(target, &node)?, // Make sure the request source is actually there and responds to pings before actually responding - self.try_ping(node, PingReason::FromDiscoveryRequest(target)); + invalidity_reason => self.try_ping(node, PingReason::FromDiscoveryRequest(target, invalidity_reason)) } Ok(None) } - fn is_a_valid_known_node(&self, node: &NodeEntry) -> bool { + fn check_validity(&mut self, node: &NodeEntry) -> NodeValidity { let id_hash = keccak(node.id); let dist = match Discovery::distance(&self.id_hash, &id_hash) { Some(dist) => dist, None => { debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node); - return false; + return NodeValidity::Ourselves; } }; let bucket = &self.node_buckets[dist]; if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) { debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node); - (known_node.address.endpoint == node.endpoint) && (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT) + match ((known_node.address.endpoint == node.endpoint), (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)) { + (true, true) => NodeValidity::ValidNode(NodeCategory::Bucket), + (true, false) => NodeValidity::ExpiredNode(NodeCategory::Bucket), + _ => NodeValidity::UnknownNode + } } else { - false + self.other_observed_nodes.get_mut(&node.id).map_or(NodeValidity::UnknownNode, |(endpoint, observed_at)| { + match ((node.endpoint==*endpoint), (observed_at.elapsed() < NODE_LAST_SEEN_TIMEOUT)) { + (true, true) => NodeValidity::ValidNode(NodeCategory::Observed), + (true, false) => NodeValidity::ExpiredNode(NodeCategory::Observed), + _ => NodeValidity::UnknownNode + } + }) } } diff --git a/util/network-devp2p/src/lib.rs b/util/network-devp2p/src/lib.rs index 531c6ee506f..6082049e890 100644 --- a/util/network-devp2p/src/lib.rs +++ b/util/network-devp2p/src/lib.rs @@ -84,6 +84,7 @@ extern crate keccak_hash as hash; extern crate serde; extern crate serde_json; extern crate parity_snappy as snappy; +extern crate lru_cache; #[macro_use] extern crate error_chain;