Skip to content

Commit

Permalink
[libp2p-kad] Scope pending RPCs to queries. (#1217)
Browse files Browse the repository at this point in the history
* Remove pending RPCs on query completion.

Ensure that any still pending RPCs related to a query are removed
once the query terminates (successfully or through timeout) by
scoping pending RPCs to the lifetime of a query.

* Cleanup.
  • Loading branch information
romanb authored and tomaka committed Aug 7, 2019
1 parent bcfb647 commit 5696b3e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
31 changes: 17 additions & 14 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
mod test;

use crate::K_VALUE;
use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::jobs::*;
Expand Down Expand Up @@ -58,10 +59,6 @@ pub struct Kademlia<TSubstream, TStore> {
/// This is a superset of the connected peers currently in the routing table.
connected_peers: FnvHashSet<PeerId>,

/// A list of pending request to peers that are not currently connected.
/// These requests are sent as soon as a connection to the peer is established.
pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); 8]>,

/// Periodic job for re-publication of provider records for keys
/// provided by the local node.
add_provider_job: Option<AddProviderJob>,
Expand Down Expand Up @@ -233,7 +230,6 @@ where
/// Creates a new `Kademlia` network behaviour with the given configuration.
pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self {
let local_key = kbucket::Key::new(id.clone());
let pending_rpcs = SmallVec::with_capacity(config.query_config.replication_factor.get());

let put_record_job = config
.record_replication_interval
Expand All @@ -256,7 +252,6 @@ where
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
queries: QueryPool::new(config.query_config),
connected_peers: Default::default(),
pending_rpcs,
add_provider_job,
put_record_job,
record_ttl: config.record_ttl,
Expand Down Expand Up @@ -1054,12 +1049,14 @@ where
}

fn inject_connected(&mut self, peer: PeerId, endpoint: ConnectedPoint) {
while let Some(pos) = self.pending_rpcs.iter().position(|(p, _)| p == &peer) {
let (_, rpc) = self.pending_rpcs.remove(pos);
self.queued_events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: peer.clone(),
event: rpc,
});
// Queue events for sending pending RPCs to the connected peer.
// There can be only one pending RPC for a particular peer and query per definition.
for (peer_id, event) in self.queries.iter_mut().filter_map(|q|
q.inner.pending_rpcs.iter()
.position(|(p, _)| p == &peer)
.map(|p| q.inner.pending_rpcs.remove(p)))
{
self.queued_events.push_back(NetworkBehaviourAction::SendEvent { peer_id, event });
}

// The remote's address can only be put into the routing table,
Expand Down Expand Up @@ -1396,7 +1393,7 @@ where
peer_id, event
});
} else if &peer_id != self.kbuckets.local_key().preimage() {
self.pending_rpcs.push((peer_id.clone(), event));
query.inner.pending_rpcs.push((peer_id.clone(), event));
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id
});
Expand Down Expand Up @@ -1741,13 +1738,19 @@ struct QueryInner {
info: QueryInfo,
/// Addresses of peers discovered during a query.
addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
/// A map of pending requests to peers.
///
/// A request is pending if the targeted peer is not currently connected
/// and these requests are sent as soon as a connection to the peer is established.
pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); K_VALUE.get()]>
}

impl QueryInner {
fn new(info: QueryInfo) -> Self {
QueryInner {
info,
addresses: Default::default()
addresses: Default::default(),
pending_rpcs: SmallVec::default()
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ fn query_iter() {
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(ok.key, search_target);
assert_eq!(swarm_ids[i], expected_swarm_id);
assert_eq!(swarm.queries.size(), 0);
assert!(expected_peer_ids.iter().all(|p| ok.peers.contains(p)));
let key = kbucket::Key::new(ok.key);
assert_eq!(expected_distances, distances(&key, ok.peers));
Expand Down Expand Up @@ -420,6 +421,7 @@ fn put_record() {

if republished {
assert_eq!(swarms[0].store.records().count(), records.len());
assert_eq!(swarms[0].queries.size(), 0);
for k in records.keys() {
swarms[0].store.remove(&k);
}
Expand Down

0 comments on commit 5696b3e

Please sign in to comment.