Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: offline adding #415

Merged
merged 2 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 64 additions & 27 deletions iroh/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use console::style;
use crossterm::style::Stylize;
use futures::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use iroh_api::{AddEvent, Api, ApiExt, IpfsPath, Iroh};
use iroh_api::{AddEvent, Api, ApiExt, IpfsPath, Iroh, ServiceStatus};
use iroh_metrics::config::Config as MetricsConfig;
use iroh_util::human;

Expand Down Expand Up @@ -52,6 +53,9 @@ enum Commands {
/// Do not wrap added content with a directory
#[clap(long)]
no_wrap: bool,
/// Don't provide added content to the network
#[clap(long)]
offline: bool,
},
#[clap(about = "Fetch IPFS content and write it to disk")]
#[clap(after_help = doc::GET_LONG_DESCRIPTION )]
Expand Down Expand Up @@ -127,8 +131,9 @@ impl Cli {
path,
recursive,
no_wrap,
offline,
} => {
add(api, path, *no_wrap, *recursive).await?;
add(api, path, *no_wrap, *recursive, !*offline).await?;
}
Commands::Get {
ipfs_path: path,
Expand Down Expand Up @@ -161,7 +166,13 @@ impl Cli {
}
}

async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Result<()> {
async fn add(
api: &impl Api,
path: &Path,
no_wrap: bool,
recursive: bool,
provide: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the next person touching this needs to create a config struct

) -> Result<()> {
if !path.exists() {
anyhow::bail!("Path does not exist");
}
Expand All @@ -175,13 +186,35 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
);
}

let mut steps = 3;
// we require p2p for adding right now because we don't have a mechanism for
// hydrating only the root CID to the p2p node for providing if a CID were
// ingested offline. Offline adding should happen, but this is the current
// path of least confusion
require_services(api, HashSet::from(["store", "p2p"])).await?;
let svc_status = require_services(api, HashSet::from(["store"])).await?;
match (provide, svc_status.p2p.status()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but we have to make sure that we refuse to speak to services with different build version

(true, ServiceStatus::Down(_status)) => {
anyhow::bail!("Add provides content to the IPFS network by default, but the p2p service is not running.\n{}",
"hint: try using the --offline flag, or run 'iroh start p2p'".yellow()
)
}
(true, ServiceStatus::Unknown)
| (true, ServiceStatus::NotServing)
| (true, ServiceStatus::ServiceUnknown) => {
anyhow::bail!("Add provides content to the IPFS network by default, but the p2p service is not running.\n{}",
"hint: try using the --offline flag, or run 'iroh start p2p'".yellow()
)
}
(true, ServiceStatus::Serving) => {}
(false, _) => {
steps -= 1;
}
}

println!("{} Calculating size...", style("[1/3]").bold().dim());
println!(
"{} Calculating size...",
style(format!("[1/{}]", steps)).bold().dim()
);

let pb = ProgressBar::new_spinner();
let mut total_size: u64 = 0;
Expand All @@ -203,7 +236,7 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res

println!(
"{} Importing content {}...",
style("[2/3]").bold().dim(),
style(format!("[2/{}]", steps)).bold().dim(),
human::format_bytes(total_size)
);

Expand All @@ -229,29 +262,33 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
}
pb.finish_and_clear();

let pb = ProgressBar::new(cids.len().try_into().unwrap());
let root = *cids.last().context("File processing failed")?;
// remove everything but the root
cids.splice(0..cids.len() - 1, []);
let rec_str = if cids.len() == 1 { "record" } else { "records" };
println!(
"{} Providing {} {} to the distributed hash table ...",
style("[3/3]").bold().dim(),
cids.len(),
rec_str,
);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos}/{len} ({per_sec}) {msg}",
)
.unwrap(),
);
pb.inc(0);
for cid in cids {
api.provide(cid).await?;
pb.inc(1);

if provide {
let pb = ProgressBar::new(cids.len().try_into().unwrap());
// remove everything but the root
cids.splice(0..cids.len() - 1, []);
let rec_str = if cids.len() == 1 { "record" } else { "records" };
println!(
"{} Providing {} {} to the distributed hash table ...",
style(format!("[3/{}]", steps)).bold().dim(),
cids.len(),
rec_str,
);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos}/{len} ({per_sec}) {msg}",
)
.unwrap(),
);
pb.inc(0);
for cid in cids {
api.provide(cid).await?;
pb.inc(1);
}
pb.finish_and_clear();
}
pb.finish_and_clear();

println!("/ipfs/{}", root);

Ok(())
Expand Down
10 changes: 7 additions & 3 deletions iroh/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,12 @@ where
Ok(())
}

/// require a set of services is up
pub async fn require_services(api: &impl Api, services: HashSet<&str>) -> Result<()> {
/// require a set of services is up. returns the underlying status table of all
/// services for additional scrutiny
pub async fn require_services(
api: &impl Api,
services: HashSet<&str>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer BTreeSet in such cases because it is more deterministic. But in this case it does not really matter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah totally. I'll come through & refactor all uses of HashSet to BTreeSet in a follow up

) -> Result<iroh_api::StatusTable> {
let table = api.check().await;
for service in table.iter() {
if services.contains(service.name()) && service.status() != iroh_api::ServiceStatus::Serving
Expand All @@ -263,7 +267,7 @@ pub async fn require_services(api: &impl Api, services: HashSet<&str>) -> Result
}));
}
}
Ok(())
Ok(table)
}

/// poll until a service matches the desired status. returns Ok(true) if status was matched,
Expand Down