Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fscache: enable multi-threading to process fscache requests #536

Merged
merged 1 commit into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ struct FsCacheState {
pub struct FsCacheHandler {
active: AtomicBool,
barrier: Barrier,
threads: usize,
file: File,
state: Arc<Mutex<FsCacheState>>,
poller: Mutex<Poll>,
Expand All @@ -236,6 +237,7 @@ impl FsCacheHandler {
dir: &str,
tag: Option<&str>,
blob_cache_mgr: Arc<BlobCacheMgr>,
threads: usize,
) -> Result<Self> {
info!(
"fscache: create FsCacheHandler with dir {}, tag {}",
Expand Down Expand Up @@ -279,14 +281,20 @@ impl FsCacheHandler {

Ok(FsCacheHandler {
active: AtomicBool::new(true),
barrier: Barrier::new(2),
barrier: Barrier::new(threads + 1),
threads,
file,
state: Arc::new(Mutex::new(state)),
poller: Mutex::new(poller),
waker: Arc::new(waker),
})
}

/// Get number of working threads to service fscache requests.
pub fn working_threads(&self) -> usize {
self.threads
}

/// Stop worker threads for the fscache service.
pub fn stop(&self) {
self.active.store(false, Ordering::Release);
Expand Down Expand Up @@ -327,6 +335,8 @@ impl FsCacheHandler {
&& event.token() == Token(TOKEN_EVENT_WAKER)
&& !self.active.load(Ordering::Acquire)
{
// Notify next worker to exit.
let _ = self.waker.wake();
self.barrier.wait();
return Ok(());
}
Expand Down
20 changes: 20 additions & 0 deletions src/bin/nydusd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,26 @@ fn append_services_subcmd_options(app: App<'static, 'static>) -> App<'static, 's
.help("Tag to identify the fscache daemon instance")
.takes_value(true)
.requires("fscache"),
)
.arg(
Arg::with_name("fscache-threads")
.long("fscache-threads")
.default_value("1")
imeoer marked this conversation as resolved.
Show resolved Hide resolved
.help("Number of working threads to serve fscache requests")
.takes_value(true)
.required(false)
.validator(|v| {
if let Ok(t) = v.parse::<i32>() {
if t > 0 && t <= 1024 {
Ok(())
} else {
Err("Invalid working thread number {}, valid values: [1-1024]"
.to_string())
}
} else {
Err("Input thread number is invalid".to_string())
}
}),
);

app.subcommand(subcmd)
Expand Down
42 changes: 31 additions & 11 deletions src/bin/nydusd/service_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ impl ServiceController {
#[cfg(target_os = "linux")]
if self.fscache_enabled.load(Ordering::Acquire) {
if let Some(fscache) = self.fscache.lock().unwrap().clone() {
std::thread::spawn(move || {
if let Err(e) = fscache.run_loop() {
error!("Failed to run fscache service loop, {}", e);
}
// Notify the global service controller that one working thread is exiting.
if let Err(e) = crate::DAEMON_CONTROLLER.waker.wake() {
error!("Failed to notify the global service controller, {}", e);
}
});
for _ in 0..fscache.working_threads() {
let fscache2 = fscache.clone();
std::thread::spawn(move || {
if let Err(e) = fscache2.run_loop() {
error!("Failed to run fscache service loop, {}", e);
}
// Notify the global service controller that one working thread is exiting.
if let Err(e) = crate::DAEMON_CONTROLLER.waker.wake() {
error!("Failed to notify the global service controller, {}", e);
}
});
}
}
}

Expand Down Expand Up @@ -117,16 +120,33 @@ impl ServiceController {
};
let tag = subargs.value_of("fscache-tag");

let mut threads = 1usize;
if let Some(threads_value) = subargs.value_of("fscache-threads") {
if let Ok(t) = threads_value.parse::<i32>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can reuse the validator code above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help to give more hints about the way to reuse code? I haven't found similar pieces of code yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is like this :)

imeoer@30fa3f6

if t > 0 && t <= 1024 {
threads = t as usize;
} else {
return Err(einval!(
"Invalid working thread number {}, valid values: [1-1024]"
));
}
} else {
return Err(einval!("Input thread number is invalid".to_string()));
}
}

info!(
"Create fscache instance at {} with tag {}",
"Create fscache instance at {} with tag {}, {} working threads",
p,
tag.unwrap_or("<none>")
tag.unwrap_or("<none>"),
threads
);
let fscache = crate::fs_cache::FsCacheHandler::new(
"/dev/cachefiles",
p,
tag,
self.blob_cache_mgr.clone(),
threads,
)?;
*self.fscache.lock().unwrap() = Some(Arc::new(fscache));
self.fscache_enabled.store(true, Ordering::Release);
Expand Down