Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Commit a7418ae

Browse files
bors[bot]ljedrzJoonas Koivunen
authored
Merge #394
394: Simplified node internals and creation v3 r=koivunej a=koivunej Supercedes #345. Rebased and fixed on top of #392 master. Co-authored-by: ljedrz <ljedrz@gmail.com> Co-authored-by: Joonas Koivunen <joonas@equilibrium.co>
2 parents 43c670d + ab0aec3 commit a7418ae

File tree

8 files changed

+84
-89
lines changed

8 files changed

+84
-89
lines changed

examples/dag_creation.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use futures::join;
2-
use ipfs::{make_ipld, Ipfs, IpfsPath, Types, UninitializedIpfs};
2+
use ipfs::{make_ipld, Ipfs, IpfsOptions, IpfsPath, TestTypes, UninitializedIpfs};
33
use tokio::task;
44

55
#[tokio::main]
66
async fn main() {
77
tracing_subscriber::fmt::init();
88

99
// Initialize the repo and start a daemon
10-
let (ipfs, fut): (Ipfs<Types>, _) = UninitializedIpfs::default().await.start().await.unwrap();
10+
let opts = IpfsOptions::inmemory_with_generated_keys();
11+
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts).start().await.unwrap();
1112
task::spawn(fut);
1213

1314
// Create a DAG

examples/fetch_and_cat.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@ async fn main() {
4848

4949
// UninitializedIpfs will handle starting up the repository and return the facade (ipfs::Ipfs)
5050
// and the background task (ipfs::IpfsFuture).
51-
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts, None)
52-
.await
53-
.start()
54-
.await
55-
.unwrap();
51+
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts).start().await.unwrap();
5652

5753
// The background task must be spawned to use anything other than the repository; most notably,
5854
// the libp2p.

