From 235c654836483e6e29deee2178375c0c5af479a9 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Fri, 14 Oct 2022 15:20:42 +0200 Subject: [PATCH] tests & fixes --- iroh-resolver/src/resolver.rs | 147 +++++++++++++++++++++++++++++++++- iroh-resolver/src/unixfs.rs | 31 ++++++- 2 files changed, 176 insertions(+), 2 deletions(-) diff --git a/iroh-resolver/src/resolver.rs b/iroh-resolver/src/resolver.rs index 2adece17629..da009ca2146 100644 --- a/iroh-resolver/src/resolver.rs +++ b/iroh-resolver/src/resolver.rs @@ -1253,7 +1253,7 @@ mod tests { use cid::multihash::{Code, MultihashDigest}; use futures::{StreamExt, TryStreamExt}; use libipld::{codec::Encode, Ipld, IpldCodec}; - use tokio::io::AsyncReadExt; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; #[async_trait] impl ContentLoader for HashMap { @@ -1285,6 +1285,28 @@ mod tests { String::from_utf8(read_to_vec(reader).await).unwrap() } + async fn seek_and_clip( + node: &UnixfsNode, + resolver: Resolver, + range: std::ops::Range, + ) -> UnixfsContentReader { + let mut cr = node + .clone() + .into_content_reader( + resolver.clone(), + OutMetrics::default(), + ResponseClip::Clip(range.end as usize), + ) + .unwrap() + .unwrap(); + let n = cr + .seek(tokio::io::SeekFrom::Start(range.start)) + .await + .unwrap(); + assert_eq!(n, range.start); + cr + } + #[test] fn test_paths() { let roundtrip_tests = [ @@ -1678,6 +1700,129 @@ mod tests { } } + #[tokio::test] + async fn test_resolver_seeking() { + // Test content + // ------------ + // QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN foo/hello.txt + // contains: "hello" + // QmdkGfDx42RNdAZFALHn5hjHqUq7L9o6Ef4zLnFEu3Y4Go foo + + let hello_txt_cid_str = "QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN"; + let hello_txt_block_bytes = load_fixture(hello_txt_cid_str).await; + + // read root + let root_cid_str = "QmdkGfDx42RNdAZFALHn5hjHqUq7L9o6Ef4zLnFEu3Y4Go"; + let root_cid: Cid = root_cid_str.parse().unwrap(); + let root_block_bytes = load_fixture(root_cid_str).await; + + let loader: HashMap = [ + (root_cid, root_block_bytes.clone()), + (hello_txt_cid_str.parse().unwrap(), hello_txt_block_bytes), + ] + .into_iter() + .collect(); + let loader = Arc::new(loader); + let resolver = Resolver::new(loader.clone()); + + let path = format!("/ipfs/{root_cid_str}/hello.txt"); + let ipld_hello_txt = resolver.resolve(path.parse().unwrap()).await.unwrap(); + + if let OutContent::Unixfs(node) = ipld_hello_txt.content { + // clip response + let cr = seek_and_clip(&node, resolver.clone(), 0..2).await; + assert_eq!(read_to_string(cr).await, "he"); + + let cr = seek_and_clip(&node, resolver.clone(), 0..5).await; + assert_eq!(read_to_string(cr).await, "hello"); + + // clip to the end + let cr = seek_and_clip(&node, resolver.clone(), 0..6).await; + assert_eq!(read_to_string(cr).await, "hello\n"); + + // clip beyond the end + let cr = seek_and_clip(&node, resolver.clone(), 0..100).await; + assert_eq!(read_to_string(cr).await, "hello\n"); + + // seek + let cr = seek_and_clip(&node, resolver.clone(), 1..100).await; + assert_eq!(read_to_string(cr).await, "ello\n"); + + // seek and clip + let cr = seek_and_clip(&node, resolver.clone(), 1..3).await; + assert_eq!(read_to_string(cr).await, "el"); + } else { + panic!("invalid result: {:?}", ipld_hello_txt); + } + } + + #[tokio::test] + async fn test_resolver_seeking_chunked() { + // Test content + // ------------ + // QmUr9cs4mhWxabKqm9PYPSQQ6AQGbHJBtyrNmxtKgxqUx9 README.md + // + // imported with `go-ipfs add --chunker size-100` + + let pieces_cid_str = [ + "QmccJ8pV5hG7DEbq66ih1ZtowxgvqVS6imt98Ku62J2WRw", + "QmUajVwSkEp9JvdW914Qh1BCMRSUf2ztiQa6jqy1aWhwJv", + "QmNyLad1dWGS6mv2zno4iEviBSYSUR2SrQ8JoZNDz1UHYy", + "QmcXoBdCgmFMoNbASaQCNVswRuuuqbw4VvA7e5GtHbhRNp", + "QmP9yKRwuji5i7RTgrevwJwXp7uqQu1prv88nxq9uj99rW", + ]; + + // read root + let root_cid_str = "QmUr9cs4mhWxabKqm9PYPSQQ6AQGbHJBtyrNmxtKgxqUx9"; + let root_cid: Cid = root_cid_str.parse().unwrap(); + let root_block_bytes = load_fixture(root_cid_str).await; + let root_block = UnixfsNode::decode(&root_cid, root_block_bytes.clone()).unwrap(); + + let links: Vec<_> = root_block.links().collect::>().unwrap(); + assert_eq!(links.len(), 5); + + let mut loader: HashMap = + [(root_cid, root_block_bytes.clone())].into_iter().collect(); + + for c in &pieces_cid_str { + let bytes = load_fixture(c).await; + loader.insert(c.parse().unwrap(), bytes); + } + + let loader = Arc::new(loader); + let resolver = Resolver::new(loader.clone()); + + { + let path = format!("/ipfs/{root_cid_str}"); + let ipld_readme = resolver.resolve(path.parse().unwrap()).await.unwrap(); + + let m = ipld_readme.metadata(); + assert_eq!(m.unixfs_type, Some(UnixfsType::File)); + assert_eq!(m.path.to_string(), path); + assert_eq!(m.typ, OutType::Unixfs); + assert_eq!(m.size, Some(426)); + assert_eq!(m.resolved_path, vec![root_cid_str.parse().unwrap(),]); + + let size = m.size.unwrap(); + + if let OutContent::Unixfs(node) = ipld_readme.content { + let cr = seek_and_clip(&node, resolver.clone(), 1..size - 1).await; + let content = read_to_string(cr).await; + assert_eq!(content.len(), (size - 2) as usize); + assert!(content.starts_with(" iroh")); // without seeking '# iroh' + assert!(content.ends_with("\n")); // without clipping '\n\n' + + let cr = seek_and_clip(&node, resolver.clone(), 101..size - 101).await; + let content = read_to_string(cr).await; + assert_eq!(content.len(), (size - 202) as usize); + assert!(content.starts_with("2.0")); + assert!(content.ends_with("the Apac")); + } else { + panic!("invalid result: {:?}", ipld_readme); + } + } + } + #[tokio::test] async fn test_resolve_recursive_unixfs_basics_cid_v0() { // Test content diff --git a/iroh-resolver/src/unixfs.rs b/iroh-resolver/src/unixfs.rs index ba3c9dbbb63..f7a2a186d9d 100644 --- a/iroh-resolver/src/unixfs.rs +++ b/iroh-resolver/src/unixfs.rs @@ -382,6 +382,7 @@ impl UnixfsNode { Ok(Some(UnixfsContentReader::File { root_node: self, pos: 0, + skip_pos: 0, pos_max, current_node: CurrentNodeState::Outer, current_links, @@ -413,6 +414,8 @@ pub enum UnixfsContentReader { pos: usize, /// Absolute max position in bytes, only used for clipping responses pos_max: ResponseClip, + /// Amount of bytes to skip to seek up to pos + skip_pos: usize, /// Current node being operated on, only used for nested nodes (not the root). current_node: CurrentNodeState, /// Stack of links left to traverse. @@ -463,6 +466,7 @@ impl AsyncRead for UnixfsContentReader { root_node, pos, pos_max, + skip_pos, current_node, current_links, loader, @@ -480,6 +484,7 @@ impl AsyncRead for UnixfsContentReader { node, loader.loader().clone(), pos, + skip_pos, *pos_max, buf, current_links, @@ -510,6 +515,7 @@ impl AsyncSeek for UnixfsContentReader { root_node, pos, pos_max: _, + skip_pos, current_node: _, current_links: _, loader: _, @@ -526,6 +532,7 @@ impl AsyncSeek for UnixfsContentReader { i = std::cmp::min(i, data_len - 1); } *pos = i; + *skip_pos = i; } std::io::SeekFrom::End(offset) => { let data_len = root_node.size(); @@ -539,6 +546,7 @@ impl AsyncSeek for UnixfsContentReader { i += data_len as i64; } *pos = i as usize; + *skip_pos = i as usize; } else { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, @@ -559,6 +567,7 @@ impl AsyncSeek for UnixfsContentReader { i = std::cmp::min(i, data_len as i64 - 1); } *pos = i as usize; + *skip_pos = i as usize; } }, } @@ -574,6 +583,7 @@ impl AsyncSeek for UnixfsContentReader { root_node: _, pos, pos_max: _, + skip_pos: _, current_node: _, current_links: _, loader: _, @@ -667,6 +677,7 @@ fn poll_read_file_at( root_node: &Node, loader: T, pos: &mut usize, + skip_pos: &mut usize, pos_max: ResponseClip, buf: &mut tokio::io::ReadBuf<'_>, current_links: &mut Vec>, @@ -735,6 +746,16 @@ fn poll_read_file_at( let ty = current_node_inner.typ(); match current_node_inner { UnixfsNode::Raw(data) => { + if node_pos < skip_pos { + if *node_pos + data.len() < *skip_pos { + *skip_pos -= data.len(); + *node_pos += data.len(); + } else { + *node_pos += *skip_pos - *node_pos; + *skip_pos = 0; + } + } + let old = *node_pos; let mut node_pos_max = data.len(); if let ResponseClip::Clip(n) = pos_max { @@ -764,6 +785,15 @@ fn poll_read_file_at( UnixfsNode::File(node) | UnixfsNode::RawNode(node) => { // read direct node data if let Some(ref data) = node.inner.data { + if node_pos < skip_pos { + if *node_pos + data.len() < *skip_pos { + *skip_pos -= data.len(); + *node_pos += data.len(); + } else { + *node_pos += *skip_pos - *node_pos; + *skip_pos = 0; + } + } let old = *node_pos; let mut node_pos_max = data.len(); if let ResponseClip::Clip(n) = pos_max { @@ -784,7 +814,6 @@ fn poll_read_file_at( return Poll::Ready(res); } } - // follow links if load_next_node(current_node, current_links, loader.clone()) { return Poll::Ready(Ok(()));