diff --git a/service/src/daemon.rs b/service/src/daemon.rs index 1134da0c055..9c3fbadeb50 100644 --- a/service/src/daemon.rs +++ b/service/src/daemon.rs @@ -12,10 +12,12 @@ use std::convert::From; use std::fmt::{Display, Formatter}; use std::ops::Deref; use std::process; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, Sender}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread::{Builder, JoinHandle}; +use mio::{Events, Poll, Token, Waker}; use nydus_api::BuildTimeInfo; use rust_fsm::*; use serde::{self, Serialize}; @@ -337,6 +339,131 @@ pub trait DaemonStateMachineSubscriber { fn on_event(&self, event: DaemonStateMachineInput) -> Result<()>; } +/// Controller to manage registered filesystem/blobcache/fscache services. +pub struct DaemonController { + active: AtomicBool, + singleton_mode: AtomicBool, + daemon: Mutex>>, + blob_cache_mgr: Mutex>>, + // For backward compatibility to support singleton fusedev/virtiofs server. + fs_service: Mutex>>, + waker: Arc, + poller: Mutex, +} + +impl DaemonController { + /// Create a new instance of [DaemonController]. + pub fn new() -> Self { + let poller = Poll::new().expect("Failed to create poller for DaemonController"); + let waker = Waker::new(poller.registry(), Token(1)) + .expect("Failed to create waker for DaemonController"); + + Self { + active: AtomicBool::new(true), + singleton_mode: AtomicBool::new(true), + daemon: Mutex::new(None), + blob_cache_mgr: Mutex::new(None), + fs_service: Mutex::new(None), + waker: Arc::new(waker), + poller: Mutex::new(poller), + } + } + + /// Check whether the service controller is still in active/working state. + pub fn is_active(&self) -> bool { + self.active.load(Ordering::Acquire) + } + + /// Allocate a waker to notify stop events. + pub fn alloc_waker(&self) -> Arc { + self.waker.clone() + } + + /// Enable/disable singleton mode. + pub fn set_singleton_mode(&self, enabled: bool) { + self.singleton_mode.store(enabled, Ordering::Release); + } + + /// Set the daemon service object. + pub fn set_daemon(&self, daemon: Arc) -> Option> { + self.daemon.lock().unwrap().replace(daemon) + } + + /// Get the daemon service object. + /// + /// Panic if called before `set_daemon()` has been called. + pub fn get_daemon(&self) -> Arc { + self.daemon.lock().unwrap().clone().unwrap() + } + + /// Get the optional blob cache manager. + pub fn get_blob_cache_mgr(&self) -> Option> { + self.blob_cache_mgr.lock().unwrap().clone() + } + + /// Set the optional blob cache manager. + pub fn set_blob_cache_mgr(&self, mgr: Arc) -> Option> { + self.blob_cache_mgr.lock().unwrap().replace(mgr) + } + + /// Set the default fs service object. + pub fn set_fs_service(&self, service: Arc) -> Option> { + self.fs_service.lock().unwrap().replace(service) + } + + /// Get the default fs service object. + pub fn get_fs_service(&self) -> Option> { + self.fs_service.lock().unwrap().clone() + } + + /// Shutdown all services managed by the controller. + pub fn shutdown(&self) { + // Marking exiting state. + self.active.store(false, Ordering::Release); + // Signal the `run_loop()` working thread to exit. + let _ = self.waker.wake(); + + let daemon = self.daemon.lock().unwrap().take(); + if let Some(d) = daemon { + if let Err(e) = d.trigger_stop() { + error!("failed to stop daemon: {}", e); + } + if let Err(e) = d.wait() { + error!("failed to wait daemon: {}", e) + } + } + } + + /// Run the event loop to handle service management events. + pub fn run_loop(&self) { + let mut events = Events::with_capacity(8); + + loop { + match self.poller.lock().unwrap().poll(&mut events, None) { + Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue, + Err(e) => error!("failed to receive notification from waker: {}", e), + Ok(_) => {} + } + + for event in events.iter() { + if event.is_error() { + error!("Got error on the monitored event."); + continue; + } + + if event.is_readable() && event.token() == Token(1) { + if self.active.load(Ordering::Acquire) { + return; + } else if !self.singleton_mode.load(Ordering::Acquire) { + self.active.store(false, Ordering::Relaxed); + return; + } + } + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/bin/nydusd/api_server_glue.rs b/src/bin/nydusd/api_server_glue.rs index dd6c4ef6019..533e95e2b94 100644 --- a/src/bin/nydusd/api_server_glue.rs +++ b/src/bin/nydusd/api_server_glue.rs @@ -384,7 +384,7 @@ impl ApiServerController { let api_server = ApiServer::new(to_router)?; let api_handler = ApiServerHandler::new(api_server, from_router)?; let (router_thread, waker) = start_http_thread(apisock, None, to_handler, from_handler)?; - let daemon_waker = DAEMON_CONTROLLER.waker.clone(); + let daemon_waker = DAEMON_CONTROLLER.alloc_waker(); info!("HTTP API server running at {}", apisock); let handler_thread = std::thread::Builder::new() diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index 98bc572fa52..8032e719818 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -14,22 +14,20 @@ extern crate nydus_error; use std::convert::TryInto; use std::io::{Error, ErrorKind, Result}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use clap::{Arg, ArgAction, ArgMatches, Command}; -use mio::{Events, Poll, Token, Waker}; use nix::sys::signal; use rlimit::Resource; -use nydus::blob_cache::BlobCacheMgr; -use nydus::daemon::NydusDaemon; -use nydus::{ - create_daemon, get_build_time_info, validate_threads_configuration, Error as NydusError, - FsBackendMountCmd, FsService, ServiceArgs, SubCmdArgs, -}; +use nydus::{get_build_time_info, SubCmdArgs}; use nydus_api::BuildTimeInfo; use nydus_app::{dump_program_info, setup_logging}; +use nydus_service::daemon::DaemonController; +use nydus_service::{ + create_daemon, create_fuse_daemon, validate_threads_configuration, Error as NydusError, + FsBackendMountCmd, FsBackendType, ServiceArgs, +}; use crate::api_server_glue::ApiServerController; @@ -49,129 +47,6 @@ lazy_static! { static ref BTI: BuildTimeInfo = get_build_time_info().1; } -/// Controller to manage registered filesystem/blobcache/fscache services. -pub struct DaemonController { - active: AtomicBool, - singleton_mode: AtomicBool, - daemon: Mutex>>, - blob_cache_mgr: Mutex>>, - // For backward compatibility to support singleton fusedev/virtiofs server. - fs_service: Mutex>>, - waker: Arc, - poller: Mutex, -} - -impl DaemonController { - fn new() -> Self { - let poller = Poll::new().expect("Failed to create poller for DaemonController"); - let waker = Waker::new(poller.registry(), Token(1)) - .expect("Failed to create waker for DaemonController"); - - Self { - active: AtomicBool::new(true), - singleton_mode: AtomicBool::new(true), - daemon: Mutex::new(None), - blob_cache_mgr: Mutex::new(None), - fs_service: Mutex::new(None), - waker: Arc::new(waker), - poller: Mutex::new(poller), - } - } - - /// Check whether the service controller is still in active/working state. - pub fn is_active(&self) -> bool { - self.active.load(Ordering::Acquire) - } - - /// Allocate a waker to notify stop events. - pub fn alloc_waker(&self) -> Arc { - self.waker.clone() - } - - /// Enable/disable singleton mode. - pub fn set_singleton_mode(&self, enabled: bool) { - self.singleton_mode.store(enabled, Ordering::Release); - } - - /// Set the daemon service object. - pub fn set_daemon(&self, daemon: Arc) -> Option> { - self.daemon.lock().unwrap().replace(daemon) - } - - /// Get the daemon service object. - /// - /// Panic if called before `set_daemon()` has been called. - pub fn get_daemon(&self) -> Arc { - self.daemon.lock().unwrap().clone().unwrap() - } - - /// Get the optional blob cache manager. - pub fn get_blob_cache_mgr(&self) -> Option> { - self.blob_cache_mgr.lock().unwrap().clone() - } - - /// Set the optional blob cache manager. - pub fn set_blob_cache_mgr(&self, mgr: Arc) -> Option> { - self.blob_cache_mgr.lock().unwrap().replace(mgr) - } - - /// Set the default fs service object. - pub fn set_fs_service(&self, service: Arc) -> Option> { - self.fs_service.lock().unwrap().replace(service) - } - - /// Get the default fs service object. - pub fn get_fs_service(&self) -> Option> { - self.fs_service.lock().unwrap().clone() - } - - fn shutdown(&self) { - // Marking exiting state. - self.active.store(false, Ordering::Release); - DAEMON_CONTROLLER.set_singleton_mode(false); - // Signal the `run_loop()` working thread to exit. - let _ = self.waker.wake(); - - let daemon = self.daemon.lock().unwrap().take(); - if let Some(d) = daemon { - if let Err(e) = d.trigger_stop() { - error!("failed to stop daemon: {}", e); - } - if let Err(e) = d.wait() { - error!("failed to wait daemon: {}", e) - } - } - } - - fn run_loop(&self) { - let mut events = Events::with_capacity(8); - - loop { - match self.poller.lock().unwrap().poll(&mut events, None) { - Err(e) if e.kind() == ErrorKind::Interrupted => continue, - Err(e) => error!("failed to receive notification from waker: {}", e), - Ok(_) => {} - } - - for event in events.iter() { - if event.is_error() { - error!("Got error on the monitored event."); - continue; - } - - if event.is_readable() && event.token() == Token(1) { - if self.active.load(Ordering::Acquire) { - return; - } else if !self.singleton_mode.load(Ordering::Acquire) { - self.active.store(false, Ordering::Relaxed); - return; - } - } - } - } - } -} - fn thread_validator(v: &str) -> std::result::Result { validate_threads_configuration(v).map(|s| s.to_string()) } @@ -504,7 +379,7 @@ fn process_fs_service( let mut opts = fuse_backend_rs::api::VfsOptions::default(); let mount_cmd = if let Some(shared_dir) = shared_dir { let cmd = FsBackendMountCmd { - fs_type: nydus::FsBackendType::PassthroughFs, + fs_type: FsBackendType::PassthroughFs, source: shared_dir.to_string(), config: "".to_string(), mountpoint: virtual_mnt.to_string(), @@ -583,7 +458,7 @@ fn process_fs_service( }; let cmd = FsBackendMountCmd { - fs_type: nydus::FsBackendType::Rafs, + fs_type: FsBackendType::Rafs, source: b.to_string(), config, mountpoint: virtual_mnt.to_string(), @@ -633,7 +508,7 @@ fn process_fs_service( })?; let daemon = { - nydus::create_fuse_daemon( + create_fuse_daemon( mountpoint, vfs, supervisor, @@ -764,6 +639,7 @@ fn main() -> Result<()> { // Gracefully shutdown system. info!("nydusd quits"); api_controller.stop(); + DAEMON_CONTROLLER.set_singleton_mode(false); DAEMON_CONTROLLER.shutdown(); Ok(())