Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provide only root, and only at the end of add #406

Merged
merged 6 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::p2p::MockP2p;
use crate::p2p::{ClientP2p, P2p};
use crate::{AddEvent, IpfsPath};
use anyhow::Result;
use cid::Cid;
use futures::future::{BoxFuture, LocalBoxFuture};
use futures::stream::LocalBoxStream;
use futures::FutureExt;
Expand Down Expand Up @@ -40,6 +41,8 @@ pub trait Api {

fn p2p(&self) -> Result<Self::P>;

fn provide(&self, cid: Cid) -> LocalBoxFuture<'_, Result<()>>;

/// Produces a asynchronous stream of file descriptions
/// Each description is a tuple of a relative path, and either a `Directory` or a `Reader`
/// with the file contents.
Expand Down Expand Up @@ -100,6 +103,10 @@ impl Iroh {
impl Api for Iroh {
type P = ClientP2p;

fn provide(&self, cid: Cid) -> LocalBoxFuture<'_, Result<()>> {
async move { self.client.try_p2p()?.start_providing(&cid).await }.boxed_local()
}

fn p2p(&self) -> Result<ClientP2p> {
let p2p_client = self.client.try_p2p()?;
Ok(ClientP2p::new(p2p_client))
Expand Down
3 changes: 1 addition & 2 deletions iroh-api/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::{anyhow, Error};
use std::io;
use thiserror::Error as ThisError;
// use std::error::Error;
use anyhow::{anyhow, Error};

/// LockError is the set of known program lock errors
#[derive(ThisError, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions iroh-resolver/src/unixfs_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,9 @@ impl Store for StoreAndProvideClient {
}

async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()> {
self.client.try_store()?.put(cid, blob, links).await?;
self.client.try_p2p()?.start_providing(&cid).await
self.client.try_store()?.put(cid, blob, links).await
// we provide after insertion is finished
// self.client.try_p2p()?.start_providing(&cid).await
}
}

Expand Down
19 changes: 19 additions & 0 deletions iroh/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::str::FromStr;

use futures::StreamExt;
use iroh_api::{AddEvent, Cid, Lookup, MockApi, MockP2p, OutType, PeerId};
use iroh_api::{ServiceStatus, StatusRow, StatusTable};
use relative_path::RelativePathBuf;

type GetFixture = fn() -> MockApi;
Expand Down Expand Up @@ -59,6 +60,13 @@ fn fixture_get() -> MockApi {

fn fixture_add_file() -> MockApi {
let mut api = MockApi::default();
api.expect_check().returning(|| {
Box::pin(future::ready(StatusTable::new(
Some(StatusRow::new("gateway", 1, ServiceStatus::Serving)),
Some(StatusRow::new("p2p", 1, ServiceStatus::Serving)),
Some(StatusRow::new("store", 1, ServiceStatus::Serving)),
)))
});
api.expect_add_file().returning(|_ipfs_path, _| {
let cid = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR").unwrap();
let add_event = AddEvent::ProgressDelta { cid, size: Some(0) };
Expand All @@ -68,11 +76,20 @@ fn fixture_add_file() -> MockApi {
)])
.boxed_local())))
});
api.expect_provide()
.returning(|_| Box::pin(future::ready(Ok(()))));
api
}

fn fixture_add_directory() -> MockApi {
let mut api = MockApi::default();
api.expect_check().returning(|| {
Box::pin(future::ready(StatusTable::new(
Some(StatusRow::new("gateway", 1, ServiceStatus::Serving)),
Some(StatusRow::new("p2p", 1, ServiceStatus::Serving)),
Some(StatusRow::new("store", 1, ServiceStatus::Serving)),
)))
});
api.expect_add_dir().returning(|_ipfs_path, _| {
let cid = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR").unwrap();
let add_event = AddEvent::ProgressDelta { cid, size: Some(0) };
Expand All @@ -82,6 +99,8 @@ fn fixture_add_directory() -> MockApi {
)])
.boxed_local())))
});
api.expect_provide()
.returning(|_| Box::pin(future::ready(Ok(()))));
api
}

Expand Down
62 changes: 52 additions & 10 deletions iroh/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};

use anyhow::{ensure, Result};
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use console::style;
use futures::StreamExt;
Expand All @@ -14,6 +14,7 @@ use crate::doc;
#[cfg(feature = "testing")]
use crate::fixture::get_fixture_api;
use crate::p2p::{run_command as run_p2p_command, P2p};
use crate::services::require_services;
use crate::size::size_stream;

