Skip to content

Development merge myfs #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: development
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion rfs/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# Introduction

`rfs` is the main tool to create, mount and extract FungiStore lists (FungiList)`fl` for short. An `fl` is a simple format
@@ -41,6 +40,7 @@ The simplest form of `<store-specs>` is a `url`. the store `url` defines the sto
- `s3`: aws-s3 is used for storing and retrieving large amounts of data (blobs) in buckets (directories). An example `s3://<username>:<password>@<host>:<port>/<bucket-name>`

`region` is an optional param for s3 stores, if you want to provide one you can add it as a query to the url `?region=<region-name>`

- `http`: http is a store mostly used for wrapping a dir store to fetch data through http requests. It does not support uploading, just fetching the data.
It can be set in the FL file as the store to fetch the data with `rfs config`. Example: `http://localhost:9000/store` (https works too).

@@ -144,6 +144,41 @@ Options:

By default when unpacking the `-p` flag is not set. which means downloaded files will be `owned` by the current user/group. If `-p` flag is set, the files ownership will be same as the original files used to create the fl (preserve `uid` and `gid` of the files and directories) this normally requires `sudo` while unpacking.

# Merge an `fl`

rfs provides a `merge` subcommand that combines multiple file lists (FL files) into a single unified file list.

```bash
rfs merge -m merged.fl -t flist1.fl -t flist2.fl -s dir:///tmp/store
```

This tells rfs to create an `fl` named `merged.fl` by combining the file lists `flist1.fl` and `flist2.fl`. A store must be specified with the `-s` option to handle block storage during the merge operation.

## Requirements for Merge

- At least 2 input file lists must be specified with the `-t` option
- A store must be specified with the `-s` option to store blocks that need to be copied or moved

## Full Command Help

```bash
# rfs merge --help

merge 2 or more FLs into a new one

Usage: rfs merge [OPTIONS] --store <STORE> <META> <TARGET_FLISTS>...

Arguments:
<META> path to metadata file (flist)
<TARGET_FLISTS>...

Options:
-s, --store <STORE> store url in the format [xx-xx=]<url>
--no-strip-password disables automatic password stripping from store url
-c, --cache <CACHE> directory used as cache for downloaded file chunks [default: /tmp/cache]
-h, --help Print help
```

# Specifications

Please check [docs](../docs)
121 changes: 121 additions & 0 deletions rfs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,8 @@ pub use unpack::unpack;
mod clone;
pub use clone::clone;
pub mod config;
mod merge;
pub use merge::merge;

const PARALLEL_UPLOAD: usize = 10; // number of files we can upload in parallel

@@ -104,4 +106,123 @@ mod test {

assert!(status.success());
}

#[tokio::test]
async fn test_merge() {
const ROOT: &str = "/tmp/merge-test";
let _ = fs::remove_dir_all(ROOT).await;

println!("declaring directories");

let root: PathBuf = ROOT.into();
let source1 = root.join("source1");
let source2 = root.join("source2");
let merged_dest = root.join("merged");
let cache_dir = root.join("cache");

println!("creating directories");

fs::create_dir_all(&source1).await.unwrap();
fs::create_dir_all(&source2).await.unwrap();
fs::create_dir_all(&cache_dir).await.unwrap();

println!("creating test files");

create_test_files(&source1, "file1.txt", 1024).await;
create_test_files(&source1, "file2.txt", 2048).await;

create_test_files(&source2, "file3.txt", 2048).await;
create_test_files(&source2, "file4.txt", 512).await;

println!("test files created");
println!("packing source1");
let meta1_path = root.join("meta1.fl");
let writer1 = meta::Writer::new(&meta1_path, true).await.unwrap();
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut router1 = Router::new();
router1.add(0x00, 0xFF, store1);

pack(writer1, router1, &source1, false, None).await.unwrap();
println!("packing complete for source1");

println!("packing source2");
let meta2_path = root.join("meta2.fl");
let writer2 = meta::Writer::new(&meta2_path, true).await.unwrap();
let store2 = DirStore::new(root.join("store2")).await.unwrap();
let mut router2 = Router::new();
router2.add(0x00, 0xFF, store2);
pack(writer2, router2, &source2, false, None).await.unwrap();

println!("packing complete for source2");
let merged_meta_path = root.join("merged.fl");
let merged_writer = meta::Writer::new(&merged_meta_path, true).await.unwrap();
let merged_store = DirStore::new(root.join("merged_store")).await.unwrap();
let mut router = Router::new();
router.add(0x00, 0xFF, merged_store);

println!("merging");

merge(
merged_writer,
router,
false,
vec![
meta1_path.to_string_lossy().to_string(),
meta2_path.to_string_lossy().to_string(),
],
cache_dir.to_string_lossy().to_string(),
)
.await
.unwrap();

println!("merge complete");
let merged_reader = meta::Reader::new(&merged_meta_path).await.unwrap();
let merged_router = store::get_router(&merged_reader).await.unwrap();
let merged_cache = Cache::new(root.join("merged_cache"), merged_router);

unpack(&merged_reader, &merged_cache, &merged_dest, false)
.await
.unwrap();

assert!(merged_dest.join("file1.txt").exists());
assert!(merged_dest.join("file2.txt").exists());
assert!(merged_dest.join("file3.txt").exists());
assert!(merged_dest.join("file4.txt").exists());

verify_file_content(merged_dest.join("file1.txt"), 1024).await;
verify_file_content(merged_dest.join("file2.txt"), 2048).await;
verify_file_content(merged_dest.join("file3.txt"), 2048).await;
verify_file_content(merged_dest.join("file4.txt"), 512).await;
}

