Skip to content

Commit 2fb1ee8

Browse files
feat(kad): report get_providers call event based
1 parent 292b168 commit 2fb1ee8

File tree

11 files changed

+421
-275
lines changed

11 files changed

+421
-275
lines changed

examples/distributed-key-value-store.rs

+122-121
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use libp2p::{
5353
swarm::SwarmEvent,
5454
NetworkBehaviour, PeerId, Swarm,
5555
};
56+
use libp2p_kad::GetProvidersOk;
5657
use std::error::Error;
5758

5859
#[async_std::main]
@@ -110,147 +111,147 @@ async fn main() -> Result<(), Box<dyn Error>> {
110111
// Kick it off.
111112
loop {
112113
select! {
113-
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
114-
event = swarm.select_next_some() => match event {
115-
SwarmEvent::NewListenAddr { address, .. } => {
116-
println!("Listening in {:?}", address);
117-
},
118-
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {
119-
for (peer_id, multiaddr) in list {
120-
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
114+
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
115+
event = swarm.select_next_some() => match event {
116+
SwarmEvent::NewListenAddr { address, .. } => {
117+
println!("Listening in {:?}", address);
118+
},
119+
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {
120+
for (peer_id, multiaddr) in list {
121+
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
122+
}
121123
}
122-
}
123-
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {
124-
match result {
125-
QueryResult::GetProviders(Ok(ok)) => {
126-
for peer in ok.providers {
124+
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
125+
match result {
126+
QueryResult::GetProviders(Ok(GetProvidersOk { key, providers, .. })) => {
127+
for peer in providers {
128+
println!(
129+
"Peer {:?} provides key {:?}",
130+
peer,
131+
std::str::from_utf8(key.as_ref()).unwrap()
132+
);
133+
}
134+
}
135+
QueryResult::GetProviders(Err(err)) => {
136+
eprintln!("Failed to get providers: {:?}", err);
137+
}
138+
QueryResult::GetRecord(Ok(ok)) => {
139+
for PeerRecord {
140+
record: Record { key, value, .. },
141+
..
142+
} in ok.records
143+
{
144+
println!(
145+
"Got record {:?} {:?}",
146+
std::str::from_utf8(key.as_ref()).unwrap(),
147+
std::str::from_utf8(&value).unwrap(),
148+
);
149+
}
150+
}
151+
QueryResult::GetRecord(Err(err)) => {
152+
eprintln!("Failed to get record: {:?}", err);
153+
}
154+
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
127155
println!(
128-
"Peer {:?} provides key {:?}",
129-
peer,
130-
std::str::from_utf8(ok.key.as_ref()).unwrap()
156+
"Successfully put record {:?}",
157+
std::str::from_utf8(key.as_ref()).unwrap()
131158
);
132159
}
133-
}
134-
QueryResult::GetProviders(Err(err)) => {
135-
eprintln!("Failed to get providers: {:?}", err);
136-
}
137-
QueryResult::GetRecord(Ok(ok)) => {
138-
for PeerRecord {
139-
record: Record { key, value, .. },
140-
..
141-
} in ok.records
142-
{
160+
QueryResult::PutRecord(Err(err)) => {
161+
eprintln!("Failed to put record: {:?}", err);
162+
}
163+
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
143164
println!(
144-
"Got record {:?} {:?}",
145-
std::str::from_utf8(key.as_ref()).unwrap(),
146-
std::str::from_utf8(&value).unwrap(),
165+
"Successfully put provider record {:?}",
166+
std::str::from_utf8(key.as_ref()).unwrap()
147167
);
148168
}
169+
QueryResult::StartProviding(Err(err)) => {
170+
eprintln!("Failed to put provider record: {:?}", err);
171+
}
172+
_ => {}
149173
}
150-
QueryResult::GetRecord(Err(err)) => {
151-
eprintln!("Failed to get record: {:?}", err);
152-
}
153-
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
154-
println!(
155-
"Successfully put record {:?}",
156-
std::str::from_utf8(key.as_ref()).unwrap()
157-
);
158-
}
159-
QueryResult::PutRecord(Err(err)) => {
160-
eprintln!("Failed to put record: {:?}", err);
161-
}
162-
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
163-
println!(
164-
"Successfully put provider record {:?}",
165-
std::str::from_utf8(key.as_ref()).unwrap()
166-
);
167-
}
168-
QueryResult::StartProviding(Err(err)) => {
169-
eprintln!("Failed to put provider record: {:?}", err);
170-
}
171-
_ => {}
172174
}
175+
_ => {}
173176
}
174-
_ => {}
175-
}
176177
}
177178
}
178-
}
179179

180-
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
181-
let mut args = line.split(' ');
180+
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
181+
let mut args = line.split(' ');
182182

183-
match args.next() {
184-
Some("GET") => {
185-
let key = {
186-
match args.next() {
187-
Some(key) => Key::new(&key),
188-
None => {
189-
eprintln!("Expected key");
190-
return;
183+
match args.next() {
184+
Some("GET") => {
185+
let key = {
186+
match args.next() {
187+
Some(key) => Key::new(&key),
188+
None => {
189+
eprintln!("Expected key");
190+
return;
191+
}
191192
}
192-
}
193-
};
194-
kademlia.get_record(key, Quorum::One);
195-
}
196-
Some("GET_PROVIDERS") => {
197-
let key = {
198-
match args.next() {
199-
Some(key) => Key::new(&key),
200-
None => {
201-
eprintln!("Expected key");
202-
return;
193+
};
194+
kademlia.get_record(key, Quorum::One);
195+
}
196+
Some("GET_PROVIDERS") => {
197+
let key = {
198+
match args.next() {
199+
Some(key) => Key::new(&key),
200+
None => {
201+
eprintln!("Expected key");
202+
return;
203+
}
203204
}
204-
}
205-
};
206-
kademlia.get_providers(key);
207-
}
208-
Some("PUT") => {
209-
let key = {
210-
match args.next() {
211-
Some(key) => Key::new(&key),
212-
None => {
213-
eprintln!("Expected key");
214-
return;
205+
};
206+
kademlia.get_providers(key);
207+
}
208+
Some("PUT") => {
209+
let key = {
210+
match args.next() {
211+
Some(key) => Key::new(&key),
212+
None => {
213+
eprintln!("Expected key");
214+
return;
215+
}
215216
}
216-
}
217-
};
218-
let value = {
219-
match args.next() {
220-
Some(value) => value.as_bytes().to_vec(),
221-
None => {
222-
eprintln!("Expected value");
223-
return;
217+
};
218+
let value = {
219+
match args.next() {
220+
Some(value) => value.as_bytes().to_vec(),
221+
None => {
222+
eprintln!("Expected value");
223+
return;
224+
}
224225
}
225-
}
226-
};
227-
let record = Record {
228-
key,
229-
value,
230-
publisher: None,
231-
expires: None,
232-
};
233-
kademlia
234-
.put_record(record, Quorum::One)
235-
.expect("Failed to store record locally.");
236-
}
237-
Some("PUT_PROVIDER") => {
238-
let key = {
239-
match args.next() {
240-
Some(key) => Key::new(&key),
241-
None => {
242-
eprintln!("Expected key");
243-
return;
226+
};
227+
let record = Record {
228+
key,
229+
value,
230+
publisher: None,
231+
expires: None,
232+
};
233+
kademlia
234+
.put_record(record, Quorum::One)
235+
.expect("Failed to store record locally.");
236+
}
237+
Some("PUT_PROVIDER") => {
238+
let key = {
239+
match args.next() {
240+
Some(key) => Key::new(&key),
241+
None => {
242+
eprintln!("Expected key");
243+
return;
244+
}
244245
}
245-
}
246-
};
246+
};
247247

248-
kademlia
249-
.start_providing(key)
250-
.expect("Failed to start providing key");
251-
}
252-
_ => {
253-
eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
248+
kademlia
249+
.start_providing(key)
250+
.expect("Failed to start providing key");
251+
}
252+
_ => {
253+
eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
254+
}
254255
}
255256
}
256257
}

examples/file-sharing.rs

+24-7
Original file line numberDiff line numberDiff line change
@@ -326,12 +326,16 @@ mod network {
326326

327327
/// Find the providers for the given file on the DHT.
328328
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
329-
let (sender, receiver) = oneshot::channel();
329+
let (sender, mut receiver) = mpsc::channel(0);
330330
self.sender
331331
.send(Command::GetProviders { file_name, sender })
332332
.await
333333
.expect("Command receiver not to be dropped.");
334-
receiver.await.expect("Sender not to be dropped.")
334+
let mut out = HashSet::new();
335+
while let Some(h) = receiver.next().await {
336+
out.extend(h);
337+
}
338+
out
335339
}
336340

337341
/// Request the content of the given file from the given peer.
@@ -371,7 +375,7 @@ mod network {
371375
event_sender: mpsc::Sender<Event>,
372376
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
373377
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
374-
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
378+
pending_get_providers: HashMap<QueryId, mpsc::Sender<HashSet<PeerId>>>,
375379
pending_request_file:
376380
HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
377381
}
@@ -415,7 +419,7 @@ mod network {
415419
) {
416420
match event {
417421
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
418-
KademliaEvent::OutboundQueryCompleted {
422+
KademliaEvent::OutboundQueryProgressed {
419423
id,
420424
result: QueryResult::StartProviding(_),
421425
..
@@ -428,18 +432,31 @@ mod network {
428432
let _ = sender.send(());
429433
}
430434
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
431-
KademliaEvent::OutboundQueryCompleted {
435+
KademliaEvent::OutboundQueryProgressed {
432436
id,
433437
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
434438
..
435439
},
436440
)) => {
437441
let _ = self
438442
.pending_get_providers
439-
.remove(&id)
443+
.get_mut(&id)
440444
.expect("Completed query to be previously pending.")
441445
.send(providers);
442446
}
447+
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
448+
KademliaEvent::OutboundQueryProgressed {
449+
id,
450+
result: QueryResult::GetProviders(..),
451+
..
452+
},
453+
)) => {
454+
// Drop channel to signal query is complete.
455+
let _ = self
456+
.pending_get_providers
457+
.remove(&id)
458+
.expect("Completed query to be previously pending.");
459+
}
443460
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
444461
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
445462
RequestResponseEvent::Message { message, .. },
@@ -626,7 +643,7 @@ mod network {
626643
},
627644
GetProviders {
628645
file_name: String,
629-
sender: oneshot::Sender<HashSet<PeerId>>,
646+
sender: mpsc::Sender<HashSet<PeerId>>,
630647
},
631648
RequestFile {
632649
file_name: String,

examples/ipfs-kad.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
8585
task::block_on(async move {
8686
loop {
8787
let event = swarm.select_next_some().await;
88-
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
88+
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
8989
result: QueryResult::GetClosestPeers(result),
9090
..
9191
}) = event

misc/metrics/src/kad.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21+
use libp2p_kad::GetProvidersOk;
2122
use prometheus_client::encoding::text::Encode;
2223
use prometheus_client::metrics::counter::Counter;
2324
use prometheus_client::metrics::family::Family;
@@ -162,7 +163,7 @@ impl Metrics {
162163
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
163164
fn record(&self, event: &libp2p_kad::KademliaEvent) {
164165
match event {
165-
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
166+
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => {
166167
self.query_result_num_requests
167168
.get_or_create(&result.into())
168169
.observe(stats.num_requests().into());
@@ -200,9 +201,10 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
200201
}
201202
},
202203
libp2p_kad::QueryResult::GetProviders(result) => match result {
203-
Ok(ok) => self
204-
.query_result_get_providers_ok
205-
.observe(ok.providers.len() as f64),
204+
Ok(GetProvidersOk { providers, .. }) => {
205+
self.query_result_get_providers_ok
206+
.observe(providers.len() as f64);
207+
}
206208
Err(error) => {
207209
self.query_result_get_providers_error
208210
.get_or_create(&error.into())

0 commit comments

Comments
 (0)