From 9db88715fbc31b0195fa5a96d1a7b57c679d3690 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B3=B0=E5=8F=8B?= Date: Fri, 18 Mar 2022 17:04:30 +0800 Subject: [PATCH] refact: use mio for daemon exit signal monitoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 泰友 fix: chore Signed-off-by: 泰友 --- Cargo.lock | 27 ++++++++++++-- Cargo.toml | 1 + src/bin/nydusd/daemon.rs | 77 +++++++++++++++++++++------------------- src/bin/nydusd/main.rs | 26 +++++++++----- 4 files changed, 82 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92bc09fe425..5a6b98093a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -624,7 +624,7 @@ checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8" dependencies = [ "cfg-if 1.0.0", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", ] [[package]] @@ -1016,6 +1016,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "mio" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "miow" version = "0.3.7" @@ -1185,6 +1199,7 @@ dependencies = [ "lazy_static", "libc", "log", + "mio 0.8.2", "nix 0.23.1", "nydus-api", "nydus-app", @@ -1397,7 +1412,7 @@ dependencies = [ "mach", "once_cell", "raw-cpuid", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", "web-sys", "winapi", ] @@ -1939,7 +1954,7 @@ dependencies = [ "bytes 1.1.0", "libc", "memchr", - "mio", + "mio 0.7.13", "num_cpus", "pin-project-lite", "tokio-macros", @@ -2169,6 +2184,12 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.78" diff --git a/Cargo.toml b/Cargo.toml index ac0406d41df..9abd2c4fbff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ nydus-utils = { path = "utils" } rafs = { path = "rafs", features = ["backend-registry", "backend-oss"] } storage = { path = "storage" } blobfs = { path = "blobfs", features = ["virtiofs"], optional = true } +mio = { version = "0.8", features = ["os-poll"] } [dev-dependencies] sendfd = "0.3.3" diff --git a/src/bin/nydusd/daemon.rs b/src/bin/nydusd/daemon.rs index 50127493992..8f252a22354 100644 --- a/src/bin/nydusd/daemon.rs +++ b/src/bin/nydusd/daemon.rs @@ -22,15 +22,14 @@ use std::sync::{ use std::thread; use std::{error, fmt, io}; -use event_manager::{EventOps, EventSubscriber, Events}; use fuse_backend_rs::api::{vfs::VfsError, BackendFileSystem, Vfs}; use fuse_backend_rs::passthrough::{Config, PassthroughFs}; use fuse_backend_rs::transport::Error as FuseTransportError; use fuse_backend_rs::Error as FuseError; +use mio::{Events, Poll, Token, Waker}; use rust_fsm::*; use serde::{self, Deserialize, Serialize}; use serde_json::Error as SerdeError; -use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; use nydus::{FsBackendDesc, FsBackendType}; use nydus_app::BuildTimeInfo; @@ -413,54 +412,58 @@ fn fs_backend_factory(cmd: &FsBackendMountCmd) -> DaemonResult { } } +const EXIT_TOKEN: Token = Token(1); + pub struct NydusDaemonSubscriber { - event_fd: EventFd, + exit_notifier: Arc, + exit_receiver: Poll, } impl NydusDaemonSubscriber { pub fn new() -> Result { - match EventFd::new(0) { - Ok(fd) => Ok(Self { event_fd: fd }), - Err(e) => { - error!("Creating event fd failed. {}", e); - Err(e) - } - } + let exit_receiver = Poll::new().map_err(|e| { + error!("Creating exit receiver failed. {}", e); + e + })?; + + let exit_notifier = Waker::new(exit_receiver.registry(), EXIT_TOKEN).map_err(|e| { + error!("Creating exit waker failed. {}", e); + e + })?; + + let subscriber = Self { + exit_notifier: Arc::new(exit_notifier), + exit_receiver, + }; + + Ok(subscriber) } - pub fn get_event_fd(&self) -> Result { - self.event_fd.try_clone() + pub fn get_notifier(&self) -> Arc { + self.exit_notifier.clone() } -} -impl EventSubscriber for NydusDaemonSubscriber { - fn process(&self, events: Events, event_ops: &mut EventOps) { - self.event_fd - .read() - .map(|_| ()) - .map_err(|e| last_error!(e)) - .unwrap_or_else(|_| {}); + pub fn listen(&mut self) { + let mut events = Events::with_capacity(8); - match events.event_set() { - EventSet::IN => { - EVENT_MANAGER_RUN.store(false, Ordering::Relaxed); - } - EventSet::ERROR => { - error!("Got error on the monitored event."); - } - EventSet::HANG_UP => { - event_ops - .remove(events) - .unwrap_or_else(|e| error!("Encountered error during cleanup, {}", e)); + loop { + self.exit_receiver + .poll(&mut events, None) + .unwrap_or_else(|e| error!("failed to listen on daemon: {}", e)); + + for event in events.iter() { + if event.is_error() { + error!("Got error on the monitored event."); + continue; + } + + if event.is_readable() && event.token() == EXIT_TOKEN { + EVENT_MANAGER_RUN.store(false, Ordering::Relaxed); + return; + } } - _ => {} } } - - fn init(&self, ops: &mut EventOps) { - ops.add(Events::new(&self.event_fd, EventSet::IN)) - .expect("Cannot register event") - } } // State machine for Nydus daemon workflow. diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index 3f308bc4bc1..fe8f67cab0c 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -25,12 +25,13 @@ use std::sync::{ mpsc::channel, Arc, Mutex, }; -use std::thread; +use std::thread::{self, spawn}; use std::{io, process}; use clap::{App, Arg}; use event_manager::{EventManager, EventSubscriber, SubscriberOps}; use fuse_backend_rs::api::{Vfs, VfsOptions}; +use mio::Waker; use nix::sys::signal; use rlimit::{rlim, Resource}; use vmm_sys_util::eventfd::EventFd; @@ -57,7 +58,7 @@ mod upgrade; lazy_static! { static ref EVENT_MANAGER_RUN: AtomicBool = AtomicBool::new(true); - static ref EXIT_EVTFD: Mutex::> = Mutex::>::default(); + static ref EXIT_NOTIFIER: Mutex>> = Mutex::default(); } fn get_default_rlimit_nofile() -> Result { @@ -91,12 +92,12 @@ fn get_default_rlimit_nofile() -> Result { } pub fn exit_event_manager() { - EXIT_EVTFD + EXIT_NOTIFIER .lock() .expect("Not poisoned lock!") .as_ref() .unwrap() - .write(1) + .wake() .unwrap_or_else(|e| error!("Write event fd failed when exiting event manager, {}", e)) } @@ -375,10 +376,6 @@ fn main() -> Result<()> { let vfs = Vfs::new(opts); let mut event_manager = EventManager::>::new().unwrap(); - let daemon_subscriber = Arc::new(NydusDaemonSubscriber::new()?); - // Send an event to exit from Event Manager so as to exit from nydusd - let exit_evtfd = daemon_subscriber.get_event_fd()?; - event_manager.add_subscriber(daemon_subscriber); let vfs = Arc::new(vfs); // Basically, below two arguments are essential for live-upgrade/failover/ and external management. @@ -462,7 +459,7 @@ fn main() -> Result<()> { info!("api server running at {}", apisock); } - *EXIT_EVTFD.lock().unwrap().deref_mut() = Some(exit_evtfd); + start_daemon_monitor().unwrap(); nydus_app::signal::register_signal_handler(signal::SIGINT, sig_exit); nydus_app::signal::register_signal_handler(signal::SIGTERM, sig_exit); @@ -491,3 +488,14 @@ fn main() -> Result<()> { Ok(()) } + +fn start_daemon_monitor() -> Result<()> { + let mut monitor = NydusDaemonSubscriber::new()?; + *EXIT_NOTIFIER.lock().unwrap().deref_mut() = Some(monitor.get_notifier()); + + spawn(move || { + monitor.listen(); + }); + + Ok(()) +}