Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Simplified node internals and creation v3 #394

Merged
merged 8 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/dag_creation.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use futures::join;
use ipfs::{make_ipld, Ipfs, IpfsPath, Types, UninitializedIpfs};
use ipfs::{make_ipld, Ipfs, IpfsOptions, IpfsPath, TestTypes, UninitializedIpfs};
use tokio::task;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

// Initialize the repo and start a daemon
let (ipfs, fut): (Ipfs<Types>, _) = UninitializedIpfs::default().await.start().await.unwrap();
let opts = IpfsOptions::inmemory_with_generated_keys();
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts).start().await.unwrap();
task::spawn(fut);

// Create a DAG
Expand Down
6 changes: 1 addition & 5 deletions examples/fetch_and_cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ async fn main() {

// UninitializedIpfs will handle starting up the repository and return the facade (ipfs::Ipfs)
// and the background task (ipfs::IpfsFuture).
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts, None)
.await
.start()
.await
.unwrap();
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts).start().await.unwrap();

// The background task must be spawned to use anything other than the repository; most notably,
// the libp2p.
Expand Down
16 changes: 8 additions & 8 deletions http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,17 @@ fn main() {
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop");

rt.block_on(async move {
let opts: IpfsOptions = IpfsOptions::new(
home.clone(),
let opts = IpfsOptions {
ipfs_path: home.clone(),
keypair,
Vec::new(),
false,
None,
bootstrap: Vec::new(),
mdns: false,
kad_protocol: None,
listening_addrs,
);
span: None,
};

let (ipfs, task): (Ipfs<ipfs::Types>, _) = UninitializedIpfs::new(opts, None)
.await
let (ipfs, task): (Ipfs<ipfs::Types>, _) = UninitializedIpfs::new(opts)
.start()
.await
.expect("Initialization failed");
Expand Down
7 changes: 2 additions & 5 deletions http/src/v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,8 @@ mod tests {
use ipfs::{IpfsOptions, UninitializedIpfs};

let options = IpfsOptions::inmemory_with_generated_keys();
let (ipfs, _): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(options, None)
.await
.start()
.await
.unwrap();
let (ipfs, _): (Ipfs<TestTypes>, _) =
UninitializedIpfs::new(options).start().await.unwrap();

let (shutdown_tx, _) = tokio::sync::mpsc::channel::<()>(1);

Expand Down
6 changes: 1 addition & 5 deletions http/src/v0/root_files/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,7 @@ mod tests {

async fn tokio_ipfs() -> ipfs::Ipfs<ipfs::TestTypes> {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let (ipfs, fut) = ipfs::UninitializedIpfs::new(options, None)
.await
.start()
.await
.unwrap();
let (ipfs, fut) = ipfs::UninitializedIpfs::new(options).start().await.unwrap();

tokio::spawn(fut);
ipfs
Expand Down
97 changes: 51 additions & 46 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,23 @@ pub struct IpfsOptions {
/// Enables mdns for peer discovery and announcement when true.
pub mdns: bool,

/// Custom Kademlia protocol name.
/// Custom Kademlia protocol name. When set to `None`, the global DHT name is used instead of
/// the LAN dht name.
///
/// The name given here is passed to [`libp2p_kad::KademliaConfig::set_protocol_name`].
///
/// [`libp2p_kad::KademliaConfig::set_protocol_name`]: https://docs.rs/libp2p-kad/*/libp2p_kad/struct.KademliaConfig.html##method.set_protocol_name
pub kad_protocol: Option<String>,

/// Bound listening addresses; by default the node will not listen on any address.
pub listening_addrs: Vec<Multiaddr>,

/// The span for tracing purposes, `None` value is converted to `tracing::trace_span!("ipfs")`.
///
/// All futures returned by `Ipfs`, background task actions and swarm actions are instrumented
/// with this span or spans referring to this as their parent. Setting this other than `None`
/// default is useful when running multiple nodes.
pub span: Option<Span>,
}

impl fmt::Debug for IpfsOptions {
Expand All @@ -153,6 +162,7 @@ impl fmt::Debug for IpfsOptions {
.field("mdns", &self.mdns)
.field("kad_protocol", &self.kad_protocol)
.field("listening_addrs", &self.listening_addrs)
.field("span", &self.span)
.finish()
}
}
Expand All @@ -170,6 +180,7 @@ impl IpfsOptions {
// default to lan kad for go-ipfs use in tests
kad_protocol: Some("/ipfs/lan/kad/1.0.0".to_owned()),
listening_addrs: vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()],
span: None,
}
}
}
Expand Down Expand Up @@ -205,6 +216,7 @@ impl IpfsOptions {
mdns: bool,
kad_protocol: Option<String>,
listening_addrs: Vec<Multiaddr>,
span: Option<Span>,
) -> Self {
Self {
ipfs_path,
Expand All @@ -213,6 +225,7 @@ impl IpfsOptions {
mdns,
kad_protocol,
listening_addrs,
span,
}
}
}
Expand Down Expand Up @@ -263,6 +276,7 @@ impl Default for IpfsOptions {
mdns: true,
kad_protocol: None,
listening_addrs: Vec::new(),
span: None,
}
}
}
Expand All @@ -275,24 +289,24 @@ impl Default for IpfsOptions {
///
/// The facade is created through [`UninitializedIpfs`] which is configured with [`IpfsOptions`].
#[derive(Debug)]
pub struct Ipfs<Types: IpfsTypes>(Arc<IpfsInner<Types>>);
pub struct Ipfs<Types: IpfsTypes> {
span: Span,
repo: Arc<Repo<Types>>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
}

impl<Types: IpfsTypes> Clone for Ipfs<Types> {
fn clone(&self) -> Self {
Ipfs(Arc::clone(&self.0))
Ipfs {
span: self.span.clone(),
repo: Arc::clone(&self.repo),
keys: self.keys.clone(),
to_task: self.to_task.clone(),
}
}
}

/// The internal shared implementation of [`Ipfs`].
#[derive(Debug)]
#[doc(hidden)]
pub struct IpfsInner<Types: IpfsTypes> {
pub span: Span,
repo: Repo<Types>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
}

type Channel<T> = OneshotSender<Result<T, Error>>;

/// Events used internally to communicate with the swarm, which is executed in the the background
Expand Down Expand Up @@ -355,8 +369,7 @@ enum IpfsEvent {

/// Configured Ipfs which can only be started.
pub struct UninitializedIpfs<Types: IpfsTypes> {
repo: Repo<Types>,
span: Span,
repo: Arc<Repo<Types>>,
keys: Keypair,
options: IpfsOptions,
repo_events: Receiver<RepoEvent>,
Expand All @@ -369,23 +382,21 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
/// The span is attached to all operations called on the later created `Ipfs` along with all
/// operations done in the background task as well as tasks spawned by the underlying
/// `libp2p::Swarm`.
pub async fn new(options: IpfsOptions, span: Option<Span>) -> Self {
pub fn new(options: IpfsOptions) -> Self {
let repo_options = RepoOptions::from(&options);
let (repo, repo_events) = create_repo(repo_options);
let keys = options.keypair.clone();
let span = span.unwrap_or_else(|| trace_span!("ipfs"));

UninitializedIpfs {
repo,
span,
repo: Arc::new(repo),
keys,
options,
repo_events,
}
}

pub async fn default() -> Self {
Self::new(IpfsOptions::default(), None).await
pub fn default() -> Self {
Self::new(IpfsOptions::default())
}

/// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the
Expand All @@ -395,25 +406,31 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {

let UninitializedIpfs {
repo,
span,
keys,
repo_events,
options,
mut options,
} = self;

repo.init().await?;

let (to_task, receiver) = channel::<IpfsEvent>(1);

let ipfs = Ipfs(Arc::new(IpfsInner {
span,
repo,
let facade_span = options
.span
.take()
.unwrap_or_else(|| tracing::trace_span!("ipfs"));

let swarm_span = tracing::trace_span!(parent: facade_span.clone(), "swarm");

let ipfs = Ipfs {
span: facade_span,
repo: repo.clone(),
keys: DebuggableKeypair(keys),
to_task,
}));
};

let swarm_options = SwarmOptions::from(&options);
let swarm = create_swarm(swarm_options, ipfs.clone()).await?;
let swarm = create_swarm(swarm_options, swarm_span, repo).await?;

let IpfsOptions {
listening_addrs, ..
Expand All @@ -434,14 +451,6 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
}
}

impl<Types: IpfsTypes> Deref for Ipfs<Types> {
type Target = IpfsInner<Types>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<Types: IpfsTypes> Ipfs<Types> {
/// Return an [`IpldDag`] for DAG operations
pub fn dag(&self) -> IpldDag<Types> {
Expand Down Expand Up @@ -1243,13 +1252,13 @@ impl<Types: IpfsTypes> Ipfs<Types> {
}

/// Exit daemon.
pub async fn exit_daemon(self) {
pub async fn exit_daemon(mut self) {
// FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
// the background task or stream. After that this could be handled by dropping.
self.repo.shutdown();

// ignoring the error because it'd mean that the background task had already been dropped
let _ = self.to_task.clone().try_send(IpfsEvent::Exit);
let _ = self.to_task.try_send(IpfsEvent::Exit);
}
}

Expand Down Expand Up @@ -1675,10 +1684,9 @@ mod node {

impl Node {
pub async fn new<T: AsRef<str>>(name: T) -> Self {
let opts = IpfsOptions::inmemory_with_generated_keys();
Node::with_options(opts)
.instrument(trace_span!("ipfs", node = name.as_ref()))
.await
let mut opts = IpfsOptions::inmemory_with_generated_keys();
opts.span = Some(trace_span!("ipfs", node = name.as_ref()));
Self::with_options(opts).await
}

pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> {
Expand All @@ -1687,12 +1695,9 @@ mod node {
}

pub async fn with_options(opts: IpfsOptions) -> Self {
let span = Some(Span::current());
let id = opts.keypair.public().into_peer_id();

let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts, span)
.in_current_span()
.await
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts)
.start()
.in_current_span()
.await
Expand Down
Loading