Skip to content

Commit

Permalink
kad: Providers part 8: unit, e2e, and libp2p conformance tests (#258)
Browse files Browse the repository at this point in the history
This PR concludes the series of content providers PRs with introduction
of the tests:
1. Unit tests in `Kademlia`
2. E2E tests with `litep2p` connecting to `litep2p`.
3. Conformance tests with `itep2p` connecting to `libp2p` and vice
versa.
  • Loading branch information
dmitry-markin authored Sep 30, 2024
1 parent 1a0325c commit d765111
Show file tree
Hide file tree
Showing 5 changed files with 901 additions and 10 deletions.
6 changes: 6 additions & 0 deletions src/protocol/libp2p/kademlia/futures_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ impl<F> FuturesStream<F> {
}
}

/// Number of futeres in the stream.
#[cfg(test)]
pub fn len(&self) -> usize {
self.futures.len()
}

/// Push a future for processing.
pub fn push(&mut self, future: F) {
self.futures.push(future);
Expand Down
245 changes: 241 additions & 4 deletions src/protocol/libp2p/kademlia/query/get_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ pub struct GetProvidersContext {

impl GetProvidersContext {
/// Create new [`GetProvidersContext`].
pub fn new(config: GetProvidersConfig, in_peers: VecDeque<KademliaPeer>) -> Self {
pub fn new(config: GetProvidersConfig, candidate_peers: VecDeque<KademliaPeer>) -> Self {
let mut candidates = BTreeMap::new();

for candidate in &in_peers {
let distance = config.target.distance(&candidate.key);
candidates.insert(distance, candidate.clone());
for peer in &candidate_peers {
let distance = config.target.distance(&peer.key);
candidates.insert(distance, peer.clone());
}

let kad_message =
Expand Down Expand Up @@ -273,3 +273,240 @@ impl GetProvidersContext {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::libp2p::kademlia::types::ConnectionType;
use multiaddr::multiaddr;

fn default_config() -> GetProvidersConfig {
GetProvidersConfig {
local_peer_id: PeerId::random(),
parallelism_factor: 3,
query: QueryId(0),
target: Key::new(vec![1, 2, 3].into()),
known_providers: vec![],
}
}

fn peer_to_kad(peer: PeerId) -> KademliaPeer {
KademliaPeer {
peer,
key: Key::from(peer),
addresses: vec![],
connection: ConnectionType::NotConnected,
}
}

fn peer_to_kad_with_addresses(peer: PeerId, addresses: Vec<Multiaddr>) -> KademliaPeer {
KademliaPeer {
peer,
key: Key::from(peer),
addresses,
connection: ConnectionType::NotConnected,
}
}

#[test]
fn completes_when_no_candidates() {
let config = default_config();

let mut context = GetProvidersContext::new(config, VecDeque::new());
assert!(context.is_done());

let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
}

#[test]
fn fulfill_parallelism() {
let config = GetProvidersConfig {
parallelism_factor: 3,
..default_config()
};

let candidate_peer_set: HashSet<_> =
[PeerId::random(), PeerId::random(), PeerId::random()].into_iter().collect();
assert_eq!(candidate_peer_set.len(), 3);

let candidate_peers = candidate_peer_set.iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetProvidersContext::new(config, candidate_peers);

for num in 0..3 {
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), num + 1);
assert!(context.pending.contains_key(&peer));

// Check the peer is the one provided.
assert!(candidate_peer_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}
}

// Fulfilled parallelism.
assert!(context.next_action().is_none());
}

#[test]
fn completes_when_responses() {
let config = GetProvidersConfig {
parallelism_factor: 3,
..default_config()
};

let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();

let candidate_peer_set: HashSet<_> = [peer_a, peer_b, peer_c].into_iter().collect();
assert_eq!(candidate_peer_set.len(), 3);

let candidate_peers =
[peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetProvidersContext::new(config, candidate_peers);

let [provider1, provider2, provider3, provider4] = (0..4)
.map(|_| ContentProvider {
peer: PeerId::random(),
addresses: vec![],
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

// Schedule peer queries.
for num in 0..3 {
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), num + 1);
assert!(context.pending.contains_key(&peer));

// Check the peer is the one provided.
assert!(candidate_peer_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}
}

// Checks a failed query that was not initiated.
let peer_d = PeerId::random();
context.register_response_failure(peer_d);
assert_eq!(context.pending.len(), 3);
assert!(context.queried.is_empty());

// Provide responses back.
let providers = vec![provider1.clone().into(), provider2.clone().into()];
context.register_response(peer_a, providers, vec![]);
assert_eq!(context.pending.len(), 2);
assert_eq!(context.queried.len(), 1);
assert_eq!(context.found_providers.len(), 2);

// Provide different response from peer b with peer d as candidate.
let providers = vec![provider2.clone().into(), provider3.clone().into()];
let candidates = vec![peer_to_kad(peer_d.clone())];
context.register_response(peer_b, providers, candidates);
assert_eq!(context.pending.len(), 1);
assert_eq!(context.queried.len(), 2);
assert_eq!(context.found_providers.len(), 4);
assert_eq!(context.candidates.len(), 1);

// Peer C fails.
context.register_response_failure(peer_c);
assert!(context.pending.is_empty());
assert_eq!(context.queried.len(), 3);
assert_eq!(context.found_providers.len(), 4);

// Drain the last candidate.
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), 1);
assert_eq!(peer, peer_d);
}
_ => panic!("Unexpected event"),
}

// Peer D responds.
let providers = vec![provider4.clone().into()];
context.register_response(peer_d, providers, vec![]);

// Produces the result.
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });

// Check results.
let found_providers = context.found_providers();
assert_eq!(found_providers.len(), 4);
assert!(found_providers.contains(&provider1));
assert!(found_providers.contains(&provider2));
assert!(found_providers.contains(&provider3));
assert!(found_providers.contains(&provider4));
}

