Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service: move DaemonController from nydus into nydus-service #1074

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 134 additions & 1 deletion service/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use std::convert::From;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread::{Builder, JoinHandle};

use mio::{Events, Poll, Token, Waker};
use nydus_api::BuildTimeInfo;
use rust_fsm::*;
use serde::{self, Serialize};
Expand Down Expand Up @@ -337,6 +339,137 @@ pub trait DaemonStateMachineSubscriber {
fn on_event(&self, event: DaemonStateMachineInput) -> Result<()>;
}

/// Controller to manage registered filesystem/blobcache/fscache services.
pub struct DaemonController {
active: AtomicBool,
singleton_mode: AtomicBool,
daemon: Mutex<Option<Arc<dyn NydusDaemon>>>,
blob_cache_mgr: Mutex<Option<Arc<BlobCacheMgr>>>,
// For backward compatibility to support singleton fusedev/virtiofs server.
fs_service: Mutex<Option<Arc<dyn FsService>>>,
waker: Arc<Waker>,
poller: Mutex<Poll>,
}

impl DaemonController {
/// Create a new instance of [DaemonController].
pub fn new() -> Self {
let poller = Poll::new().expect("Failed to create poller for DaemonController");
let waker = Waker::new(poller.registry(), Token(1))
.expect("Failed to create waker for DaemonController");

Self {
active: AtomicBool::new(true),
singleton_mode: AtomicBool::new(true),
daemon: Mutex::new(None),
blob_cache_mgr: Mutex::new(None),
fs_service: Mutex::new(None),
waker: Arc::new(waker),
poller: Mutex::new(poller),
}
}

/// Check whether the service controller is still in active/working state.
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}

/// Allocate a waker to notify stop events.
pub fn alloc_waker(&self) -> Arc<Waker> {
self.waker.clone()
}

/// Enable/disable singleton mode.
pub fn set_singleton_mode(&self, enabled: bool) {
self.singleton_mode.store(enabled, Ordering::Release);
}

/// Set the daemon service object.
pub fn set_daemon(&self, daemon: Arc<dyn NydusDaemon>) -> Option<Arc<dyn NydusDaemon>> {
self.daemon.lock().unwrap().replace(daemon)
}

/// Get the daemon service object.
///
/// Panic if called before `set_daemon()` has been called.
pub fn get_daemon(&self) -> Arc<dyn NydusDaemon> {
self.daemon.lock().unwrap().clone().unwrap()
}

/// Get the optional blob cache manager.
pub fn get_blob_cache_mgr(&self) -> Option<Arc<BlobCacheMgr>> {
self.blob_cache_mgr.lock().unwrap().clone()
}

/// Set the optional blob cache manager.
pub fn set_blob_cache_mgr(&self, mgr: Arc<BlobCacheMgr>) -> Option<Arc<BlobCacheMgr>> {
self.blob_cache_mgr.lock().unwrap().replace(mgr)
}

/// Set the default fs service object.
pub fn set_fs_service(&self, service: Arc<dyn FsService>) -> Option<Arc<dyn FsService>> {
self.fs_service.lock().unwrap().replace(service)
}

/// Get the default fs service object.
pub fn get_fs_service(&self) -> Option<Arc<dyn FsService>> {
self.fs_service.lock().unwrap().clone()
}

/// Shutdown all services managed by the controller.
pub fn shutdown(&self) {
// Marking exiting state.
self.active.store(false, Ordering::Release);
// Signal the `run_loop()` working thread to exit.
let _ = self.waker.wake();

let daemon = self.daemon.lock().unwrap().take();
if let Some(d) = daemon {
if let Err(e) = d.trigger_stop() {
error!("failed to stop daemon: {}", e);
}
if let Err(e) = d.wait() {
error!("failed to wait daemon: {}", e)
}
}
}

