Skip to content

Commit 2b89ae9

Browse files
feat(kad): report get_providers call event based
1 parent 97fd72a commit 2b89ae9

File tree

10 files changed

+223
-130
lines changed

10 files changed

+223
-130
lines changed

examples/distributed-key-value-store.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use libp2p::{
5353
swarm::{NetworkBehaviourEventProcess, SwarmEvent},
5454
NetworkBehaviour, PeerId, Swarm,
5555
};
56+
use libp2p_kad::{GetProvidersProgress, QueryProgress};
5657
use std::error::Error;
5758

5859
#[async_std::main]
@@ -89,16 +90,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
8990
// Called when `kademlia` produces an event.
9091
fn inject_event(&mut self, message: KademliaEvent) {
9192
match message {
92-
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
93-
QueryResult::GetProviders(Ok(ok)) => {
94-
for peer in ok.providers {
95-
println!(
96-
"Peer {:?} provides key {:?}",
97-
peer,
98-
std::str::from_utf8(ok.key.as_ref()).unwrap()
99-
);
100-
}
93+
KademliaEvent::OutboundQueryProgressed { result, .. } => match result {
94+
QueryProgress::GetProviders(GetProvidersProgress { key, provider, .. }) => {
95+
println!(
96+
"Peer {:?} provides key {:?}",
97+
provider,
98+
std::str::from_utf8(key.as_ref()).unwrap()
99+
);
101100
}
101+
},
102+
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
102103
QueryResult::GetProviders(Err(err)) => {
103104
eprintln!("Failed to get providers: {:?}", err);
104105
}

examples/file-sharing.rs

+22-8
Original file line numberDiff line numberDiff line change
@@ -211,14 +211,15 @@ mod network {
211211
use libp2p::identity;
212212
use libp2p::identity::ed25519;
213213
use libp2p::kad::record::store::MemoryStore;
214-
use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult};
214+
use libp2p::kad::{Kademlia, KademliaEvent, QueryId, QueryResult};
215215
use libp2p::multiaddr::Protocol;
216216
use libp2p::request_response::{
217217
ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent,
218218
RequestResponseMessage, ResponseChannel,
219219
};
220220
use libp2p::swarm::{ConnectionHandlerUpgrErr, SwarmBuilder, SwarmEvent};
221221
use libp2p::{NetworkBehaviour, Swarm};
222+
use libp2p_kad::{GetProvidersProgress, QueryProgress};
222223
use std::collections::{HashMap, HashSet};
223224
use std::iter;
224225

@@ -324,12 +325,12 @@ mod network {
324325

325326
/// Find the providers for the given file on the DHT.
326327
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
327-
let (sender, receiver) = oneshot::channel();
328+
let (sender, receiver) = mpsc::channel(0);
328329
self.sender
329330
.send(Command::GetProviders { file_name, sender })
330331
.await
331332
.expect("Command receiver not to be dropped.");
332-
receiver.await.expect("Sender not to be dropped.")
333+
receiver.collect().await
333334
}
334335

335336
/// Request the content of the given file from the given peer.
@@ -365,7 +366,7 @@ mod network {
365366
event_sender: mpsc::Sender<Event>,
366367
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
367368
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
368-
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
369+
pending_get_providers: HashMap<QueryId, mpsc::Sender<PeerId>>,
369370
pending_request_file:
370371
HashMap<RequestId, oneshot::Sender<Result<String, Box<dyn Error + Send>>>>,
371372
}
@@ -421,18 +422,31 @@ mod network {
421422
.expect("Completed query to be previously pending.");
422423
let _ = sender.send(());
423424
}
425+
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
426+
KademliaEvent::OutboundQueryProgressed {
427+
id,
428+
result: QueryProgress::GetProviders(GetProvidersProgress { provider, .. }),
429+
..
430+
},
431+
)) => {
432+
let _ = self
433+
.pending_get_providers
434+
.get_mut(&id)
435+
.expect("Completed query to be previously pending.")
436+
.send(provider);
437+
}
424438
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
425439
KademliaEvent::OutboundQueryCompleted {
426440
id,
427-
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
441+
result: QueryResult::GetProviders(..),
428442
..
429443
},
430444
)) => {
445+
// Drop channel to signal query is complete.
431446
let _ = self
432447
.pending_get_providers
433448
.remove(&id)
434-
.expect("Completed query to be previously pending.")
435-
.send(providers);
449+
.expect("Completed query to be previously pending.");
436450
}
437451
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
438452
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
@@ -620,7 +634,7 @@ mod network {
620634
},
621635
GetProviders {
622636
file_name: String,
623-
sender: oneshot::Sender<HashSet<PeerId>>,
637+
sender: mpsc::Sender<PeerId>,
624638
},
625639
RequestFile {
626640
file_name: String,

misc/metrics/src/kad.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
208208
}
209209
},
210210
libp2p_kad::QueryResult::GetProviders(result) => match result {
211-
Ok(ok) => self
212-
.kad
213-
.query_result_get_providers_ok
214-
.observe(ok.providers.len() as f64),
211+
Ok(_) => {}
215212
Err(error) => {
216213
self.kad
217214
.query_result_get_providers_error
@@ -222,6 +219,11 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
222219
_ => {}
223220
}
224221
}
222+
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, .. } => match result {
223+
libp2p_kad::QueryProgress::GetProviders(_) => {
224+
self.kad.query_result_get_providers_ok.observe(1.);
225+
}
226+
},
225227
libp2p_kad::KademliaEvent::RoutingUpdated {
226228
is_new_peer,
227229
old_peer,

0 commit comments

Comments
 (0)