Skip to content

Commit

Permalink
Add parity-kademlia
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka committed Feb 28, 2019
1 parent 56a027c commit 6b5321b
Show file tree
Hide file tree
Showing 18 changed files with 2,294 additions and 174 deletions.
2 changes: 1 addition & 1 deletion muxers/yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ futures = "0.1"
libp2p-core = { version = "0.4.0", path = "../../core" }
log = "0.4"
tokio-io = "0.1"
yamux = "0.1.1"
yamux = "0.1"
11 changes: 9 additions & 2 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@ categories = ["network-programming", "asynchronous"]
arrayvec = "0.4.7"
bs58 = "0.2.0"
bigint = "4.2"
byteorder = "1.3.1"
bytes = "0.4"
fnv = "1.0"
futures = "0.1"
libp2p-identify = { version = "0.4.0", path = "../../protocols/identify" }
libp2p-ping = { version = "0.4.0", path = "../../protocols/ping" }
lazy_static = "1.2"
libp2p-core = { version = "0.4.0", path = "../../core" }
log = "0.4"
multiaddr = { package = "parity-multiaddr", version = "0.2.0", path = "../../misc/multiaddr" }
multihash = { package = "parity-multihash", version = "0.1.0", path = "../../misc/multihash" }
parking_lot = "0.7"
protobuf = "2.3"
rand = "0.6.0"
schnorrkel = "0.0.0"
serde = "1.0"
serde_cbor = "0.9"
serde_derive = "1.0"
smallvec = "0.6"
tokio-codec = "0.1"
tokio-io = "0.1"
Expand All @@ -33,5 +37,8 @@ unsigned-varint = { version = "0.2.1", features = ["codec"] }
void = "1.0"

[dev-dependencies]
libp2p-mplex = { version = "0.4.0", path = "../../muxers/mplex" }
libp2p-secio = { version = "0.4.0", path = "../secio" }
libp2p-tcp = { version = "0.4.0", path = "../../transports/tcp" }
libp2p-yamux = { version = "0.4.0", path = "../../muxers/yamux" }
tokio = "0.1"
25 changes: 24 additions & 1 deletion protocols/kad/src/addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ impl Addresses {
})
}

/// Returns true if the list of addresses is empty, or contains only expired addresses.
pub fn is_empty(&self) -> bool {
if self.addrs.is_empty() {
return true;
}

self.iter().next().is_some()
}

/// If true, we are connected to all the addresses returned by `iter()`.
///
/// Returns false if the list of addresses is empty.
Expand Down Expand Up @@ -93,11 +102,25 @@ impl Addresses {
self.addrs.remove(pos);
}

/// Marks all the addresses as disconnected.
pub fn set_all_disconnected(&mut self) {
for addr in self.addrs.iter_mut() {
if addr.1.is_none() {
addr.1 = Some(Instant::now() + self.expiration);
}
}
}

