From ff5e62e3dcc6658dea5c8db86ecc64013a0cd89c Mon Sep 17 00:00:00 2001 From: Yadong Ding Date: Fri, 27 Sep 2024 09:42:00 +0800 Subject: [PATCH] storage: enable chunk deduplication for file cache Enable chunk deduplication for file cache. It works in this way: - When a chunk is not in blob cache file yet, inquery CAS database whether other blob data files have the required chunk. If there's duplicated data chunk in other data files, copy the chunk data into current blob cache file by using copy_file_range(). - After downloading a data chunk from remote, save file/offset/chunk-id into CAS database, so it can be reused later. Co-authored-by: Jiang Liu Co-authored-by: Yading Ding Signed-off-by: Yadong Ding --- Cargo.toml | 3 +++ src/bin/nydusd/main.rs | 23 +++++++++++++++++++-- storage/Cargo.toml | 1 - storage/src/cache/cachedfile.rs | 33 ++++++++++++++++++++++++++++-- storage/src/cache/filecache/mod.rs | 29 ++++++++++++++++++++++---- storage/src/cache/fscache/mod.rs | 29 ++++++++++++++++++++++---- 6 files changed, 105 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fbd64232437..9656f5a92c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ default = [ "backend-s3", "backend-http-proxy", "backend-localdisk", + "dedup", ] virtiofs = [ "nydus-service/virtiofs", @@ -116,6 +117,8 @@ backend-oss = ["nydus-storage/backend-oss"] backend-registry = ["nydus-storage/backend-registry"] backend-s3 = ["nydus-storage/backend-s3"] +dedup = ["nydus-storage/dedup"] + [workspace] members = [ "api", diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index fc5e4b7a6b8..ab138442f6b 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -26,6 +26,7 @@ use nydus_service::{ create_daemon, create_fuse_daemon, create_vfs_backend, validate_threads_configuration, Error as NydusError, FsBackendMountCmd, FsBackendType, ServiceArgs, }; +use nydus_storage::cache::CasMgr; use crate::api_server_glue::ApiServerController; @@ -50,7 +51,7 @@ fn thread_validator(v: &str) -> std::result::Result { } fn append_fs_options(app: Command) -> Command { - app.arg( + let mut app = app.arg( Arg::new("bootstrap") .long("bootstrap") .short('B') @@ -87,7 +88,18 @@ fn append_fs_options(app: Command) -> Command { .help("Mountpoint within the FUSE/virtiofs device to mount the RAFS/passthroughfs filesystem") .default_value("/") .required(false), - ) + ); + + #[cfg(feature = "dedup")] + { + app = app.arg( + Arg::new("dedup-db") + .long("dedup-db") + .help("Database file for chunk deduplication"), + ); + } + + app } fn append_fuse_options(app: Command) -> Command { @@ -750,6 +762,13 @@ fn main() -> Result<()> { dump_program_info(); handle_rlimit_nofile_option(&args, "rlimit-nofile")?; + #[cfg(feature = "dedup")] + if let Some(db) = args.get_one::("dedup-db") { + let mgr = CasMgr::new(db).map_err(|e| eother!(format!("{}", e)))?; + info!("Enable chunk deduplication by using database at {}", db); + CasMgr::set_singleton(mgr); + } + match args.subcommand_name() { Some("singleton") => { // Safe to unwrap because the subcommand is `singleton`. diff --git a/storage/Cargo.toml b/storage/Cargo.toml index a45ba0a1fe1..8636d5cb53c 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -58,7 +58,6 @@ regex = "1.7.0" toml = "0.5" [features] -default = ["dedup"] backend-localdisk = [] backend-localdisk-gpt = ["gpt", "backend-localdisk"] backend-localfs = [] diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index d30bcb1762b..a2432cb5fa7 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -13,6 +13,7 @@ use std::collections::HashSet; use std::fs::File; use std::io::{ErrorKind, Read, Result}; use std::mem::ManuallyDrop; +use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -29,7 +30,7 @@ use tokio::runtime::Runtime; use crate::backend::BlobReader; use crate::cache::state::ChunkMap; use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobIoMergeState}; +use crate::cache::{BlobCache, BlobIoMergeState, CasMgr}; use crate::device::{ BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoSegment, BlobIoTag, BlobIoVec, BlobObject, BlobPrefetchRequest, @@ -184,8 +185,10 @@ pub(crate) struct FileCacheEntry { pub(crate) blob_info: Arc, pub(crate) cache_cipher_object: Arc, pub(crate) cache_cipher_context: Arc, + pub(crate) cas_mgr: Option>, pub(crate) chunk_map: Arc, pub(crate) file: Arc, + pub(crate) file_path: Arc, pub(crate) meta: Option, pub(crate) metrics: Arc, pub(crate) prefetch_state: Arc, @@ -233,13 +236,16 @@ impl FileCacheEntry { } fn delay_persist_chunk_data(&self, chunk: Arc, buffer: Arc) { + let blob_info = self.blob_info.clone(); let delayed_chunk_map = self.chunk_map.clone(); let file = self.file.clone(); + let file_path = self.file_path.clone(); let metrics = self.metrics.clone(); let is_raw_data = self.is_raw_data; let is_cache_encrypted = self.is_cache_encrypted; let cipher_object = self.cache_cipher_object.clone(); let cipher_context = self.cache_cipher_context.clone(); + let cas_mgr = self.cas_mgr.clone(); metrics.buffered_backend_size.add(buffer.size() as u64); self.runtime.spawn_blocking(move || { @@ -291,6 +297,11 @@ impl FileCacheEntry { }; let res = Self::persist_cached_data(&file, offset, buf); Self::_update_chunk_pending_status(&delayed_chunk_map, chunk.as_ref(), res.is_ok()); + if let Some(mgr) = cas_mgr { + if let Err(e) = mgr.record_chunk(&blob_info, chunk.deref(), file_path.as_ref()) { + warn!("failed to record chunk state for dedup, {}", e); + } + } }); } @@ -1051,13 +1062,21 @@ impl FileCacheEntry { trace!("dispatch single io range {:?}", req); let mut blob_cci = BlobCCI::new(); for (i, chunk) in req.chunks.iter().enumerate() { - let is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) { + let mut is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) { Ok(true) => true, Ok(false) => false, Err(StorageError::Timeout) => false, // Retry if waiting for inflight IO timeouts Err(e) => return Err(einval!(e)), }; + if !is_ready { + if let Some(mgr) = self.cas_mgr.as_ref() { + is_ready = mgr.dedup_chunk(&self.blob_info, chunk.deref(), &self.file); + if is_ready { + self.update_chunk_pending_status(chunk.deref(), true); + } + } + } // Directly read chunk data from file cache into user buffer iff: // - the chunk is ready in the file cache // - data in the file cache is plaintext. @@ -1454,6 +1473,16 @@ impl FileCacheEntry { } } +impl Drop for FileCacheEntry { + fn drop(&mut self) { + if let Some(cas_mgr) = &self.cas_mgr { + if let Err(e) = cas_mgr.gc() { + warn!("cas_mgr gc failed: {}", e); + } + } + } +} + /// An enum to reuse existing buffers for IO operations, and CoW on demand. #[allow(dead_code)] enum DataBuffer { diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index e6b8c5b80da..11c7ac1143d 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -21,8 +21,9 @@ use crate::cache::state::{ BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap, }; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobCacheMgr}; +use crate::cache::{BlobCache, BlobCacheMgr, CasMgr}; use crate::device::{BlobFeatures, BlobInfo}; +use crate::utils::get_path_from_file; pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw"; pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data"; @@ -209,10 +210,19 @@ impl FileCacheEntry { reader.clone() }; + // Turn off chunk deduplication in case of cache data encryption is enabled or is tarfs. + let cas_mgr = if mgr.cache_encrypted || mgr.cache_raw_data || is_tarfs { + warn!("chunk deduplication trun off"); + None + } else { + CasMgr::get_singleton() + }; + let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?; let blob_uncompressed_size = blob_info.uncompressed_size(); let is_legacy_stargz = blob_info.is_legacy_stargz(); + let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let ( file, meta, @@ -221,7 +231,6 @@ impl FileCacheEntry { is_get_blob_object_supported, need_validation, ) = if is_tarfs { - let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let file = OpenOptions::new() .create(false) .write(false) @@ -231,7 +240,6 @@ impl FileCacheEntry { Arc::new(BlobStateMap::from(NoopChunkMap::new(true))) as Arc; (file, None, chunk_map, true, true, false) } else { - let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let (chunk_map, is_direct_chunkmap) = Self::create_chunk_map(mgr, &blob_info, &blob_file_path)?; // Validation is supported by RAFS v5 (which has no meta_ci) or v6 with chunk digest array. @@ -266,6 +274,7 @@ impl FileCacheEntry { ); return Err(einval!(msg)); } + let load_chunk_digest = need_validation || cas_mgr.is_some(); let meta = if blob_info.meta_ci_is_valid() || blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED) { @@ -275,7 +284,7 @@ impl FileCacheEntry { Some(blob_meta_reader), Some(runtime.clone()), false, - need_validation, + load_chunk_digest, )?; Some(meta) } else { @@ -307,6 +316,16 @@ impl FileCacheEntry { (Default::default(), Default::default()) }; + let mut blob_data_file_path = String::new(); + if cas_mgr.is_some() { + blob_data_file_path = if let Some(path) = get_path_from_file(&file) { + path + } else { + warn!("can't get path from file"); + "".to_string() + } + } + trace!( "filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, batch {}, zran {}", mgr.cache_raw_data, @@ -322,8 +341,10 @@ impl FileCacheEntry { blob_info, cache_cipher_object, cache_cipher_context, + cas_mgr, chunk_map, file: Arc::new(file), + file_path: Arc::new(blob_data_file_path), meta, metrics: mgr.metrics.clone(), prefetch_state: Arc::new(AtomicU32::new(0)), diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index 5b2285c9b0e..6b22386b2cd 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -15,13 +15,13 @@ use tokio::runtime::Runtime; use crate::backend::BlobBackend; use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; +use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap}; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobCacheMgr}; +use crate::cache::{BlobCache, BlobCacheMgr, CasMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; use crate::factory::BLOB_FACTORY; - -use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; +use crate::utils::get_path_from_file; const FSCACHE_BLOBS_CHECK_NUM: u8 = 1; @@ -240,9 +240,18 @@ impl FileCacheEntry { }; let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?; + // Turn off chunk deduplication in case of tarfs. + let cas_mgr = if is_tarfs { + warn!("chunk deduplication trun off"); + None + } else { + CasMgr::get_singleton() + }; + let need_validation = mgr.need_validation && !blob_info.is_legacy_stargz() && blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST); + let load_chunk_digest = need_validation || cas_mgr.is_some(); let blob_file_path = format!("{}/{}", mgr.work_dir, blob_meta_id); let meta = if blob_info.meta_ci_is_valid() { FileCacheMeta::new( @@ -251,7 +260,7 @@ impl FileCacheEntry { Some(blob_meta_reader), None, true, - need_validation, + load_chunk_digest, )? } else { return Err(enosys!( @@ -266,13 +275,25 @@ impl FileCacheEntry { )?)); Self::restore_chunk_map(blob_info.clone(), file.clone(), &meta, &chunk_map); + let mut blob_data_file_path = String::new(); + if cas_mgr.is_some() { + blob_data_file_path = if let Some(path) = get_path_from_file(&file) { + path + } else { + warn!("can't get path from file"); + "".to_string() + } + } + Ok(FileCacheEntry { blob_id, blob_info: blob_info.clone(), cache_cipher_object: Default::default(), cache_cipher_context: Default::default(), + cas_mgr, chunk_map, file, + file_path: Arc::new(blob_data_file_path), meta: Some(meta), metrics: mgr.metrics.clone(), prefetch_state: Arc::new(AtomicU32::new(0)),