async fn create_test_files<P: AsRef<std::path::Path>>(dir: P, name: &str, size: usize) {
let mut urandom = fs::OpenOptions::default()
.read(true)
.open("/dev/urandom")
.await
.unwrap()
.take(size as u64);

let p = dir.as_ref().join(name);
let mut file = fs::OpenOptions::default()
.create(true)
.write(true)
.open(p)
.await
.unwrap();

tokio::io::copy(&mut urandom, &mut file).await.unwrap();
}

async fn verify_file_content<P: AsRef<std::path::Path>>(path: P, expected_size: usize) {
let mut file = fs::OpenOptions::default()
.read(true)
.open(path)
.await
.unwrap();

let mut buffer = vec![0; expected_size];
let size = file.read(&mut buffer).await.unwrap();
assert_eq!(size, expected_size);
}
}
52 changes: 52 additions & 0 deletions rfs/src/main.rs
Original file line number Diff line number Diff line change
@@ -37,6 +37,8 @@ enum Commands {
Clone(CloneOptions),
/// list or modify FL metadata and stores
Config(ConfigOptions),
/// merge 2 or more FLs into a new one
Merge(MergeOptions),
}

#[derive(Args, Debug)]
@@ -115,6 +117,35 @@ struct CloneOptions {
cache: String,
}

#[derive(Args, Debug)]
struct MergeOptions {
/// path to metadata file (flist)
meta: String,

#[clap(short, long, action=ArgAction::Append, required = true)]
store: Vec<String>,

#[clap(long, default_value_t = false)]
no_strip_password: bool,

#[clap(action=ArgAction::Append, required = true)]
target_flists: Vec<String>,

#[clap(short, long, default_value_t = String::from("/tmp/cache"))]
cache: String,
}

impl MergeOptions {
fn validate(&self) -> Result<()> {
if self.target_flists.len() < 2 {
return Err(anyhow::anyhow!(
"At least 2 target file lists are required for merge operation"
));
}
Ok(())
}
}

#[derive(Args, Debug)]
struct ConfigOptions {
/// path to metadata file (flist)
@@ -219,6 +250,7 @@ fn main() -> Result<()> {
Commands::Unpack(opts) => unpack(opts),
Commands::Clone(opts) => clone(opts),
Commands::Config(opts) => config(opts),
Commands::Merge(opts) => merge(opts),
}
}

@@ -381,3 +413,23 @@ fn config(opts: ConfigOptions) -> Result<()> {
Ok(())
})
}

fn merge(opts: MergeOptions) -> Result<()> {
opts.validate()?;

let rt = tokio::runtime::Runtime::new()?;

rt.block_on(async move {
let store = store::parse_router(opts.store.as_slice()).await?;
let meta = fungi::Writer::new(opts.meta, true).await?;
rfs::merge(
meta,
store,
!opts.no_strip_password,
opts.target_flists,
opts.cache,
)
.await?;
Ok(())
})
}
235 changes: 235 additions & 0 deletions rfs/src/merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
use crate::{
cache::Cache,
fungi::{
meta::{FileType, Inode, Mode, Walk, WalkVisitor},
Reader, Result, Writer,
},
store::{get_router, BlockStore, Router, Store, Stores},
};
use anyhow::Context;
use hex::ToHex;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use tokio::io::AsyncReadExt;

const ROOT_PATH: &str = "/";

