From fbe54a24677b0c6881d1a97f8b42eb57596e9e9f Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Thu, 2 Feb 2023 02:31:33 +0800 Subject: [PATCH] storage: use global thread pool to support prefetch Currently we use a prefetch manager per backend, this design has two drawbacks: - spawn too many worker threads for prefetch - the network ratelimit has been defunct So change the design to use a global prefetch manager with thread pool to supporrt all prefetch requests. Signed-off-by: Jiang Liu --- storage/src/cache/cachedfile.rs | 20 +- storage/src/cache/filecache/mod.rs | 17 +- storage/src/cache/fscache/mod.rs | 18 +- storage/src/cache/worker.rs | 413 +++++++++++++++-------------- 4 files changed, 243 insertions(+), 225 deletions(-) diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index be59aea9e65..6afc65481ac 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -27,7 +27,7 @@ use tokio::runtime::Runtime; use crate::backend::BlobReader; use crate::cache::state::ChunkMap; -use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr}; +use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, PrefetchMgr}; use crate::cache::{BlobCache, BlobIoMergeState}; use crate::device::{ BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoSegment, BlobIoTag, BlobIoVec, @@ -134,9 +134,9 @@ pub(crate) struct FileCacheEntry { pub(crate) meta: Option, pub(crate) metrics: Arc, pub(crate) prefetch_state: Arc, + pub(crate) prefetch_mgr: Arc, pub(crate) reader: Arc, pub(crate) runtime: Arc, - pub(crate) workers: Arc, pub(crate) blob_compressed_size: u64, pub(crate) blob_uncompressed_size: u64, @@ -458,7 +458,8 @@ impl BlobCache for FileCacheEntry { warn!("storage: inaccurate prefetch status"); } if val == 0 || val == 1 { - self.workers.flush_pending_prefetch_requests(&self.blob_id); + self.prefetch_mgr + .flush_pending_prefetch_requests(&self.blob_id); return Ok(()); } } @@ -477,11 +478,12 @@ impl BlobCache for FileCacheEntry { // Handle blob prefetch request first, it may help performance. for req in prefetches { let msg = AsyncPrefetchMessage::new_blob_prefetch( + self.prefetch_mgr.clone(), blob_cache.clone(), req.offset as u64, req.len as u64, ); - let _ = self.workers.send_prefetch_message(msg); + let _ = self.prefetch_mgr.send_prefetch_message(msg); } // Then handle fs prefetch @@ -494,8 +496,12 @@ impl BlobCache for FileCacheEntry { max_comp_size, max_comp_size as u64 >> RAFS_BATCH_SIZE_TO_GAP_SHIFT, |req: BlobIoRange| { - let msg = AsyncPrefetchMessage::new_fs_prefetch(blob_cache.clone(), req); - let _ = self.workers.send_prefetch_message(msg); + let msg = AsyncPrefetchMessage::new_fs_prefetch( + self.prefetch_mgr.clone(), + blob_cache.clone(), + req, + ); + let _ = self.prefetch_mgr.send_prefetch_message(msg); }, ); @@ -593,7 +599,7 @@ impl BlobCache for FileCacheEntry { fn read(&self, iovec: &mut BlobIoVec, buffers: &[FileVolatileSlice]) -> Result { self.metrics.total.inc(); - self.workers.consume_prefetch_budget(iovec.size()); + self.prefetch_mgr.consume_prefetch_budget(iovec.size()); if iovec.is_empty() { Ok(0) diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index ce4a60c358e..50dd46ddfe7 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -19,7 +19,7 @@ use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; use crate::cache::state::{ BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap, }; -use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; +use crate::cache::worker::{AsyncPrefetchConfig, PrefetchMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo}; use crate::RAFS_DEFAULT_CHUNK_SIZE; @@ -32,8 +32,8 @@ pub struct FileCacheMgr { backend: Arc, metrics: Arc, prefetch_config: Arc, + prefetch_mgr: Arc, runtime: Arc, - worker_mgr: Arc, work_dir: String, validate: bool, disable_indexed_map: bool, @@ -53,7 +53,7 @@ impl FileCacheMgr { let work_dir = blob_cfg.get_work_dir()?; let metrics = BlobcacheMetrics::new(id, work_dir); let prefetch_config: Arc = Arc::new((&config.prefetch).into()); - let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?; + let worker_mgr = PrefetchMgr::new(metrics.clone(), prefetch_config.clone())?; Ok(FileCacheMgr { blobs: Arc::new(RwLock::new(HashMap::new())), @@ -61,7 +61,7 @@ impl FileCacheMgr { metrics, prefetch_config, runtime, - worker_mgr: Arc::new(worker_mgr), + prefetch_mgr: Arc::new(worker_mgr), work_dir: work_dir.to_owned(), disable_indexed_map: blob_cfg.disable_indexed_map, validate: config.cache_validate, @@ -87,7 +87,7 @@ impl FileCacheMgr { blob.clone(), self.prefetch_config.clone(), self.runtime.clone(), - self.worker_mgr.clone(), + self.prefetch_mgr.clone(), )?; let entry = Arc::new(entry); let mut guard = self.blobs.write().unwrap(); @@ -108,13 +108,12 @@ impl FileCacheMgr { impl BlobCacheMgr for FileCacheMgr { fn init(&self) -> Result<()> { - AsyncWorkerMgr::start(self.worker_mgr.clone()) + self.prefetch_mgr.setup() } fn destroy(&self) { if !self.closed.load(Ordering::Acquire) { self.closed.store(true, Ordering::Release); - self.worker_mgr.stop(); self.backend().shutdown(); self.metrics.release().unwrap_or_else(|e| error!("{:?}", e)); } @@ -170,7 +169,7 @@ impl FileCacheEntry { blob_info: Arc, prefetch_config: Arc, runtime: Arc, - workers: Arc, + prefetch_mgr: Arc, ) -> Result { let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE); let is_tarfs = blob_info.features().is_tarfs(); @@ -296,7 +295,7 @@ impl FileCacheEntry { prefetch_state: Arc::new(AtomicU32::new(0)), reader, runtime, - workers, + prefetch_mgr, blob_compressed_size, blob_uncompressed_size, diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index f8cca7ed108..d64da7ef233 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -16,7 +16,7 @@ use tokio::runtime::Runtime; use crate::backend::BlobBackend; use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap}; -use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; +use crate::cache::worker::{AsyncPrefetchConfig, PrefetchMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; use crate::factory::BLOB_FACTORY; @@ -32,8 +32,8 @@ pub struct FsCacheMgr { backend: Arc, metrics: Arc, prefetch_config: Arc, + prefetch_mgr: Arc, runtime: Arc, - worker_mgr: Arc, work_dir: String, need_validation: bool, blobs_check_count: Arc, @@ -56,7 +56,7 @@ impl FsCacheMgr { let work_dir = blob_cfg.get_work_dir()?; let metrics = BlobcacheMetrics::new(id, work_dir); let prefetch_config: Arc = Arc::new((&config.prefetch).into()); - let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?; + let worker_mgr = PrefetchMgr::new(metrics.clone(), prefetch_config.clone())?; BLOB_FACTORY.start_mgr_checker(); @@ -66,7 +66,7 @@ impl FsCacheMgr { metrics, prefetch_config, runtime, - worker_mgr: Arc::new(worker_mgr), + prefetch_mgr: Arc::new(worker_mgr), work_dir: work_dir.to_owned(), need_validation: config.cache_validate, blobs_check_count: Arc::new(AtomicU8::new(0)), @@ -91,7 +91,7 @@ impl FsCacheMgr { blob.clone(), self.prefetch_config.clone(), self.runtime.clone(), - self.worker_mgr.clone(), + self.prefetch_mgr.clone(), )?; let entry = Arc::new(entry); let mut guard = self.blobs.write().unwrap(); @@ -112,13 +112,12 @@ impl FsCacheMgr { impl BlobCacheMgr for FsCacheMgr { fn init(&self) -> Result<()> { - AsyncWorkerMgr::start(self.worker_mgr.clone()) + self.prefetch_mgr.setup() } fn destroy(&self) { if !self.closed.load(Ordering::Acquire) { self.closed.store(true, Ordering::Release); - self.worker_mgr.stop(); self.backend().shutdown(); self.metrics.release().unwrap_or_else(|e| error!("{:?}", e)); } @@ -173,7 +172,6 @@ impl BlobCacheMgr for FsCacheMgr { // we should double check blobs stat, in case some blobs hadn't been created when we checked. if all_ready { if self.blobs_check_count.load(Ordering::Acquire) == FSCACHE_BLOBS_CHECK_NUM { - self.worker_mgr.stop(); self.metrics.data_all_ready.store(true, Ordering::Release); } else { self.blobs_check_count.fetch_add(1, Ordering::Acquire); @@ -196,7 +194,7 @@ impl FileCacheEntry { blob_info: Arc, prefetch_config: Arc, runtime: Arc, - workers: Arc, + prefetch_mgr: Arc, ) -> Result { if blob_info.has_feature(BlobFeatures::_V5_NO_EXT_BLOB_TABLE) { return Err(einval!("fscache does not support Rafs v5 blobs")); @@ -269,7 +267,7 @@ impl FileCacheEntry { prefetch_state: Arc::new(AtomicU32::new(0)), reader, runtime, - workers, + prefetch_mgr, blob_compressed_size, blob_uncompressed_size: blob_info.uncompressed_size(), diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 3ee45b44ab2..946c3e11e90 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -4,8 +4,9 @@ // SPDX-License-Identifier: Apache-2.0 use std::io::Result; -use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Once}; +use std::ops::Deref; +use std::sync::atomic::{AtomicI32, AtomicU32, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Once}; use std::thread; use std::time::{Duration, SystemTime}; @@ -21,6 +22,8 @@ use crate::cache::{BlobCache, BlobIoRange}; use crate::factory::ASYNC_RUNTIME; use crate::RAFS_MAX_CHUNK_SIZE; +static ASYNC_WORKER_MGR: Mutex>> = Mutex::new(None); + /// Configuration information for asynchronous workers. pub(crate) struct AsyncPrefetchConfig { /// Whether or not to enable prefetch. @@ -47,9 +50,14 @@ impl From<&PrefetchConfigV2> for AsyncPrefetchConfig { /// Asynchronous service request message. pub(crate) enum AsyncPrefetchMessage { /// Asynchronous blob layer prefetch request with (offset, size) of blob on storage backend. - BlobPrefetch(Arc, u64, u64, SystemTime), + BlobPrefetch(Arc, Arc, u64, u64, SystemTime), /// Asynchronous file-system layer prefetch request. - FsPrefetch(Arc, BlobIoRange, SystemTime), + FsPrefetch( + Arc, + Arc, + BlobIoRange, + SystemTime, + ), #[cfg_attr(not(test), allow(unused))] /// Ping for test. Ping, @@ -59,49 +67,47 @@ pub(crate) enum AsyncPrefetchMessage { impl AsyncPrefetchMessage { /// Create a new asynchronous filesystem prefetch request message. - pub fn new_fs_prefetch(blob_cache: Arc, req: BlobIoRange) -> Self { - AsyncPrefetchMessage::FsPrefetch(blob_cache, req, SystemTime::now()) + pub fn new_fs_prefetch( + mgr: Arc, + blob_cache: Arc, + req: BlobIoRange, + ) -> Self { + AsyncPrefetchMessage::FsPrefetch(mgr, blob_cache, req, SystemTime::now()) } /// Create a new asynchronous blob prefetch request message. - pub fn new_blob_prefetch(blob_cache: Arc, offset: u64, size: u64) -> Self { - AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size, SystemTime::now()) + pub fn new_blob_prefetch( + mgr: Arc, + blob_cache: Arc, + offset: u64, + size: u64, + ) -> Self { + AsyncPrefetchMessage::BlobPrefetch(mgr, blob_cache, offset, size, SystemTime::now()) } } -/// An asynchronous task manager for data prefetching -pub(crate) struct AsyncWorkerMgr { - metrics: Arc, - ping_requests: AtomicU32, +struct AsyncWorkerMgr { workers: AtomicU32, - active: AtomicBool, - begin_timing_once: Once, - - // Limit the total retry times to avoid unnecessary resource consumption. - retry_times: AtomicI32, + ping_requests: AtomicU32, - prefetch_sema: Arc, - prefetch_channel: Arc>, prefetch_config: Arc, prefetch_delayed: AtomicU64, prefetch_inflight: AtomicU32, + prefetch_sema: Arc, + prefetch_channel: Arc>, prefetch_consumed: AtomicUsize, prefetch_limiter: Option>, } impl AsyncWorkerMgr { - /// Create a new instance of `AsyncWorkerMgr`. - pub fn new( - metrics: Arc, - prefetch_config: Arc, - ) -> Result { + fn new(prefetch_config: Arc) -> Self { let prefetch_limiter = match prefetch_config.bandwidth_rate { 0 => None, v => { // If the given value is less than maximum blob chunk size, it exceeds burst size of the // limiter ending up with throttling all throughput, so ensure bandwidth is bigger than // the maximum chunk size. - let limit = std::cmp::max(RAFS_MAX_CHUNK_SIZE as usize, v as usize); + let limit = std::cmp::max(RAFS_MAX_CHUNK_SIZE as usize * 2, v as usize); let limiter = RateLimiter::builder() .initial(limit) .refill(limit / 10) @@ -111,14 +117,9 @@ impl AsyncWorkerMgr { } }; - Ok(AsyncWorkerMgr { - metrics, - ping_requests: AtomicU32::new(0), + AsyncWorkerMgr { workers: AtomicU32::new(0), - active: AtomicBool::new(false), - begin_timing_once: Once::new(), - - retry_times: AtomicI32::new(32), + ping_requests: AtomicU32::new(0), prefetch_sema: Arc::new(Semaphore::new(0)), prefetch_channel: Arc::new(Channel::new()), @@ -127,27 +128,38 @@ impl AsyncWorkerMgr { prefetch_inflight: AtomicU32::new(0), prefetch_consumed: AtomicUsize::new(0), prefetch_limiter, - }) + } } - /// Create working threads and start the event loop. - pub fn start(mgr: Arc) -> Result<()> { - if mgr.prefetch_config.enable { - Self::start_prefetch_workers(mgr)?; - } + fn start(self: Arc) -> Result<()> { + // Hold the request queue to barrier all working threads. + let guard = self.prefetch_channel.lock_channel(); + for num in 0..self.prefetch_config.threads_count { + let mgr2 = self.clone(); + let res = thread::Builder::new() + .name(format!("nydus_storage_worker_{}", num)) + .spawn(move || { + mgr2.grow_n(1); + with_runtime(|rt| { + rt.block_on(Self::handle_prefetch_requests(mgr2.clone(), rt)); + }); + mgr2.shrink_n(1); + info!("storage: worker thread {} exits.", num) + }); + if let Err(e) = res { + error!("storage: failed to create worker thread, {:?}", e); + self.prefetch_channel.close(); + drop(guard); + self.stop(); + return Err(e); + } + } Ok(()) } /// Stop all working threads. - pub fn stop(&self) { - if self - .active - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed) - .is_err() - { - return; - } + fn stop(&self) { self.prefetch_channel.close(); while self.workers.load(Ordering::Relaxed) > 0 { @@ -156,138 +168,75 @@ impl AsyncWorkerMgr { } } - /// Send an asynchronous service request message to the workers. - pub fn send_prefetch_message( + fn send_prefetch_message( &self, msg: AsyncPrefetchMessage, ) -> std::result::Result<(), AsyncPrefetchMessage> { - if !self.prefetch_config.enable { - Err(msg) - } else { - self.prefetch_inflight.fetch_add(1, Ordering::Relaxed); - self.prefetch_channel.send(msg) - } + self.prefetch_inflight.fetch_add(1, Ordering::Relaxed); + self.prefetch_channel.send(msg) } - /// Flush pending prefetch requests associated with `blob_id`. - pub fn flush_pending_prefetch_requests(&self, blob_id: &str) { + fn flush_pending_prefetch_requests(&self, blob_id: &str) { self.prefetch_channel .flush_pending_prefetch_requests(|t| match t { - AsyncPrefetchMessage::BlobPrefetch(blob, _, _, _) => { + AsyncPrefetchMessage::BlobPrefetch(_, blob, _, _, _) => { blob_id == blob.blob_id() && !blob.is_prefetch_active() } - AsyncPrefetchMessage::FsPrefetch(blob, _, _) => { + AsyncPrefetchMessage::FsPrefetch(_, blob, _, _) => { blob_id == blob.blob_id() && !blob.is_prefetch_active() } _ => false, }); } - /// Consume network bandwidth budget for prefetching. - pub fn consume_prefetch_budget(&self, size: u32) { + fn consume_prefetch_budget(&self, size: u32) { if self.prefetch_inflight.load(Ordering::Relaxed) > 0 { self.prefetch_consumed .fetch_add(size as usize, Ordering::AcqRel); } } - fn start_prefetch_workers(mgr: Arc) -> Result<()> { - // Hold the request queue to barrier all working threads. - let guard = mgr.prefetch_channel.lock_channel(); - for num in 0..mgr.prefetch_config.threads_count { - let mgr2 = mgr.clone(); - let res = thread::Builder::new() - .name(format!("nydus_storage_worker_{}", num)) - .spawn(move || { - mgr2.grow_n(1); - mgr2.metrics - .prefetch_workers - .fetch_add(1, Ordering::Relaxed); - - with_runtime(|rt| { - rt.block_on(Self::handle_prefetch_requests(mgr2.clone(), rt)); - }); - - mgr2.metrics - .prefetch_workers - .fetch_sub(1, Ordering::Relaxed); - mgr2.shrink_n(1); - info!("storage: worker thread {} exits.", num) - }); - - if let Err(e) = res { - error!("storage: failed to create worker thread, {:?}", e); - mgr.prefetch_channel.close(); - drop(guard); - mgr.stop(); - return Err(e); - } - } - mgr.active.store(true, Ordering::Release); - Ok(()) - } - - async fn handle_prefetch_requests(mgr: Arc, rt: &Runtime) { - mgr.begin_timing_once.call_once(|| { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - mgr.metrics.prefetch_begin_time_secs.set(now.as_secs()); - mgr.metrics - .prefetch_begin_time_millis - .set(now.subsec_millis() as u64); - }); - + async fn handle_prefetch_requests(self: Arc, rt: &Runtime) { // Max 1 active requests per thread. - mgr.prefetch_sema.add_permits(1); + self.prefetch_sema.add_permits(1); - while let Ok(msg) = mgr.prefetch_channel.recv().await { - mgr.handle_prefetch_rate_limit(&msg).await; - let mgr2 = mgr.clone(); + while let Ok(msg) = self.prefetch_channel.recv().await { + self.handle_prefetch_rate_limit(&msg).await; match msg { - AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size, begin_time) => { - let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone()) + AsyncPrefetchMessage::BlobPrefetch(mgr, blob_cache, offset, size, begin_time) => { + let token = Semaphore::acquire_owned(self.prefetch_sema.clone()) .await .unwrap(); if blob_cache.is_prefetch_active() { rt.spawn_blocking(move || { let _ = Self::handle_blob_prefetch_request( - mgr2.clone(), - blob_cache, - offset, - size, - begin_time, + mgr, blob_cache, offset, size, begin_time, ); drop(token); }); } } - AsyncPrefetchMessage::FsPrefetch(blob_cache, req, begin_time) => { - let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone()) + AsyncPrefetchMessage::FsPrefetch(mgr, blob_cache, req, begin_time) => { + let token = Semaphore::acquire_owned(self.prefetch_sema.clone()) .await .unwrap(); if blob_cache.is_prefetch_active() { rt.spawn_blocking(move || { - let _ = Self::handle_fs_prefetch_request( - mgr2.clone(), - blob_cache, - req, - begin_time, - ); + let _ = + Self::handle_fs_prefetch_request(mgr, blob_cache, req, begin_time); drop(token) }); } } AsyncPrefetchMessage::Ping => { - let _ = mgr.ping_requests.fetch_add(1, Ordering::Relaxed); + let _ = self.ping_requests.fetch_add(1, Ordering::Relaxed); } AsyncPrefetchMessage::RateLimiter(_size) => {} } - mgr.prefetch_inflight.fetch_sub(1, Ordering::Relaxed); + self.prefetch_inflight.fetch_sub(1, Ordering::Relaxed); } } @@ -295,14 +244,14 @@ impl AsyncWorkerMgr { // Allocate network bandwidth budget if let Some(limiter) = &self.prefetch_limiter { let size = match msg { - AsyncPrefetchMessage::BlobPrefetch(blob_cache, _offset, size, _) => { + AsyncPrefetchMessage::BlobPrefetch(_, blob_cache, _offset, size, _) => { if blob_cache.is_prefetch_active() { *size } else { 0 } } - AsyncPrefetchMessage::FsPrefetch(blob_cache, req, _) => { + AsyncPrefetchMessage::FsPrefetch(_, blob_cache, req, _) => { if blob_cache.is_prefetch_active() { req.blob_size } else { @@ -328,7 +277,7 @@ impl AsyncWorkerMgr { } fn handle_blob_prefetch_request( - mgr: Arc, + mgr: Arc, cache: Arc, offset: u64, size: u64, @@ -344,32 +293,38 @@ impl AsyncWorkerMgr { return Ok(()); } - // Record how much prefetch data is requested from storage backend. - // So the average backend merged request size will be prefetch_data_amount/prefetch_requests_count. - // We can measure merging possibility by this. - let metrics = mgr.metrics.clone(); - metrics.prefetch_requests_count.inc(); - metrics.prefetch_data_amount.add(size); - if let Some(obj) = cache.get_blob_object() { - if let Err(_e) = obj.fetch_range_compressed(offset, size, true) { + if let Err(e) = obj.fetch_range_compressed(offset, size, true) { if mgr.retry_times.load(Ordering::Relaxed) > 0 { mgr.retry_times.fetch_sub(1, Ordering::Relaxed); ASYNC_RUNTIME.spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; - let msg = - AsyncPrefetchMessage::new_blob_prefetch(cache.clone(), offset, size); + let msg = AsyncPrefetchMessage::new_blob_prefetch( + mgr.clone(), + cache.clone(), + offset, + size, + ); let _ = mgr.send_prefetch_message(msg); }); + Ok(()) + } else { + warn!("storage: failed to prefetch data from blob {}, offset {}, size {}, {}, will try resend", + cache.blob_id(), offset, size, e); + Err(e) } + } else { + // Record how much prefetch data is requested from storage backend. + // So the average backend merged request size will be prefetch_data_amount/prefetch_requests_count. + // We can measure merging possibility by this. + mgr.metrics.prefetch_requests_count.inc(); + mgr.metrics.prefetch_data_amount.add(size); + mgr.metrics.calculate_prefetch_metrics(begin_time); + Ok(()) } } else { - warn!("prefetch blob range is not supported"); + Err(eother!("prefetch blob range is not supported")) } - - metrics.calculate_prefetch_metrics(begin_time); - - Ok(()) } // TODO: Nydus plans to switch backend storage IO stack to full asynchronous mode. @@ -378,7 +333,7 @@ impl AsyncWorkerMgr { // threads always panic in debug program profile. We can achieve the goal when // backend/registry also switches to async IO. fn handle_fs_prefetch_request( - mgr: Arc, + mgr: Arc, cache: Arc, req: BlobIoRange, begin_time: SystemTime, @@ -395,18 +350,17 @@ impl AsyncWorkerMgr { return Ok(()); } - // Record how much prefetch data is requested from storage backend. - // So the average backend merged request size will be prefetch_data_amount/prefetch_requests_count. - // We can measure merging possibility by this. - mgr.metrics.prefetch_requests_count.inc(); - mgr.metrics.prefetch_data_amount.add(blob_size); - if let Some(obj) = cache.get_blob_object() { obj.prefetch_chunks(&req)?; } else { cache.prefetch_range(&req)?; } + // Record how much prefetch data is requested from storage backend. + // So the average backend merged request size will be prefetch_data_amount/prefetch_requests_count. + // We can measure merging possibility by this. + mgr.metrics.prefetch_requests_count.inc(); + mgr.metrics.prefetch_data_amount.add(blob_size); mgr.metrics.calculate_prefetch_metrics(begin_time); Ok(()) @@ -421,25 +375,108 @@ impl AsyncWorkerMgr { } } +/// An asynchronous task manager for data prefetching +pub(crate) struct PrefetchMgr { + enabled: bool, + metrics: Arc, + begin_timing_once: Once, + retry_times: AtomicI32, + worker_mgr: Arc, +} + +impl PrefetchMgr { + /// Create a new instance of `AsyncWorkerMgr`. + pub fn new( + metrics: Arc, + prefetch_config: Arc, + ) -> Result { + let enabled = prefetch_config.enable; + let mut guard = ASYNC_WORKER_MGR.lock().unwrap(); + let worker_mgr = match guard.deref() { + Some(v) => v.clone(), + None => { + let mgr = Arc::new(AsyncWorkerMgr::new(prefetch_config)); + mgr.clone().start()?; + *guard = Some(mgr.clone()); + mgr + } + }; + + Ok(PrefetchMgr { + enabled, + metrics, + begin_timing_once: Once::new(), + retry_times: AtomicI32::new(32), + worker_mgr, + }) + } + + /// Create working threads and start the event loop. + pub fn setup(&self) -> Result<()> { + self.begin_timing_once.call_once(|| { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + self.metrics.prefetch_begin_time_secs.set(now.as_secs()); + self.metrics + .prefetch_begin_time_millis + .set(now.subsec_millis() as u64); + self.metrics.prefetch_workers.store( + self.worker_mgr.workers.load(Ordering::Relaxed) as usize, + Ordering::Relaxed, + ); + }); + + Ok(()) + } + + /// Send an asynchronous service request message to the workers. + pub fn send_prefetch_message( + &self, + msg: AsyncPrefetchMessage, + ) -> std::result::Result<(), AsyncPrefetchMessage> { + if self.enabled { + self.worker_mgr.send_prefetch_message(msg) + } else { + Ok(()) + } + } + + /// Flush pending prefetch requests associated with `blob_id`. + pub fn flush_pending_prefetch_requests(&self, blob_id: &str) { + if self.enabled { + self.worker_mgr.flush_pending_prefetch_requests(blob_id); + } + } + + /// Consume network bandwidth budget for prefetching. + pub fn consume_prefetch_budget(&self, size: u32) { + self.worker_mgr.consume_prefetch_budget(size); + } +} + #[cfg(test)] mod tests { use super::*; use vmm_sys_util::tempdir::TempDir; #[test] - fn test_worker_mgr_new() { + fn test_worker_mgr_rate_limiter() { let tmpdir = TempDir::new().unwrap(); let metrics = BlobcacheMetrics::new("test1", tmpdir.as_path().to_str().unwrap()); let config = Arc::new(AsyncPrefetchConfig { enable: true, - threads_count: 2, - merging_size: 0x100000, - bandwidth_rate: 0x100000, + threads_count: 4, + merging_size: 0x1000000, + bandwidth_rate: 0x1000000, }); - let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap()); - AsyncWorkerMgr::start(mgr.clone()).unwrap(); - assert_eq!(mgr.ping_requests.load(Ordering::Acquire), 0); + let mgr = Arc::new(PrefetchMgr::new(metrics, config).unwrap()); + mgr.setup().unwrap(); + thread::sleep(Duration::from_secs(1)); + assert_eq!(mgr.worker_mgr.workers.load(Ordering::Acquire), 4); + + assert_eq!(mgr.worker_mgr.ping_requests.load(Ordering::Acquire), 0); assert!(mgr .send_prefetch_message(AsyncPrefetchMessage::Ping) .is_ok()); @@ -456,33 +493,10 @@ mod tests { .send_prefetch_message(AsyncPrefetchMessage::Ping) .is_ok()); thread::sleep(Duration::from_secs(1)); - assert_eq!(mgr.ping_requests.load(Ordering::Acquire), 5); - assert_eq!(mgr.workers.load(Ordering::Acquire), 2); - mgr.stop(); - assert_eq!(mgr.workers.load(Ordering::Acquire), 0); - assert!(mgr - .send_prefetch_message(AsyncPrefetchMessage::Ping) - .is_err()); - } - - #[test] - fn test_worker_mgr_rate_limiter() { - let tmpdir = TempDir::new().unwrap(); - let metrics = BlobcacheMetrics::new("test1", tmpdir.as_path().to_str().unwrap()); - let config = Arc::new(AsyncPrefetchConfig { - enable: true, - threads_count: 4, - merging_size: 0x1000000, - bandwidth_rate: 0x1000000, - }); - - let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap()); - AsyncWorkerMgr::start(mgr.clone()).unwrap(); - - assert_eq!(mgr.prefetch_delayed.load(Ordering::Acquire), 0); - assert_eq!(mgr.prefetch_inflight.load(Ordering::Acquire), 0); + assert_eq!(mgr.worker_mgr.ping_requests.load(Ordering::Acquire), 5); - thread::sleep(Duration::from_secs(1)); + assert_eq!(mgr.worker_mgr.prefetch_delayed.load(Ordering::Acquire), 0); + assert_eq!(mgr.worker_mgr.prefetch_inflight.load(Ordering::Acquire), 0); assert!(mgr .send_prefetch_message(AsyncPrefetchMessage::RateLimiter(1)) .is_ok()); @@ -490,27 +504,28 @@ mod tests { .send_prefetch_message(AsyncPrefetchMessage::RateLimiter(1)) .is_ok()); thread::sleep(Duration::from_secs(1)); - assert_eq!(mgr.prefetch_delayed.load(Ordering::Acquire), 0); - assert_eq!(mgr.prefetch_inflight.load(Ordering::Acquire), 0); + assert_eq!(mgr.worker_mgr.prefetch_delayed.load(Ordering::Acquire), 0); + assert_eq!(mgr.worker_mgr.prefetch_inflight.load(Ordering::Acquire), 0); assert!(mgr - .send_prefetch_message(AsyncPrefetchMessage::RateLimiter(0x1000000)) + .send_prefetch_message(AsyncPrefetchMessage::RateLimiter(0x300_0000)) .is_ok()); assert!(mgr - .send_prefetch_message(AsyncPrefetchMessage::RateLimiter(0x1000000)) + .send_prefetch_message(AsyncPrefetchMessage::RateLimiter(0x100_0000)) .is_ok()); assert!(mgr .send_prefetch_message(AsyncPrefetchMessage::RateLimiter(u64::MAX)) .is_ok()); - assert_eq!(mgr.prefetch_inflight.load(Ordering::Acquire), 3); - thread::sleep(Duration::from_secs(1)); - assert!(mgr.prefetch_inflight.load(Ordering::Acquire) <= 2); - assert!(mgr.prefetch_inflight.load(Ordering::Acquire) >= 1); + assert!(mgr.worker_mgr.prefetch_inflight.load(Ordering::Acquire) >= 2); + thread::sleep(Duration::from_secs(2)); + assert!(mgr.worker_mgr.prefetch_inflight.load(Ordering::Acquire) <= 2); + assert!(mgr.worker_mgr.prefetch_inflight.load(Ordering::Acquire) >= 1); thread::sleep(Duration::from_secs(3)); - assert!(mgr.prefetch_inflight.load(Ordering::Acquire) >= 1); - assert!(mgr.prefetch_delayed.load(Ordering::Acquire) >= 1); + assert!(mgr.worker_mgr.prefetch_inflight.load(Ordering::Acquire) >= 1); + assert!(mgr.worker_mgr.prefetch_delayed.load(Ordering::Acquire) >= 1); - mgr.stop(); - assert_eq!(mgr.workers.load(Ordering::Acquire), 0); + assert_eq!(mgr.worker_mgr.workers.load(Ordering::Acquire), 4); + mgr.worker_mgr.stop(); + assert_eq!(mgr.worker_mgr.workers.load(Ordering::Acquire), 0); } }