Skip to content

Commit

Permalink
implement unpack with no recurssion
Browse files Browse the repository at this point in the history
add cmdline to do unpacking
  • Loading branch information
muhamadazmy committed Sep 22, 2023
1 parent cb5becc commit f0390fe
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 71 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ clap = { version = "4.2", features = ["derive"] }
tar = "0.4"
flate2 = "1.0"
snap = "1.0.5"
async-recursion = "1.0.0"
bb8-redis = "0.13"
async-trait = "0.1.53"
url = "2.3.1"
Expand Down
44 changes: 26 additions & 18 deletions src/fungi/meta.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::path::{Path, PathBuf};
use std::{
collections::LinkedList,
path::{Path, PathBuf},
};

use sqlx::{sqlite::SqliteRow, FromRow, Row, SqlitePool};

Expand Down Expand Up @@ -196,6 +199,8 @@ pub trait WalkVisitor {
async fn visit(&mut self, path: &Path, node: &Inode) -> Result<Walk>;
}

struct WalkItem(PathBuf, Inode);

#[derive(Clone)]
pub struct Reader {
pool: SqlitePool,
Expand Down Expand Up @@ -271,21 +276,25 @@ impl Reader {

pub async fn walk<W: WalkVisitor + Send>(&self, visitor: &mut W) -> Result<()> {
let node = self.inode(1).await?;
let path: PathBuf = "".into();
self.walk_node(&path, &node, visitor).await?;
let mut list = LinkedList::default();
let path: PathBuf = "/".into();
list.push_back(WalkItem(path, node));
while !list.is_empty() {
let item = list.pop_back().unwrap();
self.walk_node(&mut list, &item, visitor).await?;
}

Ok(())
}

#[async_recursion::async_recursion]
async fn walk_node<W: WalkVisitor + Send>(
&self,
path: &Path,
node: &Inode,
list: &mut LinkedList<WalkItem>,
WalkItem(path, node): &WalkItem,
visitor: &mut W,
) -> Result<Walk> {
let path = path.join(&node.name);
) -> Result<()> {
if visitor.visit(&path, node).await? == Walk::Break {
return Ok(Walk::Break);
return Ok(());
}

let mut offset = 0;
Expand All @@ -297,20 +306,19 @@ impl Reader {

for child in children {
offset += 1;
if self.walk_node(&path, &child, visitor).await? == Walk::Break {
// if a file return break, we stop scanning this directory
if child.mode.is(FileType::Regular) {
return Ok(Walk::Continue);
}
// if child was a directory we continue because it means
// a directory returned a break on first visit so the
// entire directory is skipped anyway
let child_path = path.join(&child.name);
if child.mode.is(FileType::Dir) {
list.push_back(WalkItem(child_path, child));
continue;
}

if visitor.visit(&child_path, &child).await? == Walk::Break {
return Ok(());
}
}
}

Ok(Walk::Continue)
Ok(())
}
}

Expand Down
30 changes: 0 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,34 +347,4 @@ mod test {

assert!(status.success());
}

#[tokio::test]
async fn test_ubuntu() {
const ROOT: &str = "/tmp/pack-unpack-test";
let _ = fs::remove_dir_all(ROOT).await;
let root: PathBuf = ROOT.into();

fs::create_dir_all(&root).await.unwrap();
let source = Path::new("/home/azmy/tmp/jammy");

println!("file generation complete");
let writer = meta::Writer::new(root.join("meta.fl")).await.unwrap();

// while we at it we can already create 2 stores and create a router store on top
// of that.
let store0 = store::make("zdb://localhost:9900").await.unwrap();

pack(writer, store0, &source).await.unwrap();

println!("packing complete");
}
struct WalkTest;

#[async_trait::async_trait]
impl WalkVisitor for WalkTest {
async fn visit(&mut self, path: &Path, node: &Inode) -> Result<Walk> {
println!("{} = {:?}", node.ino, path);
Ok(Walk::Continue)
}
}
}
60 changes: 50 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clap::{ArgAction, Args, Parser, Subcommand};

use rfs::cache;
use rfs::fungi;
use rfs::store;
use rfs::store::{self, Router};

mod fs;
/// mount flists
Expand All @@ -28,8 +28,11 @@ struct Options {
enum Commands {
/// mount an FL
Mount(MountOptions),
/// create an FL and upload blocks
Create(CreateOptions),
/// create an FL and upload blocks to provided storage
Pack(PackOptions),

/// unpack (downloads) content of an FL the provided location
Unpack(UnpackOptions),
}

#[derive(Args, Debug)]
Expand Down Expand Up @@ -58,7 +61,7 @@ struct MountOptions {
}

#[derive(Args, Debug)]
struct CreateOptions {
struct PackOptions {
/// path to metadata file (flist)
#[clap(short, long)]
meta: String,
Expand All @@ -71,6 +74,20 @@ struct CreateOptions {
target: String,
}

#[derive(Args, Debug)]
struct UnpackOptions {
/// path to metadata file (flist)
#[clap(short, long)]
meta: String,

/// directory used as cache for downloaded file chuncks
#[clap(short, long, default_value_t = String::from("/tmp/cache"))]
cache: String,

/// target directory to upload
target: String,
}

fn main() -> Result<()> {
let opts = Options::parse();

Expand All @@ -90,11 +107,12 @@ fn main() -> Result<()> {

match opts.command {
Commands::Mount(opts) => mount(opts),
Commands::Create(opts) => create(opts),
Commands::Pack(opts) => pack(opts),
Commands::Unpack(opts) => unpack(opts),
}
}

fn create(opts: CreateOptions) -> Result<()> {
fn pack(opts: PackOptions) -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;

rt.block_on(async move {
Expand All @@ -106,6 +124,22 @@ fn create(opts: CreateOptions) -> Result<()> {
})
}

fn unpack(opts: UnpackOptions) -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;

rt.block_on(async move {
let meta = fungi::Reader::new(opts.meta)
.await
.context("failed to initialize metadata database")?;

let router = get_router(&meta).await?;

let cache = cache::Cache::new(opts.cache, router);
rfs::unpack(&meta, &cache, opts.target).await?;
Ok(())
})
}

fn mount(opts: MountOptions) -> Result<()> {
if is_mountpoint(&opts.target)? {
eprintln!("target {} is already a mount point", opts.target);
Expand Down Expand Up @@ -180,6 +214,15 @@ async fn fuse(opts: MountOptions) -> Result<()> {
.await
.context("failed to initialize metadata database")?;

let router = get_router(&meta).await?;

let cache = cache::Cache::new(opts.cache, router);
let filesystem = fs::Filesystem::new(meta, cache);

filesystem.mount(opts.target).await
}

async fn get_router(meta: &fungi::Reader) -> Result<Router> {
let mut router = store::Router::new();

for route in meta.routes().await.context("failed to get store routes")? {
Expand All @@ -189,8 +232,5 @@ async fn fuse(opts: MountOptions) -> Result<()> {
router.add(route.start, route.end, store);
}

let cache = cache::Cache::new(opts.cache, router);
let filesystem = fs::Filesystem::new(meta, cache);

filesystem.mount(opts.target).await
Ok(router)
}

0 comments on commit f0390fe

Please sign in to comment.