Skip to content

Commit

Permalink
refactor(resolver): share code for recursive resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 9, 2022
1 parent 3055aad commit 91c0440
Showing 1 changed file with 41 additions and 42 deletions.
83 changes: 41 additions & 42 deletions iroh-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ impl FromStr for Path {
pub trait LinksContainer: Sync + Send + std::fmt::Debug + Clone + 'static {
/// Extract links out of a container struct.
fn links(&self) -> Result<Vec<Cid>>;

/// Returns links with an associated file or directory name if the content
/// is unixfs
fn named_links(&self) -> Result<Vec<(Option<&str>, Cid)>> {
self.links()
.map(|links| links.into_iter().map(|l| (None, l)).collect())
}
}

#[async_trait]
Expand All @@ -172,6 +179,19 @@ impl LinksContainer for Out {
fn links(&self) -> Result<Vec<Cid>> {
Out::links(self)
}
fn named_links(&self) -> Result<Vec<(Option<&str>, Cid)>> {
Out::named_links(self)
}
}

#[async_trait]
impl LinksContainer for (Path, Out) {
fn links(&self) -> Result<Vec<Cid>> {
Out::links(&self.1)
}
fn named_links(&self) -> Result<Vec<(Option<&str>, Cid)>> {
Out::named_links(&self.1)
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -559,47 +579,21 @@ impl<T: ContentLoader> Resolver<T> {
&self,
root: Path,
) -> impl Stream<Item = Result<(Path, Out)>> {
let mut blocks = VecDeque::new();
let this = self.clone();
async_stream::try_stream! {
let output_path = root.clone();
blocks.push_back((output_path, this.resolve(root).await));
loop {
if let Some((current_output_path, current_out)) = blocks.pop_front() {
let current = current_out?;
let links = current.named_links()?;
// TODO: configurable limit
for link_chunk in links.chunks(8) {
let next = futures::future::join_all(
link_chunk.iter().map(|(link_name, link)| {
let this = this.clone();
let mut this_path = current_output_path.clone();
match link_name {
None => this_path.push(link.to_string()),
Some(p) => this_path.push(p),
};
async move {
(this_path, this.resolve(Path::from_cid(*link)).await)
}
})
).await;
for res in next.into_iter() {
blocks.push_back(res);
}
}
yield (current_output_path, current);
} else {
// no links left to resolve
break;
}
self.resolve_recursive_mapped(root, None, move |cid, path| {
let this = this.clone();
async move {
this.resolve(Path::from_cid(cid))
.await
.map(|out| (path, out))
}
}
})
}

#[tracing::instrument(skip(self))]
pub fn resolve_recursive(&self, root: Path) -> impl Stream<Item = Result<Out>> {
let this = self.clone();
self.resolve_recursive_mapped(root, None, move |cid| {
self.resolve_recursive_mapped(root, None, move |cid, _path| {
let this = this.clone();
async move { this.resolve(Path::from_cid(cid)).await }
})
Expand All @@ -613,7 +607,7 @@ impl<T: ContentLoader> Resolver<T> {
recursion_limit: Option<usize>,
) -> impl Stream<Item = Result<OutRaw>> {
let this = self.clone();
self.resolve_recursive_mapped(root, recursion_limit, move |cid| {
self.resolve_recursive_mapped(root, recursion_limit, move |cid, _path| {
let this = this.clone();
async move {
this.load_cid(&cid)
Expand All @@ -633,7 +627,7 @@ impl<T: ContentLoader> Resolver<T> {
) -> impl Stream<Item = Result<O>>
where
O: LinksContainer,
M: Fn(Cid) -> F + Clone,
M: Fn(Cid, Path) -> F + Clone,
F: Future<Output = Result<O>> + Send + 'static,
{
let mut cids = VecDeque::new();
Expand All @@ -642,11 +636,11 @@ impl<T: ContentLoader> Resolver<T> {
let chunk_size = 8;
async_stream::try_stream! {
let root_cid = this.resolve_path_to_cid(&root).await?;
let root_block = resolve(root_cid).await?;
cids.push_back(root_block);
let root_block = resolve(root_cid, root.clone()).await?;
cids.push_back((root_block, root));
loop {
if let Some(current) = cids.pop_front() {
let links = current.links()?;
if let Some((current, current_output_path)) = cids.pop_front() {
let links = current.named_links()?;
counter += links.len();
if let Some(limit) = recursion_limit {
if counter > limit {
Expand All @@ -657,10 +651,15 @@ impl<T: ContentLoader> Resolver<T> {
// TODO: configurable limit
for link_chunk in links.chunks(chunk_size) {
let next = futures::future::join_all(
link_chunk.iter().map(|link| {
link_chunk.iter().map(|(link_name, link)| {
let resolve = resolve.clone();
let mut this_path = current_output_path.clone();
match link_name {
None => this_path.push(link.to_string()),
Some(p) => this_path.push(p),
};
async move {
resolve(*link).await
resolve(*link, this_path.clone()).await.map(|out| (out, this_path))
}
})
).await;
Expand Down

0 comments on commit 91c0440

Please sign in to comment.