Skip to content

Commit

Permalink
service: move DaemonController from nydus into nydus-service
Browse files Browse the repository at this point in the history
Move DaemonController from nydus into nydus-service so it can be
reused.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
  • Loading branch information
jiangliu committed Feb 12, 2023
1 parent 8e8445c commit fa8eafa
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 137 deletions.
129 changes: 128 additions & 1 deletion service/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use std::convert::From;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread::{Builder, JoinHandle};

use mio::{Events, Poll, Token, Waker};
use nydus_api::BuildTimeInfo;
use rust_fsm::*;
use serde::{self, Serialize};
Expand Down Expand Up @@ -337,6 +339,131 @@ pub trait DaemonStateMachineSubscriber {
fn on_event(&self, event: DaemonStateMachineInput) -> Result<()>;
}

/// Controller to manage registered filesystem/blobcache/fscache services.
pub struct DaemonController {
active: AtomicBool,
singleton_mode: AtomicBool,
daemon: Mutex<Option<Arc<dyn NydusDaemon>>>,
blob_cache_mgr: Mutex<Option<Arc<BlobCacheMgr>>>,
// For backward compatibility to support singleton fusedev/virtiofs server.
fs_service: Mutex<Option<Arc<dyn FsService>>>,
waker: Arc<Waker>,
poller: Mutex<Poll>,
}

impl DaemonController {
/// Create a new instance of [DaemonController].
pub fn new() -> Self {
let poller = Poll::new().expect("Failed to create poller for DaemonController");
let waker = Waker::new(poller.registry(), Token(1))
.expect("Failed to create waker for DaemonController");

Self {
active: AtomicBool::new(true),
singleton_mode: AtomicBool::new(true),
daemon: Mutex::new(None),
blob_cache_mgr: Mutex::new(None),
fs_service: Mutex::new(None),
waker: Arc::new(waker),
poller: Mutex::new(poller),
}
}

/// Check whether the service controller is still in active/working state.
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}

/// Allocate a waker to notify stop events.
pub fn alloc_waker(&self) -> Arc<Waker> {
self.waker.clone()
}

/// Enable/disable singleton mode.
pub fn set_singleton_mode(&self, enabled: bool) {
self.singleton_mode.store(enabled, Ordering::Release);
}

/// Set the daemon service object.
pub fn set_daemon(&self, daemon: Arc<dyn NydusDaemon>) -> Option<Arc<dyn NydusDaemon>> {
self.daemon.lock().unwrap().replace(daemon)
}

/// Get the daemon service object.
///
/// Panic if called before `set_daemon()` has been called.
pub fn get_daemon(&self) -> Arc<dyn NydusDaemon> {
self.daemon.lock().unwrap().clone().unwrap()
}

/// Get the optional blob cache manager.
pub fn get_blob_cache_mgr(&self) -> Option<Arc<BlobCacheMgr>> {
self.blob_cache_mgr.lock().unwrap().clone()
}

/// Set the optional blob cache manager.
pub fn set_blob_cache_mgr(&self, mgr: Arc<BlobCacheMgr>) -> Option<Arc<BlobCacheMgr>> {
self.blob_cache_mgr.lock().unwrap().replace(mgr)
}

/// Set the default fs service object.
pub fn set_fs_service(&self, service: Arc<dyn FsService>) -> Option<Arc<dyn FsService>> {
self.fs_service.lock().unwrap().replace(service)
}

/// Get the default fs service object.
pub fn get_fs_service(&self) -> Option<Arc<dyn FsService>> {
self.fs_service.lock().unwrap().clone()
}

/// Shutdown all services managed by the controller.
pub fn shutdown(&self) {
// Marking exiting state.
self.active.store(false, Ordering::Release);
// Signal the `run_loop()` working thread to exit.
let _ = self.waker.wake();

let daemon = self.daemon.lock().unwrap().take();
if let Some(d) = daemon {
if let Err(e) = d.trigger_stop() {
error!("failed to stop daemon: {}", e);
}
if let Err(e) = d.wait() {
error!("failed to wait daemon: {}", e)
}
}
}

