Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add export subcommand to nydus-image #1159

Merged
merged 3 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ nydus-api = { version = "0.2.2", path = "api", features = ["handler"] }
nydus-app = { version = "0.3.2", path = "app" }
nydus-error = { version = "0.2.3", path = "error" }
nydus-rafs = { version = "0.2.2", path = "rafs", features = ["builder"] }
nydus-service = { version = "0.2.0", path = "service" }
nydus-service = { version = "0.2.0", path = "service", features = ["block-device"] }
nydus-storage = { version = "0.6.2", path = "storage" }
nydus-utils = { version = "0.4.1", path = "utils" }

Expand Down
204 changes: 200 additions & 4 deletions service/src/block_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@
//! Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block
//! device, so it can be directly mounted by Linux EROFS fs driver.

use std::cmp::min;
use std::cmp::{max, min};
use std::fs::OpenOptions;
use std::io::Result;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

use dbs_allocator::{Constraint, IntervalTree, NodeState, Range};
use nydus_api::BlobCacheEntry;
use nydus_rafs::metadata::layout::v6::{
EROFS_BLOCK_BITS_12, EROFS_BLOCK_BITS_9, EROFS_BLOCK_SIZE_4096, EROFS_BLOCK_SIZE_512,
};
use nydus_storage::utils::alloc_buf;
use tokio_uring::buf::IoBufMut;

use crate::blob_cache::{BlobCacheMgr, BlobConfig, DataBlob, MetaBlob};
use crate::blob_cache::{generate_blob_key, BlobCacheMgr, BlobConfig, DataBlob, MetaBlob};

const BLOCK_DEVICE_EXPORT_BATCH_SIZE: usize = 0x80000;

enum BlockRange {
Hole,
Expand Down Expand Up @@ -81,7 +89,7 @@ impl BlockDevice {
meta_blob_config.blob_id()
))
})?;
ranges.update(&range, BlockRange::MetaBlob(meta_blob.clone()));
ranges.update(&range, BlockRange::MetaBlob(meta_blob));

let mut pos = blocks;
let data_blobs = meta_blob_config.get_blobs();
Expand Down Expand Up @@ -272,6 +280,194 @@ impl BlockDevice {

(Ok(total_size), buf)
}

/// Export a RAFS filesystem as a raw block disk image.
pub fn export(
blob_entry: BlobCacheEntry,
output: Option<String>,
localfs_dir: Option<String>,
threads: u32,
) -> Result<()> {
let cache_mgr = Arc::new(BlobCacheMgr::new());
cache_mgr.add_blob_entry(&blob_entry).map_err(|e| {
eother!(format!(
"block_device: failed to add blob into CacheMgr, {}",
e
))
})?;
let blob_id = generate_blob_key(&blob_entry.domain_id, &blob_entry.blob_id);
let block_device = BlockDevice::new(blob_id.clone(), cache_mgr.clone()).map_err(|e| {
eother!(format!(
"block_device: failed to create block device object, {}",
e
))
})?;
let block_device = Arc::new(block_device);

let path = match output {
Some(v) => PathBuf::from(v),
None => {
let path = match cache_mgr.get_config(&blob_id) {
Some(BlobConfig::MetaBlob(meta)) => meta.path().to_path_buf(),
_ => return Err(enoent!("block_device: failed to get meta blob")),
};
if !path.is_file() {
return Err(eother!(format!(
"block_device: meta blob {} is not a file",
path.display()
)));
}
let name = path
.file_name()
.ok_or_else(|| {
eother!(format!(
"block_device: failed to get file name from {}",
path.display()
))
})?
.to_str()
.ok_or_else(|| {
eother!(format!(
"block_device: failed to get file name from {}",
path.display()
))
})?;
let dir = localfs_dir
.ok_or_else(|| einval!("block_device: parameter `localfs_dir` is missing"))?;
let path = PathBuf::from(dir);
path.join(name.to_string() + ".disk")
}
};

let output_file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.open(&path)
.map_err(|e| {
eother!(format!(
"block_device: failed to create output file {}, {}",
path.display(),
e
))
})?;
let output_file = Arc::new(tokio_uring::fs::File::from_std(output_file));

let blocks = block_device.blocks();
let batch_size = BLOCK_DEVICE_EXPORT_BATCH_SIZE as u32 / block_device.block_size() as u32;
assert_eq!(batch_size.count_ones(), 1);
let threads = max(threads, 1);
let mut threads = min(threads, 32);
while blocks / threads < batch_size && threads > 1 {
threads /= 2;
}

if threads == 1 {
tokio_uring::start(async move {
Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await
})?;
} else {
let mut thread_handlers: Vec<JoinHandle<Result<()>>> =
Vec::with_capacity(threads as usize);
let step = (blocks + batch_size - 1) & !(batch_size - 1);
let mut pos = 0;

for _i in 0..threads {
let count = min(blocks - pos, step);
let mgr = cache_mgr.clone();
let id = blob_id.clone();
let path = path.to_path_buf();

let handler = thread::spawn(move || {
let output_file = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.map_err(|e| {
eother!(format!(
"block_device: failed to create output file {}, {}",
path.display(),
e
))
})?;
let file = Arc::new(tokio_uring::fs::File::from_std(output_file));
let block_device = BlockDevice::new(id, mgr).map_err(|e| {
eother!(format!(
"block_device: failed to create block device object, {}",
e
))
})?;
let device = Arc::new(block_device);

tokio_uring::start(
async move { Self::do_export(device, file, pos, count).await },
)?;
Ok(())
});
pos += count;
thread_handlers.push(handler);
}
assert_eq!(pos, blocks);
assert_eq!(thread_handlers.len(), threads as usize);

for handler in thread_handlers {
handler
.join()
.map_err(|e| {
eother!(format!(
"block_device: failed to wait for worker thread, {:?}",
e
))
})?
.map_err(|e| {
eother!(format!("block_device: failed to export disk image, {}", e))
})?;
}
}
Ok(())
}

async fn do_export(
block_device: Arc<BlockDevice>,
output_file: Arc<tokio_uring::fs::File>,
start: u32,
mut blocks: u32,
) -> Result<()> {
let batch_size = BLOCK_DEVICE_EXPORT_BATCH_SIZE as u32 / block_device.block_size() as u32;
let mut pos = start;
let mut buf = alloc_buf(BLOCK_DEVICE_EXPORT_BATCH_SIZE);

while blocks > 0 {
let count = min(batch_size, blocks);
let (res, buf1) = block_device.async_read(pos, count, buf).await;
let sz = res?;
if sz != count as usize * block_device.block_size() as usize {
return Err(eio!(
"block_device: failed to read data, got less data than requested"
));
}
buf = buf1;

if sz != buf.len() {
buf.resize(sz, 0);
}
let (res, buf2) = output_file
.write_at(buf, block_device.blocks_to_size(pos))
.await;
let sz1 = res?;
if sz1 != sz {
return Err(eio!(
"block_device: failed to write data to disk image file, written less data than requested"
));
}
buf = buf2;

pos += count;
blocks -= count;
}

Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -337,7 +533,7 @@ mod tests {
assert!(mgr.get_config(&key).is_some());

let mgr = Arc::new(mgr);
let device = BlockDevice::new(blob_id.clone(), mgr).unwrap();
let device = BlockDevice::new(blob_id, mgr).unwrap();
assert_eq!(device.blocks(), 0x209);

tokio_uring::start(async move {
Expand Down
Loading