Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

some groundwork for peer/content discovery #256

Merged
merged 9 commits into from
Jul 22, 2020
2 changes: 1 addition & 1 deletion http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn main() {

rt.block_on(async move {
let opts: IpfsOptions<ipfs::TestTypes> =
IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false);
IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false, None);

let (ipfs, task) = UninitializedIpfs::new(opts)
.await
Expand Down
2 changes: 1 addition & 1 deletion http/src/v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ mod tests {
use super::routes;
use ipfs::{IpfsOptions, UninitializedIpfs};

let options = IpfsOptions::inmemory_with_generated_keys(false);
let options = IpfsOptions::inmemory_with_generated_keys();

let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();
drop(fut);
Expand Down
2 changes: 1 addition & 1 deletion http/src/v0/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ mod tests {
}

async fn preloaded_testing_ipfs() -> Ipfs<ipfs::TestTypes> {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false);
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let (ipfs, _) = ipfs::UninitializedIpfs::new(options)
.await
.start()
Expand Down
4 changes: 2 additions & 2 deletions http/src/v0/root_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ mod tests {

#[tokio::test]
async fn very_long_file_and_symlink_names() {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false);
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let (ipfs, _) = ipfs::UninitializedIpfs::new(options)
.await
.start()
Expand Down Expand Up @@ -395,7 +395,7 @@ mod tests {

#[tokio::test]
async fn get_multiblock_file() {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false);
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let (ipfs, _) = ipfs::UninitializedIpfs::new(options)
.await
.start()
Expand Down
2 changes: 1 addition & 1 deletion http/src/v0/root_files/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ mod tests {
}

async fn testing_ipfs() -> ipfs::Ipfs<ipfs::TestTypes> {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false);
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let (ipfs, fut) = ipfs::UninitializedIpfs::new(options)
.await
.start()
Expand Down
85 changes: 67 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#[macro_use]
extern crate log;

use anyhow::format_err;
use anyhow::{anyhow, format_err};
use async_std::path::PathBuf;
pub use bitswap::{BitswapEvent, Block, Stats};
pub use cid::Cid;
Expand Down Expand Up @@ -91,6 +91,8 @@ pub struct IpfsOptions<Types: IpfsTypes> {
pub bootstrap: Vec<(Multiaddr, PeerId)>,
/// Enables mdns for peer discovery when true.
pub mdns: bool,
/// Custom Kademlia protocol name.
pub kad_protocol: Option<String>,
ljedrz marked this conversation as resolved.
Show resolved Hide resolved
}

impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
Expand All @@ -102,19 +104,22 @@ impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
.field("bootstrap", &self.bootstrap)
.field("keypair", &DebuggableKeypair(&self.keypair))
.field("mdns", &self.mdns)
.field("kad_protocol", &self.kad_protocol)
.finish()
}
}

impl IpfsOptions<TestTypes> {
/// Creates an inmemory store backed node for tests
pub fn inmemory_with_generated_keys(mdns: bool) -> Self {
Self::new(
std::env::temp_dir().into(),
Keypair::generate_ed25519(),
vec![],
mdns,
)
pub fn inmemory_with_generated_keys() -> Self {
Self {
_marker: PhantomData,
ipfs_path: std::env::temp_dir().into(),
keypair: Keypair::generate_ed25519(),
mdns: Default::default(),
bootstrap: Default::default(),
kad_protocol: Default::default(),
}
}
}

Expand Down Expand Up @@ -147,13 +152,15 @@ impl<Types: IpfsTypes> IpfsOptions<Types> {
keypair: Keypair,
bootstrap: Vec<(Multiaddr, PeerId)>,
mdns: bool,
kad_protocol: Option<String>,
) -> Self {
Self {
_marker: PhantomData,
ipfs_path,
keypair,
bootstrap,
mdns,
kad_protocol,
}
}
}
Expand Down Expand Up @@ -203,6 +210,7 @@ impl<T: IpfsTypes> Default for IpfsOptions<T> {
keypair,
bootstrap,
mdns: true,
kad_protocol: None,
}
}
}
Expand All @@ -226,16 +234,14 @@ pub struct IpfsInner<Types: IpfsTypes> {
}

