From 87230f2b1763113fb9438da466ba976e595f2e02 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Fri, 28 Aug 2020 15:57:16 +0200 Subject: [PATCH 1/8] chore: make IpfsInner.span private --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 39a25cbb6..56f13bb9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -287,7 +287,7 @@ impl Clone for Ipfs { #[derive(Debug)] #[doc(hidden)] pub struct IpfsInner { - pub span: Span, + span: Span, repo: Repo, keys: DebuggableKeypair, to_task: Sender, From 23b59ff47e1b2fb01ce14db855e6c3d5ce672810 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Fri, 28 Aug 2020 16:02:02 +0200 Subject: [PATCH 2/8] refactor: remove InnerIpfs, wrap Ipfs.repo in an Arc --- src/lib.rs | 44 ++++++++++++++++++-------------------------- src/p2p/mod.rs | 2 +- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 56f13bb9b..db5b5e744 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -275,24 +275,24 @@ impl Default for IpfsOptions { /// /// The facade is created through [`UninitializedIpfs`] which is configured with [`IpfsOptions`]. #[derive(Debug)] -pub struct Ipfs(Arc>); +pub struct Ipfs { + span: Span, + repo: Arc>, + keys: DebuggableKeypair, + to_task: Sender, +} impl Clone for Ipfs { fn clone(&self) -> Self { - Ipfs(Arc::clone(&self.0)) + Ipfs { + span: self.span.clone(), + repo: Arc::clone(&self.repo), + keys: self.keys.clone(), + to_task: self.to_task.clone(), + } } } -/// The internal shared implementation of [`Ipfs`]. -#[derive(Debug)] -#[doc(hidden)] -pub struct IpfsInner { - span: Span, - repo: Repo, - keys: DebuggableKeypair, - to_task: Sender, -} - type Channel = OneshotSender>; /// Events used internally to communicate with the swarm, which is executed in the the background @@ -355,7 +355,7 @@ enum IpfsEvent { /// Configured Ipfs which can only be started. pub struct UninitializedIpfs { - repo: Repo, + repo: Arc>, span: Span, keys: Keypair, options: IpfsOptions, @@ -376,7 +376,7 @@ impl UninitializedIpfs { let span = span.unwrap_or_else(|| trace_span!("ipfs")); UninitializedIpfs { - repo, + repo: Arc::new(repo), span, keys, options, @@ -405,12 +405,12 @@ impl UninitializedIpfs { let (to_task, receiver) = channel::(1); - let ipfs = Ipfs(Arc::new(IpfsInner { + let ipfs = Ipfs { span, repo, keys: DebuggableKeypair(keys), to_task, - })); + }; let swarm_options = SwarmOptions::from(&options); let swarm = create_swarm(swarm_options, ipfs.clone()).await?; @@ -434,14 +434,6 @@ impl UninitializedIpfs { } } -impl Deref for Ipfs { - type Target = IpfsInner; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl Ipfs { /// Return an [`IpldDag`] for DAG operations pub fn dag(&self) -> IpldDag { @@ -1243,13 +1235,13 @@ impl Ipfs { } /// Exit daemon. - pub async fn exit_daemon(self) { + pub async fn exit_daemon(mut self) { // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of // the background task or stream. After that this could be handled by dropping. self.repo.shutdown(); // ignoring the error because it'd mean that the background task had already been dropped - let _ = self.to_task.clone().try_send(IpfsEvent::Exit); + let _ = self.to_task.try_send(IpfsEvent::Exit); } } diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index a7ed3065d..e71d49978 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -53,7 +53,7 @@ pub async fn create_swarm( // Set up an encrypted TCP transport over the Mplex protocol. let transport = transport::build_transport(options.keypair.clone())?; - let swarm_span = ipfs.0.span.clone(); + let swarm_span = ipfs.span.clone(); // Create a Kademlia behaviour let behaviour = behaviour::build_behaviour(options, ipfs).await; From 248740060a12ddbef415b8af2ee08498a33bd377 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Fri, 28 Aug 2020 16:05:49 +0200 Subject: [PATCH 3/8] feat: UninitializedIpfs::new doesn't need to be async --- http/src/main.rs | 1 - src/lib.rs | 8 +++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/http/src/main.rs b/http/src/main.rs index 7d2ad7c8a..441be6fd3 100644 --- a/http/src/main.rs +++ b/http/src/main.rs @@ -145,7 +145,6 @@ fn main() { ); let (ipfs, task): (Ipfs, _) = UninitializedIpfs::new(opts, None) - .await .start() .await .expect("Initialization failed"); diff --git a/src/lib.rs b/src/lib.rs index db5b5e744..5b23a2aaf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -369,7 +369,7 @@ impl UninitializedIpfs { /// The span is attached to all operations called on the later created `Ipfs` along with all /// operations done in the background task as well as tasks spawned by the underlying /// `libp2p::Swarm`. - pub async fn new(options: IpfsOptions, span: Option) -> Self { + pub fn new(options: IpfsOptions, span: Option) -> Self { let repo_options = RepoOptions::from(&options); let (repo, repo_events) = create_repo(repo_options); let keys = options.keypair.clone(); @@ -384,8 +384,8 @@ impl UninitializedIpfs { } } - pub async fn default() -> Self { - Self::new(IpfsOptions::default(), None).await + pub fn default() -> Self { + Self::new(IpfsOptions::default(), None) } /// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the @@ -1683,8 +1683,6 @@ mod node { let id = opts.keypair.public().into_peer_id(); let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts, span) - .in_current_span() - .await .start() .in_current_span() .await From 74ac0ede901cf838c230972deea2d6eebfaeee25 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Fri, 28 Aug 2020 16:12:07 +0200 Subject: [PATCH 4/8] refactor: move the span from UninitializedIpfs to IpfsOptions --- http/src/main.rs | 15 ++++++++------- src/lib.rs | 34 ++++++++++++++++++++-------------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/http/src/main.rs b/http/src/main.rs index 441be6fd3..638a4f1d9 100644 --- a/http/src/main.rs +++ b/http/src/main.rs @@ -135,16 +135,17 @@ fn main() { let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop"); rt.block_on(async move { - let opts: IpfsOptions = IpfsOptions::new( - home.clone(), + let opts = IpfsOptions { + ipfs_path: home.clone(), keypair, - Vec::new(), - false, - None, + bootstrap: Vec::new(), + mdns: false, + kad_protocol: None, listening_addrs, - ); + span: None, + }; - let (ipfs, task): (Ipfs, _) = UninitializedIpfs::new(opts, None) + let (ipfs, task): (Ipfs, _) = UninitializedIpfs::new(opts) .start() .await .expect("Initialization failed"); diff --git a/src/lib.rs b/src/lib.rs index 5b23a2aaf..b128323da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -138,8 +138,12 @@ pub struct IpfsOptions { /// /// [`libp2p_kad::KademliaConfig::set_protocol_name`]: https://docs.rs/libp2p-kad/*/libp2p_kad/struct.KademliaConfig.html##method.set_protocol_name pub kad_protocol: Option, + /// Bound listening addresses; by default the node will not listen on any address. pub listening_addrs: Vec, + + /// The span for tracing purposes. + span: Option, } impl fmt::Debug for IpfsOptions { @@ -153,6 +157,7 @@ impl fmt::Debug for IpfsOptions { .field("mdns", &self.mdns) .field("kad_protocol", &self.kad_protocol) .field("listening_addrs", &self.listening_addrs) + .field("span", &self.span) .finish() } } @@ -170,6 +175,7 @@ impl IpfsOptions { // default to lan kad for go-ipfs use in tests kad_protocol: Some("/ipfs/lan/kad/1.0.0".to_owned()), listening_addrs: vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()], + span: None, } } } @@ -205,6 +211,7 @@ impl IpfsOptions { mdns: bool, kad_protocol: Option, listening_addrs: Vec, + span: Option, ) -> Self { Self { ipfs_path, @@ -213,6 +220,7 @@ impl IpfsOptions { mdns, kad_protocol, listening_addrs, + span, } } } @@ -263,6 +271,7 @@ impl Default for IpfsOptions { mdns: true, kad_protocol: None, listening_addrs: Vec::new(), + span: None, } } } @@ -356,7 +365,6 @@ enum IpfsEvent { /// Configured Ipfs which can only be started. pub struct UninitializedIpfs { repo: Arc>, - span: Span, keys: Keypair, options: IpfsOptions, repo_events: Receiver, @@ -369,15 +377,13 @@ impl UninitializedIpfs { /// The span is attached to all operations called on the later created `Ipfs` along with all /// operations done in the background task as well as tasks spawned by the underlying /// `libp2p::Swarm`. - pub fn new(options: IpfsOptions, span: Option) -> Self { + pub fn new(options: IpfsOptions) -> Self { let repo_options = RepoOptions::from(&options); let (repo, repo_events) = create_repo(repo_options); let keys = options.keypair.clone(); - let span = span.unwrap_or_else(|| trace_span!("ipfs")); UninitializedIpfs { repo: Arc::new(repo), - span, keys, options, repo_events, @@ -385,7 +391,7 @@ impl UninitializedIpfs { } pub fn default() -> Self { - Self::new(IpfsOptions::default(), None) + Self::new(IpfsOptions::default()) } /// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the @@ -395,10 +401,9 @@ impl UninitializedIpfs { let UninitializedIpfs { repo, - span, keys, repo_events, - options, + mut options, } = self; repo.init().await?; @@ -406,7 +411,10 @@ impl UninitializedIpfs { let (to_task, receiver) = channel::(1); let ipfs = Ipfs { - span, + span: options + .span + .take() + .unwrap_or_else(|| tracing::trace_span!("ipfs")), repo, keys: DebuggableKeypair(keys), to_task, @@ -1667,10 +1675,9 @@ mod node { impl Node { pub async fn new>(name: T) -> Self { - let opts = IpfsOptions::inmemory_with_generated_keys(); - Node::with_options(opts) - .instrument(trace_span!("ipfs", node = name.as_ref())) - .await + let mut opts = IpfsOptions::inmemory_with_generated_keys(); + opts.span = Some(trace_span!("ipfs", node = name.as_ref())); + Self::with_options(opts).await } pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> { @@ -1679,10 +1686,9 @@ mod node { } pub async fn with_options(opts: IpfsOptions) -> Self { - let span = Some(Span::current()); let id = opts.keypair.public().into_peer_id(); - let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts, span) + let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts) .start() .in_current_span() .await From f6ec5fa8f1259aede9af5f3c40321c60a5686aaa Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 23 Sep 2020 14:05:50 +0300 Subject: [PATCH 5/8] fix: examples and tests --- examples/dag_creation.rs | 5 +++-- examples/fetch_and_cat.rs | 6 +----- http/src/v0.rs | 7 ++----- http/src/v0/root_files/add.rs | 6 +----- src/lib.rs | 5 +++-- 5 files changed, 10 insertions(+), 19 deletions(-) diff --git a/examples/dag_creation.rs b/examples/dag_creation.rs index b9e5c448d..61b8a3d57 100644 --- a/examples/dag_creation.rs +++ b/examples/dag_creation.rs @@ -1,5 +1,5 @@ use futures::join; -use ipfs::{make_ipld, Ipfs, IpfsPath, Types, UninitializedIpfs}; +use ipfs::{make_ipld, Ipfs, IpfsOptions, IpfsPath, TestTypes, UninitializedIpfs}; use tokio::task; #[tokio::main] @@ -7,7 +7,8 @@ async fn main() { tracing_subscriber::fmt::init(); // Initialize the repo and start a daemon - let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::default().await.start().await.unwrap(); + let opts = IpfsOptions::inmemory_with_generated_keys(); + let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts).start().await.unwrap(); task::spawn(fut); // Create a DAG diff --git a/examples/fetch_and_cat.rs b/examples/fetch_and_cat.rs index 530c3fb74..6beb62794 100644 --- a/examples/fetch_and_cat.rs +++ b/examples/fetch_and_cat.rs @@ -48,11 +48,7 @@ async fn main() { // UninitializedIpfs will handle starting up the repository and return the facade (ipfs::Ipfs) // and the background task (ipfs::IpfsFuture). - let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts, None) - .await - .start() - .await - .unwrap(); + let (ipfs, fut): (Ipfs, _) = UninitializedIpfs::new(opts).start().await.unwrap(); // The background task must be spawned to use anything other than the repository; most notably, // the libp2p. diff --git a/http/src/v0.rs b/http/src/v0.rs index c4f2a1354..0fc291259 100644 --- a/http/src/v0.rs +++ b/http/src/v0.rs @@ -192,11 +192,8 @@ mod tests { use ipfs::{IpfsOptions, UninitializedIpfs}; let options = IpfsOptions::inmemory_with_generated_keys(); - let (ipfs, _): (Ipfs, _) = UninitializedIpfs::new(options, None) - .await - .start() - .await - .unwrap(); + let (ipfs, _): (Ipfs, _) = + UninitializedIpfs::new(options).start().await.unwrap(); let (shutdown_tx, _) = tokio::sync::mpsc::channel::<()>(1); diff --git a/http/src/v0/root_files/add.rs b/http/src/v0/root_files/add.rs index 0c53d3533..ecee819ad 100644 --- a/http/src/v0/root_files/add.rs +++ b/http/src/v0/root_files/add.rs @@ -410,11 +410,7 @@ mod tests { async fn tokio_ipfs() -> ipfs::Ipfs { let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); - let (ipfs, fut) = ipfs::UninitializedIpfs::new(options, None) - .await - .start() - .await - .unwrap(); + let (ipfs, fut) = ipfs::UninitializedIpfs::new(options).start().await.unwrap(); tokio::spawn(fut); ipfs diff --git a/src/lib.rs b/src/lib.rs index b128323da..f972281a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -142,8 +142,9 @@ pub struct IpfsOptions { /// Bound listening addresses; by default the node will not listen on any address. pub listening_addrs: Vec, - /// The span for tracing purposes. - span: Option, + /// The span for tracing purposes. All futures returned by `Ipfs`, background task actions and + /// swarm actions are instrumented with this span or spans referring to this as their parent. + pub span: Option, } impl fmt::Debug for IpfsOptions { From 9e934446fa8f894a85e7bc6df9bdfaeb754b613f Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 23 Sep 2020 14:21:26 +0300 Subject: [PATCH 6/8] doc: adjust IpfsOptions::span --- src/lib.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f972281a1..4044c3b23 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -142,8 +142,11 @@ pub struct IpfsOptions { /// Bound listening addresses; by default the node will not listen on any address. pub listening_addrs: Vec, - /// The span for tracing purposes. All futures returned by `Ipfs`, background task actions and - /// swarm actions are instrumented with this span or spans referring to this as their parent. + /// The span for tracing purposes, `None` value is converted to `tracing::trace_span!("ipfs")`. + /// + /// All futures returned by `Ipfs`, background task actions and swarm actions are instrumented + /// with this span or spans referring to this as their parent. Setting this other than `None` + /// default is useful when running multiple nodes. pub span: Option, } From 2180135e1ad7133912dd08da6c78c8e6584f4dc3 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 23 Sep 2020 14:23:51 +0300 Subject: [PATCH 7/8] doc: describe the default for Kademlia protocol name --- src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 4044c3b23..e9a14bbb4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,7 +132,8 @@ pub struct IpfsOptions { /// Enables mdns for peer discovery and announcement when true. pub mdns: bool, - /// Custom Kademlia protocol name. + /// Custom Kademlia protocol name. When set to `None`, the global DHT name is used instead of + /// the LAN dht name. /// /// The name given here is passed to [`libp2p_kad::KademliaConfig::set_protocol_name`]. /// From ab0aec378065d7eaaa6887d0d86a4a3c8a1b01b6 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 23 Sep 2020 14:42:39 +0300 Subject: [PATCH 8/8] refactor: depend in p2p::Behaviour on Arc> ... instead of Ipfs. this was the original idea in #343 perhaps. --- src/lib.rs | 16 ++++++++++------ src/p2p/behaviour.rs | 25 ++++++++++++------------- src/p2p/mod.rs | 11 ++++++----- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e9a14bbb4..5a23fcf1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -415,18 +415,22 @@ impl UninitializedIpfs { let (to_task, receiver) = channel::(1); + let facade_span = options + .span + .take() + .unwrap_or_else(|| tracing::trace_span!("ipfs")); + + let swarm_span = tracing::trace_span!(parent: facade_span.clone(), "swarm"); + let ipfs = Ipfs { - span: options - .span - .take() - .unwrap_or_else(|| tracing::trace_span!("ipfs")), - repo, + span: facade_span, + repo: repo.clone(), keys: DebuggableKeypair(keys), to_task, }; let swarm_options = SwarmOptions::from(&options); - let swarm = create_swarm(swarm_options, ipfs.clone()).await?; + let swarm = create_swarm(swarm_options, swarm_span, repo).await?; let IpfsOptions { listening_addrs, .. diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index c8ade98b4..2199748d9 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -2,9 +2,9 @@ use super::pubsub::Pubsub; use super::swarm::{Connection, Disconnector, SwarmApi}; use crate::config::BOOTSTRAP_NODES; use crate::p2p::{MultiaddrWithPeerId, SwarmOptions}; -use crate::repo::BlockPut; +use crate::repo::{BlockPut, Repo}; use crate::subscription::{SubscriptionFuture, SubscriptionRegistry}; -use crate::{Ipfs, IpfsTypes}; +use crate::IpfsTypes; use anyhow::anyhow; use bitswap::{Bitswap, BitswapEvent}; use cid::Cid; @@ -16,16 +16,15 @@ use libp2p::mdns::{MdnsEvent, TokioMdns}; use libp2p::ping::{Ping, PingEvent}; use libp2p::swarm::toggle::Toggle; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourEventProcess}; -use libp2p::NetworkBehaviour; use multibase::Base; use std::{convert::TryInto, sync::Arc}; use tokio::task; /// Behaviour type. -#[derive(NetworkBehaviour)] +#[derive(libp2p::NetworkBehaviour)] pub struct Behaviour { #[behaviour(ignore)] - ipfs: Ipfs, + repo: Arc>, mdns: Toggle, kademlia: Kademlia, #[behaviour(ignore)] @@ -314,11 +313,11 @@ impl NetworkBehaviourEventProcess for Behaviour< fn inject_event(&mut self, event: BitswapEvent) { match event { BitswapEvent::ReceivedBlock(peer_id, block) => { - let ipfs = self.ipfs.clone(); + let repo = self.repo.clone(); let peer_stats = Arc::clone(&self.bitswap.stats.get(&peer_id).unwrap()); task::spawn(async move { let bytes = block.data().len() as u64; - let res = ipfs.repo.put_block(block.clone()).await; + let res = repo.put_block(block.clone()).await; match res { Ok((_, uniqueness)) => match uniqueness { BlockPut::NewBlock => peer_stats.update_incoming_unique(bytes), @@ -343,10 +342,10 @@ impl NetworkBehaviourEventProcess for Behaviour< ); let queued_blocks = self.bitswap().queued_blocks.clone(); - let ipfs = self.ipfs.clone(); + let repo = self.repo.clone(); task::spawn(async move { - match ipfs.repo.get_block_now(&cid).await { + match repo.get_block_now(&cid).await { Ok(Some(block)) => { let _ = queued_blocks.unbounded_send((peer_id, block)); } @@ -413,7 +412,7 @@ impl NetworkBehaviourEventProcess for Behaviour impl Behaviour { /// Create a Kademlia behaviour with the IPFS bootstrap nodes. - pub async fn new(options: SwarmOptions, ipfs: Ipfs) -> Self { + pub async fn new(options: SwarmOptions, repo: Arc>) -> Self { info!("net: starting with peer id {}", options.peer_id); let mdns = if options.mdns { @@ -454,7 +453,7 @@ impl Behaviour { } Behaviour { - ipfs, + repo, mdns, kademlia, kad_subscriptions: Default::default(), @@ -646,7 +645,7 @@ impl Behaviour { /// Create a IPFS behaviour with the IPFS bootstrap nodes. pub async fn build_behaviour( options: SwarmOptions, - ipfs: Ipfs, + repo: Arc>, ) -> Behaviour { - Behaviour::new(options, ipfs).await + Behaviour::new(options, repo).await } diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index e71d49978..8657c4714 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -1,9 +1,11 @@ //! P2P handling for IPFS nodes. -use crate::{Ipfs, IpfsOptions, IpfsTypes}; +use crate::repo::Repo; +use crate::{IpfsOptions, IpfsTypes}; use libp2p::identity::Keypair; use libp2p::Swarm; use libp2p::{Multiaddr, PeerId}; use std::io; +use std::sync::Arc; use tracing::Span; pub(crate) mod addr; @@ -46,17 +48,16 @@ impl From<&IpfsOptions> for SwarmOptions { /// Creates a new IPFS swarm. pub async fn create_swarm( options: SwarmOptions, - ipfs: Ipfs, + swarm_span: Span, + repo: Arc>, ) -> io::Result> { let peer_id = options.peer_id.clone(); // Set up an encrypted TCP transport over the Mplex protocol. let transport = transport::build_transport(options.keypair.clone())?; - let swarm_span = ipfs.span.clone(); - // Create a Kademlia behaviour - let behaviour = behaviour::build_behaviour(options, ipfs).await; + let behaviour = behaviour::build_behaviour(options, repo).await; // Create a Swarm let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)