diff --git a/api/src/http.rs b/api/src/http.rs index 159bae9ce37..ceaea78a184 100644 --- a/api/src/http.rs +++ b/api/src/http.rs @@ -24,7 +24,8 @@ use url::Url; use crate::http_endpoint_v1::{ EventsHandler, ExitHandler, FsBackendInfo, InfoHandler, MetricsBackendHandler, MetricsBlobcacheHandler, MetricsFilesHandler, MetricsHandler, MetricsInflightHandler, - MetricsPatternHandler, MountHandler, SendFuseFdHandler, TakeoverHandler, HTTP_ROOT_V1, + MetricsPatternHandler, MountHandler, SendFuseFdHandler, StartHandler, TakeoverHandler, + HTTP_ROOT_V1, }; const EXIT_TOKEN: Token = Token(usize::MAX); @@ -72,6 +73,7 @@ pub enum ApiRequest { SendFuseFd, Takeover, Exit, + Start, } #[derive(Debug)] @@ -289,6 +291,7 @@ lazy_static! { r.routes.insert(endpoint_v1!("/daemon/events"), Box::new(EventsHandler{})); r.routes.insert(endpoint_v1!("/daemon/backend"), Box::new(FsBackendInfo{})); r.routes.insert(endpoint_v1!("/daemon/exit"), Box::new(ExitHandler{})); + r.routes.insert(endpoint_v1!("/daemon/start"), Box::new(StartHandler{})); r.routes.insert(endpoint_v1!("/daemon/fuse/sendfd"), Box::new(SendFuseFdHandler{})); r.routes.insert(endpoint_v1!("/daemon/fuse/takeover"), Box::new(TakeoverHandler{})); r.routes.insert(endpoint_v1!("/mount"), Box::new(MountHandler{})); diff --git a/api/src/http_endpoint_v1.rs b/api/src/http_endpoint_v1.rs index e8ce0d3706f..5fc68c8e0db 100644 --- a/api/src/http_endpoint_v1.rs +++ b/api/src/http_endpoint_v1.rs @@ -257,6 +257,23 @@ impl EndpointHandler for TakeoverHandler { } } +pub struct StartHandler {} +impl EndpointHandler for StartHandler { + fn handle_request( + &self, + req: &Request, + kicker: &dyn Fn(ApiRequest) -> ApiResponse, + ) -> HttpResult { + match (req.method(), req.body.as_ref()) { + (Method::Put, None) => { + let r = kicker(ApiRequest::Start); + Ok(convert_to_response(r, HttpError::Upgrade)) + } + _ => Err(HttpError::BadRequest), + } + } +} + pub struct ExitHandler {} impl EndpointHandler for ExitHandler { fn handle_request( diff --git a/src/bin/nydusd/api_server_glue.rs b/src/bin/nydusd/api_server_glue.rs index cac4080f244..2fde88d7281 100644 --- a/src/bin/nydusd/api_server_glue.rs +++ b/src/bin/nydusd/api_server_glue.rs @@ -83,6 +83,7 @@ impl ApiServer { ApiRequest::SendFuseFd => self.send_fuse_fd(), ApiRequest::Takeover => self.do_takeover(), + ApiRequest::Start => self.do_start(), }; self.respond(resp); @@ -290,6 +291,13 @@ impl ApiServer { .map(|_| ApiResponsePayload::Empty) .map_err(|e| ApiError::DaemonAbnormal(e.into())) } + + fn do_start(&self) -> ApiResponse { + let d = self.daemon.as_ref(); + d.trigger_start() + .map(|_| ApiResponsePayload::Empty) + .map_err(|e| ApiError::DaemonAbnormal(e.into())) + } } pub struct ApiSeverSubscriber { diff --git a/src/bin/nydusd/daemon.rs b/src/bin/nydusd/daemon.rs index 0af622679a6..5bbb40c6dad 100644 --- a/src/bin/nydusd/daemon.rs +++ b/src/bin/nydusd/daemon.rs @@ -48,10 +48,9 @@ type BackFileSystem = Box + Sen pub enum DaemonState { INIT = 1, RUNNING = 2, - UPGRADING = 3, - INTERRUPTED = 4, - STOPPED = 5, - UNKNOWN = 6, + READY = 3, + STOPPED = 4, + UNKNOWN = 5, } impl Display for DaemonState { @@ -65,9 +64,8 @@ impl From for DaemonState { match i { 1 => DaemonState::INIT, 2 => DaemonState::RUNNING, - 3 => DaemonState::UPGRADING, - 4 => DaemonState::INTERRUPTED, - 5 => DaemonState::STOPPED, + 3 => DaemonState::READY, + 4 => DaemonState::STOPPED, _ => DaemonState::UNKNOWN, } } @@ -230,12 +228,24 @@ impl FsBackendCollection { pub trait NydusDaemon: DaemonStateMachineSubscriber { fn start(&self) -> DaemonResult<()>; fn wait(&self) -> DaemonResult<()>; + fn wait_service(&self) -> DaemonResult<()> { + Ok(()) + } + fn wait_state_machine(&self) -> DaemonResult<()> { + Ok(()) + } fn stop(&self) -> DaemonResult<()> { let s = self.get_state(); - if s != DaemonState::INTERRUPTED && s != DaemonState::STOPPED { - return self.on_event(DaemonStateMachineInput::Stop); + + if s == DaemonState::STOPPED { + return Ok(()); } - Ok(()) + + if s == DaemonState::RUNNING { + self.on_event(DaemonStateMachineInput::Stop)?; + } + + self.on_event(DaemonStateMachineInput::Stop) } /// close the current FUSE connection to properly shutdown /// the FUSE server daemon. @@ -248,12 +258,27 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber { fn get_state(&self) -> DaemonState; fn set_state(&self, s: DaemonState); fn trigger_exit(&self) -> DaemonResult<()> { + let s = self.get_state(); + + if s == DaemonState::STOPPED { + return Ok(()); + } + + if s == DaemonState::INIT { + return self.on_event(DaemonStateMachineInput::Stop); + } + + if s == DaemonState::RUNNING { + self.on_event(DaemonStateMachineInput::Stop)?; + } + self.on_event(DaemonStateMachineInput::Exit) } fn trigger_takeover(&self) -> DaemonResult<()> { - self.on_event(DaemonStateMachineInput::Takeover)?; - self.on_event(DaemonStateMachineInput::Successful)?; - Ok(()) + self.on_event(DaemonStateMachineInput::Takeover) + } + fn trigger_start(&self) -> DaemonResult<()> { + self.on_event(DaemonStateMachineInput::Start) } fn id(&self) -> Option; fn supervisor(&self) -> Option; @@ -429,35 +454,28 @@ fn fs_backend_factory(cmd: &FsBackendMountCmd) -> DaemonResult { // - `Init` means nydusd is just started and potentially configured well but not // yet negotiate with kernel the capabilities of both sides. It even does not try // to set up fuse session by mounting `/fuse/dev`(in case of `fusedev` backend). +// - `Ready` means nydusd is ready for start or die. Fuse session is created. // - `Running` means nydusd has successfully prepared all the stuff needed to work as a // user-space fuse filesystem, however, the essential capabilities negotiation might not be // done yet. It relies on `fuse-rs` to tell if capability negotiation is done. -// - `Upgrading` state means the nydus daemon is being live-upgraded. There's no need -// to do kernel mount again to set up a session but try to reuse a fuse fd from somewhere else. -// In this state, we try to push `Successful` event to state machine to trigger state transition. -// - `Interrupt` state means nydusd has shutdown fuse server, which means no more message will -// be read from kernel and handled and no pending and in-flight fuse message exists. But the -// nydusd daemon should be alive and wait for coming events. // - `Die` state means the whole nydusd process is going to die. state_machine! { derive(Debug, Clone) pub DaemonStateMachine(Init) - // FIXME: It's possible that failover does not succeed or resource is not capable to - // be passed. To handle event `Stop` when being `Init`. Init => { - Mount => Running [StartService], - Takeover => Upgrading [Restore], - Exit => Die[StopStateMachine], + Mount => Ready, + Takeover => Ready[Restore], + Stop => Die[StopStateMachine], + }, + Ready => { + Start => Running[StartService], Stop => Die[Umount], + Exit => Die[StopStateMachine], }, Running => { - Exit => Interrupted [TerminateFuseService], - Stop => Die[Umount], + Stop => Ready [TerminateFuseService], }, - Upgrading(Successful) => Running [StartService], - // Quit from daemon but not disconnect from fuse front-end. - Interrupted(Stop) => Die[StopStateMachine], } pub struct DaemonStateMachineContext { @@ -524,20 +542,30 @@ impl DaemonStateMachineContext { }), TerminateFuseService => { d.interrupt(); - d.set_state(DaemonState::INTERRUPTED); - Ok(()) + let res = d.wait_service(); + if res.is_ok() { + d.set_state(DaemonState::READY); + } + + res } Umount => d.disconnect().map(|r| { // Always interrupt fuse service loop after shutdown connection to kernel. // In case that kernel does not really shutdown the session due to some reasons // causing service loop keep waiting of `/dev/fuse`. d.interrupt(); + d.wait_service() + .unwrap_or_else(|e| error!("failed to wait service {}", e)); + // at least all fuse thread stopped, no matter what error each thread got d.set_state(DaemonState::STOPPED); r }), Restore => { - d.set_state(DaemonState::UPGRADING); - d.restore() + let res = d.restore(); + if res.is_ok() { + d.set_state(DaemonState::READY); + } + res } StopStateMachine => { d.set_state(DaemonState::STOPPED); @@ -550,9 +578,7 @@ impl DaemonStateMachineContext { // Safe to unwrap because channel is never closed self.result_sender.send(r).unwrap(); // Quit state machine thread if interrupted or stopped - if d.get_state() == DaemonState::INTERRUPTED - || d.get_state() == DaemonState::STOPPED - { + if d.get_state() == DaemonState::STOPPED { break; } } @@ -582,10 +608,19 @@ mod tests { let stat = DaemonState::from(1); assert_eq!(stat, DaemonState::INIT); - let stat = DaemonState::from(6); + let stat = DaemonState::from(2); + assert_eq!(stat, DaemonState::RUNNING); + + let stat = DaemonState::from(3); + assert_eq!(stat, DaemonState::READY); + + let stat = DaemonState::from(4); + assert_eq!(stat, DaemonState::STOPPED); + + let stat = DaemonState::from(5); assert_eq!(stat, DaemonState::UNKNOWN); - let stat = DaemonState::from(7); + let stat = DaemonState::from(8); assert_eq!(stat, DaemonState::UNKNOWN); } diff --git a/src/bin/nydusd/fusedev.rs b/src/bin/nydusd/fusedev.rs index d2c2e492c91..cbdfb1c9971 100644 --- a/src/bin/nydusd/fusedev.rs +++ b/src/bin/nydusd/fusedev.rs @@ -164,7 +164,8 @@ pub struct FusedevDaemon { inflight_ops: Mutex>, result_receiver: Mutex>>, trigger: Arc>, - threads: Mutex>>>, + state_machine_thread: Mutex>>>, + fuse_service_threads: Mutex>>>, } impl FusedevDaemon { @@ -175,14 +176,20 @@ impl FusedevDaemon { let thread = thread::Builder::new() .name("fuse_server".to_string()) .spawn(move || { - let _ = s.svc_loop(&inflight_op); - // quit the daemon if any fuse server thread exits - exit_daemon(); + if let Err(err) = s.svc_loop(&inflight_op) { + warn!("fuse server exits with err: {:?}, exiting daemon", err); + if let Err(e) = thread::Builder::new() + .name("exit_thread".to_string()) + .spawn(exit_daemon) + { + error!("failed to exit daemon, err {}", e); + } + } Ok(()) }) .map_err(DaemonError::ThreadSpawn)?; - self.threads.lock().unwrap().push(thread); + self.fuse_service_threads.lock().unwrap().push(thread); Ok(()) } @@ -230,8 +237,32 @@ impl NydusDaemon for FusedevDaemon { } fn wait(&self) -> DaemonResult<()> { + self.wait_state_machine()?; + self.wait_service() + } + + fn wait_state_machine(&self) -> DaemonResult<()> { + let mut guard = self.state_machine_thread.lock().unwrap(); + if guard.is_some() { + guard + .take() + .unwrap() + .join() + .map_err(|e| { + DaemonError::WaitDaemon( + *e.downcast::() + .unwrap_or_else(|e| Box::new(eother!(e))), + ) + })? + .map_err(DaemonError::WaitDaemon)?; + } + + Ok(()) + } + + fn wait_service(&self) -> DaemonResult<()> { loop { - let handle = self.threads.lock().unwrap().pop(); + let handle = self.fuse_service_threads.lock().unwrap().pop(); if let Some(handle) = handle { handle .join() @@ -477,12 +508,13 @@ pub fn create_nydus_daemon( inflight_ops: Mutex::new(Vec::new()), result_receiver: Mutex::new(result_receiver), trigger: Arc::new(Mutex::new(trigger)), - threads: Mutex::new(Vec::new()), + state_machine_thread: Mutex::new(None), + fuse_service_threads: Mutex::new(Vec::new()), }); let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender); let machine_thread = machine.kick_state_machine()?; - daemon.threads.lock().unwrap().push(machine_thread); + *daemon.state_machine_thread.lock().unwrap() = Some(machine_thread); // Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper // finding a victim is not necessary. @@ -496,6 +528,9 @@ pub fn create_nydus_daemon( daemon .on_event(DaemonStateMachineInput::Mount) .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Start) + .map_err(|e| eother!(e))?; daemon.conn.store(calc_fuse_conn(mnt)?, Ordering::Relaxed); }