/// Run the event loop to handle service management events.
pub fn run_loop(&self) {
let mut events = Events::with_capacity(8);

loop {
match self.poller.lock().unwrap().poll(&mut events, None) {
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => error!("failed to receive notification from waker: {}", e),
Ok(_) => {}
}

for event in events.iter() {
if event.is_error() {
error!("Got error on the monitored event.");
continue;
}

if event.is_readable() && event.token() == Token(1) {
if self.active.load(Ordering::Acquire) {
return;
} else if !self.singleton_mode.load(Ordering::Acquire) {
self.active.store(false, Ordering::Relaxed);
return;
}
}
}
}
}
}

impl Default for DaemonController {
fn default() -> Self {
DaemonController::new()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion src/bin/nydusd/api_server_glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl ApiServerController {
let api_server = ApiServer::new(to_router)?;
let api_handler = ApiServerHandler::new(api_server, from_router)?;
let (router_thread, waker) = start_http_thread(apisock, None, to_handler, from_handler)?;
let daemon_waker = DAEMON_CONTROLLER.waker.clone();
let daemon_waker = DAEMON_CONTROLLER.alloc_waker();

info!("HTTP API server running at {}", apisock);
let handler_thread = std::thread::Builder::new()
Expand Down
146 changes: 11 additions & 135 deletions src/bin/nydusd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,20 @@ extern crate nydus_error;

use std::convert::TryInto;
use std::io::{Error, ErrorKind, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use clap::{Arg, ArgAction, ArgMatches, Command};
use mio::{Events, Poll, Token, Waker};
use nix::sys::signal;
use rlimit::Resource;

use nydus::blob_cache::BlobCacheMgr;
use nydus::daemon::NydusDaemon;
use nydus::{
create_daemon, get_build_time_info, validate_threads_configuration, Error as NydusError,
FsBackendMountCmd, FsService, ServiceArgs, SubCmdArgs,
};
use nydus::{get_build_time_info, SubCmdArgs};
use nydus_api::BuildTimeInfo;
use nydus_app::{dump_program_info, setup_logging};
use nydus_service::daemon::DaemonController;
use nydus_service::{
create_daemon, create_fuse_daemon, validate_threads_configuration, Error as NydusError,
FsBackendMountCmd, FsBackendType, ServiceArgs,
};

use crate::api_server_glue::ApiServerController;

Expand All @@ -49,129 +47,6 @@ lazy_static! {
static ref BTI: BuildTimeInfo = get_build_time_info().1;
}

/// Controller to manage registered filesystem/blobcache/fscache services.
pub struct DaemonController {
active: AtomicBool,
singleton_mode: AtomicBool,
daemon: Mutex<Option<Arc<dyn NydusDaemon>>>,
blob_cache_mgr: Mutex<Option<Arc<BlobCacheMgr>>>,
// For backward compatibility to support singleton fusedev/virtiofs server.
fs_service: Mutex<Option<Arc<dyn FsService>>>,
waker: Arc<Waker>,
poller: Mutex<Poll>,
}

impl DaemonController {
fn new() -> Self {
let poller = Poll::new().expect("Failed to create poller for DaemonController");
let waker = Waker::new(poller.registry(), Token(1))
.expect("Failed to create waker for DaemonController");

Self {
active: AtomicBool::new(true),
singleton_mode: AtomicBool::new(true),
daemon: Mutex::new(None),
blob_cache_mgr: Mutex::new(None),
fs_service: Mutex::new(None),
waker: Arc::new(waker),
poller: Mutex::new(poller),
}
}

/// Check whether the service controller is still in active/working state.
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}

/// Allocate a waker to notify stop events.
pub fn alloc_waker(&self) -> Arc<Waker> {
self.waker.clone()
}

/// Enable/disable singleton mode.
pub fn set_singleton_mode(&self, enabled: bool) {
self.singleton_mode.store(enabled, Ordering::Release);
}

/// Set the daemon service object.
pub fn set_daemon(&self, daemon: Arc<dyn NydusDaemon>) -> Option<Arc<dyn NydusDaemon>> {
self.daemon.lock().unwrap().replace(daemon)
}

/// Get the daemon service object.
///
/// Panic if called before `set_daemon()` has been called.
pub fn get_daemon(&self) -> Arc<dyn NydusDaemon> {
self.daemon.lock().unwrap().clone().unwrap()
}

/// Get the optional blob cache manager.
pub fn get_blob_cache_mgr(&self) -> Option<Arc<BlobCacheMgr>> {
self.blob_cache_mgr.lock().unwrap().clone()
}

/// Set the optional blob cache manager.
pub fn set_blob_cache_mgr(&self, mgr: Arc<BlobCacheMgr>) -> Option<Arc<BlobCacheMgr>> {
self.blob_cache_mgr.lock().unwrap().replace(mgr)
}

/// Set the default fs service object.
pub fn set_fs_service(&self, service: Arc<dyn FsService>) -> Option<Arc<dyn FsService>> {
self.fs_service.lock().unwrap().replace(service)
}

/// Get the default fs service object.
pub fn get_fs_service(&self) -> Option<Arc<dyn FsService>> {
self.fs_service.lock().unwrap().clone()
}

fn shutdown(&self) {
// Marking exiting state.
self.active.store(false, Ordering::Release);
DAEMON_CONTROLLER.set_singleton_mode(false);
// Signal the `run_loop()` working thread to exit.
let _ = self.waker.wake();

let daemon = self.daemon.lock().unwrap().take();
if let Some(d) = daemon {
if let Err(e) = d.trigger_stop() {
error!("failed to stop daemon: {}", e);
}
if let Err(e) = d.wait() {
error!("failed to wait daemon: {}", e)
}
}
}

fn run_loop(&self) {
let mut events = Events::with_capacity(8);

loop {
match self.poller.lock().unwrap().poll(&mut events, None) {
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => error!("failed to receive notification from waker: {}", e),
Ok(_) => {}
}

for event in events.iter() {
if event.is_error() {
error!("Got error on the monitored event.");
continue;
}

if event.is_readable() && event.token() == Token(1) {
if self.active.load(Ordering::Acquire) {
return;
} else if !self.singleton_mode.load(Ordering::Acquire) {
self.active.store(false, Ordering::Relaxed);
return;
}
}
}
}
}
}

fn thread_validator(v: &str) -> std::result::Result<String, String> {
validate_threads_configuration(v).map(|s| s.to_string())
}
Expand Down Expand Up @@ -504,7 +379,7 @@ fn process_fs_service(
let mut opts = fuse_backend_rs::api::VfsOptions::default();
let mount_cmd = if let Some(shared_dir) = shared_dir {
let cmd = FsBackendMountCmd {
fs_type: nydus::FsBackendType::PassthroughFs,
fs_type: FsBackendType::PassthroughFs,
source: shared_dir.to_string(),
config: "".to_string(),
mountpoint: virtual_mnt.to_string(),
Expand Down Expand Up @@ -583,7 +458,7 @@ fn process_fs_service(
};

let cmd = FsBackendMountCmd {
fs_type: nydus::FsBackendType::Rafs,
fs_type: FsBackendType::Rafs,
source: b.to_string(),
config,
mountpoint: virtual_mnt.to_string(),
Expand Down Expand Up @@ -633,7 +508,7 @@ fn process_fs_service(
})?;

let daemon = {
nydus::create_fuse_daemon(
create_fuse_daemon(
mountpoint,
vfs,
supervisor,
Expand Down Expand Up @@ -764,6 +639,7 @@ fn main() -> Result<()> {
// Gracefully shutdown system.
info!("nydusd quits");
api_controller.stop();
DAEMON_CONTROLLER.set_singleton_mode(false);
DAEMON_CONTROLLER.shutdown();

Ok(())
Expand Down