From ba02c64f112345ba5b94d12bf282db61056fe4ec Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Wed, 30 Apr 2025 15:07:22 +0300 Subject: [PATCH 1/4] feat: add merge command --- rfs/src/lib.rs | 118 +++++++++++++++++++++++++++++++++++++++++++++++ rfs/src/main.rs | 30 ++++++++++++ rfs/src/merge.rs | 107 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 255 insertions(+) create mode 100644 rfs/src/merge.rs diff --git a/rfs/src/lib.rs b/rfs/src/lib.rs index a3299d8..f8aee90 100644 --- a/rfs/src/lib.rs +++ b/rfs/src/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; + pub mod cache; pub mod fungi; pub mod store; @@ -12,6 +13,8 @@ pub use unpack::unpack; mod clone; pub use clone::clone; pub mod config; +mod merge; +pub use merge::merge; const PARALLEL_UPLOAD: usize = 10; // number of files we can upload in parallel @@ -104,4 +107,119 @@ mod test { assert!(status.success()); } + + #[tokio::test] + async fn test_merge(){ + const ROOT: &str = "/tmp/merge-test"; + let _ = fs::remove_dir_all(ROOT).await; + + println!("declaring directories"); + + let root: PathBuf = ROOT.into(); + let source1 = root.join("source1"); + let source2 = root.join("source2"); + let merged_dest = root.join("merged"); + let cache_dir = root.join("cache"); + + println!("creating directories"); + + fs::create_dir_all(&source1).await.unwrap(); + fs::create_dir_all(&source2).await.unwrap(); + fs::create_dir_all(&cache_dir).await.unwrap(); + + println!("creating test files"); + + create_test_files(&source1, "file1.txt", 1024).await; + create_test_files(&source1, "file2.txt", 2048).await; + + create_test_files(&source2, "file3.txt", 2048).await; + create_test_files(&source2, "file4.txt", 512).await; + + println!("test files created"); + println!("packing source1"); + let meta1_path = root.join("meta1.fl"); + let writer1 = meta::Writer::new(&meta1_path, true).await.unwrap(); + let store1 = DirStore::new(root.join("store1")).await.unwrap(); + let mut router1 = Router::new(); + router1.add(0x00, 0xFF, store1); + + pack(writer1, router1, &source1, false, None).await.unwrap(); + println!("packing complete for source1"); + + println!("packing source2"); + let meta2_path = root.join("meta2.fl"); + let writer2 = meta::Writer::new(&meta2_path, true).await.unwrap(); + let store2 = DirStore::new(root.join("store2")).await.unwrap(); + let mut router2 = Router::new(); + router2.add(0x00, 0xFF, store2); + pack(writer2, router2, &source2, false, None).await.unwrap(); + + println!("packing complete for source2"); + let merged_meta_path = root.join("merged.fl"); + let merged_writer = meta::Writer::new(&merged_meta_path, true).await.unwrap(); + let merged_store = DirStore::new(root.join("merged_store")).await.unwrap(); + let block_store = store::BlockStore::from(merged_store); + + println!("merging"); + + merge( + merged_writer, + block_store, + vec![meta1_path.to_string_lossy().to_string(), meta2_path.to_string_lossy().to_string()], + cache_dir.to_string_lossy().to_string(), + ).await.unwrap(); + + println!("merge complete"); + let merged_reader = meta::Reader::new(&merged_meta_path).await.unwrap(); + let merged_router = store::get_router(&merged_reader).await.unwrap(); + let merged_cache = Cache::new(root.join("merged_cache"), merged_router); + + unpack(&merged_reader, &merged_cache, &merged_dest, false) + .await + .unwrap(); + + assert!(merged_dest.join("file1.txt").exists()); + assert!(merged_dest.join("file2.txt").exists()); + assert!(merged_dest.join("file3.txt").exists()); + assert!(merged_dest.join("file4.txt").exists()); + + + verify_file_content(merged_dest.join("file1.txt"), 1024).await; + verify_file_content(merged_dest.join("file2.txt"), 2048).await; + verify_file_content(merged_dest.join("file3.txt"), 2048).await; + verify_file_content(merged_dest.join("file4.txt"), 512).await; + + + } + + async fn create_test_files>(dir: P, name: &str, size: usize) { + let mut urandom = fs::OpenOptions::default() + .read(true) + .open("/dev/urandom") + .await + .unwrap() + .take(size as u64); + + let p = dir.as_ref().join(name); + let mut file = fs::OpenOptions::default() + .create(true) + .write(true) + .open(p) + .await + .unwrap(); + + tokio::io::copy(&mut urandom, &mut file).await.unwrap(); + } + + async fn verify_file_content>(path: P, expected_size: usize) { + let mut file = fs::OpenOptions::default() + .read(true) + .open(path) + .await + .unwrap(); + + let mut buffer = vec![0; expected_size]; + let size = file.read(&mut buffer).await.unwrap(); + assert_eq!(size, expected_size); + } } diff --git a/rfs/src/main.rs b/rfs/src/main.rs index 14d4a00..d0039dc 100644 --- a/rfs/src/main.rs +++ b/rfs/src/main.rs @@ -37,6 +37,8 @@ enum Commands { Clone(CloneOptions), /// list or modify FL metadata and stores Config(ConfigOptions), + /// merge 2 or more FLs into a new one + Merge(MergeOptions), } #[derive(Args, Debug)] @@ -115,6 +117,22 @@ struct CloneOptions { cache: String, } +#[derive(Args, Debug)] +struct MergeOptions{ + /// path to metadata file (flist) + #[clap(short, long)] + meta: String, + + #[clap(short, long, action=ArgAction::Append)] + store: Vec, + + #[clap(short, long, action=ArgAction::Append)] + target_flists: Vec, + + #[clap(short, long, default_value_t = String::from("/tmp/cache"))] + cache: String, +} + #[derive(Args, Debug)] struct ConfigOptions { /// path to metadata file (flist) @@ -219,6 +237,7 @@ fn main() -> Result<()> { Commands::Unpack(opts) => unpack(opts), Commands::Clone(opts) => clone(opts), Commands::Config(opts) => config(opts), + Commands::Merge(opts) => merge(opts), } } @@ -381,3 +400,14 @@ fn config(opts: ConfigOptions) -> Result<()> { Ok(()) }) } + +fn merge(opts: MergeOptions) -> Result<()> { + let rt = tokio::runtime::Runtime::new()?; + + rt.block_on(async move { + let store = store::parse_router(opts.store.as_slice()).await?; + let meta = fungi::Writer::new(opts.meta, true).await?; + rfs::merge(meta, store.into(), opts.target_flists, opts.cache).await?; + Ok(()) + }) +} diff --git a/rfs/src/merge.rs b/rfs/src/merge.rs new file mode 100644 index 0000000..18dc0be --- /dev/null +++ b/rfs/src/merge.rs @@ -0,0 +1,107 @@ +use crate::{ + cache::Cache, + fungi::{meta::{FileType, Inode, Walk, WalkVisitor}, Reader, Result, Writer}, + store::{get_router, BlockStore, Router, Store, Stores}, +}; +use std::path::Path; +use hex::ToHex; +use tokio::io::AsyncReadExt; +use std::collections::HashSet; + +pub async fn merge( + writer: Writer, + store: BlockStore, + target_flists: Vec, + cache: String +) -> Result<()> { + + for target_flist in target_flists { + let reader = Reader::new(&target_flist).await?; + let router = get_router(&reader).await?; + let cache = Cache::new(cache.clone(), router); + + let mut visitor = MergeVisitor::new(writer.clone(), reader.clone(), &store, cache); + reader.walk(&mut visitor).await?; + } + Ok(()) +} + +struct MergeVisitor<'a, S> +where + S: Store, +{ + writer: Writer, + reader: Reader, + store: &'a BlockStore, + cache: Cache>, + visited: HashSet +} + +impl<'a, S> MergeVisitor<'a, S> +where + S: Store, +{ + pub fn new(writer: Writer, reader: Reader, store: &'a BlockStore, cache: Cache>) -> Self { + Self { + writer, + reader, + store, + cache, + visited: HashSet::new(), + } + } +} + +#[async_trait::async_trait] +impl<'a, S> WalkVisitor for MergeVisitor<'a, S> +where + S: Store, +{ + async fn visit(&mut self, _path: &Path, node: &Inode) -> Result { + if self.visited.contains(&node.ino) { + return Ok(Walk::Break); + } + self.visited.insert(node.ino); + + match node.mode.file_type() { + FileType::Dir => { + self.writer.inode(node.clone()).await?; + return Ok(Walk::Continue); + } + FileType::Regular => { + let ino = self.writer.inode(node.clone()).await?; + let blocks = self.reader.blocks(node.ino).await?; + + for block in &blocks { + self.writer.block(ino, &block.id, &block.key).await?; + } + + let mut blocks_to_store = Vec::new(); + for block in blocks { + if self.store.get(&block).await.is_err() { + blocks_to_store.push(block); + } + } + for block in blocks_to_store { + + let (_, mut file) = self.cache.get(&block).await?; + let mut data = Vec::new(); + if let Err(e) = file.read_to_end(&mut data).await { + log::error!("failed to read block {}: {}", block.id.as_slice().encode_hex::(), e); + return Err(e.into()); + } + if let Err(e) = self.store.set(&data).await { + log::error!("failed to set block {}: {}", block.id.as_slice().encode_hex::(), e); + return Err(e.into()); + } + } + + } + _ => { + log::warn!("Unknown file type for node: {:?}", node); + } + } + + Ok(Walk::Continue) + } +} \ No newline at end of file From 8f38980a61f9509ed9b9b8f58f99b86e339bbdfd Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Wed, 30 Apr 2025 16:38:57 +0300 Subject: [PATCH 2/4] fix: store routes in resulted flist --- rfs/src/lib.rs | 27 ++++++++++-------- rfs/src/main.rs | 14 ++++++++-- rfs/src/merge.rs | 72 +++++++++++++++++++++++++++++++++--------------- 3 files changed, 77 insertions(+), 36 deletions(-) diff --git a/rfs/src/lib.rs b/rfs/src/lib.rs index f8aee90..1d2ae05 100644 --- a/rfs/src/lib.rs +++ b/rfs/src/lib.rs @@ -1,7 +1,6 @@ #[macro_use] extern crate log; - pub mod cache; pub mod fungi; pub mod store; @@ -109,7 +108,7 @@ mod test { } #[tokio::test] - async fn test_merge(){ + async fn test_merge() { const ROOT: &str = "/tmp/merge-test"; let _ = fs::remove_dir_all(ROOT).await; @@ -122,7 +121,7 @@ mod test { let cache_dir = root.join("cache"); println!("creating directories"); - + fs::create_dir_all(&source1).await.unwrap(); fs::create_dir_all(&source2).await.unwrap(); fs::create_dir_all(&cache_dir).await.unwrap(); @@ -142,7 +141,7 @@ mod test { let store1 = DirStore::new(root.join("store1")).await.unwrap(); let mut router1 = Router::new(); router1.add(0x00, 0xFF, store1); - + pack(writer1, router1, &source1, false, None).await.unwrap(); println!("packing complete for source1"); @@ -158,22 +157,29 @@ mod test { let merged_meta_path = root.join("merged.fl"); let merged_writer = meta::Writer::new(&merged_meta_path, true).await.unwrap(); let merged_store = DirStore::new(root.join("merged_store")).await.unwrap(); - let block_store = store::BlockStore::from(merged_store); + let mut router = Router::new(); + router.add(0x00, 0xFF, merged_store); println!("merging"); merge( merged_writer, - block_store, - vec![meta1_path.to_string_lossy().to_string(), meta2_path.to_string_lossy().to_string()], + router, + false, + vec![ + meta1_path.to_string_lossy().to_string(), + meta2_path.to_string_lossy().to_string(), + ], cache_dir.to_string_lossy().to_string(), - ).await.unwrap(); + ) + .await + .unwrap(); println!("merge complete"); let merged_reader = meta::Reader::new(&merged_meta_path).await.unwrap(); let merged_router = store::get_router(&merged_reader).await.unwrap(); let merged_cache = Cache::new(root.join("merged_cache"), merged_router); - + unpack(&merged_reader, &merged_cache, &merged_dest, false) .await .unwrap(); @@ -183,13 +189,10 @@ mod test { assert!(merged_dest.join("file3.txt").exists()); assert!(merged_dest.join("file4.txt").exists()); - verify_file_content(merged_dest.join("file1.txt"), 1024).await; verify_file_content(merged_dest.join("file2.txt"), 2048).await; verify_file_content(merged_dest.join("file3.txt"), 2048).await; verify_file_content(merged_dest.join("file4.txt"), 512).await; - - } async fn create_test_files>(dir: P, name: &str, size: usize) { diff --git a/rfs/src/main.rs b/rfs/src/main.rs index d0039dc..a13dda7 100644 --- a/rfs/src/main.rs +++ b/rfs/src/main.rs @@ -118,7 +118,7 @@ struct CloneOptions { } #[derive(Args, Debug)] -struct MergeOptions{ +struct MergeOptions { /// path to metadata file (flist) #[clap(short, long)] meta: String, @@ -126,6 +126,9 @@ struct MergeOptions{ #[clap(short, long, action=ArgAction::Append)] store: Vec, + #[clap(long, default_value_t = false)] + no_strip_password: bool, + #[clap(short, long, action=ArgAction::Append)] target_flists: Vec, @@ -407,7 +410,14 @@ fn merge(opts: MergeOptions) -> Result<()> { rt.block_on(async move { let store = store::parse_router(opts.store.as_slice()).await?; let meta = fungi::Writer::new(opts.meta, true).await?; - rfs::merge(meta, store.into(), opts.target_flists, opts.cache).await?; + rfs::merge( + meta, + store, + !opts.no_strip_password, + opts.target_flists, + opts.cache, + ) + .await?; Ok(()) }) } diff --git a/rfs/src/merge.rs b/rfs/src/merge.rs index 18dc0be..71014eb 100644 --- a/rfs/src/merge.rs +++ b/rfs/src/merge.rs @@ -1,20 +1,43 @@ use crate::{ cache::Cache, - fungi::{meta::{FileType, Inode, Walk, WalkVisitor}, Reader, Result, Writer}, + fungi::{ + meta::{FileType, Inode, Walk, WalkVisitor}, + Reader, Result, Writer, + }, store::{get_router, BlockStore, Router, Store, Stores}, }; -use std::path::Path; +use anyhow::Context; use hex::ToHex; +use std::path::Path; use tokio::io::AsyncReadExt; -use std::collections::HashSet; pub async fn merge( writer: Writer, - store: BlockStore, + store: S, + strip_password: bool, target_flists: Vec, - cache: String + cache: String, ) -> Result<()> { - + for route in store.routes() { + let mut store_url = route.url; + + if strip_password { + let mut url = url::Url::parse(&store_url).context("failed to parse store url")?; + if url.password().is_some() { + url.set_password(None) + .map_err(|_| anyhow::anyhow!("failed to strip password"))?; + + store_url = url.to_string(); + } + } + + let range_start = route.start.unwrap_or_default(); + let range_end = route.end.unwrap_or(u8::MAX); + + writer.route(range_start, range_end, store_url).await?; + } + + let store = store.into(); for target_flist in target_flists { let reader = Reader::new(&target_flist).await?; let router = get_router(&reader).await?; @@ -23,6 +46,7 @@ pub async fn merge( let mut visitor = MergeVisitor::new(writer.clone(), reader.clone(), &store, cache); reader.walk(&mut visitor).await?; } + Ok(()) } @@ -34,35 +58,33 @@ where reader: Reader, store: &'a BlockStore, cache: Cache>, - visited: HashSet } impl<'a, S> MergeVisitor<'a, S> where S: Store, { - pub fn new(writer: Writer, reader: Reader, store: &'a BlockStore, cache: Cache>) -> Self { + pub fn new( + writer: Writer, + reader: Reader, + store: &'a BlockStore, + cache: Cache>, + ) -> Self { Self { writer, reader, store, cache, - visited: HashSet::new(), } } } #[async_trait::async_trait] -impl<'a, S> WalkVisitor for MergeVisitor<'a, S> +impl<'a, S> WalkVisitor for MergeVisitor<'a, S> where S: Store, { async fn visit(&mut self, _path: &Path, node: &Inode) -> Result { - if self.visited.contains(&node.ino) { - return Ok(Walk::Break); - } - self.visited.insert(node.ino); - match node.mode.file_type() { FileType::Dir => { self.writer.inode(node.clone()).await?; @@ -83,25 +105,31 @@ where } } for block in blocks_to_store { - let (_, mut file) = self.cache.get(&block).await?; let mut data = Vec::new(); if let Err(e) = file.read_to_end(&mut data).await { - log::error!("failed to read block {}: {}", block.id.as_slice().encode_hex::(), e); + log::error!( + "failed to read block {}: {}", + block.id.as_slice().encode_hex::(), + e + ); return Err(e.into()); } if let Err(e) = self.store.set(&data).await { - log::error!("failed to set block {}: {}", block.id.as_slice().encode_hex::(), e); + log::error!( + "failed to set block {}: {}", + block.id.as_slice().encode_hex::(), + e + ); return Err(e.into()); } } - } _ => { log::warn!("Unknown file type for node: {:?}", node); } } - - Ok(Walk::Continue) + + Ok(Walk::Continue) } -} \ No newline at end of file +} From 244c6d5691103c6053199c1127b38052254a73f9 Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Mon, 5 May 2025 13:28:46 +0300 Subject: [PATCH 3/4] docs: add documentation for merge command --- rfs/README.md | 37 ++++++++++++++++++++++++++++++++++++- rfs/src/main.rs | 20 ++++++++++++++++---- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/rfs/README.md b/rfs/README.md index 048190f..b59fda0 100644 --- a/rfs/README.md +++ b/rfs/README.md @@ -1,4 +1,3 @@ - # Introduction `rfs` is the main tool to create, mount and extract FungiStore lists (FungiList)`fl` for short. An `fl` is a simple format @@ -41,6 +40,7 @@ The simplest form of `` is a `url`. the store `url` defines the sto - `s3`: aws-s3 is used for storing and retrieving large amounts of data (blobs) in buckets (directories). An example `s3://:@:/` `region` is an optional param for s3 stores, if you want to provide one you can add it as a query to the url `?region=` + - `http`: http is a store mostly used for wrapping a dir store to fetch data through http requests. It does not support uploading, just fetching the data. It can be set in the FL file as the store to fetch the data with `rfs config`. Example: `http://localhost:9000/store` (https works too). @@ -144,6 +144,41 @@ Options: By default when unpacking the `-p` flag is not set. which means downloaded files will be `owned` by the current user/group. If `-p` flag is set, the files ownership will be same as the original files used to create the fl (preserve `uid` and `gid` of the files and directories) this normally requires `sudo` while unpacking. +# Merge an `fl` + +rfs provides a `merge` subcommand that combines multiple file lists (FL files) into a single unified file list. + +```bash +rfs merge -m merged.fl -t flist1.fl -t flist2.fl -s dir:///tmp/store +``` + +This tells rfs to create an `fl` named `merged.fl` by combining the file lists `flist1.fl` and `flist2.fl`. A store must be specified with the `-s` option to handle block storage during the merge operation. + +## Requirements for Merge + +- At least 2 input file lists must be specified with the `-t` option +- A store must be specified with the `-s` option to store blocks that need to be copied or moved + +## Full Command Help + +```bash +# rfs merge --help + +merge 2 or more FLs into a new one + +Usage: rfs merge [OPTIONS] --store ... + +Arguments: + path to metadata file (flist) + ... + +Options: + -s, --store store url in the format [xx-xx=] + --no-strip-password disables automatic password stripping from store url + -c, --cache directory used as cache for downloaded file chunks [default: /tmp/cache] + -h, --help Print help +``` + # Specifications Please check [docs](../docs) diff --git a/rfs/src/main.rs b/rfs/src/main.rs index a13dda7..60dbb05 100644 --- a/rfs/src/main.rs +++ b/rfs/src/main.rs @@ -120,22 +120,31 @@ struct CloneOptions { #[derive(Args, Debug)] struct MergeOptions { /// path to metadata file (flist) - #[clap(short, long)] meta: String, - #[clap(short, long, action=ArgAction::Append)] + #[clap(short, long, action=ArgAction::Append, required = true)] store: Vec, #[clap(long, default_value_t = false)] no_strip_password: bool, - #[clap(short, long, action=ArgAction::Append)] + #[clap(action=ArgAction::Append, required = true)] target_flists: Vec, - #[clap(short, long, default_value_t = String::from("/tmp/cache"))] + #[clap(short, long, default_value_t = String::from("/tmp/cache"))] cache: String, } +impl MergeOptions { + + fn validate(&self) -> Result<()> { + if self.target_flists.len() < 2 { + return Err(anyhow::anyhow!("At least 2 target file lists are required for merge operation")); + } + Ok(()) + } +} + #[derive(Args, Debug)] struct ConfigOptions { /// path to metadata file (flist) @@ -405,6 +414,9 @@ fn config(opts: ConfigOptions) -> Result<()> { } fn merge(opts: MergeOptions) -> Result<()> { + + opts.validate()?; + let rt = tokio::runtime::Runtime::new()?; rt.block_on(async move { From 0058487afd72411d70778a830be92801e3d747c4 Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Mon, 5 May 2025 14:56:55 +0300 Subject: [PATCH 4/4] fix: merge of subdirectories of flists --- rfs/src/main.rs | 10 +-- rfs/src/merge.rs | 198 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 154 insertions(+), 54 deletions(-) diff --git a/rfs/src/main.rs b/rfs/src/main.rs index 60dbb05..3c4b06d 100644 --- a/rfs/src/main.rs +++ b/rfs/src/main.rs @@ -131,15 +131,16 @@ struct MergeOptions { #[clap(action=ArgAction::Append, required = true)] target_flists: Vec, - #[clap(short, long, default_value_t = String::from("/tmp/cache"))] + #[clap(short, long, default_value_t = String::from("/tmp/cache"))] cache: String, } impl MergeOptions { - fn validate(&self) -> Result<()> { if self.target_flists.len() < 2 { - return Err(anyhow::anyhow!("At least 2 target file lists are required for merge operation")); + return Err(anyhow::anyhow!( + "At least 2 target file lists are required for merge operation" + )); } Ok(()) } @@ -414,9 +415,8 @@ fn config(opts: ConfigOptions) -> Result<()> { } fn merge(opts: MergeOptions) -> Result<()> { - opts.validate()?; - + let rt = tokio::runtime::Runtime::new()?; rt.block_on(async move { diff --git a/rfs/src/merge.rs b/rfs/src/merge.rs index 71014eb..002bd21 100644 --- a/rfs/src/merge.rs +++ b/rfs/src/merge.rs @@ -1,16 +1,19 @@ use crate::{ cache::Cache, fungi::{ - meta::{FileType, Inode, Walk, WalkVisitor}, + meta::{FileType, Inode, Mode, Walk, WalkVisitor}, Reader, Result, Writer, }, store::{get_router, BlockStore, Router, Store, Stores}, }; use anyhow::Context; use hex::ToHex; -use std::path::Path; +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; use tokio::io::AsyncReadExt; +const ROOT_PATH: &str = "/"; + pub async fn merge( writer: Writer, store: S, @@ -38,12 +41,33 @@ pub async fn merge( } let store = store.into(); + + let mut path_to_inode_map = HashMap::new(); + let root_path = PathBuf::from(ROOT_PATH); + + let root_inode = Inode { + name: ROOT_PATH.into(), + mode: Mode::new(FileType::Dir, 0o755), + ..Default::default() + }; + let root_ino = writer.inode(root_inode).await?; + path_to_inode_map.insert(root_path, root_ino); + for target_flist in target_flists { let reader = Reader::new(&target_flist).await?; let router = get_router(&reader).await?; - let cache = Cache::new(cache.clone(), router); + let cache_instance = Cache::new(cache.clone(), router); + + let mut visited = HashSet::new(); + let mut visitor = MergeVisitor { + writer: writer.clone(), + reader: reader.clone(), + store: &store, + cache: cache_instance, + path_to_inode: &mut path_to_inode_map, + visited: &mut visited, + }; - let mut visitor = MergeVisitor::new(writer.clone(), reader.clone(), &store, cache); reader.walk(&mut visitor).await?; } @@ -58,24 +82,80 @@ where reader: Reader, store: &'a BlockStore, cache: Cache>, + path_to_inode: &'a mut HashMap, + visited: &'a mut HashSet, } impl<'a, S> MergeVisitor<'a, S> where S: Store, { - pub fn new( - writer: Writer, - reader: Reader, - store: &'a BlockStore, - cache: Cache>, - ) -> Self { - Self { - writer, - reader, - store, - cache, + async fn ensure_parent_directory(&mut self, path: &Path) -> Result { + if path.to_str() == Some(ROOT_PATH) { + return Ok(*self.path_to_inode.get(path).unwrap_or(&1)); } + + if let Some(ino) = self.path_to_inode.get(path) { + return Ok(*ino); + } + + let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH)); + let parent_ino = Box::pin(self.ensure_parent_directory(parent_path)).await?; + + let dir_name = path + .file_name() + .map(|name| name.to_string_lossy().to_string()) + .unwrap_or_default(); + + let dir_inode = Inode { + parent: parent_ino, + name: dir_name, + mode: Mode::new(FileType::Dir, 0o755), + ..Default::default() + }; + + let new_ino = self.writer.inode(dir_inode).await?; + self.path_to_inode.insert(path.to_path_buf(), new_ino); + + Ok(new_ino) + } + + async fn copy_blocks(&mut self, source_ino: u64, dest_ino: u64) -> Result<()> { + let blocks = self.reader.blocks(source_ino).await?; + + for block in &blocks { + self.writer.block(dest_ino, &block.id, &block.key).await?; + } + + let mut blocks_to_store = Vec::new(); + for block in blocks { + if self.store.get(&block).await.is_err() { + blocks_to_store.push(block); + } + } + + for block in blocks_to_store { + let (_, mut file) = self.cache.get(&block).await?; + let mut data = Vec::new(); + if let Err(e) = file.read_to_end(&mut data).await { + log::error!( + "failed to read block {}: {}", + block.id.as_slice().encode_hex::(), + e + ); + return Err(e.into()); + } + if let Err(e) = self.store.set(&data).await { + log::error!( + "failed to set block {}: {}", + block.id.as_slice().encode_hex::(), + e + ); + return Err(e.into()); + } + } + + Ok(()) } } @@ -84,46 +164,66 @@ impl<'a, S> WalkVisitor for MergeVisitor<'a, S> where S: Store, { - async fn visit(&mut self, _path: &Path, node: &Inode) -> Result { + async fn visit(&mut self, path: &Path, node: &Inode) -> Result { + if !self.visited.insert(node.ino) { + return Ok(Walk::Continue); + } + match node.mode.file_type() { FileType::Dir => { - self.writer.inode(node.clone()).await?; - return Ok(Walk::Continue); + if path.to_str() == Some(ROOT_PATH) { + return Ok(Walk::Continue); + } + + let dir_name = path + .file_name() + .map(|name| name.to_string_lossy().to_string()) + .unwrap_or_default(); + + let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH)); + let parent_ino = self.ensure_parent_directory(parent_path).await?; + + let dir_node = Inode { + parent: parent_ino, + name: dir_name, + mode: node.mode.clone(), + uid: node.uid, + gid: node.gid, + rdev: node.rdev, + ctime: node.ctime, + mtime: node.mtime, + data: node.data.clone(), + ..Default::default() + }; + + let dir_ino = self.writer.inode(dir_node).await?; + self.path_to_inode.insert(path.to_path_buf(), dir_ino); } FileType::Regular => { - let ino = self.writer.inode(node.clone()).await?; - let blocks = self.reader.blocks(node.ino).await?; + let file_name = path + .file_name() + .map(|name| name.to_string_lossy().to_string()) + .unwrap_or_default(); - for block in &blocks { - self.writer.block(ino, &block.id, &block.key).await?; - } + let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH)); + let parent_ino = self.ensure_parent_directory(parent_path).await?; - let mut blocks_to_store = Vec::new(); - for block in blocks { - if self.store.get(&block).await.is_err() { - blocks_to_store.push(block); - } - } - for block in blocks_to_store { - let (_, mut file) = self.cache.get(&block).await?; - let mut data = Vec::new(); - if let Err(e) = file.read_to_end(&mut data).await { - log::error!( - "failed to read block {}: {}", - block.id.as_slice().encode_hex::(), - e - ); - return Err(e.into()); - } - if let Err(e) = self.store.set(&data).await { - log::error!( - "failed to set block {}: {}", - block.id.as_slice().encode_hex::(), - e - ); - return Err(e.into()); - } - } + let file_node = Inode { + parent: parent_ino, + name: file_name, + size: node.size, + uid: node.uid, + gid: node.gid, + mode: node.mode.clone(), + rdev: node.rdev, + ctime: node.ctime, + mtime: node.mtime, + data: node.data.clone(), + ..Default::default() + }; + + let ino = self.writer.inode(file_node).await?; + self.copy_blocks(node.ino, ino).await?; } _ => { log::warn!("Unknown file type for node: {:?}", node);