#[derive(Parser, Debug, Clone)]
Expand Down Expand Up @@ -64,6 +65,9 @@ enum Commands {
#[clap(after_help = doc::START_LONG_DESCRIPTION )]
Start {
service: Vec<String>,
/// Start all services
#[clap(short, long)]
all: bool,
},
/// status checks the health of the different processes
#[clap(about = "Check the health of the different iroh services")]
Expand Down Expand Up @@ -134,8 +138,16 @@ impl Cli {
println!("Saving file(s) to {}", root_path.to_str().unwrap());
}
Commands::P2p(p2p) => run_p2p_command(&api.p2p()?, p2p).await?,
Commands::Start { service } => {
crate::services::start(api, service).await?;
Commands::Start { service, all } => {
let mut svc = &vec![
String::from("store"),
String::from("p2p"),
String::from("gateway"),
];
if !*all {
svc = service;
};
crate::services::start(api, svc).await?;
}
Commands::Status { watch } => {
crate::services::status(api, *watch).await?;
Expand All @@ -162,7 +174,14 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
path.display()
);
}
println!("{} Calculating size...", style("[1/2]").bold().dim());

// we require p2p for adding right now because we don't have a mechanism for
// hydrating only the root CID to the p2p node for providing if a CID were
// ingested offline. Offline adding should happen, but this is the current
// path of least confusion
require_services(api, HashSet::from(["store", "p2p"])).await?;

println!("{} Calculating size...", style("[1/3]").bold().dim());

let pb = ProgressBar::new_spinner();
let mut total_size: u64 = 0;
Expand All @@ -184,7 +203,7 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res

println!(
"{} Importing content {}...",
style("[2/2]").bold().dim(),
style("[2/3]").bold().dim(),
human::format_bytes(total_size)
);

Expand All @@ -197,20 +216,43 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
pb.inc(0);

let mut progress = api.add_stream(path, !no_wrap).await?;
let mut root = None;
let mut cids = Vec::new();
while let Some(add_event) = progress.next().await {
match add_event? {
AddEvent::ProgressDelta { cid, size } => {
root = Some(cid);
cids.push(cid);
if let Some(size) = size {
pb.inc(size);
}
}
}
}
pb.finish_and_clear();
ensure!(root.is_some(), "File processing failed");
println!("/ipfs/{}", root.unwrap());

let pb = ProgressBar::new(cids.len().try_into().unwrap());
let root = *cids.last().context("File processing failed")?;
// remove everything but the root
cids.splice(0..cids.len() - 1, []);
let rec_str = if cids.len() == 1 { "record" } else { "records" };
println!(
"{} Providing {} {} to the distributed hash table ...",
style("[3/3]").bold().dim(),
cids.len(),
rec_str,
);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos}/{len} ({per_sec}) {msg}",
)
.unwrap(),
);
pb.inc(0);
for cid in cids {
api.provide(cid).await?;
pb.inc(1);
}
pb.finish_and_clear();
println!("/ipfs/{}", root);

Ok(())
}
22 changes: 18 additions & 4 deletions iroh/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::SystemTime;
use sysinfo::PidExt;
use tracing::info;

use iroh_api::{Api, ServiceStatus, StatusRow, StatusTable};
use iroh_api::{Api, ApiError, ServiceStatus, StatusRow, StatusTable};
use iroh_util::lock::{LockError, ProgramLock};

const SERVICE_START_TIMEOUT_SECONDS: u64 = 15;
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn start_services(api: &impl Api, services: HashSet<&str>) -> Result<()> {

iroh_localops::process::daemonize(bin_path, log_path.clone())?;

let is_up = ensure_status(api, service, iroh_api::ServiceStatus::Serving).await?;
let is_up = poll_until_status(api, service, iroh_api::ServiceStatus::Serving).await?;
if is_up {
println!("{}", "success".green());
} else {
Expand Down Expand Up @@ -134,7 +134,7 @@ pub async fn stop_services(api: &impl Api, services: HashSet<&str>) -> Result<()
print!("stopping {}... ", &daemon_name);
match iroh_localops::process::stop(pid.as_u32()) {
Ok(_) => {
let is_down = ensure_status(
let is_down = poll_until_status(
api,
service,
iroh_api::ServiceStatus::Down(tonic::Status::unavailable(
Expand Down Expand Up @@ -252,9 +252,23 @@ where
Ok(())
}

/// require a set of services is up
pub async fn require_services(api: &impl Api, services: HashSet<&str>) -> Result<()> {
let table = api.check().await;
for service in table.iter() {
if services.contains(service.name()) && service.status() != iroh_api::ServiceStatus::Serving
{
return Err(anyhow!(ApiError::ConnectionRefused {
service: service.name()
}));
}
}
Ok(())
}

/// poll until a service matches the desired status. returns Ok(true) if status was matched,
/// and Ok(false) if desired status isn't reported before SERVICE_START_TIMEOUT_SECONDS
async fn ensure_status(
async fn poll_until_status(
api: &impl Api,
service: &str,
status: iroh_api::ServiceStatus,
Expand Down
5 changes: 3 additions & 2 deletions iroh/tests/cmd/add_directory.trycmd
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
```
$ iroh add -r mydir
[1/2] Calculating size...
[2/2] Importing content 5 B...
[1/3] Calculating size...
[2/3] Importing content 5 B...
[3/3] Providing 1 record to the distributed hash table ...
/ipfs/QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR

```
5 changes: 3 additions & 2 deletions iroh/tests/cmd/add_file.trycmd
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
```
$ iroh add file.txt
[1/2] Calculating size...
[2/2] Importing content 20 B...
[1/3] Calculating size...
[2/3] Importing content 20 B...
[3/3] Providing 1 record to the distributed hash table ...
/ipfs/QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR

```