Skip to content

Commit

Permalink
allow offline adding
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Oct 25, 2022
1 parent 96c6148 commit 2bce4db
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 31 deletions.
93 changes: 65 additions & 28 deletions iroh/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use clap::{Parser, Subcommand};
use console::style;
use crossterm::style::Stylize;
use futures::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use iroh_api::{AddEvent, Api, ApiExt, IpfsPath, Iroh};
use iroh_api::{AddEvent, Api, ApiExt, IpfsPath, Iroh, ServiceStatus};
use iroh_metrics::config::Config as MetricsConfig;
use iroh_util::human;

Expand Down Expand Up @@ -52,6 +53,9 @@ enum Commands {
/// Do not wrap added content with a directory
#[clap(long)]
no_wrap: bool,
/// Don't provide added content to the network
#[clap(long)]
offline: bool,
},
#[clap(about = "Fetch IPFS content and write it to disk")]
#[clap(after_help = doc::GET_LONG_DESCRIPTION )]
Expand Down Expand Up @@ -127,8 +131,9 @@ impl Cli {
path,
recursive,
no_wrap,
offline,
} => {
add(api, path, *no_wrap, *recursive).await?;
add(api, path, *no_wrap, *recursive, !*offline).await?;
}
Commands::Get {
ipfs_path: path,
Expand Down Expand Up @@ -161,7 +166,13 @@ impl Cli {
}
}

async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Result<()> {
async fn add(
api: &impl Api,
path: &Path,
no_wrap: bool,
recursive: bool,
provide: bool,
) -> Result<()> {
if !path.exists() {
anyhow::bail!("Path does not exist");
}
Expand All @@ -175,13 +186,35 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
);
}

let mut steps = 3;
// we require p2p for adding right now because we don't have a mechanism for
// hydrating only the root CID to the p2p node for providing if a CID were
// ingested offline. Offline adding should happen, but this is the current
// path of least confusion
require_services(api, HashSet::from(["store", "p2p"])).await?;
let svc_status = require_services(api, HashSet::from(["store"])).await?;
match (provide, svc_status.p2p.status()) {
(true, ServiceStatus::Down(_status)) => {
return Err(anyhow!("Add provides content to the IPFS network by default, but the p2p service is not running.\n{}",
"hint: try using the --offline flag, or run 'iroh start p2p'".yellow()
));
}
(true, ServiceStatus::Unknown)
| (true, ServiceStatus::NotServing)
| (true, ServiceStatus::ServiceUnknown) => {
return Err(anyhow!("Add provides content to the IPFS network by default, but the p2p service is not running.\n{}",
"hint: try using the --offline flag, or run 'iroh start p2p'".yellow()
));
}
(true, ServiceStatus::Serving) => {}
(false, _) => {
steps -= 1;
}
}

println!("{} Calculating size...", style("[1/3]").bold().dim());
println!(
"{} Calculating size...",
style(format!("[1/{}]", steps)).bold().dim()
);

let pb = ProgressBar::new_spinner();
let mut total_size: u64 = 0;
Expand All @@ -203,7 +236,7 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res

println!(
"{} Importing content {}...",
style("[2/3]").bold().dim(),
style(format!("[2/{}]", steps)).bold().dim(),
human::format_bytes(total_size)
);

Expand All @@ -229,29 +262,33 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
}
pb.finish_and_clear();

let pb = ProgressBar::new(cids.len().try_into().unwrap());
let root = *cids.last().context("File processing failed")?;
// remove everything but the root
cids.splice(0..cids.len() - 1, []);
let rec_str = if cids.len() == 1 { "record" } else { "records" };
println!(
"{} Providing {} {} to the distributed hash table ...",
style("[3/3]").bold().dim(),
cids.len(),
rec_str,
);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos}/{len} ({per_sec}) {msg}",
)
.unwrap(),
);
pb.inc(0);
for cid in cids {
api.provide(cid).await?;
pb.inc(1);

if provide {
let pb = ProgressBar::new(cids.len().try_into().unwrap());
// remove everything but the root
cids.splice(0..cids.len() - 1, []);
let rec_str = if cids.len() == 1 { "record" } else { "records" };
println!(
"{} Providing {} {} to the distributed hash table ...",
style(format!("[3/{}]", steps)).bold().dim(),
cids.len(),
rec_str,
);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos}/{len} ({per_sec}) {msg}",
)
.unwrap(),
);
pb.inc(0);
for cid in cids {
api.provide(cid).await?;
pb.inc(1);
}
pb.finish_and_clear();
}
pb.finish_and_clear();

println!("/ipfs/{}", root);

Ok(())
Expand Down
10 changes: 7 additions & 3 deletions iroh/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,12 @@ where
Ok(())
}

/// require a set of services is up
pub async fn require_services(api: &impl Api, services: HashSet<&str>) -> Result<()> {
/// require a set of services is up. returns the underlying status table of all
/// services for additional scrutiny
pub async fn require_services(
api: &impl Api,
services: HashSet<&str>,
) -> Result<iroh_api::StatusTable> {
let table = api.check().await;
for service in table.iter() {
if services.contains(service.name()) && service.status() != iroh_api::ServiceStatus::Serving
Expand All @@ -263,7 +267,7 @@ pub async fn require_services(api: &impl Api, services: HashSet<&str>) -> Result
}));
}
}
Ok(())
Ok(table)
}

/// poll until a service matches the desired status. returns Ok(true) if status was matched,
Expand Down

0 comments on commit 2bce4db

Please sign in to comment.