Skip to content

Commit

Permalink
tests & fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Arqu committed Oct 14, 2022
1 parent b0ea7b1 commit 235c654
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 2 deletions.
147 changes: 146 additions & 1 deletion iroh-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ mod tests {
use cid::multihash::{Code, MultihashDigest};
use futures::{StreamExt, TryStreamExt};
use libipld::{codec::Encode, Ipld, IpldCodec};
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, AsyncSeekExt};

#[async_trait]
impl<S: BuildHasher + Clone + Send + Sync + 'static> ContentLoader for HashMap<Cid, Bytes, S> {
Expand Down Expand Up @@ -1285,6 +1285,28 @@ mod tests {
String::from_utf8(read_to_vec(reader).await).unwrap()
}

async fn seek_and_clip<T: ContentLoader + Unpin>(
node: &UnixfsNode,
resolver: Resolver<T>,
range: std::ops::Range<u64>,
) -> UnixfsContentReader<T> {
let mut cr = node
.clone()
.into_content_reader(
resolver.clone(),
OutMetrics::default(),
ResponseClip::Clip(range.end as usize),
)
.unwrap()
.unwrap();
let n = cr
.seek(tokio::io::SeekFrom::Start(range.start))
.await
.unwrap();
assert_eq!(n, range.start);
cr
}

