diff --git a/examples/dag_creation.rs b/examples/dag_creation.rs index b9e5c448d..61b8a3d57 100644 --- a/examples/dag_creation.rs +++ b/examples/dag_creation.rs @@ -1,5 +1,5 @@ use futures::join; -use ipfs::{make_ipld, Ipfs, IpfsPath, Types, UninitializedIpfs}; +use ipfs::{make_ipld, Ipfs, IpfsOptions, IpfsPath, TestTypes, UninitializedIpfs}; use tokio::task; #[tokio::main] @@ -7,7 +7,8 @@ async fn main() { tracing_subscriber::fmt::init(); // Initialize the repo and start a daemon - let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::default().await.start().await.unwrap(); + let opts = IpfsOptions::inmemory_with_generated_keys(); + let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts).start().await.unwrap(); task::spawn(fut); // Create a DAG diff --git a/examples/fetch_and_cat.rs b/examples/fetch_and_cat.rs index 530c3fb74..6beb62794 100644 --- a/examples/fetch_and_cat.rs +++ b/examples/fetch_and_cat.rs @@ -48,11 +48,7 @@ async fn main() { // UninitializedIpfs will handle starting up the repository and return the facade (ipfs::Ipfs) // and the background task (ipfs::IpfsFuture). - let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts, None) - .await - .start() - .await - .unwrap(); + let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts).start().await.unwrap(); // The background task must be spawned to use anything other than the repository; most notably, // the libp2p. diff --git a/http/src/main.rs b/http/src/main.rs index 7d2ad7c8a..638a4f1d9 100644 --- a/http/src/main.rs +++ b/http/src/main.rs @@ -135,17 +135,17 @@ fn main() { let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop"); rt.block_on(async move { - let opts: IpfsOptions = IpfsOptions::new( - home.clone(), + let opts = IpfsOptions { + ipfs_path: home.clone(), keypair, - Vec::new(), - false, - None, + bootstrap: Vec::new(), + mdns: false, + kad_protocol: None, listening_addrs, - ); + span: None, + }; - let (ipfs, task): (Ipfs, _) = UninitializedIpfs::new(opts, None) - .await + let (ipfs, task): (Ipfs, _) = UninitializedIpfs::new(opts) .start() .await .expect("Initialization failed"); diff --git a/http/src/v0.rs b/http/src/v0.rs index c4f2a1354..0fc291259 100644 --- a/http/src/v0.rs +++ b/http/src/v0.rs @@ -192,11 +192,8 @@ mod tests { use ipfs::{IpfsOptions, UninitializedIpfs}; let options = IpfsOptions::inmemory_with_generated_keys(); - let (ipfs, _): (Ipfs, _) = UninitializedIpfs::new(options, None) - .await - .start() - .await - .unwrap(); + let (ipfs, _): (Ipfs, _) = + UninitializedIpfs::new(options).start().await.unwrap(); let (shutdown_tx, _) = tokio::sync::mpsc::channel::<()>(1); diff --git a/http/src/v0/root_files/add.rs b/http/src/v0/root_files/add.rs index 0c53d3533..ecee819ad 100644 --- a/http/src/v0/root_files/add.rs +++ b/http/src/v0/root_files/add.rs @@ -410,11 +410,7 @@ mod tests { async fn tokio_ipfs() -> ipfs::Ipfs { let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); - let (ipfs, fut) = ipfs::UninitializedIpfs::new(options, None) - .await - .start() - .await - .unwrap(); + let (ipfs, fut) = ipfs::UninitializedIpfs::new(options).start().await.unwrap(); tokio::spawn(fut); ipfs diff --git a/src/lib.rs b/src/lib.rs index 39a25cbb6..5a23fcf1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,14 +132,23 @@ pub struct IpfsOptions { /// Enables mdns for peer discovery and announcement when true. pub mdns: bool, - /// Custom Kademlia protocol name. + /// Custom Kademlia protocol name. When set to `None`, the global DHT name is used instead of + /// the LAN dht name. /// /// The name given here is passed to [`libp2p_kad::KademliaConfig::set_protocol_name`]. /// /// [`libp2p_kad::KademliaConfig::set_protocol_name`]: https://docs.rs/libp2p-kad/*/libp2p_kad/struct.KademliaConfig.html##method.set_protocol_name pub kad_protocol: Option, + /// Bound listening addresses; by default the node will not listen on any address. pub listening_addrs: Vec, + + /// The span for tracing purposes, `None` value is converted to `tracing::trace_span!("ipfs")`. + /// + /// All futures returned by `Ipfs`, background task actions and swarm actions are instrumented + /// with this span or spans referring to this as their parent. Setting this other than `None` + /// default is useful when running multiple nodes. + pub span: Option, } impl fmt::Debug for IpfsOptions { @@ -153,6 +162,7 @@ impl fmt::Debug for IpfsOptions { .field("mdns", &self.mdns) .field("kad_protocol", &self.kad_protocol) .field("listening_addrs", &self.listening_addrs) + .field("span", &self.span) .finish() } } @@ -170,6 +180,7 @@ impl IpfsOptions { // default to lan kad for go-ipfs use in tests kad_protocol: Some("/ipfs/lan/kad/1.0.0".to_owned()), listening_addrs: vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()], + span: None, } } } @@ -205,6 +216,7 @@ impl IpfsOptions { mdns: bool, kad_protocol: Option, listening_addrs: Vec, + span: Option, ) -> Self { Self { ipfs_path, @@ -213,6 +225,7 @@ impl IpfsOptions { mdns, kad_protocol, listening_addrs, + span, } } } @@ -263,6 +276,7 @@ impl Default for IpfsOptions { mdns: true, kad_protocol: None, listening_addrs: Vec::new(), + span: None, } } } @@ -275,24 +289,24 @@ impl Default for IpfsOptions { /// /// The facade is created through [`UninitializedIpfs`] which is configured with [`IpfsOptions`]. #[derive(Debug)] -pub struct Ipfs(Arc>); +pub struct Ipfs { + span: Span, + repo: Arc>, + keys: DebuggableKeypair, + to_task: Sender, +} impl Clone for Ipfs { fn clone(&self) -> Self { - Ipfs(Arc::clone(&self.0)) + Ipfs { + span: self.span.clone(), + repo: Arc::clone(&self.repo), + keys: self.keys.clone(), + to_task: self.to_task.clone(), + } } } -/// The internal shared implementation of [`Ipfs`]. -#[derive(Debug)] -#[doc(hidden)] -pub struct IpfsInner { - pub span: Span, - repo: Repo, - keys: DebuggableKeypair, - to_task: Sender, -} - type Channel = OneshotSender>; /// Events used internally to communicate with the swarm, which is executed in the the background @@ -355,8 +369,7 @@ enum IpfsEvent { /// Configured Ipfs which can only be started. pub struct UninitializedIpfs { - repo: Repo, - span: Span, + repo: Arc>, keys: Keypair, options: IpfsOptions, repo_events: Receiver, @@ -369,23 +382,21 @@ impl UninitializedIpfs { /// The span is attached to all operations called on the later created `Ipfs` along with all /// operations done in the background task as well as tasks spawned by the underlying /// `libp2p::Swarm`. - pub async fn new(options: IpfsOptions, span: Option) -> Self { + pub fn new(options: IpfsOptions) -> Self { let repo_options = RepoOptions::from(&options); let (repo, repo_events) = create_repo(repo_options); let keys = options.keypair.clone(); - let span = span.unwrap_or_else(|| trace_span!("ipfs")); UninitializedIpfs { - repo, - span, + repo: Arc::new(repo), keys, options, repo_events, } } - pub async fn default() -> Self { - Self::new(IpfsOptions::default(), None).await + pub fn default() -> Self { + Self::new(IpfsOptions::default()) } /// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the @@ -395,25 +406,31 @@ impl UninitializedIpfs { let UninitializedIpfs { repo, - span, keys, repo_events, - options, + mut options, } = self; repo.init().await?; let (to_task, receiver) = channel::(1); - let ipfs = Ipfs(Arc::new(IpfsInner { - span, - repo, + let facade_span = options + .span + .take() + .unwrap_or_else(|| tracing::trace_span!("ipfs")); + + let swarm_span = tracing::trace_span!(parent: facade_span.clone(), "swarm"); + + let ipfs = Ipfs { + span: facade_span, + repo: repo.clone(), keys: DebuggableKeypair(keys), to_task, - })); + }; let swarm_options = SwarmOptions::from(&options); - let swarm = create_swarm(swarm_options, ipfs.clone()).await?; + let swarm = create_swarm(swarm_options, swarm_span, repo).await?; let IpfsOptions { listening_addrs, .. @@ -434,14 +451,6 @@ impl UninitializedIpfs { } } -impl Deref for Ipfs { - type Target = IpfsInner; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl Ipfs { /// Return an [`IpldDag`] for DAG operations pub fn dag(&self) -> IpldDag { @@ -1243,13 +1252,13 @@ impl Ipfs { } /// Exit daemon. - pub async fn exit_daemon(self) { + pub async fn exit_daemon(mut self) { // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of // the background task or stream. After that this could be handled by dropping. self.repo.shutdown(); // ignoring the error because it'd mean that the background task had already been dropped - let _ = self.to_task.clone().try_send(IpfsEvent::Exit); + let _ = self.to_task.try_send(IpfsEvent::Exit); } } @@ -1675,10 +1684,9 @@ mod node { impl Node { pub async fn new>(name: T) -> Self { - let opts = IpfsOptions::inmemory_with_generated_keys(); - Node::with_options(opts) - .instrument(trace_span!("ipfs", node = name.as_ref())) - .await + let mut opts = IpfsOptions::inmemory_with_generated_keys(); + opts.span = Some(trace_span!("ipfs", node = name.as_ref())); + Self::with_options(opts).await } pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> { @@ -1687,12 +1695,9 @@ mod node { } pub async fn with_options(opts: IpfsOptions) -> Self { - let span = Some(Span::current()); let id = opts.keypair.public().into_peer_id(); - let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts, span) - .in_current_span() - .await + let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts) .start() .in_current_span() .await diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index c8ade98b4..2199748d9 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -2,9 +2,9 @@ use super::pubsub::Pubsub; use super::swarm::{Connection, Disconnector, SwarmApi}; use crate::config::BOOTSTRAP_NODES; use crate::p2p::{MultiaddrWithPeerId, SwarmOptions}; -use crate::repo::BlockPut; +use crate::repo::{BlockPut, Repo}; use crate::subscription::{SubscriptionFuture, SubscriptionRegistry}; -use crate::{Ipfs, IpfsTypes}; +use crate::IpfsTypes; use anyhow::anyhow; use bitswap::{Bitswap, BitswapEvent}; use cid::Cid; @@ -16,16 +16,15 @@ use libp2p::mdns::{MdnsEvent, TokioMdns}; use libp2p::ping::{Ping, PingEvent}; use libp2p::swarm::toggle::Toggle; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourEventProcess}; -use libp2p::NetworkBehaviour; use multibase::Base; use std::{convert::TryInto, sync::Arc}; use tokio::task; /// Behaviour type. -#[derive(NetworkBehaviour)] +#[derive(libp2p::NetworkBehaviour)] pub struct Behaviour { #[behaviour(ignore)] - ipfs: Ipfs, + repo: Arc>, mdns: Toggle, kademlia: Kademlia, #[behaviour(ignore)] @@ -314,11 +313,11 @@ impl NetworkBehaviourEventProcess for Behaviour< fn inject_event(&mut self, event: BitswapEvent) { match event { BitswapEvent::ReceivedBlock(peer_id, block) => { - let ipfs = self.ipfs.clone(); + let repo = self.repo.clone(); let peer_stats = Arc::clone(&self.bitswap.stats.get(&peer_id).unwrap()); task::spawn(async move { let bytes = block.data().len() as u64; - let res = ipfs.repo.put_block(block.clone()).await; + let res = repo.put_block(block.clone()).await; match res { Ok((_, uniqueness)) => match uniqueness { BlockPut::NewBlock => peer_stats.update_incoming_unique(bytes), @@ -343,10 +342,10 @@ impl NetworkBehaviourEventProcess for Behaviour< ); let queued_blocks = self.bitswap().queued_blocks.clone(); - let ipfs = self.ipfs.clone(); + let repo = self.repo.clone(); task::spawn(async move { - match ipfs.repo.get_block_now(&cid).await { + match repo.get_block_now(&cid).await { Ok(Some(block)) => { let _ = queued_blocks.unbounded_send((peer_id, block)); } @@ -413,7 +412,7 @@ impl NetworkBehaviourEventProcess for Behaviour impl Behaviour { /// Create a Kademlia behaviour with the IPFS bootstrap nodes. - pub async fn new(options: SwarmOptions, ipfs: Ipfs) -> Self { + pub async fn new(options: SwarmOptions, repo: Arc>) -> Self { info!("net: starting with peer id {}", options.peer_id); let mdns = if options.mdns { @@ -454,7 +453,7 @@ impl Behaviour { } Behaviour { - ipfs, + repo, mdns, kademlia, kad_subscriptions: Default::default(), @@ -646,7 +645,7 @@ impl Behaviour { /// Create a IPFS behaviour with the IPFS bootstrap nodes. pub async fn build_behaviour( options: SwarmOptions, - ipfs: Ipfs, + repo: Arc>, ) -> Behaviour { - Behaviour::new(options, ipfs).await + Behaviour::new(options, repo).await } diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index a7ed3065d..8657c4714 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -1,9 +1,11 @@ //! P2P handling for IPFS nodes. -use crate::{Ipfs, IpfsOptions, IpfsTypes}; +use crate::repo::Repo; +use crate::{IpfsOptions, IpfsTypes}; use libp2p::identity::Keypair; use libp2p::Swarm; use libp2p::{Multiaddr, PeerId}; use std::io; +use std::sync::Arc; use tracing::Span; pub(crate) mod addr; @@ -46,17 +48,16 @@ impl From<&IpfsOptions> for SwarmOptions { /// Creates a new IPFS swarm. pub async fn create_swarm( options: SwarmOptions, - ipfs: Ipfs, + swarm_span: Span, + repo: Arc>, ) -> io::Result> { let peer_id = options.peer_id.clone(); // Set up an encrypted TCP transport over the Mplex protocol. let transport = transport::build_transport(options.keypair.clone())?; - let swarm_span = ipfs.0.span.clone(); - // Create a Kademlia behaviour - let behaviour = behaviour::build_behaviour(options, ipfs).await; + let behaviour = behaviour::build_behaviour(options, repo).await; // Create a Swarm let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)