Skip to content

Commit

Permalink
Add Swarm::next_extended (#1374)
Browse files Browse the repository at this point in the history
* Add Swarm::next_extended

* Fix ipfs-kad example

* Fix tests

* Renames
  • Loading branch information
tomaka authored Jan 7, 2020
1 parent 56ca671 commit 84b6a7d
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 78 deletions.
4 changes: 2 additions & 2 deletions examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
//! peer ID will be generated randomly.
use async_std::task;
use futures::prelude::*;
use libp2p::{
Swarm,
PeerId,
Expand Down Expand Up @@ -90,7 +89,8 @@ fn main() -> Result<(), Box<dyn Error>> {

// Kick it off!
task::block_on(async move {
while let Some(event) = swarm.try_next().await? {
loop {
let event = swarm.next().await;
if let KademliaEvent::GetClosestPeersResult(result) = event {
match result {
Ok(ok) =>
Expand Down
35 changes: 22 additions & 13 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,20 +254,18 @@ pub enum IdentifyEvent {
#[cfg(test)]
mod tests {
use crate::{Identify, IdentifyEvent};
use futures::prelude::*;
use futures::{prelude::*, pin_mut};
use libp2p_core::{
identity,
PeerId,
muxing::StreamMuxer,
Multiaddr,
Transport,
upgrade
};
use libp2p_tcp::TcpConfig;
use libp2p_secio::SecioConfig;
use libp2p_swarm::Swarm;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_mplex::MplexConfig;
use rand::{Rng, thread_rng};
use std::{fmt, io};

fn transport() -> (identity::PublicKey, impl Transport<
Expand Down Expand Up @@ -303,30 +301,41 @@ mod tests {
(swarm, pubkey)
};

let addr: Multiaddr = {
let port = thread_rng().gen_range(49152, std::u16::MAX);
format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap()
};
Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

Swarm::listen_on(&mut swarm1, addr.clone()).unwrap();
Swarm::dial_addr(&mut swarm2, addr.clone()).unwrap();
let listen_addr = async_std::task::block_on(async {
loop {
let swarm1_fut = swarm1.next_event();
pin_mut!(swarm1_fut);
match swarm1_fut.await {
SwarmEvent::NewListenAddr(addr) => return addr,
_ => {}
}
}
});
Swarm::dial_addr(&mut swarm2, listen_addr).unwrap();

// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
async_std::task::block_on(async move {
loop {
match future::select(swarm1.next(), swarm2.next()).await.factor_second().0 {
future::Either::Left(Some(Ok(IdentifyEvent::Received { info, .. }))) => {
let swarm1_fut = swarm1.next();
pin_mut!(swarm1_fut);
let swarm2_fut = swarm2.next();
pin_mut!(swarm2_fut);

match future::select(swarm1_fut, swarm2_fut).await.factor_second().0 {
future::Either::Left(IdentifyEvent::Received { info, .. }) => {
assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d");
assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty());
return;
}
future::Either::Right(Some(Ok(IdentifyEvent::Received { info, .. }))) => {
future::Either::Right(IdentifyEvent::Received { info, .. }) => {
assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
Expand Down
42 changes: 21 additions & 21 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn bootstrap() {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::BootstrapResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => {
assert_eq!(i, 0);
assert_eq!(ok.peer, swarm_ids[0]);
let known = swarm.kbuckets.iter()
Expand All @@ -138,7 +138,7 @@ fn bootstrap() {
return Poll::Ready(())
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -186,7 +186,7 @@ fn query_iter() {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(swarm_ids[i], expected_swarm_id);
assert_eq!(swarm.queries.size(), 0);
Expand All @@ -196,7 +196,7 @@ fn query_iter() {
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -234,13 +234,13 @@ fn unresponsive_not_returned_direct() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 0);
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -278,14 +278,14 @@ fn unresponsive_not_returned_indirect() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 1);
assert_eq!(ok.peers[0], first_peer_id);
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -314,7 +314,7 @@ fn get_record_not_found() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Err(e))))) => {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => {
if let GetRecordError::NotFound { key, closest_peers, } = e {
assert_eq!(key, target_key);
assert_eq!(closest_peers.len(), 2);
Expand All @@ -326,7 +326,7 @@ fn get_record_not_found() {
}
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -375,8 +375,8 @@ fn put_record() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::PutRecordResult(res)))) |
Poll::Ready(Some(Ok(KademliaEvent::RepublishRecordResult(res)))) => {
Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) |
Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => {
match res {
Err(e) => panic!(e),
Ok(ok) => {
Expand All @@ -387,7 +387,7 @@ fn put_record() {
}
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -474,13 +474,13 @@ fn get_value() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
assert_eq!(ok.records.len(), 1);
assert_eq!(ok.records.first(), Some(&record));
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -513,13 +513,13 @@ fn get_value_many() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
assert_eq!(ok.records.len(), num_results);
assert_eq!(ok.records.first(), Some(&record));
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -561,8 +561,8 @@ fn add_provider() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::StartProvidingResult(res)))) |
Poll::Ready(Some(Ok(KademliaEvent::RepublishProviderResult(res)))) => {
Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) |
Poll::Ready(Some(KademliaEvent::RepublishProviderResult(res))) => {
match res {
Err(e) => panic!(e),
Ok(ok) => {
Expand All @@ -572,7 +572,7 @@ fn add_provider() {
}
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -669,7 +669,7 @@ fn exceed_jobs_max_queries() {
for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) {
if let Ok(KademliaEvent::BootstrapResult(r)) = e {
if let KademliaEvent::BootstrapResult(r) = e {
assert!(r.is_ok(), "Unexpected error")
} else {
panic!("Unexpected event: {:?}", e)
Expand Down
4 changes: 2 additions & 2 deletions protocols/ping/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn ping() {
}

loop {
match swarm1.next().await.unwrap().unwrap() {
match swarm1.next().await {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
return (pid1.clone(), peer, rtt)
},
Expand All @@ -74,7 +74,7 @@ fn ping() {
Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap();

loop {
match swarm2.next().await.unwrap().unwrap() {
match swarm2.next().await {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
return (pid2.clone(), peer, rtt)
},
Expand Down
Loading

0 comments on commit 84b6a7d

Please sign in to comment.