Skip to content

Commit

Permalink
Merge pull request #340 from ccx1024cc/morgan/mio
Browse files Browse the repository at this point in the history
refact: use mio for daemon exit signal monitoring
  • Loading branch information
bergwolf authored Mar 22, 2022
2 parents 208e3cd + 9db8871 commit cc87025
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 49 deletions.
27 changes: 24 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ nydus-utils = { path = "utils" }
rafs = { path = "rafs", features = ["backend-registry", "backend-oss"] }
storage = { path = "storage" }
blobfs = { path = "blobfs", features = ["virtiofs"], optional = true }
mio = { version = "0.8", features = ["os-poll"] }

[dev-dependencies]
sendfd = "0.3.3"
Expand Down
77 changes: 40 additions & 37 deletions src/bin/nydusd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ use std::sync::{
use std::thread;
use std::{error, fmt, io};

use event_manager::{EventOps, EventSubscriber, Events};
use fuse_backend_rs::api::{vfs::VfsError, BackendFileSystem, Vfs};
use fuse_backend_rs::passthrough::{Config, PassthroughFs};
use fuse_backend_rs::transport::Error as FuseTransportError;
use fuse_backend_rs::Error as FuseError;
use mio::{Events, Poll, Token, Waker};
use rust_fsm::*;
use serde::{self, Deserialize, Serialize};
use serde_json::Error as SerdeError;
use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};

use nydus::{FsBackendDesc, FsBackendType};
use nydus_app::BuildTimeInfo;
Expand Down Expand Up @@ -413,54 +412,58 @@ fn fs_backend_factory(cmd: &FsBackendMountCmd) -> DaemonResult<BackFileSystem> {
}
}

const EXIT_TOKEN: Token = Token(1);

pub struct NydusDaemonSubscriber {
event_fd: EventFd,
exit_notifier: Arc<Waker>,
exit_receiver: Poll,
}

impl NydusDaemonSubscriber {
pub fn new() -> Result<Self> {
match EventFd::new(0) {
Ok(fd) => Ok(Self { event_fd: fd }),
Err(e) => {
error!("Creating event fd failed. {}", e);
Err(e)
}
}
let exit_receiver = Poll::new().map_err(|e| {
error!("Creating exit receiver failed. {}", e);
e
})?;

let exit_notifier = Waker::new(exit_receiver.registry(), EXIT_TOKEN).map_err(|e| {
error!("Creating exit waker failed. {}", e);
e
})?;

let subscriber = Self {
exit_notifier: Arc::new(exit_notifier),
exit_receiver,
};

Ok(subscriber)
}

pub fn get_event_fd(&self) -> Result<EventFd> {
self.event_fd.try_clone()
pub fn get_notifier(&self) -> Arc<Waker> {
self.exit_notifier.clone()
}
}

impl EventSubscriber for NydusDaemonSubscriber {
fn process(&self, events: Events, event_ops: &mut EventOps) {
self.event_fd
.read()
.map(|_| ())
.map_err(|e| last_error!(e))
.unwrap_or_else(|_| {});
pub fn listen(&mut self) {
let mut events = Events::with_capacity(8);

match events.event_set() {
EventSet::IN => {
EVENT_MANAGER_RUN.store(false, Ordering::Relaxed);
}
EventSet::ERROR => {
error!("Got error on the monitored event.");
}
EventSet::HANG_UP => {
event_ops
.remove(events)
.unwrap_or_else(|e| error!("Encountered error during cleanup, {}", e));
loop {
self.exit_receiver
.poll(&mut events, None)
.unwrap_or_else(|e| error!("failed to listen on daemon: {}", e));

for event in events.iter() {
if event.is_error() {
error!("Got error on the monitored event.");
continue;
}

if event.is_readable() && event.token() == EXIT_TOKEN {
EVENT_MANAGER_RUN.store(false, Ordering::Relaxed);
return;
}
}
_ => {}
}
}

fn init(&self, ops: &mut EventOps) {
ops.add(Events::new(&self.event_fd, EventSet::IN))
.expect("Cannot register event")
}
}

// State machine for Nydus daemon workflow.
Expand Down
26 changes: 17 additions & 9 deletions src/bin/nydusd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use std::sync::{
mpsc::channel,
Arc, Mutex,
};
use std::thread;
use std::thread::{self, spawn};
use std::{io, process};

use clap::{App, Arg};
use event_manager::{EventManager, EventSubscriber, SubscriberOps};
use fuse_backend_rs::api::{Vfs, VfsOptions};
use mio::Waker;
use nix::sys::signal;
use rlimit::{rlim, Resource};
use vmm_sys_util::eventfd::EventFd;
Expand All @@ -57,7 +58,7 @@ mod upgrade;

lazy_static! {
static ref EVENT_MANAGER_RUN: AtomicBool = AtomicBool::new(true);
static ref EXIT_EVTFD: Mutex::<Option<EventFd>> = Mutex::<Option<EventFd>>::default();
static ref EXIT_NOTIFIER: Mutex<Option<Arc<Waker>>> = Mutex::default();
}

fn get_default_rlimit_nofile() -> Result<rlim> {
Expand Down Expand Up @@ -91,12 +92,12 @@ fn get_default_rlimit_nofile() -> Result<rlim> {
}

pub fn exit_event_manager() {
EXIT_EVTFD
EXIT_NOTIFIER
.lock()
.expect("Not poisoned lock!")
.as_ref()
.unwrap()
.write(1)
.wake()
.unwrap_or_else(|e| error!("Write event fd failed when exiting event manager, {}", e))
}

Expand Down Expand Up @@ -375,10 +376,6 @@ fn main() -> Result<()> {
let vfs = Vfs::new(opts);

let mut event_manager = EventManager::<Arc<dyn EventSubscriber>>::new().unwrap();
let daemon_subscriber = Arc::new(NydusDaemonSubscriber::new()?);
// Send an event to exit from Event Manager so as to exit from nydusd
let exit_evtfd = daemon_subscriber.get_event_fd()?;
event_manager.add_subscriber(daemon_subscriber);

let vfs = Arc::new(vfs);
// Basically, below two arguments are essential for live-upgrade/failover/ and external management.
Expand Down Expand Up @@ -462,7 +459,7 @@ fn main() -> Result<()> {
info!("api server running at {}", apisock);
}

*EXIT_EVTFD.lock().unwrap().deref_mut() = Some(exit_evtfd);
start_daemon_monitor().unwrap();
nydus_app::signal::register_signal_handler(signal::SIGINT, sig_exit);
nydus_app::signal::register_signal_handler(signal::SIGTERM, sig_exit);

Expand Down Expand Up @@ -491,3 +488,14 @@ fn main() -> Result<()> {

Ok(())
}

fn start_daemon_monitor() -> Result<()> {
let mut monitor = NydusDaemonSubscriber::new()?;
*EXIT_NOTIFIER.lock().unwrap().deref_mut() = Some(monitor.get_notifier());

spawn(move || {
monitor.listen();
});

Ok(())
}

0 comments on commit cc87025

Please sign in to comment.