Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway: Fetch recursive DAG as CAR #304

Merged
merged 6 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions iroh-car/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ pub enum CarHeader {
}

impl CarHeader {
pub fn new_v1(roots: Vec<Cid>) -> Self {
Self::V1(roots.into())
}

pub fn decode(buffer: &[u8]) -> Result<Self, Error> {
let header: CarHeaderV1 = DagCborCodec
.decode(buffer)
Expand Down
1 change: 1 addition & 0 deletions iroh-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ anyhow = "1"
futures = "0.3.21"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
iroh-resolver = { path = "../iroh-resolver" }
iroh-car = { path = "../iroh-car" }
tokio-util = { version = "0.7", features = ["io"] }
bytes = "1.1.0"
tower-layer = { version = "0.3" }
Expand Down
103 changes: 72 additions & 31 deletions iroh-gateway/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::Result;
use bytes::Bytes;
use futures::{StreamExt, TryStream};
use http::HeaderMap;
use iroh_car::{CarHeader, CarWriter};
use iroh_metrics::{
core::{MObserver, MRecorder},
gateway::{GatewayHistograms, GatewayMetrics},
Expand All @@ -13,7 +14,7 @@ use iroh_metrics::{
use iroh_resolver::resolver::{
CidOrDomain, ContentLoader, Metadata, Out, OutMetrics, OutPrettyReader, Resolver, Source,
};
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, AsyncWrite};
use tokio_util::io::ReaderStream;
use tracing::{info, warn};

Expand Down Expand Up @@ -85,22 +86,8 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
.resolve(path)
.await
.map_err(|e| e.to_string())?;
record!(
GatewayMetrics::TimeToFetchFirstBlock,
start_time.elapsed().as_millis() as u64
);
let metadata = res.metadata().clone();
if metadata.source == Source::Bitswap {
observe!(
GatewayHistograms::TimeToFetchFirstBlock,
start_time.elapsed().as_millis() as f64
);
} else {
observe!(
GatewayHistograms::TimeToFetchFirstBlockCached,
start_time.elapsed().as_millis() as f64
);
}
record_ttfb_metrics(start_time, &metadata);

if res.is_dir() {
let body = FileResult::Directory(res);
Expand All @@ -117,6 +104,26 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
}
}

#[tracing::instrument(skip(self))]
pub async fn get_car_recursive(
self,
path: iroh_resolver::resolver::Path,
start_time: std::time::Instant,
) -> Result<axum::body::StreamBody<ReaderStream<tokio::io::DuplexStream>>, String> {
info!("get car {}", path);
// TODO: Find out what a good buffer size is here.
let (writer, reader) = tokio::io::duplex(1024 * 64);
let body = axum::body::StreamBody::new(ReaderStream::new(reader));
let client = self.clone();
tokio::task::spawn(async move {
if let Err(e) = fetch_car_recursive(&client.resolver, path, writer, start_time).await {
warn!("failed to load recursively: {:?}", e);
}
});

Ok(body)
}

#[tracing::instrument(skip(self))]
pub async fn get_file_recursive(
self,
Expand All @@ -133,22 +140,8 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
while let Some(res) = res.next().await {
match res {
Ok(res) => {
record!(
GatewayMetrics::TimeToFetchFirstBlock,
start_time.elapsed().as_millis() as u64
);
let metadata = res.metadata().clone();
if metadata.source == Source::Bitswap {
observe!(
GatewayHistograms::TimeToFetchFirstBlock,
start_time.elapsed().as_millis() as f64
);
} else {
observe!(
GatewayHistograms::TimeToFetchFirstBlockCached,
start_time.elapsed().as_millis() as f64
);
}
record_ttfb_metrics(start_time, &metadata);
let reader =
res.pretty(self.resolver.clone(), OutMetrics { start: start_time });
match reader {
Expand Down Expand Up @@ -187,3 +180,51 @@ pub struct Request {
pub download: bool,
pub query_params: GetParams,
}

async fn fetch_car_recursive<T, W>(
resolver: &Resolver<T>,
path: iroh_resolver::resolver::Path,
writer: W,
start_time: std::time::Instant,
) -> Result<(), anyhow::Error>
where
T: ContentLoader,
W: AsyncWrite + Send + Unpin,
{
let stream = resolver.resolve_recursive_raw(path);
tokio::pin!(stream);

let root = stream
.next()
.await
.ok_or_else(|| anyhow::anyhow!("root cid not found"))??;

let header = CarHeader::new_v1(vec![*root.cid()]);
let mut writer = CarWriter::new(header, writer);
writer.write(*root.cid(), root.content()).await?;

while let Some(block) = stream.next().await {
let block = block?;
record_ttfb_metrics(start_time, block.metadata());
writer.write(*block.cid(), block.content()).await?;
}
Ok(())
}

fn record_ttfb_metrics(start_time: std::time::Instant, metadata: &Metadata) {
record!(
GatewayMetrics::TimeToFetchFirstBlock,
start_time.elapsed().as_millis() as u64
);
if metadata.source == Source::Bitswap {
observe!(
GatewayHistograms::TimeToFetchFirstBlock,
start_time.elapsed().as_millis() as f64
);
} else {
observe!(
GatewayHistograms::TimeToFetchFirstBlockCached,
start_time.elapsed().as_millis() as f64
);
}
}
4 changes: 1 addition & 3 deletions iroh-gateway/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,10 @@ async fn serve_car_recursive<T: ContentLoader + std::marker::Unpin>(
mut headers: HeaderMap,
start_time: std::time::Instant,
) -> Result<GatewayResponse, GatewayError> {
// FIXME: actually package as car file

let body = state
.client
.clone()
.get_file_recursive(req.resolved_path.clone(), start_time)
.get_car_recursive(req.resolved_path.clone(), start_time)
.await
.map_err(|e| error(StatusCode::INTERNAL_SERVER_ERROR, &e, &state))?;

Expand Down
72 changes: 72 additions & 0 deletions iroh-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,42 @@ impl FromStr for Path {
}
}

#[derive(Debug, Clone)]
pub struct OutRaw {
metadata: Metadata,
content: Bytes,
cid: Cid,
}

impl OutRaw {
pub fn from_loaded(cid: Cid, loaded: LoadedCid) -> Self {
let metadata = Metadata {
path: Path::from_cid(cid),
size: None,
typ: OutType::Raw,
unixfs_type: None,
source: loaded.source,
resolved_path: vec![cid],
};
Self {
metadata,
content: loaded.data,
cid,
}
}
pub fn metadata(&self) -> &Metadata {
&self.metadata
}

pub fn cid(&self) -> &Cid {
&self.cid
}

pub fn content(&self) -> &Bytes {
&self.content
}
}

#[derive(Debug, Clone)]
pub struct Out {
metadata: Metadata,
Expand Down Expand Up @@ -581,6 +617,42 @@ impl<T: ContentLoader> Resolver<T> {
}
}

/// Resolve a path recursively and yield the raw bytes plus metadata.
#[tracing::instrument(skip(self))]
pub fn resolve_recursive_raw(&self, root: Path) -> impl Stream<Item = Result<OutRaw>> {
Frando marked this conversation as resolved.
Show resolved Hide resolved
let mut cids = VecDeque::new();
let this = self.clone();
async_stream::try_stream! {
let root_block = this.resolve_root(&root).await.map(|(cid, loaded)| OutRaw::from_loaded(cid, loaded));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same code as resolve_recursive effectively, other than mapping things differently, maybe this could be abstracted by passing a closer to a shared function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I implemented an abstraction to be shared between the two resolve_recursive functions. resolve_recursive_with_path doesn't yet use it because there's additional logic around keeping the path. Could add it to the generic one, if always maintaining the path isn't considered too expensive (don't think it would matter).

Copy link
Member Author

@Frando Frando Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dignifiedquire I've looked into having the remaining resolve_recursive_with_path also use the resolve_recursive_mapped function from the latest commit in this PR. I made it work but with the cost of a couple of additional clones of the path even if it's not being used. See this commit (not on this PR): Frando@91c0440
I wasn't sure if it's worth it and couldn't find a way to avoid the clones (because of lifetime issues due to async move in the closure)

cids.push_back(root_block?);
loop {
if let Some(current) = cids.pop_front() {
// let (cid, data) = &current;
let links = parse_links(current.cid(), current.content())?;
// TODO: configurable limit
for link_chunk in links.chunks(8) {
let next = futures::future::join_all(
link_chunk.iter().map(|link| {
let this = this.clone();
async move {
this.load_cid(&link).await.map(|loaded| OutRaw::from_loaded(*link, loaded))
}
})
).await;
for res in next.into_iter() {
let res = res?;
cids.push_back(res);
}
}
yield current;
} else {
// no links left to resolve
break;
}
}
}
}

/// Resolves through a given path, returning the [`Cid`] and raw bytes of the final leaf.
#[tracing::instrument(skip(self))]
pub async fn resolve(&self, path: Path) -> Result<Out> {
Expand Down