Skip to content

Commit

Permalink
nydusd: add the config support of amplify_io
Browse files Browse the repository at this point in the history
Add the support of `amplify_io` in the config file of nydusd
to configure read amplification.

Signed-off-by: Wenhao Ren <wenhaoren@mail.dlut.edu.cn>
  • Loading branch information
hangvane committed Nov 1, 2023
1 parent 8c573cb commit 02c2c25
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 30 deletions.
23 changes: 13 additions & 10 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ pub struct RafsConfigV2 {
#[serde(default = "default_rafs_mode")]
pub mode: String,
/// Batch size to read data from storage cache layer.
#[serde(rename = "batch_size", default = "default_batch_size")]
#[serde(rename = "batch_size", default = "default_user_io_batch_size")]
pub user_io_batch_size: usize,
/// Whether to validate data digest.
#[serde(default)]
Expand Down Expand Up @@ -874,8 +874,8 @@ pub struct PrefetchConfigV2 {
/// Number of data prefetching working threads.
#[serde(rename = "threads", default = "default_prefetch_threads_count")]
pub threads_count: usize,
/// The batch size to prefetch data from backend.
#[serde(rename = "batch_size", default = "default_prefetch_batch_size")]
/// The amplify batch size to prefetch data from backend.
#[serde(rename = "batch_size", default = "default_prefetch_request_batch_size")]
pub prefetch_request_batch_size: usize,
/// Network bandwidth rate limit in unit of Bytes and Zero means no limit.
#[serde(default)]
Expand Down Expand Up @@ -1194,11 +1194,11 @@ fn default_work_dir() -> String {
".".to_string()
}

pub fn default_batch_size() -> usize {
128 * 1024
fn default_user_io_batch_size() -> usize {
1024 * 1024
}

fn default_prefetch_batch_size() -> usize {
pub fn default_prefetch_request_batch_size() -> usize {
1024 * 1024
}

Expand Down Expand Up @@ -1364,7 +1364,7 @@ struct RafsConfig {
#[serde(default)]
pub latest_read_files: bool,
// ZERO value means, amplifying user io is not enabled.
#[serde(rename = "amplify_io", default = "default_batch_size")]
#[serde(rename = "amplify_io", default = "default_user_io_batch_size")]
pub user_io_batch_size: usize,
}

Expand Down Expand Up @@ -1410,8 +1410,11 @@ struct FsPrefetchControl {
#[serde(default = "default_prefetch_threads_count")]
pub threads_count: usize,

/// Window size in unit of bytes to merge request to backend.
#[serde(rename = "merging_size", default = "default_batch_size")]
/// The amplify batch size to prefetch data from backend.
#[serde(
rename = "merging_size",
default = "default_prefetch_request_batch_size"
)]
pub prefetch_request_batch_size: usize,

/// Network bandwidth limitation for prefetching.
Expand Down Expand Up @@ -1449,7 +1452,7 @@ struct BlobPrefetchConfig {
pub enable: bool,
/// Number of data prefetching working threads.
pub threads_count: usize,
/// The maximum size of a merged IO request.
/// The amplify batch size to prefetch data from backend.
#[serde(rename = "merging_size")]
pub prefetch_request_batch_size: usize,
/// Network bandwidth rate limit in unit of Bytes and Zero means no limit.
Expand Down
4 changes: 2 additions & 2 deletions service/src/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,8 @@ impl FsCacheHandler {
.prefetch_request_batch_size
.checked_next_power_of_two()
{
None => nydus_api::default_batch_size() as u64,
Some(1) => nydus_api::default_batch_size() as u64,
None => nydus_api::default_prefetch_request_batch_size() as u64,
Some(1) => nydus_api::default_prefetch_request_batch_size() as u64,
Some(s) => s as u64,
};
let size = std::cmp::max(0x4_0000u64, size);
Expand Down
24 changes: 12 additions & 12 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub(crate) struct FileCacheEntry {
pub(crate) dio_enabled: bool,
// Data from the file cache should be validated before use.
pub(crate) need_validation: bool,
pub(crate) batch_size: u64,
pub(crate) user_io_batch_size: u64,
pub(crate) prefetch_config: Arc<AsyncPrefetchConfig>,
}

Expand Down Expand Up @@ -300,19 +300,19 @@ impl FileCacheEntry {
}
}

fn prefetch_batch_size(&self) -> u64 {
fn prefetch_request_batch_size(&self) -> u64 {
if self.prefetch_config.prefetch_request_batch_size < 0x2_0000 {
0x2_0000
} else {
self.prefetch_config.prefetch_request_batch_size as u64
}
}

fn ondemand_batch_size(&self) -> u64 {
if self.batch_size < 0x2_0000 {
fn user_io_batch_size(&self) -> u64 {
if self.user_io_batch_size < 0x2_0000 {
0x2_0000
} else {
self.batch_size
self.user_io_batch_size
}
}

Expand Down Expand Up @@ -559,7 +559,7 @@ impl BlobCache for FileCacheEntry {
}

// Then handle fs prefetch
let max_comp_size = self.prefetch_batch_size();
let max_comp_size = self.prefetch_request_batch_size();
let mut bios = bios.to_vec();
bios.sort_unstable_by_key(|entry| entry.chunkinfo.compressed_offset());
self.metrics.prefetch_unmerged_chunks.add(bios.len() as u64);
Expand Down Expand Up @@ -719,7 +719,7 @@ impl BlobObject for FileCacheEntry {
let meta = self.meta.as_ref().ok_or_else(|| enoent!())?;
let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;
let mut chunks =
meta.get_chunks_compressed(offset, size, self.prefetch_batch_size(), prefetch)?;
meta.get_chunks_compressed(offset, size, self.prefetch_request_batch_size(), prefetch)?;
if !chunks.is_empty() {
if let Some(meta) = self.get_blob_meta_info()? {
chunks = self.strip_ready_chunks(meta, None, chunks);
Expand All @@ -745,7 +745,7 @@ impl BlobObject for FileCacheEntry {

let meta = self.meta.as_ref().ok_or_else(|| einval!())?;
let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;
let mut chunks = meta.get_chunks_uncompressed(offset, size, self.ondemand_batch_size())?;
let mut chunks = meta.get_chunks_uncompressed(offset, size, self.user_io_batch_size())?;
if let Some(meta) = self.get_blob_meta_info()? {
chunks = self.strip_ready_chunks(meta, None, chunks);
}
Expand All @@ -764,7 +764,7 @@ impl BlobObject for FileCacheEntry {

let chunks_extended;
let mut chunks = &range.chunks;
if let Some(v) = self.extend_pending_chunks(chunks, self.prefetch_batch_size())? {
if let Some(v) = self.extend_pending_chunks(chunks, self.prefetch_request_batch_size())? {
chunks_extended = v;
chunks = &chunks_extended;
}
Expand Down Expand Up @@ -934,7 +934,7 @@ impl FileCacheEntry {
fn read_iter(&self, bios: &mut [BlobIoDesc], buffers: &[FileVolatileSlice]) -> Result<usize> {
// Merge requests with continuous blob addresses.
let requests = self
.merge_requests_for_user(bios, self.ondemand_batch_size())
.merge_requests_for_user(bios, self.user_io_batch_size())
.ok_or_else(|| {
for bio in bios.iter() {
self.update_chunk_pending_status(&bio.chunkinfo, false);
Expand Down Expand Up @@ -1100,14 +1100,14 @@ impl FileCacheEntry {
+ region.chunks[idx].compressed_size() as u64;
let start = region.chunks[idx + 1].compressed_offset();
assert!(end <= start);
assert!(start - end <= self.ondemand_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT);
assert!(start - end <= self.user_io_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT);
assert!(region.chunks[idx].id() < region.chunks[idx + 1].id());
}
}

// Try to extend requests.
let mut region_hold;
if let Some(v) = self.extend_pending_chunks(&region.chunks, self.ondemand_batch_size())? {
if let Some(v) = self.extend_pending_chunks(&region.chunks, self.user_io_batch_size())? {
if v.len() > r.chunks.len() {
let mut tag_set = HashSet::new();
for (idx, chunk) in region.chunks.iter().enumerate() {
Expand Down
8 changes: 6 additions & 2 deletions storage/src/cache/filecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::cache::state::{
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::device::{BlobFeatures, BlobInfo};
use crate::RAFS_DEFAULT_CHUNK_SIZE;

pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw";
pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data";
Expand All @@ -46,6 +45,7 @@ pub struct FileCacheMgr {
cache_convergent_encryption: bool,
cache_encryption_key: String,
closed: Arc<AtomicBool>,
user_io_batch_size: Option<u64>,
}

impl FileCacheMgr {
Expand All @@ -55,6 +55,7 @@ impl FileCacheMgr {
backend: Arc<dyn BlobBackend>,
runtime: Arc<Runtime>,
id: &str,
user_io_batch_size: Option<u64>,
) -> Result<FileCacheMgr> {
let blob_cfg = config.get_filecache_config()?;
let work_dir = blob_cfg.get_work_dir()?;
Expand All @@ -77,6 +78,7 @@ impl FileCacheMgr {
cache_convergent_encryption: blob_cfg.enable_convergent_encryption,
cache_encryption_key: blob_cfg.encryption_key.clone(),
closed: Arc::new(AtomicBool::new(false)),
user_io_batch_size,
})
}

Expand Down Expand Up @@ -339,7 +341,9 @@ impl FileCacheEntry {
is_zran,
dio_enabled: false,
need_validation,
batch_size: RAFS_DEFAULT_CHUNK_SIZE,
// If none, use default 0 since it's not used.
// e.g., at build time.
user_io_batch_size: mgr.user_io_batch_size.unwrap_or(0),
prefetch_config,
})
}
Expand Down
8 changes: 6 additions & 2 deletions storage/src/cache/fscache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::device::{BlobFeatures, BlobInfo, BlobObject};
use crate::factory::BLOB_FACTORY;
use crate::RAFS_DEFAULT_CHUNK_SIZE;

use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX;

Expand All @@ -40,6 +39,7 @@ pub struct FsCacheMgr {
need_validation: bool,
blobs_check_count: Arc<AtomicU8>,
closed: Arc<AtomicBool>,
user_io_batch_size: Option<u64>,
}

impl FsCacheMgr {
Expand All @@ -49,6 +49,7 @@ impl FsCacheMgr {
backend: Arc<dyn BlobBackend>,
runtime: Arc<Runtime>,
id: &str,
user_io_batch_size: Option<u64>,
) -> Result<FsCacheMgr> {
if config.cache_compressed {
return Err(enosys!("fscache doesn't support compressed cache mode"));
Expand All @@ -73,6 +74,7 @@ impl FsCacheMgr {
need_validation: config.cache_validate,
blobs_check_count: Arc::new(AtomicU8::new(0)),
closed: Arc::new(AtomicBool::new(false)),
user_io_batch_size,
})
}

Expand Down Expand Up @@ -290,7 +292,9 @@ impl FileCacheEntry {
is_zran,
dio_enabled: true,
need_validation,
batch_size: RAFS_DEFAULT_CHUNK_SIZE,
// If none, use default 0 since it's not used.
// e.g., at build time.
user_io_batch_size: mgr.user_io_batch_size.unwrap_or(0),
prefetch_config,
})
}
Expand Down
2 changes: 1 addition & 1 deletion storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) struct AsyncPrefetchConfig {
pub enable: bool,
/// Number of working threads.
pub threads_count: usize,
/// Window size to merge/amplify requests.
/// The amplify batch size to prefetch data from backend.
pub prefetch_request_batch_size: usize,
/// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set.
#[allow(unused)]
Expand Down
13 changes: 12 additions & 1 deletion storage/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ impl BlobFactory {
) -> IOResult<Arc<dyn BlobCache>> {
let backend_cfg = config.get_backend_config()?;
let cache_cfg = config.get_cache_config()?;
let user_io_batch_size = match config.get_rafs_config() {
Ok(v) => Some(v.user_io_batch_size as u64),
Err(_) => None,
};
let key = BlobCacheMgrKey {
config: config.clone(),
};
Expand All @@ -128,7 +132,13 @@ impl BlobFactory {
let backend = Self::new_backend(backend_cfg, &blob_info.blob_id())?;
let mgr = match cache_cfg.cache_type.as_str() {
"blobcache" | "filecache" => {
let mgr = FileCacheMgr::new(cache_cfg, backend, ASYNC_RUNTIME.clone(), &config.id)?;
let mgr = FileCacheMgr::new(
cache_cfg,
backend,
ASYNC_RUNTIME.clone(),
&config.id,
user_io_batch_size,
)?;
mgr.init()?;
Arc::new(mgr) as Arc<dyn BlobCacheMgr>
}
Expand All @@ -139,6 +149,7 @@ impl BlobFactory {
backend,
ASYNC_RUNTIME.clone(),
&config.id,
user_io_batch_size,
)?;
mgr.init()?;
Arc::new(mgr) as Arc<dyn BlobCacheMgr>
Expand Down

0 comments on commit 02c2c25

Please sign in to comment.