http/src/main.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,17 @@ fn main() {
135135
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop");
136136

137137
rt.block_on(async move {
138-
let opts: IpfsOptions = IpfsOptions::new(
139-
home.clone(),
138+
let opts = IpfsOptions {
139+
ipfs_path: home.clone(),
140140
keypair,
141-
Vec::new(),
142-
false,
143-
None,
141+
bootstrap: Vec::new(),
142+
mdns: false,
143+
kad_protocol: None,
144144
listening_addrs,
145-
);
145+
span: None,
146+
};
146147

147-
let (ipfs, task): (Ipfs<ipfs::Types>, _) = UninitializedIpfs::new(opts, None)
148-
.await
148+
let (ipfs, task): (Ipfs<ipfs::Types>, _) = UninitializedIpfs::new(opts)
149149
.start()
150150
.await
151151
.expect("Initialization failed");

http/src/v0.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,8 @@ mod tests {
192192
use ipfs::{IpfsOptions, UninitializedIpfs};
193193

194194
let options = IpfsOptions::inmemory_with_generated_keys();
195-
let (ipfs, _): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(options, None)
196-
.await
197-
.start()
198-
.await
199-
.unwrap();
195+
let (ipfs, _): (Ipfs<TestTypes>, _) =
196+
UninitializedIpfs::new(options).start().await.unwrap();
200197

201198
let (shutdown_tx, _) = tokio::sync::mpsc::channel::<()>(1);
202199

http/src/v0/root_files/add.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -410,11 +410,7 @@ mod tests {
410410

411411
async fn tokio_ipfs() -> ipfs::Ipfs<ipfs::TestTypes> {
412412
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
413-
let (ipfs, fut) = ipfs::UninitializedIpfs::new(options, None)
414-
.await
415-
.start()
416-
.await
417-
.unwrap();
413+
let (ipfs, fut) = ipfs::UninitializedIpfs::new(options).start().await.unwrap();
418414

419415
tokio::spawn(fut);
420416
ipfs

src/lib.rs

+51-46
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,23 @@ pub struct IpfsOptions {
132132
/// Enables mdns for peer discovery and announcement when true.
133133
pub mdns: bool,
134134

135-
/// Custom Kademlia protocol name.
135+
/// Custom Kademlia protocol name. When set to `None`, the global DHT name is used instead of
136+
/// the LAN dht name.
136137
///
137138
/// The name given here is passed to [`libp2p_kad::KademliaConfig::set_protocol_name`].
138139
///
139140
/// [`libp2p_kad::KademliaConfig::set_protocol_name`]: https://docs.rs/libp2p-kad/*/libp2p_kad/struct.KademliaConfig.html##method.set_protocol_name
140141
pub kad_protocol: Option<String>,
142+
141143
/// Bound listening addresses; by default the node will not listen on any address.
142144
pub listening_addrs: Vec<Multiaddr>,
145+
146+
/// The span for tracing purposes, `None` value is converted to `tracing::trace_span!("ipfs")`.
147+
///
148+
/// All futures returned by `Ipfs`, background task actions and swarm actions are instrumented
149+
/// with this span or spans referring to this as their parent. Setting this other than `None`
150+
/// default is useful when running multiple nodes.
151+
pub span: Option<Span>,
143152
}
144153

145154
impl fmt::Debug for IpfsOptions {
@@ -153,6 +162,7 @@ impl fmt::Debug for IpfsOptions {
153162
.field("mdns", &self.mdns)
154163
.field("kad_protocol", &self.kad_protocol)
155164
.field("listening_addrs", &self.listening_addrs)
165+
.field("span", &self.span)
156166
.finish()
157167
}
158168
}
@@ -170,6 +180,7 @@ impl IpfsOptions {
170180
// default to lan kad for go-ipfs use in tests
171181
kad_protocol: Some("/ipfs/lan/kad/1.0.0".to_owned()),
172182
listening_addrs: vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()],
183+
span: None,
173184
}
174185
}
175186
}
@@ -205,6 +216,7 @@ impl IpfsOptions {
205216
mdns: bool,
206217
kad_protocol: Option<String>,
207218
listening_addrs: Vec<Multiaddr>,
219+
span: Option<Span>,
208220
) -> Self {
209221
Self {
210222
ipfs_path,
@@ -213,6 +225,7 @@ impl IpfsOptions {
213225
mdns,
214226
kad_protocol,
215227
listening_addrs,
228+
span,
216229
}
217230
}
218231
}
@@ -263,6 +276,7 @@ impl Default for IpfsOptions {
263276
mdns: true,
264277
kad_protocol: None,
265278
listening_addrs: Vec::new(),
279+
span: None,
266280
}
267281
}
268282
}
@@ -275,24 +289,24 @@ impl Default for IpfsOptions {
275289
///
276290
/// The facade is created through [`UninitializedIpfs`] which is configured with [`IpfsOptions`].
277291
#[derive(Debug)]
278-
pub struct Ipfs<Types: IpfsTypes>(Arc<IpfsInner<Types>>);
292+
pub struct Ipfs<Types: IpfsTypes> {
293+
span: Span,
294+
repo: Arc<Repo<Types>>,
295+
keys: DebuggableKeypair<Keypair>,
296+
to_task: Sender<IpfsEvent>,
297+
}
279298

280299
impl<Types: IpfsTypes> Clone for Ipfs<Types> {
281300
fn clone(&self) -> Self {
282-
Ipfs(Arc::clone(&self.0))
301+
Ipfs {
302+
span: self.span.clone(),
303+
repo: Arc::clone(&self.repo),
304+
keys: self.keys.clone(),
305+
to_task: self.to_task.clone(),
306+
}
283307
}
284308
}
285309

286-
/// The internal shared implementation of [`Ipfs`].
287-
#[derive(Debug)]
288-
#[doc(hidden)]
289-
pub struct IpfsInner<Types: IpfsTypes> {
290-
pub span: Span,
291-
repo: Repo<Types>,
292-
keys: DebuggableKeypair<Keypair>,
293-
to_task: Sender<IpfsEvent>,
294-
}
295-
296310
type Channel<T> = OneshotSender<Result<T, Error>>;
297311

298312
/// Events used internally to communicate with the swarm, which is executed in the the background
@@ -355,8 +369,7 @@ enum IpfsEvent {
355369

356370
/// Configured Ipfs which can only be started.
357371
pub struct UninitializedIpfs<Types: IpfsTypes> {
358-
repo: Repo<Types>,
359-
span: Span,
372+
repo: Arc<Repo<Types>>,
360373
keys: Keypair,
361374
options: IpfsOptions,
362375
repo_events: Receiver<RepoEvent>,
@@ -369,23 +382,21 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
369382
/// The span is attached to all operations called on the later created `Ipfs` along with all
370383
/// operations done in the background task as well as tasks spawned by the underlying
371384
/// `libp2p::Swarm`.
372-
pub async fn new(options: IpfsOptions, span: Option<Span>) -> Self {
385+
pub fn new(options: IpfsOptions) -> Self {
373386
let repo_options = RepoOptions::from(&options);
374387
let (repo, repo_events) = create_repo(repo_options);
375388
let keys = options.keypair.clone();
376-
let span = span.unwrap_or_else(|| trace_span!("ipfs"));
377389

378390
UninitializedIpfs {
379-
repo,
380-
span,
391+
repo: Arc::new(repo),
381392
keys,
382393
options,
383394
repo_events,
384395
}
385396
}
386397

387-
pub async fn default() -> Self {
388-
Self::new(IpfsOptions::default(), None).await
398+
pub fn default() -> Self {
399+
Self::new(IpfsOptions::default())
389400
}
390401

391402
/// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the
@@ -395,25 +406,31 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
395406

396407
let UninitializedIpfs {
397408
repo,
398-
span,
399409
keys,
400410
repo_events,
401-
options,
411+
mut options,
402412
} = self;
403413

404414
repo.init().await?;
405415

406416
let (to_task, receiver) = channel::<IpfsEvent>(1);
407417

408-
let ipfs = Ipfs(Arc::new(IpfsInner {
409-
span,
410-
repo,
418+
let facade_span = options
419+
.span
420+
.take()
421+
.unwrap_or_else(|| tracing::trace_span!("ipfs"));
422+
423+
let swarm_span = tracing::trace_span!(parent: facade_span.clone(), "swarm");
424+
425+
let ipfs = Ipfs {
426+
span: facade_span,
427+
repo: repo.clone(),
411428
keys: DebuggableKeypair(keys),
412429
to_task,
413-
}));
430+
};
414431

415432
let swarm_options = SwarmOptions::from(&options);
416-
let swarm = create_swarm(swarm_options, ipfs.clone()).await?;
433+
let swarm = create_swarm(swarm_options, swarm_span, repo).await?;
417434

418435
let IpfsOptions {
419436
listening_addrs, ..
@@ -434,14 +451,6 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
434451
}
435452
}
436453

437-
impl<Types: IpfsTypes> Deref for Ipfs<Types> {
438-
type Target = IpfsInner<Types>;
439-
440-
fn deref(&self) -> &Self::Target {
441-
&self.0
442-
}
443-
}
444-
445454
impl<Types: IpfsTypes> Ipfs<Types> {
446455
/// Return an [`IpldDag`] for DAG operations
447456
pub fn dag(&self) -> IpldDag<Types> {
@@ -1243,13 +1252,13 @@ impl<Types: IpfsTypes> Ipfs<Types> {
12431252
}
12441253

12451254
/// Exit daemon.
1246-
pub async fn exit_daemon(self) {
1255+
pub async fn exit_daemon(mut self) {
12471256
// FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
12481257
// the background task or stream. After that this could be handled by dropping.
12491258
self.repo.shutdown();
12501259

12511260
// ignoring the error because it'd mean that the background task had already been dropped
1252-
let _ = self.to_task.clone().try_send(IpfsEvent::Exit);
1261+
let _ = self.to_task.try_send(IpfsEvent::Exit);
12531262
}
12541263
}
12551264

@@ -1675,10 +1684,9 @@ mod node {
16751684

16761685
impl Node {
16771686
pub async fn new<T: AsRef<str>>(name: T) -> Self {
1678-
let opts = IpfsOptions::inmemory_with_generated_keys();
1679-
Node::with_options(opts)
1680-
.instrument(trace_span!("ipfs", node = name.as_ref()))
1681-
.await
1687+
let mut opts = IpfsOptions::inmemory_with_generated_keys();
1688+
opts.span = Some(trace_span!("ipfs", node = name.as_ref()));
1689+
Self::with_options(opts).await
16821690
}
16831691

16841692
pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> {
@@ -1687,12 +1695,9 @@ mod node {
16871695
}
16881696

16891697
pub async fn with_options(opts: IpfsOptions) -> Self {
1690-
let span = Some(Span::current());
16911698
let id = opts.keypair.public().into_peer_id();
16921699

1693-
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts, span)
1694-
.in_current_span()
1695-
.await
1700+
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts)
16961701
.start()
16971702
.in_current_span()
16981703
.await

0 commit comments

Comments
 (0)