type Channel<T> = OneshotSender<Result<T, Error>>;
type FutureSubscription<T, E> = SubscriptionFuture<Result<T, E>>;

/// Events used internally to communicate with the swarm, which is executed in the the background
/// task.
#[derive(Debug)]
enum IpfsEvent {
/// Connect
Connect(
Multiaddr,
OneshotSender<SubscriptionFuture<Result<(), String>>>,
),
Connect(Multiaddr, OneshotSender<FutureSubscription<(), String>>),
/// Addresses
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
/// Local addresses
Expand All @@ -255,6 +261,9 @@ enum IpfsEvent {
BitswapStats(OneshotSender<BitswapStats>),
AddListeningAddress(Multiaddr, Channel<Multiaddr>),
RemoveListeningAddress(Multiaddr, Channel<()>),
Bootstrap(OneshotSender<Result<FutureSubscription<(), String>, Error>>),
AddPeer(PeerId, Multiaddr),
GetClosestPeers(PeerId, OneshotSender<FutureSubscription<(), String>>),
Exit,
}

Expand Down Expand Up @@ -869,6 +878,17 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {

let _ = ret.send(removed);
}
IpfsEvent::Bootstrap(ret) => {
let future = self.swarm.bootstrap();
let _ = ret.send(future);
}
IpfsEvent::AddPeer(peer_id, addr) => {
self.swarm.add_peer(peer_id, addr);
}
IpfsEvent::GetClosestPeers(self_peer, ret) => {
let future = self.swarm.get_closest_peers(self_peer);
let _ = ret.send(future);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
Expand Down Expand Up @@ -930,7 +950,7 @@ impl From<(bitswap::Stats, Vec<PeerId>, Vec<(Cid, bitswap::Priority)>)> for Bits
pub use node::Node;

mod node {
use super::{subscription, Block, Ipfs, IpfsOptions, TestTypes, UninitializedIpfs};
use super::*;

/// Node encapsulates everything to setup a testing instance so that multi-node tests become
/// easier.
Expand All @@ -940,8 +960,8 @@ mod node {
}

impl Node {
pub async fn new(mdns: bool) -> Self {
let opts = IpfsOptions::inmemory_with_generated_keys(mdns);
pub async fn new() -> Self {
let opts = IpfsOptions::inmemory_with_generated_keys();
let (ipfs, fut) = UninitializedIpfs::new(opts)
.await
.start()
Expand All @@ -962,6 +982,37 @@ mod node {
&self.ipfs.repo.subscriptions.subscriptions
}

pub async fn get_closest_peers(&self) -> Result<(), Error> {
let self_peer = PeerId::from_public_key(self.identity().await?.0);
let (tx, rx) = oneshot_channel::<FutureSubscription<(), String>>();

self.to_task
.clone()
.send(IpfsEvent::GetClosestPeers(self_peer, tx))
.await?;

rx.await?.await?.map_err(|e| anyhow!(e))
}

/// Initiate a query for random key to discover peers.
pub async fn bootstrap(&self) -> Result<(), Error> {
let (tx, rx) = oneshot_channel::<Result<FutureSubscription<(), String>, Error>>();

self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?;

rx.await??.await?.map_err(|e| anyhow!(e))
}

/// Add a known peer to the DHT.
pub async fn add_peer(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
self.to_task
.clone()
.send(IpfsEvent::AddPeer(peer_id, addr))
.await?;

Ok(())
}

pub async fn shutdown(self) {
self.ipfs.exit_daemon().await;
self.background_task.await;
Expand Down Expand Up @@ -1036,10 +1087,8 @@ mod tests {
use libp2p::build_multiaddr;
use multihash::Sha2_256;

const MDNS: bool = false;

pub async fn create_mock_ipfs() -> Ipfs<TestTypes> {
let options = IpfsOptions::inmemory_with_generated_keys(MDNS);
let options = IpfsOptions::inmemory_with_generated_keys();
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();
task::spawn(fut);

Expand Down
Loading