#[test]
fn providers_sorted_by_distance() {
let target = Key::new(vec![1, 2, 3].into());

let mut peers = (0..10).map(|_| PeerId::random()).collect::<Vec<_>>();
let providers = peers.iter().map(|peer| peer_to_kad(peer.clone())).collect::<Vec<_>>();

let found_providers =
GetProvidersContext::merge_and_sort_providers(providers, target.clone());

peers.sort_by(|p1, p2| {
Key::from(*p1).distance(&target).cmp(&Key::from(*p2).distance(&target))
});

assert!(
std::iter::zip(found_providers.into_iter(), peers.into_iter())
.all(|(provider, peer)| provider.peer == peer)
);
}

#[test]
fn provider_addresses_merged() {
let peer = PeerId::random();

let address1 = multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10000u16));
let address2 = multiaddr!(Ip4([192, 168, 0, 1]), Tcp(10000u16));
let address3 = multiaddr!(Ip4([10, 0, 0, 1]), Tcp(10000u16));
let address4 = multiaddr!(Ip4([1, 1, 1, 1]), Tcp(10000u16));
let address5 = multiaddr!(Ip4([8, 8, 8, 8]), Tcp(10000u16));

let provider1 = peer_to_kad_with_addresses(peer.clone(), vec![address1.clone()]);
let provider2 = peer_to_kad_with_addresses(
peer.clone(),
vec![address2.clone(), address3.clone(), address4.clone()],
);
let provider3 =
peer_to_kad_with_addresses(peer.clone(), vec![address4.clone(), address5.clone()]);

let providers = vec![provider1, provider2, provider3];

let found_providers = GetProvidersContext::merge_and_sort_providers(
providers,
Key::new(vec![1, 2, 3].into()),
);

assert_eq!(found_providers.len(), 1);

let addresses = &found_providers.get(0).unwrap().addresses;
assert_eq!(addresses.len(), 5);
assert!(addresses.contains(&address1));
assert!(addresses.contains(&address2));
assert!(addresses.contains(&address3));
assert!(addresses.contains(&address4));
assert!(addresses.contains(&address5));
}
}
Loading

0 comments on commit d765111

Please sign in to comment.