diff --git a/src/bin/nydusd/fs_cache.rs b/src/bin/nydusd/fs_cache.rs index af32e097cef..7f82768da64 100644 --- a/src/bin/nydusd/fs_cache.rs +++ b/src/bin/nydusd/fs_cache.rs @@ -223,6 +223,7 @@ struct FsCacheState { pub struct FsCacheHandler { active: AtomicBool, barrier: Barrier, + threads: usize, file: File, state: Arc>, poller: Mutex, @@ -236,6 +237,7 @@ impl FsCacheHandler { dir: &str, tag: Option<&str>, blob_cache_mgr: Arc, + threads: usize, ) -> Result { info!( "fscache: create FsCacheHandler with dir {}, tag {}", @@ -279,7 +281,8 @@ impl FsCacheHandler { Ok(FsCacheHandler { active: AtomicBool::new(true), - barrier: Barrier::new(2), + barrier: Barrier::new(threads + 1), + threads, file, state: Arc::new(Mutex::new(state)), poller: Mutex::new(poller), @@ -287,6 +290,11 @@ impl FsCacheHandler { }) } + /// Get number of working threads to service fscache requests. + pub fn working_threads(&self) -> usize { + self.threads + } + /// Stop worker threads for the fscache service. pub fn stop(&self) { self.active.store(false, Ordering::Release); @@ -327,6 +335,8 @@ impl FsCacheHandler { && event.token() == Token(TOKEN_EVENT_WAKER) && !self.active.load(Ordering::Acquire) { + // Notify next worker to exit. + let _ = self.waker.wake(); self.barrier.wait(); return Ok(()); } diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index 6e7d62bfbf1..9d04881f80b 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -343,6 +343,26 @@ fn append_services_subcmd_options(app: App<'static, 'static>) -> App<'static, 's .help("Tag to identify the fscache daemon instance") .takes_value(true) .requires("fscache"), + ) + .arg( + Arg::with_name("fscache-threads") + .long("fscache-threads") + .default_value("1") + .help("Number of working threads to serve fscache requests") + .takes_value(true) + .required(false) + .validator(|v| { + if let Ok(t) = v.parse::() { + if t > 0 && t <= 1024 { + Ok(()) + } else { + Err("Invalid working thread number {}, valid values: [1-1024]" + .to_string()) + } + } else { + Err("Input thread number is invalid".to_string()) + } + }), ); app.subcommand(subcmd) diff --git a/src/bin/nydusd/service_controller.rs b/src/bin/nydusd/service_controller.rs index 05ff949357b..9aa4a1ccc31 100644 --- a/src/bin/nydusd/service_controller.rs +++ b/src/bin/nydusd/service_controller.rs @@ -43,15 +43,18 @@ impl ServiceController { #[cfg(target_os = "linux")] if self.fscache_enabled.load(Ordering::Acquire) { if let Some(fscache) = self.fscache.lock().unwrap().clone() { - std::thread::spawn(move || { - if let Err(e) = fscache.run_loop() { - error!("Failed to run fscache service loop, {}", e); - } - // Notify the global service controller that one working thread is exiting. - if let Err(e) = crate::DAEMON_CONTROLLER.waker.wake() { - error!("Failed to notify the global service controller, {}", e); - } - }); + for _ in 0..fscache.working_threads() { + let fscache2 = fscache.clone(); + std::thread::spawn(move || { + if let Err(e) = fscache2.run_loop() { + error!("Failed to run fscache service loop, {}", e); + } + // Notify the global service controller that one working thread is exiting. + if let Err(e) = crate::DAEMON_CONTROLLER.waker.wake() { + error!("Failed to notify the global service controller, {}", e); + } + }); + } } } @@ -117,16 +120,33 @@ impl ServiceController { }; let tag = subargs.value_of("fscache-tag"); + let mut threads = 1usize; + if let Some(threads_value) = subargs.value_of("fscache-threads") { + if let Ok(t) = threads_value.parse::() { + if t > 0 && t <= 1024 { + threads = t as usize; + } else { + return Err(einval!( + "Invalid working thread number {}, valid values: [1-1024]" + )); + } + } else { + return Err(einval!("Input thread number is invalid".to_string())); + } + } + info!( - "Create fscache instance at {} with tag {}", + "Create fscache instance at {} with tag {}, {} working threads", p, - tag.unwrap_or("") + tag.unwrap_or(""), + threads ); let fscache = crate::fs_cache::FsCacheHandler::new( "/dev/cachefiles", p, tag, self.blob_cache_mgr.clone(), + threads, )?; *self.fscache.lock().unwrap() = Some(Arc::new(fscache)); self.fscache_enabled.store(true, Ordering::Release);