/// Removes the given address from the list. Typically called if an address is determined to
/// be invalid or unreachable.
pub fn remove_addr(&mut self, addr: &Multiaddr) {
///
/// Returns `true` if an address has been effectively removed.
pub fn remove_addr(&mut self, addr: &Multiaddr) -> bool {
if let Some(pos) = self.addrs.iter().position(|(a, _)| a == addr) {
self.addrs.remove(pos);
true
} else {
false
}
}

Expand Down
95 changes: 74 additions & 21 deletions protocols/kad/src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct KBucketsTable<TPeerId, TVal> {
#[derive(Debug, Clone)]
struct KBucket<TPeerId, TVal> {
/// Nodes are always ordered from oldest to newest. The nodes we are connected to are always
/// all on top of the nodes we are not connected to.
/// all on top (ie. have higher indices) of the nodes we are not connected to.
nodes: ArrayVec<[Node<TPeerId, TVal>; MAX_NODES_PER_BUCKET]>,

/// Index in `nodes` over which all nodes are connected. Must always be <= to the length
Expand Down Expand Up @@ -96,7 +96,10 @@ impl<TPeerId, TVal> KBucket<TPeerId, TVal> {
}

/// Trait that must be implemented on types that can be used as an identifier in a k-bucket.
pub trait KBucketsPeerId<TOther = Self>: PartialEq<TOther> + Clone {
///
/// If `TOther` is not the same as `Self`, it represents an entry already in the k-buckets that
/// `Self` can compare against.
pub trait KBucketsPeerId<TOther = Self>: PartialEq<TOther> {
/// Computes the XOR of this value and another one. The lower the closer.
fn distance_with(&self, other: &TOther) -> u32;

Expand All @@ -110,7 +113,7 @@ pub trait KBucketsPeerId<TOther = Self>: PartialEq<TOther> + Clone {
impl KBucketsPeerId for PeerId {
#[inline]
fn distance_with(&self, other: &Self) -> u32 {
Multihash::distance_with(self.as_ref(), other.as_ref())
<Multihash as KBucketsPeerId<Multihash>>::distance_with(self.as_ref(), other.as_ref())
}

#[inline]
Expand All @@ -119,15 +122,15 @@ impl KBucketsPeerId for PeerId {
}
}

impl KBucketsPeerId<Multihash> for PeerId {
impl KBucketsPeerId<PeerId> for Multihash {
#[inline]
fn distance_with(&self, other: &Multihash) -> u32 {
Multihash::distance_with(self.as_ref(), other)
fn distance_with(&self, other: &PeerId) -> u32 {
<Multihash as KBucketsPeerId<Multihash>>::distance_with(self, other.as_ref())
}

#[inline]
fn max_distance() -> usize {
<Multihash as KBucketsPeerId>::max_distance()
<PeerId as KBucketsPeerId>::max_distance()
}
}

Expand All @@ -148,9 +151,40 @@ impl KBucketsPeerId for Multihash {
}
}

impl<A, B> KBucketsPeerId for (A, B)
where
A: KBucketsPeerId + PartialEq,
B: KBucketsPeerId + PartialEq,
{
#[inline]
fn distance_with(&self, other: &(A, B)) -> u32 {
A::distance_with(&self.0, &other.0) + B::distance_with(&self.1, &other.1)
}

#[inline]
fn max_distance() -> usize {
<A as KBucketsPeerId<A>>::max_distance() + <B as KBucketsPeerId<B>>::max_distance()
}
}

impl<'a, T> KBucketsPeerId for &'a T
where
T: KBucketsPeerId,
{
#[inline]
fn distance_with(&self, other: &&'a T) -> u32 {
T::distance_with(*self, *other)
}

#[inline]
fn max_distance() -> usize {
<T as KBucketsPeerId>::max_distance()
}
}

impl<TPeerId, TVal> KBucketsTable<TPeerId, TVal>
where
TPeerId: KBucketsPeerId,
TPeerId: KBucketsPeerId + Clone,
{
/// Builds a new routing table.
pub fn new(my_id: TPeerId, unresponsive_timeout: Duration) -> Self {
Expand All @@ -168,9 +202,9 @@ where
}
}

// Returns the id of the bucket that should contain the peer with the given ID.
//
// Returns `None` if out of range, which happens if `id` is the same as the local peer id.
/// Returns the id of the bucket that should contain the peer with the given ID.
///
/// Returns `None` if out of range, which happens if `id` is the same as the local peer id.
#[inline]
fn bucket_num(&self, id: &TPeerId) -> Option<usize> {
(self.my_id.distance_with(id) as usize).checked_sub(1)
Expand Down Expand Up @@ -354,11 +388,32 @@ where
}
}

/// Removes an entry from the k-buckets. You shouldn't generally use this, .
pub fn remove(&mut self, id: &TPeerId) {
let table = match self.bucket_num(&id) {
Some(n) => &mut self.tables[n],
None => return,
};

if table.pending_node.as_ref().map(|n| &n.0.id) == Some(id) {
table.pending_node = None;
return;
}

let pos = match table.nodes.iter().position(|elem| elem.id == *id) {
Some(pos) => pos,
None => return,
};

if pos < table.first_connected_pos {
// `first_connected_pos` can't be 0, otherwise `pos` can't be inferior.
debug_assert_ne!(table.first_connected_pos, 0);
table.first_connected_pos -= 1;
}
}

/// Finds the `num` nodes closest to `id`, ordered by distance.
pub fn find_closest<TOther>(&mut self, id: &TOther) -> VecIntoIter<TPeerId>
where
TPeerId: Clone + KBucketsPeerId<TOther>,
{
pub fn find_closest(&mut self, id: &impl KBucketsPeerId<TPeerId>) -> VecIntoIter<TPeerId> {
// TODO: optimize
let mut out = Vec::new();
for table in self.tables.iter_mut() {
Expand All @@ -370,20 +425,18 @@ where
out.push(node.id.clone());
}
}
out.sort_by(|a, b| b.distance_with(id).cmp(&a.distance_with(id)));
out.sort_by(|a, b| id.distance_with(a).cmp(&id.distance_with(b)));
out.into_iter()
}

/// Same as `find_closest`, but includes the local peer as well.
pub fn find_closest_with_self<TOther>(&mut self, id: &TOther) -> VecIntoIter<TPeerId>
where
TPeerId: Clone + KBucketsPeerId<TOther>,
{
// TODO: this method shouldn't be necessary, as per the Kademlia algorithm?!
pub fn find_closest_with_self(&mut self, id: &impl KBucketsPeerId<TPeerId>) -> VecIntoIter<TPeerId> {
// TODO: optimize
let mut intermediate: Vec<_> = self.find_closest(id).collect();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(id) >= self.my_id.distance_with(id))
.position(|e| id.distance_with(e) >= id.distance_with(&self.my_id))
{
if intermediate[pos] != self.my_id {
intermediate.insert(pos, self.my_id.clone());
Expand Down
46 changes: 40 additions & 6 deletions protocols/kad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,49 @@
// `KademliaSystem`.
//

pub use self::behaviour::{Kademlia, KademliaOut};
pub use self::kbucket::KBucketsPeerId;
pub use self::protocol::KadConnectionType;
pub use self::libp2p::{Kademlia, KademliaOut};
pub use self::libp2p::protocol::KadConnectionType;

pub mod handler;
pub mod kbucket;
pub mod protocol;
pub mod parity;

mod addresses;
mod behaviour;
mod protobuf_structs;
mod libp2p;
mod query;

use libp2p_core::PeerId;
use std::cmp::Ordering;

/// Generates a random `PeerId` that belongs to the given bucket.
///
/// Returns an error if `bucket_num` is out of range.
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
let my_id_len = my_id.as_bytes().len();

// TODO: this 2 is magic here; it is the length of the hash of the multihash
let bits_diff = bucket_num + 1;
if bits_diff > 8 * (my_id_len - 2) {
return Err(());
}

let mut random_id = [0; 64];
for byte in 0..my_id_len {
match byte.cmp(&(my_id_len - bits_diff / 8 - 1)) {
Ordering::Less => {
random_id[byte] = my_id.as_bytes()[byte];
}
Ordering::Equal => {
let mask: u8 = (1 << (bits_diff % 8)) - 1;
random_id[byte] = (my_id.as_bytes()[byte] & !mask) | (rand::random::<u8>() & mask);
}
Ordering::Greater => {
random_id[byte] = rand::random();
}
}
}

let peer_id = PeerId::from_bytes(random_id[..my_id_len].to_owned())
.expect("randomly-generated peer ID should always be valid");
Ok(peer_id)
}
27 changes: 27 additions & 0 deletions protocols/kad/src/libp2p.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod behaviour;
mod protobuf_structs;

pub use self::behaviour::{Kademlia, KademliaOut};

pub mod handler;
pub mod protocol;
Loading

0 comments on commit 6b5321b

Please sign in to comment.