Skip to content

Commit 013eeaf

Browse files
feat(kad): report get_providers call event based
1 parent 832eb63 commit 013eeaf

File tree

10 files changed

+240
-162
lines changed

10 files changed

+240
-162
lines changed

examples/distributed-key-value-store.rs

+12-8
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,20 @@ async fn main() -> Result<(), Box<dyn Error>> {
8989
// Called when `kademlia` produces an event.
9090
fn inject_event(&mut self, message: KademliaEvent) {
9191
match message {
92-
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
92+
KademliaEvent::OutboundQueryProgressed { result, .. } => match result {
9393
QueryResult::GetProviders(Ok(ok)) => {
94-
for peer in ok.providers {
95-
println!(
96-
"Peer {:?} provides key {:?}",
97-
peer,
98-
std::str::from_utf8(ok.key.as_ref()).unwrap()
99-
);
100-
}
94+
println!(
95+
"Peer {:?} provides key {:?}",
96+
ok.provider,
97+
std::str::from_utf8(ok.key.as_ref()).unwrap()
98+
);
10199
}
100+
_ => {}
101+
},
102+
KademliaEvent::OutboundQueryCompleted {
103+
result: Some(result),
104+
..
105+
} => match result {
102106
QueryResult::GetProviders(Err(err)) => {
103107
eprintln!("Failed to get providers: {:?}", err);
104108
}

examples/file-sharing.rs

+20-9
Original file line numberDiff line numberDiff line change
@@ -324,12 +324,12 @@ mod network {
324324

325325
/// Find the providers for the given file on the DHT.
326326
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
327-
let (sender, receiver) = oneshot::channel();
327+
let (sender, receiver) = mpsc::channel(0);
328328
self.sender
329329
.send(Command::GetProviders { file_name, sender })
330330
.await
331331
.expect("Command receiver not to be dropped.");
332-
receiver.await.expect("Sender not to be dropped.")
332+
receiver.collect().await
333333
}
334334

335335
/// Request the content of the given file from the given peer.
@@ -365,7 +365,7 @@ mod network {
365365
event_sender: mpsc::Sender<Event>,
366366
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
367367
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
368-
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
368+
pending_get_providers: HashMap<QueryId, mpsc::Sender<PeerId>>,
369369
pending_request_file:
370370
HashMap<RequestId, oneshot::Sender<Result<String, Box<dyn Error + Send>>>>,
371371
}
@@ -411,7 +411,7 @@ mod network {
411411
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
412412
KademliaEvent::OutboundQueryCompleted {
413413
id,
414-
result: QueryResult::StartProviding(_),
414+
result: Some(QueryResult::StartProviding(_)),
415415
..
416416
},
417417
)) => {
@@ -422,17 +422,28 @@ mod network {
422422
let _ = sender.send(());
423423
}
424424
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
425-
KademliaEvent::OutboundQueryCompleted {
425+
KademliaEvent::OutboundQueryProgressed {
426426
id,
427-
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
427+
result: QueryResult::GetProviders(Ok(GetProvidersOk { provider, .. })),
428428
..
429429
},
430430
)) => {
431431
let _ = self
432432
.pending_get_providers
433-
.remove(&id)
433+
.get_mut(&id)
434434
.expect("Completed query to be previously pending.")
435-
.send(providers);
435+
.send(provider);
436+
}
437+
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
438+
KademliaEvent::OutboundQueryCompleted {
439+
id, result: None, ..
440+
},
441+
)) => {
442+
// Drop channel to signal query is complete.
443+
let _ = self
444+
.pending_get_providers
445+
.remove(&id)
446+
.expect("Completed query to be previously pending.");
436447
}
437448
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
438449
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
@@ -620,7 +631,7 @@ mod network {
620631
},
621632
GetProviders {
622633
file_name: String,
623-
sender: oneshot::Sender<HashSet<PeerId>>,
634+
sender: mpsc::Sender<PeerId>,
624635
},
625636
RequestFile {
626637
file_name: String,

examples/ipfs-kad.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
8686
loop {
8787
let event = swarm.select_next_some().await;
8888
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
89-
result: QueryResult::GetClosestPeers(result),
89+
result: Some(QueryResult::GetClosestPeers(result)),
9090
..
9191
}) = event
9292
{

misc/metrics/src/kad.rs

+17-13
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,11 @@ impl Metrics {
162162
impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
163163
fn record(&self, event: &libp2p_kad::KademliaEvent) {
164164
match event {
165-
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
165+
libp2p_kad::KademliaEvent::OutboundQueryCompleted {
166+
result: Some(result),
167+
stats,
168+
..
169+
} => {
166170
self.kad
167171
.query_result_num_requests
168172
.get_or_create(&result.into())
@@ -207,21 +211,21 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
207211
.inc();
208212
}
209213
},
210-
libp2p_kad::QueryResult::GetProviders(result) => match result {
211-
Ok(ok) => self
212-
.kad
213-
.query_result_get_providers_ok
214-
.observe(ok.providers.len() as f64),
215-
Err(error) => {
216-
self.kad
217-
.query_result_get_providers_error
218-
.get_or_create(&error.into())
219-
.inc();
220-
}
221-
},
222214
_ => {}
223215
}
224216
}
217+
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, .. } => match result {
218+
libp2p_kad::QueryResult::GetProviders(result) => match result {
219+
Ok(_ok) => self.kad.query_result_get_providers_ok.observe(1.),
220+
Err(error) => {
221+
self.kad
222+
.query_result_get_providers_error
223+
.get_or_create(&error.into())
224+
.inc();
225+
}
226+
},
227+
_ => {}
228+
},
225229
libp2p_kad::KademliaEvent::RoutingUpdated {
226230
is_new_peer,
227231
old_peer,

0 commit comments

Comments
 (0)