From 02c2c2588b970fbdabafe3cc15e9ccd01fb917f5 Mon Sep 17 00:00:00 2001 From: Wenhao Ren Date: Wed, 1 Nov 2023 14:27:24 +0800 Subject: [PATCH] nydusd: add the config support of `amplify_io` Add the support of `amplify_io` in the config file of nydusd to configure read amplification. Signed-off-by: Wenhao Ren --- api/src/config.rs | 23 +++++++++++++---------- service/src/fs_cache.rs | 4 ++-- storage/src/cache/cachedfile.rs | 24 ++++++++++++------------ storage/src/cache/filecache/mod.rs | 8 ++++++-- storage/src/cache/fscache/mod.rs | 8 ++++++-- storage/src/cache/worker.rs | 2 +- storage/src/factory.rs | 13 ++++++++++++- 7 files changed, 52 insertions(+), 30 deletions(-) diff --git a/api/src/config.rs b/api/src/config.rs index b9cc387b2eb..2dc86d175c5 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -820,7 +820,7 @@ pub struct RafsConfigV2 { #[serde(default = "default_rafs_mode")] pub mode: String, /// Batch size to read data from storage cache layer. - #[serde(rename = "batch_size", default = "default_batch_size")] + #[serde(rename = "batch_size", default = "default_user_io_batch_size")] pub user_io_batch_size: usize, /// Whether to validate data digest. #[serde(default)] @@ -874,8 +874,8 @@ pub struct PrefetchConfigV2 { /// Number of data prefetching working threads. #[serde(rename = "threads", default = "default_prefetch_threads_count")] pub threads_count: usize, - /// The batch size to prefetch data from backend. - #[serde(rename = "batch_size", default = "default_prefetch_batch_size")] + /// The amplify batch size to prefetch data from backend. + #[serde(rename = "batch_size", default = "default_prefetch_request_batch_size")] pub prefetch_request_batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. #[serde(default)] @@ -1194,11 +1194,11 @@ fn default_work_dir() -> String { ".".to_string() } -pub fn default_batch_size() -> usize { - 128 * 1024 +fn default_user_io_batch_size() -> usize { + 1024 * 1024 } -fn default_prefetch_batch_size() -> usize { +pub fn default_prefetch_request_batch_size() -> usize { 1024 * 1024 } @@ -1364,7 +1364,7 @@ struct RafsConfig { #[serde(default)] pub latest_read_files: bool, // ZERO value means, amplifying user io is not enabled. - #[serde(rename = "amplify_io", default = "default_batch_size")] + #[serde(rename = "amplify_io", default = "default_user_io_batch_size")] pub user_io_batch_size: usize, } @@ -1410,8 +1410,11 @@ struct FsPrefetchControl { #[serde(default = "default_prefetch_threads_count")] pub threads_count: usize, - /// Window size in unit of bytes to merge request to backend. - #[serde(rename = "merging_size", default = "default_batch_size")] + /// The amplify batch size to prefetch data from backend. + #[serde( + rename = "merging_size", + default = "default_prefetch_request_batch_size" + )] pub prefetch_request_batch_size: usize, /// Network bandwidth limitation for prefetching. @@ -1449,7 +1452,7 @@ struct BlobPrefetchConfig { pub enable: bool, /// Number of data prefetching working threads. pub threads_count: usize, - /// The maximum size of a merged IO request. + /// The amplify batch size to prefetch data from backend. #[serde(rename = "merging_size")] pub prefetch_request_batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index cb699f2875f..fdca45ed16a 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -522,8 +522,8 @@ impl FsCacheHandler { .prefetch_request_batch_size .checked_next_power_of_two() { - None => nydus_api::default_batch_size() as u64, - Some(1) => nydus_api::default_batch_size() as u64, + None => nydus_api::default_prefetch_request_batch_size() as u64, + Some(1) => nydus_api::default_prefetch_request_batch_size() as u64, Some(s) => s as u64, }; let size = std::cmp::max(0x4_0000u64, size); diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index d9c8d32b612..8662effc0ce 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -164,7 +164,7 @@ pub(crate) struct FileCacheEntry { pub(crate) dio_enabled: bool, // Data from the file cache should be validated before use. pub(crate) need_validation: bool, - pub(crate) batch_size: u64, + pub(crate) user_io_batch_size: u64, pub(crate) prefetch_config: Arc, } @@ -300,7 +300,7 @@ impl FileCacheEntry { } } - fn prefetch_batch_size(&self) -> u64 { + fn prefetch_request_batch_size(&self) -> u64 { if self.prefetch_config.prefetch_request_batch_size < 0x2_0000 { 0x2_0000 } else { @@ -308,11 +308,11 @@ impl FileCacheEntry { } } - fn ondemand_batch_size(&self) -> u64 { - if self.batch_size < 0x2_0000 { + fn user_io_batch_size(&self) -> u64 { + if self.user_io_batch_size < 0x2_0000 { 0x2_0000 } else { - self.batch_size + self.user_io_batch_size } } @@ -559,7 +559,7 @@ impl BlobCache for FileCacheEntry { } // Then handle fs prefetch - let max_comp_size = self.prefetch_batch_size(); + let max_comp_size = self.prefetch_request_batch_size(); let mut bios = bios.to_vec(); bios.sort_unstable_by_key(|entry| entry.chunkinfo.compressed_offset()); self.metrics.prefetch_unmerged_chunks.add(bios.len() as u64); @@ -719,7 +719,7 @@ impl BlobObject for FileCacheEntry { let meta = self.meta.as_ref().ok_or_else(|| enoent!())?; let meta = meta.get_blob_meta().ok_or_else(|| einval!())?; let mut chunks = - meta.get_chunks_compressed(offset, size, self.prefetch_batch_size(), prefetch)?; + meta.get_chunks_compressed(offset, size, self.prefetch_request_batch_size(), prefetch)?; if !chunks.is_empty() { if let Some(meta) = self.get_blob_meta_info()? { chunks = self.strip_ready_chunks(meta, None, chunks); @@ -745,7 +745,7 @@ impl BlobObject for FileCacheEntry { let meta = self.meta.as_ref().ok_or_else(|| einval!())?; let meta = meta.get_blob_meta().ok_or_else(|| einval!())?; - let mut chunks = meta.get_chunks_uncompressed(offset, size, self.ondemand_batch_size())?; + let mut chunks = meta.get_chunks_uncompressed(offset, size, self.user_io_batch_size())?; if let Some(meta) = self.get_blob_meta_info()? { chunks = self.strip_ready_chunks(meta, None, chunks); } @@ -764,7 +764,7 @@ impl BlobObject for FileCacheEntry { let chunks_extended; let mut chunks = &range.chunks; - if let Some(v) = self.extend_pending_chunks(chunks, self.prefetch_batch_size())? { + if let Some(v) = self.extend_pending_chunks(chunks, self.prefetch_request_batch_size())? { chunks_extended = v; chunks = &chunks_extended; } @@ -934,7 +934,7 @@ impl FileCacheEntry { fn read_iter(&self, bios: &mut [BlobIoDesc], buffers: &[FileVolatileSlice]) -> Result { // Merge requests with continuous blob addresses. let requests = self - .merge_requests_for_user(bios, self.ondemand_batch_size()) + .merge_requests_for_user(bios, self.user_io_batch_size()) .ok_or_else(|| { for bio in bios.iter() { self.update_chunk_pending_status(&bio.chunkinfo, false); @@ -1100,14 +1100,14 @@ impl FileCacheEntry { + region.chunks[idx].compressed_size() as u64; let start = region.chunks[idx + 1].compressed_offset(); assert!(end <= start); - assert!(start - end <= self.ondemand_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT); + assert!(start - end <= self.user_io_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT); assert!(region.chunks[idx].id() < region.chunks[idx + 1].id()); } } // Try to extend requests. let mut region_hold; - if let Some(v) = self.extend_pending_chunks(®ion.chunks, self.ondemand_batch_size())? { + if let Some(v) = self.extend_pending_chunks(®ion.chunks, self.user_io_batch_size())? { if v.len() > r.chunks.len() { let mut tag_set = HashSet::new(); for (idx, chunk) in region.chunks.iter().enumerate() { diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index 2b158ca09b1..edb06952505 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -23,7 +23,6 @@ use crate::cache::state::{ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo}; -use crate::RAFS_DEFAULT_CHUNK_SIZE; pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw"; pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data"; @@ -46,6 +45,7 @@ pub struct FileCacheMgr { cache_convergent_encryption: bool, cache_encryption_key: String, closed: Arc, + user_io_batch_size: Option, } impl FileCacheMgr { @@ -55,6 +55,7 @@ impl FileCacheMgr { backend: Arc, runtime: Arc, id: &str, + user_io_batch_size: Option, ) -> Result { let blob_cfg = config.get_filecache_config()?; let work_dir = blob_cfg.get_work_dir()?; @@ -77,6 +78,7 @@ impl FileCacheMgr { cache_convergent_encryption: blob_cfg.enable_convergent_encryption, cache_encryption_key: blob_cfg.encryption_key.clone(), closed: Arc::new(AtomicBool::new(false)), + user_io_batch_size, }) } @@ -339,7 +341,9 @@ impl FileCacheEntry { is_zran, dio_enabled: false, need_validation, - batch_size: RAFS_DEFAULT_CHUNK_SIZE, + // If none, use default 0 since it's not used. + // e.g., at build time. + user_io_batch_size: mgr.user_io_batch_size.unwrap_or(0), prefetch_config, }) } diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index cf624f4f427..e11b83936d9 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -20,7 +20,6 @@ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; use crate::factory::BLOB_FACTORY; -use crate::RAFS_DEFAULT_CHUNK_SIZE; use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; @@ -40,6 +39,7 @@ pub struct FsCacheMgr { need_validation: bool, blobs_check_count: Arc, closed: Arc, + user_io_batch_size: Option, } impl FsCacheMgr { @@ -49,6 +49,7 @@ impl FsCacheMgr { backend: Arc, runtime: Arc, id: &str, + user_io_batch_size: Option, ) -> Result { if config.cache_compressed { return Err(enosys!("fscache doesn't support compressed cache mode")); @@ -73,6 +74,7 @@ impl FsCacheMgr { need_validation: config.cache_validate, blobs_check_count: Arc::new(AtomicU8::new(0)), closed: Arc::new(AtomicBool::new(false)), + user_io_batch_size, }) } @@ -290,7 +292,9 @@ impl FileCacheEntry { is_zran, dio_enabled: true, need_validation, - batch_size: RAFS_DEFAULT_CHUNK_SIZE, + // If none, use default 0 since it's not used. + // e.g., at build time. + user_io_batch_size: mgr.user_io_batch_size.unwrap_or(0), prefetch_config, }) } diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 14bc6d5eb17..0a3cb241ca6 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -25,7 +25,7 @@ pub(crate) struct AsyncPrefetchConfig { pub enable: bool, /// Number of working threads. pub threads_count: usize, - /// Window size to merge/amplify requests. + /// The amplify batch size to prefetch data from backend. pub prefetch_request_batch_size: usize, /// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set. #[allow(unused)] diff --git a/storage/src/factory.rs b/storage/src/factory.rs index cc37a4e913c..f7ec7805103 100644 --- a/storage/src/factory.rs +++ b/storage/src/factory.rs @@ -117,6 +117,10 @@ impl BlobFactory { ) -> IOResult> { let backend_cfg = config.get_backend_config()?; let cache_cfg = config.get_cache_config()?; + let user_io_batch_size = match config.get_rafs_config() { + Ok(v) => Some(v.user_io_batch_size as u64), + Err(_) => None, + }; let key = BlobCacheMgrKey { config: config.clone(), }; @@ -128,7 +132,13 @@ impl BlobFactory { let backend = Self::new_backend(backend_cfg, &blob_info.blob_id())?; let mgr = match cache_cfg.cache_type.as_str() { "blobcache" | "filecache" => { - let mgr = FileCacheMgr::new(cache_cfg, backend, ASYNC_RUNTIME.clone(), &config.id)?; + let mgr = FileCacheMgr::new( + cache_cfg, + backend, + ASYNC_RUNTIME.clone(), + &config.id, + user_io_batch_size, + )?; mgr.init()?; Arc::new(mgr) as Arc } @@ -139,6 +149,7 @@ impl BlobFactory { backend, ASYNC_RUNTIME.clone(), &config.id, + user_io_batch_size, )?; mgr.init()?; Arc::new(mgr) as Arc