Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Don't add discovery initiators to the node table #10305

Merged
merged 8 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions util/network-devp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
error-chain = { version = "0.12", default-features = false }
lru-cache = "0.1"

[dev-dependencies]
env_logger = "0.5"
Expand Down
63 changes: 52 additions & 11 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, VecDeque};
use std::collections::hash_map::Entry;
use std::default::Default;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use lru_cache::LruCache;
use hash::keccak;
use ethereum_types::{H256, H520};
use rlp::{Rlp, RlpStream};
Expand Down Expand Up @@ -55,6 +56,8 @@ const REQUEST_BACKOFF: [Duration; 4] = [

const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60);

const OBSERVED_NODES_MAX_SIZE: usize = 10000;
kirushik marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Clone, Debug)]
pub struct NodeEntry {
pub id: NodeId,
Expand Down Expand Up @@ -95,7 +98,21 @@ struct FindNodeRequest {
#[derive(Clone, Copy)]
enum PingReason {
Default,
FromDiscoveryRequest(NodeId)
FromDiscoveryRequest(NodeId, NodeValidity)
kirushik marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Clone, Copy, PartialEq)]
enum NodeCategory {
Bucket,
Observed
}

#[derive(Clone, Copy, PartialEq)]
enum NodeValidity {
Ourselves,
ValidNode(NodeCategory),
ExpiredNode(NodeCategory),
UnknownNode
}

struct PingRequest {
Expand Down Expand Up @@ -145,6 +162,8 @@ pub struct Discovery<'a> {
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,
other_observed_nodes: LruCache<NodeId, (NodeEndpoint, Instant)>, // Sometimes we don't want to add nodes to the NodeTable, but still want to
kirushik marked this conversation as resolved.
Show resolved Hide resolved
// keep track of them to avoid excessive pinging (happens when an unknown node sends a discovery request to us -- the node might be on a different net).
in_flight_pings: HashMap<NodeId, PingRequest>,
in_flight_find_nodes: HashMap<NodeId, FindNodeRequest>,
send_queue: VecDeque<Datagram>,
Expand All @@ -171,6 +190,7 @@ impl<'a> Discovery<'a> {
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
other_observed_nodes: LruCache::new(OBSERVED_NODES_MAX_SIZE),
in_flight_pings: HashMap::new(),
in_flight_find_nodes: HashMap::new(),
send_queue: VecDeque::new(),
Expand Down Expand Up @@ -541,10 +561,22 @@ impl<'a> Discovery<'a> {
};

if let Some((node, ping_reason)) = expected_node {
if let PingReason::FromDiscoveryRequest(target) = ping_reason {
if let PingReason::FromDiscoveryRequest(target, validity) = ping_reason {
self.respond_with_discovery(target, &node)?;
// kirushik: I would prefer to probe the network id of the remote node here, and add it to the nodes list if it's on "our" net --
// but `on_packet` happens synchronously, so doing the full TCP handshake ceremony here is a bad idea.
// So instead we just LRU-caching most recently seen nodes to avoid unnecessary pinging
match validity {
NodeValidity::UnknownNode | NodeValidity::ExpiredNode(NodeCategory::Observed) => {
kirushik marked this conversation as resolved.
Show resolved Hide resolved
self.other_observed_nodes.insert(node.id, (node.endpoint, Instant::now()));
Ok(None)
},
NodeValidity::ExpiredNode(NodeCategory::Bucket) => Ok(self.update_node(node)),
kirushik marked this conversation as resolved.
Show resolved Hide resolved
_ => Ok(None)
}
} else {
Ok(self.update_node(node))
}
Ok(self.update_node(node))
} else {
debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from);
Ok(None)
Expand All @@ -565,31 +597,40 @@ impl<'a> Discovery<'a> {
}
};

if self.is_a_valid_known_node(&node) {
self.respond_with_discovery(target, &node)?;
} else {
match self.check_validity(&node) {
NodeValidity::ValidNode(_) => self.respond_with_discovery(target, &node)?,
// Make sure the request source is actually there and responds to pings before actually responding
self.try_ping(node, PingReason::FromDiscoveryRequest(target));
invalidity_reason => self.try_ping(node, PingReason::FromDiscoveryRequest(target, invalidity_reason))
kirushik marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(None)
}

fn is_a_valid_known_node(&self, node: &NodeEntry) -> bool {
fn check_validity(&mut self, node: &NodeEntry) -> NodeValidity {
let id_hash = keccak(node.id);
let dist = match Discovery::distance(&self.id_hash, &id_hash) {
Some(dist) => dist,
kirushik marked this conversation as resolved.
Show resolved Hide resolved
None => {
debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node);
return false;
return NodeValidity::Ourselves;
}
};

let bucket = &self.node_buckets[dist];
if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) {
debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node);
(known_node.address.endpoint == node.endpoint) && (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)
match ((known_node.address.endpoint == node.endpoint), (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)) {
(true, true) => NodeValidity::ValidNode(NodeCategory::Bucket),
(true, false) => NodeValidity::ExpiredNode(NodeCategory::Bucket),
_ => NodeValidity::UnknownNode
}
} else {
false
self.other_observed_nodes.get_mut(&node.id).map_or(NodeValidity::UnknownNode, |(endpoint, observed_at)| {
match ((node.endpoint==*endpoint), (observed_at.elapsed() < NODE_LAST_SEEN_TIMEOUT)) {
(true, true) => NodeValidity::ValidNode(NodeCategory::Observed),
(true, false) => NodeValidity::ExpiredNode(NodeCategory::Observed),
_ => NodeValidity::UnknownNode
}
})
}
}

Expand Down
1 change: 1 addition & 0 deletions util/network-devp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ extern crate keccak_hash as hash;
extern crate serde;
extern crate serde_json;
extern crate parity_snappy as snappy;
extern crate lru_cache;

#[macro_use]
extern crate error_chain;
Expand Down