diff --git a/api/src/http.rs b/api/src/http.rs index 0e5f9a7b48c..7a7791837ed 100644 --- a/api/src/http.rs +++ b/api/src/http.rs @@ -25,7 +25,8 @@ use url::Url; use crate::http_endpoint_v1::{ EventsHandler, ExitHandler, FsBackendInfo, InfoHandler, MetricsBackendHandler, MetricsBlobcacheHandler, MetricsFilesHandler, MetricsHandler, MetricsInflightHandler, - MetricsPatternHandler, MountHandler, SendFuseFdHandler, TakeoverHandler, HTTP_ROOT_V1, + MetricsPatternHandler, MountHandler, SendFuseFdHandler, StartHandler, TakeoverHandler, + HTTP_ROOT_V1, }; use crate::http_endpoint_v2::{BlobObjectListHandlerV2, HTTP_ROOT_V2}; @@ -126,6 +127,7 @@ pub enum ApiRequest { ExportFilesMetrics(Option, bool), Exit, Takeover, + Start, // Filesystem Related Mount(String, ApiMountCmd), @@ -398,6 +400,8 @@ lazy_static! { r.routes.insert(endpoint_v2!("/daemon/events"), Box::new(EventsHandler{})); r.routes.insert(endpoint_v1!("/daemon/exit"), Box::new(ExitHandler{})); r.routes.insert(endpoint_v2!("/daemon/exit"), Box::new(ExitHandler{})); + r.routes.insert(endpoint_v1!("/daemon/start"), Box::new(StartHandler{})); + r.routes.insert(endpoint_v2!("/daemon/start"), Box::new(StartHandler{})); r.routes.insert(endpoint_v1!("/metrics/backend"), Box::new(MetricsBackendHandler{})); r.routes.insert(endpoint_v2!("/metrics/backend"), Box::new(MetricsBackendHandler{})); r.routes.insert(endpoint_v1!("/metrics/blobcache"), Box::new(MetricsBlobcacheHandler{})); diff --git a/api/src/http_endpoint_v1.rs b/api/src/http_endpoint_v1.rs index 47e99c8e02c..4eb86aa5b80 100644 --- a/api/src/http_endpoint_v1.rs +++ b/api/src/http_endpoint_v1.rs @@ -245,6 +245,23 @@ impl EndpointHandler for MountHandler { } } +pub struct StartHandler {} +impl EndpointHandler for StartHandler { + fn handle_request( + &self, + req: &Request, + kicker: &dyn Fn(ApiRequest) -> ApiResponse, + ) -> HttpResult { + match (req.method(), req.body.as_ref()) { + (Method::Put, None) => { + let r = kicker(ApiRequest::Start); + Ok(convert_to_response(r, HttpError::Upgrade)) + } + _ => Err(HttpError::BadRequest), + } + } +} + pub struct MetricsInflightHandler {} impl EndpointHandler for MetricsInflightHandler { fn handle_request( diff --git a/src/bin/nydusd/api_server_glue.rs b/src/bin/nydusd/api_server_glue.rs index 3744be0f07a..89c11bb256e 100644 --- a/src/bin/nydusd/api_server_glue.rs +++ b/src/bin/nydusd/api_server_glue.rs @@ -63,6 +63,7 @@ impl ApiServer { ApiRequest::ConfigureDaemon(conf) => self.configure_daemon(conf), ApiRequest::DaemonInfo => self.daemon_info(true), ApiRequest::Exit => self.do_exit(), + ApiRequest::Start => self.do_start(), ApiRequest::Takeover => self.do_takeover(), ApiRequest::Events => Self::events(), ApiRequest::ExportGlobalMetrics(id) => Self::export_global_metrics(id), @@ -325,6 +326,13 @@ impl ApiServer { } } } + + fn do_start(&self) -> ApiResponse { + let d = self.get_daemon_object()?; + d.trigger_start() + .map(|_| ApiResponsePayload::Empty) + .map_err(|e| ApiError::DaemonAbnormal(e.into())) + } } struct ApiServerHandler { diff --git a/src/bin/nydusd/daemon.rs b/src/bin/nydusd/daemon.rs index bf18c800ed0..4b532da9693 100644 --- a/src/bin/nydusd/daemon.rs +++ b/src/bin/nydusd/daemon.rs @@ -35,10 +35,9 @@ use crate::upgrade::UpgradeMgrError; pub enum DaemonState { INIT = 1, RUNNING = 2, - UPGRADING = 3, - INTERRUPTED = 4, - STOPPED = 5, - UNKNOWN = 6, + READY = 3, + STOPPED = 4, + UNKNOWN = 5, } impl Display for DaemonState { @@ -52,9 +51,8 @@ impl From for DaemonState { match i { 1 => DaemonState::INIT, 2 => DaemonState::RUNNING, - 3 => DaemonState::UPGRADING, - 4 => DaemonState::INTERRUPTED, - 5 => DaemonState::STOPPED, + 3 => DaemonState::READY, + 4 => DaemonState::STOPPED, _ => DaemonState::UNKNOWN, } } @@ -196,23 +194,49 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync { fn interrupt(&self) {} fn stop(&self) -> DaemonResult<()> { let s = self.get_state(); - if s != DaemonState::INTERRUPTED && s != DaemonState::STOPPED { - return self.on_event(DaemonStateMachineInput::Stop); + + if s == DaemonState::STOPPED { + return Ok(()); } - Ok(()) + + if s == DaemonState::RUNNING { + self.on_event(DaemonStateMachineInput::Stop)?; + } + + self.on_event(DaemonStateMachineInput::Stop) } fn wait(&self) -> DaemonResult<()>; + fn wait_service(&self) -> DaemonResult<()> { + Ok(()) + } + fn wait_state_machine(&self) -> DaemonResult<()> { + Ok(()) + } fn trigger_exit(&self) -> DaemonResult<()> { + let s = self.get_state(); + + if s == DaemonState::STOPPED { + return Ok(()); + } + + if s == DaemonState::INIT { + return self.on_event(DaemonStateMachineInput::Stop); + } + + if s == DaemonState::RUNNING { + self.on_event(DaemonStateMachineInput::Stop)?; + } + self.on_event(DaemonStateMachineInput::Exit) } - fn supervisor(&self) -> Option; fn save(&self) -> DaemonResult<()>; fn restore(&self) -> DaemonResult<()>; fn trigger_takeover(&self) -> DaemonResult<()> { - self.on_event(DaemonStateMachineInput::Takeover)?; - self.on_event(DaemonStateMachineInput::Successful)?; - Ok(()) + self.on_event(DaemonStateMachineInput::Takeover) + } + fn trigger_start(&self) -> DaemonResult<()> { + self.on_event(DaemonStateMachineInput::Start) } // For backward compatibility. @@ -225,35 +249,28 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync { // - `Init` means nydusd is just started and potentially configured well but not // yet negotiate with kernel the capabilities of both sides. It even does not try // to set up fuse session by mounting `/fuse/dev`(in case of `fusedev` backend). +// - `Ready` means nydusd is ready for start or die. Fuse session is created. // - `Running` means nydusd has successfully prepared all the stuff needed to work as a // user-space fuse filesystem, however, the essential capabilities negotiation might not be // done yet. It relies on `fuse-rs` to tell if capability negotiation is done. -// - `Upgrading` state means the nydus daemon is being live-upgraded. There's no need -// to do kernel mount again to set up a session but try to reuse a fuse fd from somewhere else. -// In this state, we try to push `Successful` event to state machine to trigger state transition. -// - `Interrupted` state means nydusd has shutdown fuse server, which means no more message will -// be read from kernel and handled and no pending and in-flight fuse message exists. But the -// nydusd daemon should be alive and wait for coming events. // - `Die` state means the whole nydusd process is going to die. state_machine! { derive(Debug, Clone) pub DaemonStateMachine(Init) - // FIXME: It's possible that failover does not succeed or resource is not capable to - // be passed. To handle event `Stop` when being `Init`. Init => { - Start => Running [StartService], - Takeover => Upgrading [Restore], - Exit => Die[StopStateMachine], + Mount => Ready, + Takeover => Ready[Restore], + Stop => Die[StopStateMachine], + }, + Ready => { + Start => Running[StartService], Stop => Die[Umount], + Exit => Die[StopStateMachine], }, Running => { - Exit => Interrupted [TerminateService], - Stop => Die[Umount], + Stop => Ready [TerminateService], }, - Upgrading(Successful) => Running [StartService], - // Quit from daemon but not disconnect from fuse front-end. - Interrupted(Stop) => Die[StopStateMachine], } /// Implementation of the state machine defined by `DaemonStateMachine`. @@ -322,20 +339,30 @@ impl DaemonStateMachineContext { }), TerminateService => { d.interrupt(); - d.set_state(DaemonState::INTERRUPTED); - Ok(()) + let res = d.wait_service(); + if res.is_ok() { + d.set_state(DaemonState::READY); + } + + res } Umount => d.disconnect().map(|r| { // Always interrupt fuse service loop after shutdown connection to kernel. // In case that kernel does not really shutdown the session due to some reasons // causing service loop keep waiting of `/dev/fuse`. d.interrupt(); + d.wait_service() + .unwrap_or_else(|e| error!("failed to wait service {}", e)); + // at least all fuse thread stopped, no matter what error each thread got d.set_state(DaemonState::STOPPED); r }), Restore => { - d.set_state(DaemonState::UPGRADING); - d.restore() + let res = d.restore(); + if res.is_ok() { + d.set_state(DaemonState::READY); + } + res } StopStateMachine => { d.set_state(DaemonState::STOPPED); @@ -348,9 +375,7 @@ impl DaemonStateMachineContext { // Safe to unwrap because channel is never closed self.result_sender.send(r).unwrap(); // Quit state machine thread if interrupted or stopped - if d.get_state() == DaemonState::INTERRUPTED - || d.get_state() == DaemonState::STOPPED - { + if d.get_state() == DaemonState::STOPPED { break; } } @@ -380,10 +405,19 @@ mod tests { let stat = DaemonState::from(1); assert_eq!(stat, DaemonState::INIT); - let stat = DaemonState::from(6); + let stat = DaemonState::from(2); + assert_eq!(stat, DaemonState::RUNNING); + + let stat = DaemonState::from(3); + assert_eq!(stat, DaemonState::READY); + + let stat = DaemonState::from(4); + assert_eq!(stat, DaemonState::STOPPED); + + let stat = DaemonState::from(5); assert_eq!(stat, DaemonState::UNKNOWN); - let stat = DaemonState::from(7); + let stat = DaemonState::from(8); assert_eq!(stat, DaemonState::UNKNOWN); } diff --git a/src/bin/nydusd/fusedev.rs b/src/bin/nydusd/fusedev.rs index d785f7e9e6e..b759cc5e1ea 100644 --- a/src/bin/nydusd/fusedev.rs +++ b/src/bin/nydusd/fusedev.rs @@ -246,7 +246,8 @@ pub struct FusedevDaemon { state: AtomicI32, supervisor: Option, threads_cnt: u32, - threads: Mutex>>>, + state_machine_thread: Mutex>>>, + fuse_service_threads: Mutex>>>, } impl FusedevDaemon { @@ -256,14 +257,19 @@ impl FusedevDaemon { let thread = thread::Builder::new() .name("fuse_server".to_string()) .spawn(move || { - let _ = s.svc_loop(&inflight_op); + if let Err(err) = s.svc_loop(&inflight_op) { + warn!("fuse server exits with err: {:?}, exiting daemon", err); + if let Err(err) = waker.wake() { + error!("fail to exit daemon, error: {:?}", err); + } + } // Notify the daemon controller that one working thread has exited. - let _ = waker.wake(); + Ok(()) }) .map_err(DaemonError::ThreadSpawn)?; - self.threads.lock().unwrap().push(thread); + self.fuse_service_threads.lock().unwrap().push(thread); Ok(()) } @@ -338,8 +344,32 @@ impl NydusDaemon for FusedevDaemon { } fn wait(&self) -> DaemonResult<()> { + self.wait_state_machine()?; + self.wait_service() + } + + fn wait_state_machine(&self) -> DaemonResult<()> { + let mut guard = self.state_machine_thread.lock().unwrap(); + if guard.is_some() { + guard + .take() + .unwrap() + .join() + .map_err(|e| { + DaemonError::WaitDaemon( + *e.downcast::() + .unwrap_or_else(|e| Box::new(eother!(e))), + ) + })? + .map_err(DaemonError::WaitDaemon)?; + } + + Ok(()) + } + + fn wait_service(&self) -> DaemonResult<()> { loop { - let handle = self.threads.lock().unwrap().pop(); + let handle = self.fuse_service_threads.lock().unwrap().pop(); if let Some(handle) = handle { handle .join() @@ -508,11 +538,12 @@ pub fn create_fuse_daemon( result_receiver: Mutex::new(result_receiver), request_sender: Arc::new(Mutex::new(trigger)), service: Arc::new(service), - threads: Mutex::new(Vec::new()), + state_machine_thread: Mutex::new(None), + fuse_service_threads: Mutex::new(Vec::new()), }); let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender); let machine_thread = machine.kick_state_machine()?; - daemon.threads.lock().unwrap().push(machine_thread); + *daemon.state_machine_thread.lock().unwrap() = Some(machine_thread); // Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper // finding a victim is not necessary. @@ -523,6 +554,9 @@ pub fn create_fuse_daemon( daemon.service.mount(cmd)?; } daemon.service.session.lock().unwrap().mount()?; + daemon + .on_event(DaemonStateMachineInput::Mount) + .map_err(|e| eother!(e))?; daemon .on_event(DaemonStateMachineInput::Start) .map_err(|e| eother!(e))?;