#[test]
fn test_paths() {
let roundtrip_tests = [
Expand Down Expand Up @@ -1678,6 +1700,129 @@ mod tests {
}
}

#[tokio::test]
async fn test_resolver_seeking() {
// Test content
// ------------
// QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN foo/hello.txt
// contains: "hello"
// QmdkGfDx42RNdAZFALHn5hjHqUq7L9o6Ef4zLnFEu3Y4Go foo

let hello_txt_cid_str = "QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN";
let hello_txt_block_bytes = load_fixture(hello_txt_cid_str).await;

// read root
let root_cid_str = "QmdkGfDx42RNdAZFALHn5hjHqUq7L9o6Ef4zLnFEu3Y4Go";
let root_cid: Cid = root_cid_str.parse().unwrap();
let root_block_bytes = load_fixture(root_cid_str).await;

let loader: HashMap<Cid, Bytes> = [
(root_cid, root_block_bytes.clone()),
(hello_txt_cid_str.parse().unwrap(), hello_txt_block_bytes),
]
.into_iter()
.collect();
let loader = Arc::new(loader);
let resolver = Resolver::new(loader.clone());

let path = format!("/ipfs/{root_cid_str}/hello.txt");
let ipld_hello_txt = resolver.resolve(path.parse().unwrap()).await.unwrap();

if let OutContent::Unixfs(node) = ipld_hello_txt.content {
// clip response
let cr = seek_and_clip(&node, resolver.clone(), 0..2).await;
assert_eq!(read_to_string(cr).await, "he");

let cr = seek_and_clip(&node, resolver.clone(), 0..5).await;
assert_eq!(read_to_string(cr).await, "hello");

// clip to the end
let cr = seek_and_clip(&node, resolver.clone(), 0..6).await;
assert_eq!(read_to_string(cr).await, "hello\n");

// clip beyond the end
let cr = seek_and_clip(&node, resolver.clone(), 0..100).await;
assert_eq!(read_to_string(cr).await, "hello\n");

// seek
let cr = seek_and_clip(&node, resolver.clone(), 1..100).await;
assert_eq!(read_to_string(cr).await, "ello\n");

// seek and clip
let cr = seek_and_clip(&node, resolver.clone(), 1..3).await;
assert_eq!(read_to_string(cr).await, "el");
} else {
panic!("invalid result: {:?}", ipld_hello_txt);
}
}

#[tokio::test]
async fn test_resolver_seeking_chunked() {
// Test content
// ------------
// QmUr9cs4mhWxabKqm9PYPSQQ6AQGbHJBtyrNmxtKgxqUx9 README.md
//
// imported with `go-ipfs add --chunker size-100`

let pieces_cid_str = [
"QmccJ8pV5hG7DEbq66ih1ZtowxgvqVS6imt98Ku62J2WRw",
"QmUajVwSkEp9JvdW914Qh1BCMRSUf2ztiQa6jqy1aWhwJv",
"QmNyLad1dWGS6mv2zno4iEviBSYSUR2SrQ8JoZNDz1UHYy",
"QmcXoBdCgmFMoNbASaQCNVswRuuuqbw4VvA7e5GtHbhRNp",
"QmP9yKRwuji5i7RTgrevwJwXp7uqQu1prv88nxq9uj99rW",
];

// read root
let root_cid_str = "QmUr9cs4mhWxabKqm9PYPSQQ6AQGbHJBtyrNmxtKgxqUx9";
let root_cid: Cid = root_cid_str.parse().unwrap();
let root_block_bytes = load_fixture(root_cid_str).await;
let root_block = UnixfsNode::decode(&root_cid, root_block_bytes.clone()).unwrap();

let links: Vec<_> = root_block.links().collect::<Result<_>>().unwrap();
assert_eq!(links.len(), 5);

let mut loader: HashMap<Cid, Bytes> =
[(root_cid, root_block_bytes.clone())].into_iter().collect();

for c in &pieces_cid_str {
let bytes = load_fixture(c).await;
loader.insert(c.parse().unwrap(), bytes);
}

let loader = Arc::new(loader);
let resolver = Resolver::new(loader.clone());

{
let path = format!("/ipfs/{root_cid_str}");
let ipld_readme = resolver.resolve(path.parse().unwrap()).await.unwrap();

let m = ipld_readme.metadata();
assert_eq!(m.unixfs_type, Some(UnixfsType::File));
assert_eq!(m.path.to_string(), path);
assert_eq!(m.typ, OutType::Unixfs);
assert_eq!(m.size, Some(426));
assert_eq!(m.resolved_path, vec![root_cid_str.parse().unwrap(),]);

let size = m.size.unwrap();

if let OutContent::Unixfs(node) = ipld_readme.content {
let cr = seek_and_clip(&node, resolver.clone(), 1..size - 1).await;
let content = read_to_string(cr).await;
assert_eq!(content.len(), (size - 2) as usize);
assert!(content.starts_with(" iroh")); // without seeking '# iroh'
assert!(content.ends_with("</sub>\n")); // without clipping '</sub>\n\n'

let cr = seek_and_clip(&node, resolver.clone(), 101..size - 101).await;
let content = read_to_string(cr).await;
assert_eq!(content.len(), (size - 202) as usize);
assert!(content.starts_with("2.0</a>"));
assert!(content.ends_with("the Apac"));
} else {
panic!("invalid result: {:?}", ipld_readme);
}
}
}

#[tokio::test]
async fn test_resolve_recursive_unixfs_basics_cid_v0() {
// Test content
Expand Down
31 changes: 30 additions & 1 deletion iroh-resolver/src/unixfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ impl UnixfsNode {
Ok(Some(UnixfsContentReader::File {
root_node: self,
pos: 0,
skip_pos: 0,
pos_max,
current_node: CurrentNodeState::Outer,
current_links,
Expand Down Expand Up @@ -413,6 +414,8 @@ pub enum UnixfsContentReader<T: ContentLoader> {
pos: usize,
/// Absolute max position in bytes, only used for clipping responses
pos_max: ResponseClip,
/// Amount of bytes to skip to seek up to pos
skip_pos: usize,
/// Current node being operated on, only used for nested nodes (not the root).
current_node: CurrentNodeState,
/// Stack of links left to traverse.
Expand Down Expand Up @@ -463,6 +466,7 @@ impl<T: ContentLoader + Unpin + 'static> AsyncRead for UnixfsContentReader<T> {
root_node,
pos,
pos_max,
skip_pos,
current_node,
current_links,
loader,
Expand All @@ -480,6 +484,7 @@ impl<T: ContentLoader + Unpin + 'static> AsyncRead for UnixfsContentReader<T> {
node,
loader.loader().clone(),
pos,
skip_pos,
*pos_max,
buf,
current_links,
Expand Down Expand Up @@ -510,6 +515,7 @@ impl<T: ContentLoader + Unpin + 'static> AsyncSeek for UnixfsContentReader<T> {
root_node,
pos,
pos_max: _,
skip_pos,
current_node: _,
current_links: _,
loader: _,
Expand All @@ -526,6 +532,7 @@ impl<T: ContentLoader + Unpin + 'static> AsyncSeek for UnixfsContentReader<T> {
i = std::cmp::min(i, data_len - 1);
}
*pos = i;
*skip_pos = i;
}
std::io::SeekFrom::End(offset) => {
let data_len = root_node.size();
Expand All @@ -539,6 +546,7 @@ impl<T: ContentLoader + Unpin + 'static> AsyncSeek for UnixfsContentReader<T> {
i += data_len as i64;
}
*pos = i as usize;
*skip_pos = i as usize;
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand All @@ -559,6 +567,7 @@ impl<T: ContentLoader + Unpin + 'static> AsyncSeek for UnixfsContentReader<T> {
i = std::cmp::min(i, data_len as i64 - 1);
}
*pos = i as usize;
*skip_pos = i as usize;
}
},
}
Expand All @@ -574,6 +583,7 @@ impl<T: ContentLoader + Unpin + 'static> AsyncSeek for UnixfsContentReader<T> {
root_node: _,
pos,
pos_max: _,
skip_pos: _,
current_node: _,
current_links: _,
loader: _,
Expand Down Expand Up @@ -667,6 +677,7 @@ fn poll_read_file_at<T: ContentLoader + 'static>(
root_node: &Node,
loader: T,
pos: &mut usize,
skip_pos: &mut usize,
pos_max: ResponseClip,
buf: &mut tokio::io::ReadBuf<'_>,
current_links: &mut Vec<VecDeque<Link>>,
Expand Down Expand Up @@ -735,6 +746,16 @@ fn poll_read_file_at<T: ContentLoader + 'static>(
let ty = current_node_inner.typ();
match current_node_inner {
UnixfsNode::Raw(data) => {
if node_pos < skip_pos {
if *node_pos + data.len() < *skip_pos {
*skip_pos -= data.len();
*node_pos += data.len();
} else {
*node_pos += *skip_pos - *node_pos;
*skip_pos = 0;
}
}

let old = *node_pos;
let mut node_pos_max = data.len();
if let ResponseClip::Clip(n) = pos_max {
Expand Down Expand Up @@ -764,6 +785,15 @@ fn poll_read_file_at<T: ContentLoader + 'static>(
UnixfsNode::File(node) | UnixfsNode::RawNode(node) => {
// read direct node data
if let Some(ref data) = node.inner.data {
if node_pos < skip_pos {
if *node_pos + data.len() < *skip_pos {
*skip_pos -= data.len();
*node_pos += data.len();
} else {
*node_pos += *skip_pos - *node_pos;
*skip_pos = 0;
}
}
let old = *node_pos;
let mut node_pos_max = data.len();
if let ResponseClip::Clip(n) = pos_max {
Expand All @@ -784,7 +814,6 @@ fn poll_read_file_at<T: ContentLoader + 'static>(
return Poll::Ready(res);
}
}

// follow links
if load_next_node(current_node, current_links, loader.clone()) {
return Poll::Ready(Ok(()));
Expand Down

0 comments on commit 235c654

Please sign in to comment.