pub async fn merge<S: Store>(
writer: Writer,
store: S,
strip_password: bool,
target_flists: Vec<String>,
cache: String,
) -> Result<()> {
for route in store.routes() {
let mut store_url = route.url;

if strip_password {
let mut url = url::Url::parse(&store_url).context("failed to parse store url")?;
if url.password().is_some() {
url.set_password(None)
.map_err(|_| anyhow::anyhow!("failed to strip password"))?;

store_url = url.to_string();
}
}

let range_start = route.start.unwrap_or_default();
let range_end = route.end.unwrap_or(u8::MAX);

writer.route(range_start, range_end, store_url).await?;
}

let store = store.into();

let mut path_to_inode_map = HashMap::new();
let root_path = PathBuf::from(ROOT_PATH);

let root_inode = Inode {
name: ROOT_PATH.into(),
mode: Mode::new(FileType::Dir, 0o755),
..Default::default()
};
let root_ino = writer.inode(root_inode).await?;
path_to_inode_map.insert(root_path, root_ino);

for target_flist in target_flists {
let reader = Reader::new(&target_flist).await?;
let router = get_router(&reader).await?;
let cache_instance = Cache::new(cache.clone(), router);

let mut visited = HashSet::new();
let mut visitor = MergeVisitor {
writer: writer.clone(),
reader: reader.clone(),
store: &store,
cache: cache_instance,
path_to_inode: &mut path_to_inode_map,
visited: &mut visited,
};

reader.walk(&mut visitor).await?;
}

Ok(())
}

struct MergeVisitor<'a, S>
where
S: Store,
{
writer: Writer,
reader: Reader,
store: &'a BlockStore<S>,
cache: Cache<Router<Stores>>,
path_to_inode: &'a mut HashMap<PathBuf, u64>,
visited: &'a mut HashSet<u64>,
}

impl<'a, S> MergeVisitor<'a, S>
where
S: Store,
{
async fn ensure_parent_directory(&mut self, path: &Path) -> Result<u64> {
if path.to_str() == Some(ROOT_PATH) {
return Ok(*self.path_to_inode.get(path).unwrap_or(&1));
}

if let Some(ino) = self.path_to_inode.get(path) {
return Ok(*ino);
}

let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH));
let parent_ino = Box::pin(self.ensure_parent_directory(parent_path)).await?;

let dir_name = path
.file_name()
.map(|name| name.to_string_lossy().to_string())
.unwrap_or_default();

let dir_inode = Inode {
parent: parent_ino,
name: dir_name,
mode: Mode::new(FileType::Dir, 0o755),
..Default::default()
};

let new_ino = self.writer.inode(dir_inode).await?;
self.path_to_inode.insert(path.to_path_buf(), new_ino);

Ok(new_ino)
}

async fn copy_blocks(&mut self, source_ino: u64, dest_ino: u64) -> Result<()> {
let blocks = self.reader.blocks(source_ino).await?;

for block in &blocks {
self.writer.block(dest_ino, &block.id, &block.key).await?;
}

let mut blocks_to_store = Vec::new();
for block in blocks {
if self.store.get(&block).await.is_err() {
blocks_to_store.push(block);
}
}

for block in blocks_to_store {
let (_, mut file) = self.cache.get(&block).await?;
let mut data = Vec::new();
if let Err(e) = file.read_to_end(&mut data).await {
log::error!(
"failed to read block {}: {}",
block.id.as_slice().encode_hex::<String>(),
e
);
return Err(e.into());
}
if let Err(e) = self.store.set(&data).await {
log::error!(
"failed to set block {}: {}",
block.id.as_slice().encode_hex::<String>(),
e
);
return Err(e.into());
}
}

Ok(())
}
}

#[async_trait::async_trait]
impl<'a, S> WalkVisitor for MergeVisitor<'a, S>
where
S: Store,
{
async fn visit(&mut self, path: &Path, node: &Inode) -> Result<Walk> {
if !self.visited.insert(node.ino) {
return Ok(Walk::Continue);
}

match node.mode.file_type() {
FileType::Dir => {
if path.to_str() == Some(ROOT_PATH) {
return Ok(Walk::Continue);
}

let dir_name = path
.file_name()
.map(|name| name.to_string_lossy().to_string())
.unwrap_or_default();

let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH));
let parent_ino = self.ensure_parent_directory(parent_path).await?;

let dir_node = Inode {
parent: parent_ino,
name: dir_name,
mode: node.mode.clone(),
uid: node.uid,
gid: node.gid,
rdev: node.rdev,
ctime: node.ctime,
mtime: node.mtime,
data: node.data.clone(),
..Default::default()
};

let dir_ino = self.writer.inode(dir_node).await?;
self.path_to_inode.insert(path.to_path_buf(), dir_ino);
}
FileType::Regular => {
let file_name = path
.file_name()
.map(|name| name.to_string_lossy().to_string())
.unwrap_or_default();

let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH));
let parent_ino = self.ensure_parent_directory(parent_path).await?;

let file_node = Inode {
parent: parent_ino,
name: file_name,
size: node.size,
uid: node.uid,
gid: node.gid,
mode: node.mode.clone(),
rdev: node.rdev,
ctime: node.ctime,
mtime: node.mtime,
data: node.data.clone(),
..Default::default()
};

let ino = self.writer.inode(file_node).await?;
self.copy_blocks(node.ino, ino).await?;
}
_ => {
log::warn!("Unknown file type for node: {:?}", node);
}
}

Ok(Walk::Continue)
}
}