/// Run the event loop to handle service management events.
pub fn run_loop(&self) {
let mut events = Events::with_capacity(8);

loop {
match self.poller.lock().unwrap().poll(&mut events, None) {
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => error!("failed to receive notification from waker: {}", e),
Ok(_) => {}
}

for event in events.iter() {
if event.is_error() {
error!("Got error on the monitored event.");
continue;
}

if event.is_readable() && event.token() == Token(1) {
if self.active.load(Ordering::Acquire) {
return;
} else if !self.singleton_mode.load(Ordering::Acquire) {
self.active.store(false, Ordering::Relaxed);
return;
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion src/bin/nydusd/api_server_glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl ApiServerController {
let api_server = ApiServer::new(to_router)?;
let api_handler = ApiServerHandler::new(api_server, from_router)?;
let (router_thread, waker) = start_http_thread(apisock, None, to_handler, from_handler)?;
let daemon_waker = DAEMON_CONTROLLER.waker.clone();
let daemon_waker = DAEMON_CONTROLLER.alloc_waker();

info!("HTTP API server running at {}", apisock);
let handler_thread = std::thread::Builder::new()
Expand Down
146 changes: 11 additions & 135 deletions src/bin/nydusd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,20 @@ extern crate nydus_error;

use std::convert::TryInto;
use std::io::{Error, ErrorKind, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use clap::{Arg, ArgAction, ArgMatches, Command};
use mio::{Events, Poll, Token, Waker};
use nix::sys::signal;
use rlimit::Resource;

use nydus::blob_cache::BlobCacheMgr;
use nydus::daemon::NydusDaemon;
use nydus::{
create_daemon, get_build_time_info, validate_threads_configuration, Error as NydusError,
FsBackendMountCmd, FsService, ServiceArgs, SubCmdArgs,
};
use nydus::{get_build_time_info, SubCmdArgs};
use nydus_api::BuildTimeInfo;
use nydus_app::{dump_program_info, setup_logging};
use nydus_service::daemon::DaemonController;
use nydus_service::{
create_daemon, create_fuse_daemon, validate_threads_configuration, Error as NydusError,
FsBackendMountCmd, FsBackendType, ServiceArgs,
};

use crate::api_server_glue::ApiServerController;

Expand All @@ -49,129 +47,6 @@ lazy_static! {
static ref BTI: BuildTimeInfo = get_build_time_info().1;
}

/// Controller to manage registered filesystem/blobcache/fscache services.
pub struct DaemonController {
active: AtomicBool,
singleton_mode: AtomicBool,
daemon: Mutex<Option<Arc<dyn NydusDaemon>>>,
blob_cache_mgr: Mutex<Option<Arc<BlobCacheMgr>>>,
// For backward compatibility to support singleton fusedev/virtiofs server.
fs_service: Mutex<Option<Arc<dyn FsService>>>,
waker: Arc<Waker>,
poller: Mutex<Poll>,
}

impl DaemonController {
fn new() -> Self {
let poller = Poll::new().expect("Failed to create poller for DaemonController");
let waker = Waker::new(poller.registry(), Token(1))
.expect("Failed to create waker for DaemonController");

Self {
active: AtomicBool::new(true),
singleton_mode: AtomicBool::new(true),
daemon: Mutex::new(None),
blob_cache_mgr: Mutex::new(None),
fs_service: Mutex::new(None),
waker: Arc::new(waker),
poller: Mutex::new(poller),
}
}

/// Check whether the service controller is still in active/working state.
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}

/// Allocate a waker to notify stop events.
pub fn alloc_waker(&self) -> Arc<Waker> {
self.waker.clone()
}

/// Enable/disable singleton mode.
pub fn set_singleton_mode(&self, enabled: bool) {
self.singleton_mode.store(enabled, Ordering::Release);
}

/// Set the daemon service object.
pub fn set_daemon(&self, daemon: Arc<dyn NydusDaemon>) -> Option<Arc<dyn NydusDaemon>> {
self.daemon.lock().unwrap().replace(daemon)
}

/// Get the daemon service object.
///
/// Panic if called before `set_daemon()` has been called.
pub fn get_daemon(&self) -> Arc<dyn NydusDaemon> {
self.daemon.lock().unwrap().clone().unwrap()
}

/// Get the optional blob cache manager.
pub fn get_blob_cache_mgr(&self) -> Option<Arc<BlobCacheMgr>> {
self.blob_cache_mgr.lock().unwrap().clone()
}

/// Set the optional blob cache manager.
pub fn set_blob_cache_mgr(&self, mgr: Arc<BlobCacheMgr>) -> Option<Arc<BlobCacheMgr>> {
self.blob_cache_mgr.lock().unwrap().replace(mgr)
}

/// Set the default fs service object.
pub fn set_fs_service(&self, service: Arc<dyn FsService>) -> Option<Arc<dyn FsService>> {
self.fs_service.lock().unwrap().replace(service)
}

/// Get the default fs service object.
pub fn get_fs_service(&self) -> Option<Arc<dyn FsService>> {
self.fs_service.lock().unwrap().clone()
}

fn shutdown(&self) {
// Marking exiting state.
self.active.store(false, Ordering::Release);
DAEMON_CONTROLLER.set_singleton_mode(false);
// Signal the `run_loop()` working thread to exit.
let _ = self.waker.wake();

let daemon = self.daemon.lock().unwrap().take();
if let Some(d) = daemon {
if let Err(e) = d.trigger_stop() {
error!("failed to stop daemon: {}", e);
}
if let Err(e) = d.wait() {
error!("failed to wait daemon: {}", e)
}
}
}

fn run_loop(&self) {
let mut events = Events::with_capacity(8);

loop {
match self.poller.lock().unwrap().poll(&mut events, None) {
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => error!("failed to receive notification from waker: {}", e),
Ok(_) => {}
}

for event in events.iter() {
if event.is_error() {
error!("Got error on the monitored event.");
continue;
}

if event.is_readable() && event.token() == Token(1) {
if self.active.load(Ordering::Acquire) {
return;
} else if !self.singleton_mode.load(Ordering::Acquire) {
self.active.store(false, Ordering::Relaxed);
return;
}
}
}
}
}
}

fn thread_validator(v: &str) -> std::result::Result<String, String> {
validate_threads_configuration(v).map(|s| s.to_string())
}
Expand Down Expand Up @@ -504,7 +379,7 @@ fn process_fs_service(
let mut opts = fuse_backend_rs::api::VfsOptions::default();
let mount_cmd = if let Some(shared_dir) = shared_dir {
let cmd = FsBackendMountCmd {
fs_type: nydus::FsBackendType::PassthroughFs,
fs_type: FsBackendType::PassthroughFs,
source: shared_dir.to_string(),
config: "".to_string(),
mountpoint: virtual_mnt.to_string(),
Expand Down Expand Up @@ -583,7 +458,7 @@ fn process_fs_service(
};

let cmd = FsBackendMountCmd {
fs_type: nydus::FsBackendType::Rafs,
fs_type: FsBackendType::Rafs,
source: b.to_string(),
config,
mountpoint: virtual_mnt.to_string(),
Expand Down Expand Up @@ -633,7 +508,7 @@ fn process_fs_service(
})?;

let daemon = {
nydus::create_fuse_daemon(
create_fuse_daemon(
mountpoint,
vfs,
supervisor,
Expand Down Expand Up @@ -764,6 +639,7 @@ fn main() -> Result<()> {
// Gracefully shutdown system.
info!("nydusd quits");
api_controller.stop();
DAEMON_CONTROLLER.set_singleton_mode(false);
DAEMON_CONTROLLER.shutdown();

Ok(())
Expand Down

0 comments on commit fa8eafa

Please sign in to comment.