From 8e957c2423aea7b4a1febd86a6df627af931b7f2 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 20 Jul 2020 16:36:59 +0200 Subject: [PATCH 1/9] feat: initial Kademlia extensions Signed-off-by: ljedrz --- src/p2p/behaviour.rs | 193 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 164 insertions(+), 29 deletions(-) diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 8a4bf8669..8b1bcbdce 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -10,13 +10,14 @@ use cid::Cid; use libp2p::core::{Multiaddr, PeerId}; use libp2p::identify::{Identify, IdentifyEvent}; use libp2p::kad::record::store::MemoryStore; -use libp2p::kad::{Kademlia, KademliaEvent}; +use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryId}; use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::ping::{Ping, PingEvent}; use libp2p::swarm::toggle::Toggle; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourEventProcess}; use libp2p::NetworkBehaviour; -use std::sync::Arc; +use multibase::Base; +use std::{collections::HashSet, sync::Arc}; /// Behaviour type. #[derive(NetworkBehaviour)] @@ -25,6 +26,8 @@ pub struct Behaviour { ipfs: Ipfs, mdns: Toggle, kademlia: Kademlia, + #[behaviour(ignore)] + kad_queries: HashSet, bitswap: Bitswap, ping: Ping, identify: Identify, @@ -60,40 +63,166 @@ impl NetworkBehaviourEventProcess for Behaviour NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: KademliaEvent) { - use libp2p::kad::{GetProvidersError, GetProvidersOk, QueryResult}; + use libp2p::kad::{ + AddProviderError, AddProviderOk, BootstrapError, BootstrapOk, GetClosestPeersError, + GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk, + KademliaEvent::*, PutRecordError, PutRecordOk, QueryResult::*, + }; match event { - KademliaEvent::QueryResult { - result: - QueryResult::GetProviders(Ok(GetProvidersOk { + QueryResult { result, id, .. } => { + self.kad_queries.remove(&id); + + match result { + Bootstrap(Ok(BootstrapOk { .. })) => { + info!("kad: finished bootstrapping"); + } + Bootstrap(Err(BootstrapError::Timeout { .. })) => { + warn!("kad: failed to bootstrap"); + } + GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => { + let key = multibase::encode(Base::Base58Btc, key); + + for peer in peers { + info!("kad: peer {} is close to key {}", peer, key); + } + } + GetClosestPeers(Err(GetClosestPeersError::Timeout { key, peers })) => { + let key = multibase::encode(Base::Base58Btc, key); + + warn!("kad: timed out trying to find all peers closest to key {}; got the following:", key); + for peer in peers { + info!("kad: peer {} is close to key {}", peer, key); + } + } + GetProviders(Ok(GetProvidersOk { key, providers, .. })) => { + let key = multibase::encode(Base::Base58Btc, key); + if providers.is_empty() { + // FIXME: not sure if this is possible + info!("kad: could not find a provider for {}", key); + } else { + for peer in providers { + info!("kad: {} provided by {}", key, peer); + self.bitswap.connect(peer); + } + } + } + GetProviders(Err(GetProvidersError::Timeout { key, .. })) => { + let key = multibase::encode(Base::Base58Btc, key); + warn!("kad: timed out trying to get providers for {}", key); + } + StartProviding(Ok(AddProviderOk { key })) => { + let key = multibase::encode(Base::Base58Btc, key); + info!("kad: added provider {}", key); + } + StartProviding(Err(AddProviderError::Timeout { key })) => { + let key = multibase::encode(Base::Base58Btc, key); + warn!("kad: timed out trying to add provider {}", key); + } + RepublishProvider(Ok(AddProviderOk { key })) => { + let key = multibase::encode(Base::Base58Btc, key); + info!("kad: republished provider {}", key); + } + RepublishProvider(Err(AddProviderError::Timeout { key })) => { + let key = multibase::encode(Base::Base58Btc, key); + warn!("kad: timed out trying to republish provider {}", key); + } + GetRecord(Ok(GetRecordOk { records })) => { + for record in records { + let key = multibase::encode(Base::Base58Btc, record.record.key); + info!("kad: got record {}:{:?}", key, record.record.value); + } + } + GetRecord(Err(GetRecordError::NotFound { key, - providers, - closest_peers, - })), - .. - } => { - // FIXME: really wasteful to run this through Vec - let cid = PeerId::from_bytes(key.to_vec()).unwrap().to_base58(); - if providers.is_empty() { - // FIXME: not sure if this is possible - info!("kad: Could not find provider for {}", cid); - } else { - for peer in closest_peers { - info!("kad: {} provided by {}", cid, peer.to_base58()); - self.bitswap.connect(peer); + closest_peers: _, + })) => { + let key = multibase::encode(Base::Base58Btc, key); + warn!("kad: couldn't find record {}", key); + } + GetRecord(Err(GetRecordError::QuorumFailed { + key, + records, + quorum, + })) => { + let key = multibase::encode(Base::Base58Btc, key); + + warn!( + "kad: quorum failed {} trying to get key {}; got the following:", + quorum, key + ); + for record in records { + let key = multibase::encode(Base::Base58Btc, record.record.key); + info!("kad: got record {}:{:?}", key, record.record.value); + } + } + GetRecord(Err(GetRecordError::Timeout { + key, + records, + quorum: _, + })) => { + let key = multibase::encode(Base::Base58Btc, key); + + warn!( + "kad: timed out trying to get key {}; got the following:", + key + ); + for record in records { + let key = multibase::encode(Base::Base58Btc, record.record.key); + info!("kad: got record {}:{:?}", key, record.record.value); + } + } + PutRecord(Ok(PutRecordOk { key })) + | RepublishRecord(Ok(PutRecordOk { key })) => { + let key = multibase::encode(Base::Base58Btc, key); + info!("kad: successfully put record {}", key); + } + PutRecord(Err(PutRecordError::QuorumFailed { + key, + success: _, + quorum, + })) + | RepublishRecord(Err(PutRecordError::QuorumFailed { + key, + success: _, + quorum, + })) => { + let key = multibase::encode(Base::Base58Btc, key); + info!( + "kad: quorum failed ({}) trying to put record {}", + quorum, key + ); + } + PutRecord(Err(PutRecordError::Timeout { + key, + success: _, + quorum: _, + })) + | RepublishRecord(Err(PutRecordError::Timeout { + key, + success: _, + quorum: _, + })) => { + let key = multibase::encode(Base::Base58Btc, key); + info!("kad: timed out trying to put record {}", key); } } } - KademliaEvent::QueryResult { - result: QueryResult::GetProviders(Err(GetProvidersError::Timeout { key, .. })), - .. + RoutingUpdated { + peer, + addresses, + old_peer: _, } => { - // FIXME: really wasteful to run this through Vec - let cid = PeerId::from_bytes(key.to_vec()).unwrap().to_base58(); - warn!("kad: timed out get providers query for {}", cid); + trace!("kad: routing updated; {}: {:?}", peer, addresses); + } + UnroutablePeer { peer } => { + warn!("kad: peer {} is unroutable", peer); + } + RoutablePeer { peer, address } => { + trace!("kad: peer {} ({}) is routable", peer, address); } - event => { - log::trace!("kad: {:?}", event); + PendingRoutablePeer { peer, address } => { + trace!("kad: pending routable peer {} ({})", peer, address); } } } @@ -219,7 +348,12 @@ impl Behaviour { .into(); let store = MemoryStore::new(options.peer_id.to_owned()); - let mut kademlia = Kademlia::new(options.peer_id.to_owned(), store); + + let mut kad_config = KademliaConfig::default(); + kad_config.disjoint_query_paths(true); + kad_config.set_query_timeout(std::time::Duration::from_secs(300)); + let mut kademlia = Kademlia::with_config(options.peer_id.to_owned(), store, kad_config); + for (addr, peer_id) in &options.bootstrap { kademlia.add_address(peer_id, addr.to_owned()); } @@ -238,6 +372,7 @@ impl Behaviour { ipfs, mdns, kademlia, + kad_queries: Default::default(), bitswap, ping, identify, From 3bb0a65e042ea6ab9579a6a5726ff20144b57e94 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Tue, 21 Jul 2020 14:05:48 +0200 Subject: [PATCH 2/9] feat: add and test peer discovery Signed-off-by: ljedrz --- http/src/main.rs | 9 ++++-- src/lib.rs | 66 +++++++++++++++++++++++++++++++++++----- src/p2p/behaviour.rs | 71 +++++++++++++++++++++++++++++++------------- src/p2p/mod.rs | 7 +++-- src/subscription.rs | 11 ++++++- tests/connect_two.rs | 10 +++++-- tests/kademlia.rs | 51 +++++++++++++++++++++++++++++++ 7 files changed, 189 insertions(+), 36 deletions(-) create mode 100644 tests/kademlia.rs diff --git a/http/src/main.rs b/http/src/main.rs index 45bde1e15..5e35eccff 100644 --- a/http/src/main.rs +++ b/http/src/main.rs @@ -134,8 +134,13 @@ fn main() { let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop"); rt.block_on(async move { - let opts: IpfsOptions = - IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false); + let opts: IpfsOptions = IpfsOptions::new( + home.clone().into(), + keypair, + Vec::new(), + false, + Some("/ipfs/kad/1.0.0".into()), + ); let (ipfs, task) = UninitializedIpfs::new(opts) .await diff --git a/src/lib.rs b/src/lib.rs index bc47b3f76..ba9121882 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ #[macro_use] extern crate log; -use anyhow::format_err; +use anyhow::{anyhow, format_err}; use async_std::path::PathBuf; pub use bitswap::{BitswapEvent, Block, Stats}; pub use cid::Cid; @@ -91,6 +91,8 @@ pub struct IpfsOptions { pub bootstrap: Vec<(Multiaddr, PeerId)>, /// Enables mdns for peer discovery when true. pub mdns: bool, + /// Custom Kademlia protocol name. + pub kad_protocol: Option, } impl fmt::Debug for IpfsOptions { @@ -102,18 +104,20 @@ impl fmt::Debug for IpfsOptions { .field("bootstrap", &self.bootstrap) .field("keypair", &DebuggableKeypair(&self.keypair)) .field("mdns", &self.mdns) + .field("kad_protocol", &self.kad_protocol) .finish() } } impl IpfsOptions { /// Creates an inmemory store backed node for tests - pub fn inmemory_with_generated_keys(mdns: bool) -> Self { + pub fn inmemory_with_generated_keys(mdns: bool, kad_protocol: Option) -> Self { Self::new( std::env::temp_dir().into(), Keypair::generate_ed25519(), vec![], mdns, + kad_protocol, ) } } @@ -147,6 +151,7 @@ impl IpfsOptions { keypair: Keypair, bootstrap: Vec<(Multiaddr, PeerId)>, mdns: bool, + kad_protocol: Option, ) -> Self { Self { _marker: PhantomData, @@ -154,6 +159,7 @@ impl IpfsOptions { keypair, bootstrap, mdns, + kad_protocol, } } } @@ -203,6 +209,7 @@ impl Default for IpfsOptions { keypair, bootstrap, mdns: true, + kad_protocol: Some("/ipfs/kad/1.0.0".into()), } } } @@ -226,16 +233,14 @@ pub struct IpfsInner { } type Channel = OneshotSender>; +type FutureSubscription = SubscriptionFuture>; /// Events used internally to communicate with the swarm, which is executed in the the background /// task. #[derive(Debug)] enum IpfsEvent { /// Connect - Connect( - Multiaddr, - OneshotSender>>, - ), + Connect(Multiaddr, OneshotSender>), /// Addresses Addresses(Channel)>>), /// Local addresses @@ -255,6 +260,9 @@ enum IpfsEvent { BitswapStats(OneshotSender), AddListeningAddress(Multiaddr, Channel), RemoveListeningAddress(Multiaddr, Channel<()>), + Bootstrap(OneshotSender, Error>>), + AddPeer(PeerId, Multiaddr), + GetClosestPeers(PeerId, OneshotSender>), Exit, } @@ -608,6 +616,35 @@ impl Ipfs { rx.await? } + pub async fn bootstrap(&self) -> Result<(), Error> { + let (tx, rx) = oneshot_channel::, Error>>(); + + self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?; + + rx.await??.await?.map_err(|e| anyhow!(e)) + } + + pub async fn add_peer(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> { + self.to_task + .clone() + .send(IpfsEvent::AddPeer(peer_id, addr)) + .await?; + + Ok(()) + } + + pub async fn get_closest_peers(&self) -> Result<(), Error> { + let self_peer = PeerId::from_public_key(self.identity().await?.0); + let (tx, rx) = oneshot_channel::>(); + + self.to_task + .clone() + .send(IpfsEvent::GetClosestPeers(self_peer, tx)) + .await?; + + rx.await?.await?.map_err(|e| anyhow!(e)) + } + /// Exit daemon. pub async fn exit_daemon(self) { // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of @@ -869,6 +906,17 @@ impl Future for IpfsFuture { let _ = ret.send(removed); } + IpfsEvent::Bootstrap(ret) => { + let future = self.swarm.bootstrap(); + let _ = ret.send(future); + } + IpfsEvent::AddPeer(peer_id, addr) => { + self.swarm.add_peer(peer_id, addr); + } + IpfsEvent::GetClosestPeers(self_peer, ret) => { + let future = self.swarm.get_closest_peers(self_peer); + let _ = ret.send(future); + } IpfsEvent::Exit => { // FIXME: we could do a proper teardown return Poll::Ready(()); @@ -941,7 +989,8 @@ mod node { impl Node { pub async fn new(mdns: bool) -> Self { - let opts = IpfsOptions::inmemory_with_generated_keys(mdns); + let opts = + IpfsOptions::inmemory_with_generated_keys(mdns, Some("/ipfs/lan/kad/1.0.0".into())); let (ipfs, fut) = UninitializedIpfs::new(opts) .await .start() @@ -1039,7 +1088,8 @@ mod tests { const MDNS: bool = false; pub async fn create_mock_ipfs() -> Ipfs { - let options = IpfsOptions::inmemory_with_generated_keys(MDNS); + let options = + IpfsOptions::inmemory_with_generated_keys(MDNS, Some("/ipfs/lan/kad/1.0.0".into())); let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); task::spawn(fut); diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 8b1bcbdce..a7ba7c1ae 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -2,22 +2,23 @@ use super::pubsub::Pubsub; use super::swarm::{Connection, Disconnector, SwarmApi}; use crate::p2p::{SwarmOptions, SwarmTypes}; use crate::repo::BlockPut; -use crate::subscription::SubscriptionFuture; +use crate::subscription::{SubscriptionFuture, SubscriptionRegistry}; use crate::{Ipfs, IpfsTypes}; +use anyhow::anyhow; use async_std::task; use bitswap::{Bitswap, BitswapEvent}; use cid::Cid; use libp2p::core::{Multiaddr, PeerId}; use libp2p::identify::{Identify, IdentifyEvent}; use libp2p::kad::record::store::MemoryStore; -use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryId}; +use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent}; use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::ping::{Ping, PingEvent}; use libp2p::swarm::toggle::Toggle; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourEventProcess}; use libp2p::NetworkBehaviour; use multibase::Base; -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; /// Behaviour type. #[derive(NetworkBehaviour)] @@ -27,7 +28,7 @@ pub struct Behaviour { mdns: Toggle, kademlia: Kademlia, #[behaviour(ignore)] - kad_queries: HashSet, + kad_subscriptions: SubscriptionRegistry>, bitswap: Bitswap, ping: Ping, identify: Identify, @@ -46,9 +47,9 @@ impl NetworkBehaviourEventProcess for Behaviour { - for (peer, _) in list { + for (peer, addr) in list { log::trace!("mdns: Discovered peer {}", peer.to_base58()); - self.add_peer(peer); + self.add_peer(peer, addr); } } MdnsEvent::Expired(list) => { @@ -71,35 +72,34 @@ impl NetworkBehaviourEventProcess for Behaviour match event { QueryResult { result, id, .. } => { - self.kad_queries.remove(&id); + self.kad_subscriptions + .finish_subscription(id.into(), Ok(())); match result { Bootstrap(Ok(BootstrapOk { .. })) => { - info!("kad: finished bootstrapping"); + debug!("kad: finished bootstrapping"); } Bootstrap(Err(BootstrapError::Timeout { .. })) => { warn!("kad: failed to bootstrap"); } - GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => { - let key = multibase::encode(Base::Base58Btc, key); - + GetClosestPeers(Ok(GetClosestPeersOk { key: _, peers })) => { for peer in peers { - info!("kad: peer {} is close to key {}", peer, key); + info!("kad: peer {} is close", peer); } } - GetClosestPeers(Err(GetClosestPeersError::Timeout { key, peers })) => { - let key = multibase::encode(Base::Base58Btc, key); - - warn!("kad: timed out trying to find all peers closest to key {}; got the following:", key); + GetClosestPeers(Err(GetClosestPeersError::Timeout { key: _, peers })) => { + warn!( + "kad: timed out trying to find all closest peers; got the following:" + ); for peer in peers { - info!("kad: peer {} is close to key {}", peer, key); + info!("kad: peer {} is close", peer); } } GetProviders(Ok(GetProvidersOk { key, providers, .. })) => { let key = multibase::encode(Base::Base58Btc, key); if providers.is_empty() { // FIXME: not sure if this is possible - info!("kad: could not find a provider for {}", key); + warn!("kad: could not find a provider for {}", key); } else { for peer in providers { info!("kad: {} provided by {}", key, peer); @@ -338,7 +338,7 @@ impl Behaviour { options: SwarmOptions, ipfs: Ipfs, ) -> Self { - info!("Local peer id: {}", options.peer_id.to_base58()); + info!("net: starting with peer id {}", options.peer_id); let mdns = if options.mdns { Some(Mdns::new().expect("Failed to create mDNS service")) @@ -352,6 +352,9 @@ impl Behaviour { let mut kad_config = KademliaConfig::default(); kad_config.disjoint_query_paths(true); kad_config.set_query_timeout(std::time::Duration::from_secs(300)); + if let Some(protocol) = options.kad_protocol { + kad_config.set_protocol_name(protocol.into_bytes()); + } let mut kademlia = Kademlia::with_config(options.peer_id.to_owned(), store, kad_config); for (addr, peer_id) in &options.bootstrap { @@ -372,7 +375,7 @@ impl Behaviour { ipfs, mdns, kademlia, - kad_queries: Default::default(), + kad_subscriptions: Default::default(), bitswap, ping, identify, @@ -381,7 +384,16 @@ impl Behaviour { } } - pub fn add_peer(&mut self, peer: PeerId) { + pub fn add_peer(&mut self, peer: PeerId, addr: Multiaddr) { + self.kademlia.add_address(&peer, addr); + /* + match self.kademlia.start_providing(peer.to_base58().into_bytes().into()) { + Ok(id) => { + self.kad_queries.insert(id); + }, + Err(e) => error!("kad: can't provide peer {}: {:?}", peer, e), + } + */ self.swarm.add_peer(peer.clone()); self.pubsub.add_node_to_partial_view(peer); // TODO self.bitswap.add_node_to_partial_view(peer); @@ -440,6 +452,23 @@ impl Behaviour { pub fn bitswap(&mut self) -> &mut Bitswap { &mut self.bitswap } + + pub fn bootstrap(&mut self) -> Result>, anyhow::Error> { + match self.kademlia.bootstrap() { + Ok(id) => Ok(self.kad_subscriptions.create_subscription(id.into(), None)), + Err(e) => { + error!("kad: can't bootstrap the node: {:?}", e); + Err(anyhow!("kad: can't bootstrap the node: {:?}", e)) + } + } + } + + pub fn get_closest_peers(&mut self, id: PeerId) -> SubscriptionFuture> { + let id = id.to_base58(); + + self.kad_subscriptions + .create_subscription(self.kademlia.get_closest_peers(id.as_bytes()).into(), None) + } } /// Create a IPFS behaviour with the IPFS bootstrap nodes. diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index b5f7d5be4..1ea4d910b 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -24,6 +24,7 @@ pub struct SwarmOptions { pub peer_id: PeerId, pub bootstrap: Vec<(Multiaddr, PeerId)>, pub mdns: bool, + pub kad_protocol: Option, } impl From<&IpfsOptions> for SwarmOptions { @@ -32,12 +33,15 @@ impl From<&IpfsOptions> for SwarmOptions( let mut swarm = libp2p::Swarm::new(transport, behaviour, peer_id); // Listen on all interfaces and whatever port the OS assigns - let addr = Swarm::listen_on(&mut swarm, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - info!("Listening on {:?}", addr); + Swarm::listen_on(&mut swarm, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); swarm } diff --git a/src/subscription.rs b/src/subscription.rs index 67ce2534e..fffaff046 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -12,7 +12,7 @@ use core::pin::Pin; use futures::channel::mpsc::Sender; use futures::lock::Mutex; use libipld::Cid; -use libp2p::Multiaddr; +use libp2p::{kad::QueryId, Multiaddr}; use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; @@ -33,6 +33,8 @@ pub enum RequestKind { Connect(Multiaddr), /// A request to obtain a `Block` with a specific `Cid`. GetBlock(Cid), + /// A DHT request to Kademlia. + KadQuery(QueryId), #[cfg(test)] Num(u32), } @@ -49,6 +51,12 @@ impl From for RequestKind { } } +impl From for RequestKind { + fn from(id: QueryId) -> Self { + Self::KadQuery(id) + } +} + impl fmt::Display for RequestKind { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -62,6 +70,7 @@ impl fmt::Display for RequestKind { .map(|b| format!("{:02x}", b)) .collect::() ), + Self::KadQuery(id) => write!(fmt, "Kad request {:?}", id), #[cfg(test)] Self::Num(n) => write!(fmt, "A test request for {}", n), } diff --git a/tests/connect_two.rs b/tests/connect_two.rs index 56da7ae8a..7b6608c32 100644 --- a/tests/connect_two.rs +++ b/tests/connect_two.rs @@ -11,7 +11,10 @@ fn connect_two_nodes() { let (tx, rx) = futures::channel::oneshot::channel(); let node_a = task::spawn(async move { - let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(mdns); + let opts = ipfs::IpfsOptions::inmemory_with_generated_keys( + mdns, + Some("/ipfs/lan/kad/1.0.0".into()), + ); let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts) .await .start() @@ -33,7 +36,10 @@ fn connect_two_nodes() { println!("got back from the other node: {:?}", other_addrs); - let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(mdns); + let opts = ipfs::IpfsOptions::inmemory_with_generated_keys( + mdns, + Some("/ipfs/lan/kad/1.0.0".into()), + ); let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts) .await .start() diff --git a/tests/kademlia.rs b/tests/kademlia.rs new file mode 100644 index 000000000..787196beb --- /dev/null +++ b/tests/kademlia.rs @@ -0,0 +1,51 @@ +use ipfs::Node; +use libp2p::PeerId; +use log::LevelFilter; + +const PEER_COUNT: usize = 20; + +#[async_std::test] +async fn kademlia() { + let _ = env_logger::builder() + .is_test(true) + .filter(Some("async_std"), LevelFilter::Error) + .init(); + + // start up PEER_COUNT bootstrapper nodes + let mut nodes = Vec::with_capacity(PEER_COUNT); + for _ in 0..PEER_COUNT { + nodes.push(Node::new(false).await); + } + + // register the bootstrappers' ids and addresses + let mut peers = Vec::with_capacity(PEER_COUNT); + for node in &nodes { + let (id, addrs) = node.identity().await.unwrap(); + let id = PeerId::from_public_key(id); + + peers.push((id, addrs)); + } + + // connect all the bootstrappers to one another + for (i, (node_id, _)) in peers.iter().enumerate() { + for (peer_id, addrs) in peers.iter().filter(|(peer_id, _)| peer_id != node_id) { + nodes[i] + .add_peer(peer_id.clone(), addrs[0].clone()) + .await + .unwrap(); + } + } + + // introduce an extra peer and connect it to one of the bootstrappers + let extra_peer = Node::new(false).await; + assert!(extra_peer + .add_peer(peers[0].0.clone(), peers[0].1[0].clone()) + .await + .is_ok()); + + // call kad::bootstrap + assert!(extra_peer.bootstrap().await.is_ok()); + + // call kad::get_closest_peers + assert!(nodes[0].get_closest_peers().await.is_ok()); +} From e2297fd53a186c70f2419579e6898e120bae5a70 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 13:30:48 +0200 Subject: [PATCH 3/9] fix: update a few tests Signed-off-by: ljedrz --- http/src/v0.rs | 3 ++- http/src/v0/refs.rs | 5 ++++- http/src/v0/root_files.rs | 10 ++++++++-- http/src/v0/root_files/add.rs | 5 ++++- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/http/src/v0.rs b/http/src/v0.rs index db9623680..791caa214 100644 --- a/http/src/v0.rs +++ b/http/src/v0.rs @@ -119,7 +119,8 @@ mod tests { use super::routes; use ipfs::{IpfsOptions, UninitializedIpfs}; - let options = IpfsOptions::inmemory_with_generated_keys(false); + let options = + IpfsOptions::inmemory_with_generated_keys(false, Some("/ipfs/lan/kad/1.0.0".into())); let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); drop(fut); diff --git a/http/src/v0/refs.rs b/http/src/v0/refs.rs index 46dd07f80..ceacaeb97 100644 --- a/http/src/v0/refs.rs +++ b/http/src/v0/refs.rs @@ -817,7 +817,10 @@ mod tests { } async fn preloaded_testing_ipfs() -> Ipfs { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys( + false, + Some("/ipfs/lan/kad/1.0.0".into()), + ); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() diff --git a/http/src/v0/root_files.rs b/http/src/v0/root_files.rs index e0f6ed748..d5e0865ac 100644 --- a/http/src/v0/root_files.rs +++ b/http/src/v0/root_files.rs @@ -334,7 +334,10 @@ mod tests { #[tokio::test] async fn very_long_file_and_symlink_names() { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys( + false, + Some("/ipfs/lan/kad/1.0.0".into()), + ); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() @@ -395,7 +398,10 @@ mod tests { #[tokio::test] async fn get_multiblock_file() { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys( + false, + Some("/ipfs/lan/kad/1.0.0".into()), + ); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() diff --git a/http/src/v0/root_files/add.rs b/http/src/v0/root_files/add.rs index 2924a554e..7e66f88a6 100644 --- a/http/src/v0/root_files/add.rs +++ b/http/src/v0/root_files/add.rs @@ -199,7 +199,10 @@ mod tests { } async fn testing_ipfs() -> ipfs::Ipfs { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys( + false, + Some("/ipfs/lan/kad/1.0.0".into()), + ); let (ipfs, fut) = ipfs::UninitializedIpfs::new(options) .await .start() From 499744b20c5b42e9d93a6b593d9a1f3d03d41965 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 15:23:12 +0200 Subject: [PATCH 4/9] chore: remove a stray commented-out block Signed-off-by: ljedrz --- src/p2p/behaviour.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index a7ba7c1ae..8224f59ae 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -386,14 +386,6 @@ impl Behaviour { pub fn add_peer(&mut self, peer: PeerId, addr: Multiaddr) { self.kademlia.add_address(&peer, addr); - /* - match self.kademlia.start_providing(peer.to_base58().into_bytes().into()) { - Ok(id) => { - self.kad_queries.insert(id); - }, - Err(e) => error!("kad: can't provide peer {}: {:?}", peer, e), - } - */ self.swarm.add_peer(peer.clone()); self.pubsub.add_node_to_partial_view(peer); // TODO self.bitswap.add_node_to_partial_view(peer); From ff1765ba103f759d12c408213b4c7f0031a01c1f Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 15:35:25 +0200 Subject: [PATCH 5/9] fix: remove unused params in Node::new and IpfsOptions::inmemory_with_generated_keys Signed-off-by: ljedrz --- http/src/v0.rs | 3 +-- http/src/v0/refs.rs | 5 +---- http/src/v0/root_files.rs | 10 ++-------- http/src/v0/root_files/add.rs | 5 +---- src/lib.rs | 24 +++++++++--------------- tests/connect_two.rs | 19 ++++--------------- tests/exchange_block.rs | 5 ++--- tests/kademlia.rs | 4 ++-- tests/multiple_listening_addresses.rs | 18 ++++++------------ tests/pubsub.rs | 15 ++++++--------- tests/wantlist_and_cancellation.rs | 2 +- 11 files changed, 35 insertions(+), 75 deletions(-) diff --git a/http/src/v0.rs b/http/src/v0.rs index 791caa214..a99c8a2a2 100644 --- a/http/src/v0.rs +++ b/http/src/v0.rs @@ -119,8 +119,7 @@ mod tests { use super::routes; use ipfs::{IpfsOptions, UninitializedIpfs}; - let options = - IpfsOptions::inmemory_with_generated_keys(false, Some("/ipfs/lan/kad/1.0.0".into())); + let options = IpfsOptions::inmemory_with_generated_keys(); let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); drop(fut); diff --git a/http/src/v0/refs.rs b/http/src/v0/refs.rs index ceacaeb97..42dd542d0 100644 --- a/http/src/v0/refs.rs +++ b/http/src/v0/refs.rs @@ -817,10 +817,7 @@ mod tests { } async fn preloaded_testing_ipfs() -> Ipfs { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys( - false, - Some("/ipfs/lan/kad/1.0.0".into()), - ); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() diff --git a/http/src/v0/root_files.rs b/http/src/v0/root_files.rs index d5e0865ac..8f8a4d1d4 100644 --- a/http/src/v0/root_files.rs +++ b/http/src/v0/root_files.rs @@ -334,10 +334,7 @@ mod tests { #[tokio::test] async fn very_long_file_and_symlink_names() { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys( - false, - Some("/ipfs/lan/kad/1.0.0".into()), - ); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() @@ -398,10 +395,7 @@ mod tests { #[tokio::test] async fn get_multiblock_file() { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys( - false, - Some("/ipfs/lan/kad/1.0.0".into()), - ); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() diff --git a/http/src/v0/root_files/add.rs b/http/src/v0/root_files/add.rs index 7e66f88a6..5cd76651a 100644 --- a/http/src/v0/root_files/add.rs +++ b/http/src/v0/root_files/add.rs @@ -199,10 +199,7 @@ mod tests { } async fn testing_ipfs() -> ipfs::Ipfs { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys( - false, - Some("/ipfs/lan/kad/1.0.0".into()), - ); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); let (ipfs, fut) = ipfs::UninitializedIpfs::new(options) .await .start() diff --git a/src/lib.rs b/src/lib.rs index ba9121882..0ee2c53ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,14 +111,12 @@ impl fmt::Debug for IpfsOptions { impl IpfsOptions { /// Creates an inmemory store backed node for tests - pub fn inmemory_with_generated_keys(mdns: bool, kad_protocol: Option) -> Self { - Self::new( - std::env::temp_dir().into(), - Keypair::generate_ed25519(), - vec![], - mdns, - kad_protocol, - ) + pub fn inmemory_with_generated_keys() -> Self { + Self { + ipfs_path: std::env::temp_dir().into(), + keypair: Keypair::generate_ed25519(), + ..Default::default() + } } } @@ -988,9 +986,8 @@ mod node { } impl Node { - pub async fn new(mdns: bool) -> Self { - let opts = - IpfsOptions::inmemory_with_generated_keys(mdns, Some("/ipfs/lan/kad/1.0.0".into())); + pub async fn new() -> Self { + let opts = IpfsOptions::inmemory_with_generated_keys(); let (ipfs, fut) = UninitializedIpfs::new(opts) .await .start() @@ -1085,11 +1082,8 @@ mod tests { use libp2p::build_multiaddr; use multihash::Sha2_256; - const MDNS: bool = false; - pub async fn create_mock_ipfs() -> Ipfs { - let options = - IpfsOptions::inmemory_with_generated_keys(MDNS, Some("/ipfs/lan/kad/1.0.0".into())); + let options = IpfsOptions::inmemory_with_generated_keys(); let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); task::spawn(fut); diff --git a/tests/connect_two.rs b/tests/connect_two.rs index 7b6608c32..252b1a4d6 100644 --- a/tests/connect_two.rs +++ b/tests/connect_two.rs @@ -5,16 +5,10 @@ use async_std::task; fn connect_two_nodes() { // env_logger::init(); - // make sure the connection will only happen through explicit connect - let mdns = false; - let (tx, rx) = futures::channel::oneshot::channel(); let node_a = task::spawn(async move { - let opts = ipfs::IpfsOptions::inmemory_with_generated_keys( - mdns, - Some("/ipfs/lan/kad/1.0.0".into()), - ); + let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(); let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts) .await .start() @@ -36,10 +30,7 @@ fn connect_two_nodes() { println!("got back from the other node: {:?}", other_addrs); - let opts = ipfs::IpfsOptions::inmemory_with_generated_keys( - mdns, - Some("/ipfs/lan/kad/1.0.0".into()), - ); + let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(); let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts) .await .start() @@ -80,11 +71,9 @@ fn connect_two_nodes() { /// one should dial both of the addresses, resulting in two connections. #[test] fn connect_two_nodes_with_two_connections_doesnt_panic() { - const MDNS: bool = false; - task::block_on(async move { - let node_a = ipfs::Node::new(MDNS).await; - let node_b = ipfs::Node::new(MDNS).await; + let node_a = ipfs::Node::new().await; + let node_b = ipfs::Node::new().await; node_a .add_listening_address(libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16))) diff --git a/tests/exchange_block.rs b/tests/exchange_block.rs index b65ff5050..28427631f 100644 --- a/tests/exchange_block.rs +++ b/tests/exchange_block.rs @@ -7,13 +7,12 @@ use std::time::Duration; #[async_std::test] async fn exchange_block() { env_logger::init(); - let mdns = false; let data = b"hello block\n".to_vec().into_boxed_slice(); let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data)); - let a = Node::new(mdns).await; - let b = Node::new(mdns).await; + let a = Node::new().await; + let b = Node::new().await; let (_, mut addrs) = b.identity().await.unwrap(); diff --git a/tests/kademlia.rs b/tests/kademlia.rs index 787196beb..dc67c4805 100644 --- a/tests/kademlia.rs +++ b/tests/kademlia.rs @@ -14,7 +14,7 @@ async fn kademlia() { // start up PEER_COUNT bootstrapper nodes let mut nodes = Vec::with_capacity(PEER_COUNT); for _ in 0..PEER_COUNT { - nodes.push(Node::new(false).await); + nodes.push(Node::new().await); } // register the bootstrappers' ids and addresses @@ -37,7 +37,7 @@ async fn kademlia() { } // introduce an extra peer and connect it to one of the bootstrappers - let extra_peer = Node::new(false).await; + let extra_peer = Node::new().await; assert!(extra_peer .add_peer(peers[0].0.clone(), peers[0].1[0].clone()) .await diff --git a/tests/multiple_listening_addresses.rs b/tests/multiple_listening_addresses.rs index b225b7443..486beb7d2 100644 --- a/tests/multiple_listening_addresses.rs +++ b/tests/multiple_listening_addresses.rs @@ -2,9 +2,8 @@ use async_std::task; #[test] fn multiple_consecutive_ephemeral_listening_addresses() { - const MDNS: bool = false; task::block_on(async move { - let node = ipfs::Node::new(MDNS).await; + let node = ipfs::Node::new().await; let target = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)); @@ -19,9 +18,8 @@ fn multiple_consecutive_ephemeral_listening_addresses() { #[test] fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() { - const MDNS: bool = false; task::block_on(async move { - let node = ipfs::Node::new(MDNS).await; + let node = ipfs::Node::new().await; let target = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)); @@ -52,9 +50,8 @@ fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() { #[test] #[cfg(not(target_os = "macos"))] fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() { - const MDNS: bool = false; task::block_on(async move { - let node = ipfs::Node::new(MDNS).await; + let node = ipfs::Node::new().await; // it doesnt work on mac os x as 127.0.0.2 is not enabled by default. let first = @@ -72,9 +69,8 @@ fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() { #[test] fn adding_unspecified_addr_resolves_with_first() { - const MDNS: bool = false; task::block_on(async move { - let node = ipfs::Node::new(MDNS).await; + let node = ipfs::Node::new().await; // there is no test in trying to match this with others as ... that would be quite // perilous. node.add_listening_address(libp2p::build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16))) @@ -85,9 +81,8 @@ fn adding_unspecified_addr_resolves_with_first() { #[test] fn listening_for_multiple_unspecified_addresses() { - const MDNS: bool = false; task::block_on(async move { - let node = ipfs::Node::new(MDNS).await; + let node = ipfs::Node::new().await; // there is no test in trying to match this with others as ... that would be quite // perilous. let target = libp2p::build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16)); @@ -112,9 +107,8 @@ fn listening_for_multiple_unspecified_addresses() { #[test] fn remove_listening_address() { - const MDNS: bool = false; task::block_on(async move { - let node = ipfs::Node::new(MDNS).await; + let node = ipfs::Node::new().await; let unbound = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)); let first = node.add_listening_address(unbound.clone()).await.unwrap(); diff --git a/tests/pubsub.rs b/tests/pubsub.rs index afdf795c3..edca47559 100644 --- a/tests/pubsub.rs +++ b/tests/pubsub.rs @@ -3,19 +3,16 @@ use futures::stream::StreamExt; use ipfs::{Node, PeerId}; use std::time::Duration; -// Disable mdns for these tests not to connect to any local go-ipfs node -const MDNS: bool = false; - #[async_std::test] async fn subscribe_only_once() { - let a = Node::new(MDNS).await; + let a = Node::new().await; let _stream = a.pubsub_subscribe("some_topic".into()).await.unwrap(); a.pubsub_subscribe("some_topic".into()).await.unwrap_err(); } #[async_std::test] async fn resubscribe_after_unsubscribe() { - let a = Node::new(MDNS).await; + let a = Node::new().await; let mut stream = a.pubsub_subscribe("topic".into()).await.unwrap(); a.pubsub_unsubscribe("topic").await.unwrap(); @@ -27,7 +24,7 @@ async fn resubscribe_after_unsubscribe() { #[async_std::test] async fn unsubscribe_via_drop() { - let a = Node::new(MDNS).await; + let a = Node::new().await; let msgs = a.pubsub_subscribe("topic".into()).await.unwrap(); assert_eq!(a.pubsub_subscribed().await.unwrap(), &["topic"]); @@ -40,7 +37,7 @@ async fn unsubscribe_via_drop() { #[async_std::test] async fn can_publish_without_subscribing() { - let a = Node::new(MDNS).await; + let a = Node::new().await; a.pubsub_publish("topic".into(), b"foobar".to_vec()) .await .unwrap() @@ -135,8 +132,8 @@ async fn publish_between_two_nodes() { } async fn two_connected_nodes() -> ((Node, PeerId), (Node, PeerId)) { - let a = Node::new(MDNS).await; - let b = Node::new(MDNS).await; + let a = Node::new().await; + let b = Node::new().await; let (a_pk, _) = a.identity().await.unwrap(); let a_id = a_pk.into_peer_id(); diff --git a/tests/wantlist_and_cancellation.rs b/tests/wantlist_and_cancellation.rs index 68117003d..76a24a491 100644 --- a/tests/wantlist_and_cancellation.rs +++ b/tests/wantlist_and_cancellation.rs @@ -50,7 +50,7 @@ async fn check_cid_subscriptions(ipfs: &Node, cid: &Cid, expected_count: usize) #[async_std::test] async fn wantlist_cancellation() { // start a single node - let ipfs = Node::new(false).await; + let ipfs = Node::new().await; // execute a get_block request let cid = Cid::try_from("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KaGa").unwrap(); From 962f389e46225bd19b2d865aebbbcd2e0cda308c Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 15:37:37 +0200 Subject: [PATCH 6/9] fix: move get_closest_peers from Ipfs to Node Signed-off-by: ljedrz --- src/lib.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0ee2c53ca..126e3b5e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -631,18 +631,6 @@ impl Ipfs { Ok(()) } - pub async fn get_closest_peers(&self) -> Result<(), Error> { - let self_peer = PeerId::from_public_key(self.identity().await?.0); - let (tx, rx) = oneshot_channel::>(); - - self.to_task - .clone() - .send(IpfsEvent::GetClosestPeers(self_peer, tx)) - .await?; - - rx.await?.await?.map_err(|e| anyhow!(e)) - } - /// Exit daemon. pub async fn exit_daemon(self) { // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of @@ -976,7 +964,7 @@ impl From<(bitswap::Stats, Vec, Vec<(Cid, bitswap::Priority)>)> for Bits pub use node::Node; mod node { - use super::{subscription, Block, Ipfs, IpfsOptions, TestTypes, UninitializedIpfs}; + use super::*; /// Node encapsulates everything to setup a testing instance so that multi-node tests become /// easier. @@ -1008,6 +996,18 @@ mod node { &self.ipfs.repo.subscriptions.subscriptions } + pub async fn get_closest_peers(&self) -> Result<(), Error> { + let self_peer = PeerId::from_public_key(self.identity().await?.0); + let (tx, rx) = oneshot_channel::>(); + + self.to_task + .clone() + .send(IpfsEvent::GetClosestPeers(self_peer, tx)) + .await?; + + rx.await?.await?.map_err(|e| anyhow!(e)) + } + pub async fn shutdown(self) { self.ipfs.exit_daemon().await; self.background_task.await; From c14f9e60cb8b22fc16e6ee9d601c3c63b4bcdc4a Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 15:40:07 +0200 Subject: [PATCH 7/9] chore: document new DHT-related Ipfs methods Co-authored-by: Joonas Koivunen Signed-off-by: ljedrz --- src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 126e3b5e6..cafd45e01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -614,6 +614,7 @@ impl Ipfs { rx.await? } + /// Initiate a query for random key to discover peers. pub async fn bootstrap(&self) -> Result<(), Error> { let (tx, rx) = oneshot_channel::, Error>>(); @@ -622,6 +623,7 @@ impl Ipfs { rx.await??.await?.map_err(|e| anyhow!(e)) } + /// Add a known peer to the DHT. pub async fn add_peer(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> { self.to_task .clone() From b41490fe3186b0d6d9c2a9e5fc6f4452474a51a9 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 15:47:25 +0200 Subject: [PATCH 8/9] fix: classic Default::default() fun plus remove explicit kad protocol names Signed-off-by: ljedrz --- http/src/main.rs | 9 ++------- src/lib.rs | 7 +++++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/http/src/main.rs b/http/src/main.rs index 5e35eccff..0aad4b91f 100644 --- a/http/src/main.rs +++ b/http/src/main.rs @@ -134,13 +134,8 @@ fn main() { let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop"); rt.block_on(async move { - let opts: IpfsOptions = IpfsOptions::new( - home.clone().into(), - keypair, - Vec::new(), - false, - Some("/ipfs/kad/1.0.0".into()), - ); + let opts: IpfsOptions = + IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false, None); let (ipfs, task) = UninitializedIpfs::new(opts) .await diff --git a/src/lib.rs b/src/lib.rs index cafd45e01..b29bff512 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -113,9 +113,12 @@ impl IpfsOptions { /// Creates an inmemory store backed node for tests pub fn inmemory_with_generated_keys() -> Self { Self { + _marker: PhantomData, ipfs_path: std::env::temp_dir().into(), keypair: Keypair::generate_ed25519(), - ..Default::default() + mdns: Default::default(), + bootstrap: Default::default(), + kad_protocol: Default::default(), } } } @@ -207,7 +210,7 @@ impl Default for IpfsOptions { keypair, bootstrap, mdns: true, - kad_protocol: Some("/ipfs/kad/1.0.0".into()), + kad_protocol: None, } } } From 5b02b62b8518dec8e062536ab3d8d956d676b182 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 15:59:58 +0200 Subject: [PATCH 9/9] fix: move bootstrap and add_peer methods to Node Signed-off-by: ljedrz --- src/lib.rs | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b29bff512..455ecf58e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -617,25 +617,6 @@ impl Ipfs { rx.await? } - /// Initiate a query for random key to discover peers. - pub async fn bootstrap(&self) -> Result<(), Error> { - let (tx, rx) = oneshot_channel::, Error>>(); - - self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?; - - rx.await??.await?.map_err(|e| anyhow!(e)) - } - - /// Add a known peer to the DHT. - pub async fn add_peer(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> { - self.to_task - .clone() - .send(IpfsEvent::AddPeer(peer_id, addr)) - .await?; - - Ok(()) - } - /// Exit daemon. pub async fn exit_daemon(self) { // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of @@ -1013,6 +994,25 @@ mod node { rx.await?.await?.map_err(|e| anyhow!(e)) } + /// Initiate a query for random key to discover peers. + pub async fn bootstrap(&self) -> Result<(), Error> { + let (tx, rx) = oneshot_channel::, Error>>(); + + self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?; + + rx.await??.await?.map_err(|e| anyhow!(e)) + } + + /// Add a known peer to the DHT. + pub async fn add_peer(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> { + self.to_task + .clone() + .send(IpfsEvent::AddPeer(peer_id, addr)) + .await?; + + Ok(()) + } + pub async fn shutdown(self) { self.ipfs.exit_daemon().await; self.background_task.await;