Skip to content

Commit

Permalink
refact: daemon state machine context
Browse files Browse the repository at this point in the history
`UPGRADING` and `INTERRUPTED` are designed for hot-upgrading, which is a use case instead of daemon's state. As a result, they are weird in lack of upgrading function.

Merge `UPGRADING` and `INTERRUPTED` to `READY`. `READY` means service is well-configured, but waiting for trigger. For fuse-based daemon, it means fuse device is mounted, while fuse mountpoint may be not attached. The daemon lives in `INIT` -> `READY` -> `RUNNING` -> `READY` -> `DIE`.

1. Daemon context with new status.
2. Async `TerminateFuseService`, `Umount` actions.
3. `Start` endpoint, which is usually used after `takeover` endpoint.
4. Async exit after fuse device unmounted abnormally.

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
  • Loading branch information
泰友 committed May 23, 2022
1 parent 027b011 commit da31b99
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 47 deletions.
5 changes: 4 additions & 1 deletion api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use url::Url;
use crate::http_endpoint_v1::{
EventsHandler, ExitHandler, FsBackendInfo, InfoHandler, MetricsBackendHandler,
MetricsBlobcacheHandler, MetricsFilesHandler, MetricsHandler, MetricsInflightHandler,
MetricsPatternHandler, MountHandler, SendFuseFdHandler, TakeoverHandler, HTTP_ROOT_V1,
MetricsPatternHandler, MountHandler, SendFuseFdHandler, StartHandler, TakeoverHandler,
HTTP_ROOT_V1,
};

const EXIT_TOKEN: Token = Token(usize::MAX);
Expand Down Expand Up @@ -72,6 +73,7 @@ pub enum ApiRequest {
SendFuseFd,
Takeover,
Exit,
Start,
}

#[derive(Debug)]
Expand Down Expand Up @@ -289,6 +291,7 @@ lazy_static! {
r.routes.insert(endpoint_v1!("/daemon/events"), Box::new(EventsHandler{}));
r.routes.insert(endpoint_v1!("/daemon/backend"), Box::new(FsBackendInfo{}));
r.routes.insert(endpoint_v1!("/daemon/exit"), Box::new(ExitHandler{}));
r.routes.insert(endpoint_v1!("/daemon/start"), Box::new(StartHandler{}));
r.routes.insert(endpoint_v1!("/daemon/fuse/sendfd"), Box::new(SendFuseFdHandler{}));
r.routes.insert(endpoint_v1!("/daemon/fuse/takeover"), Box::new(TakeoverHandler{}));
r.routes.insert(endpoint_v1!("/mount"), Box::new(MountHandler{}));
Expand Down
17 changes: 17 additions & 0 deletions api/src/http_endpoint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,23 @@ impl EndpointHandler for TakeoverHandler {
}
}

pub struct StartHandler {}
impl EndpointHandler for StartHandler {
fn handle_request(
&self,
req: &Request,
kicker: &dyn Fn(ApiRequest) -> ApiResponse,
) -> HttpResult {
match (req.method(), req.body.as_ref()) {
(Method::Put, None) => {
let r = kicker(ApiRequest::Start);
Ok(convert_to_response(r, HttpError::Upgrade))
}
_ => Err(HttpError::BadRequest),
}
}
}

pub struct ExitHandler {}
impl EndpointHandler for ExitHandler {
fn handle_request(
Expand Down
8 changes: 8 additions & 0 deletions src/bin/nydusd/api_server_glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl ApiServer {

ApiRequest::SendFuseFd => self.send_fuse_fd(),
ApiRequest::Takeover => self.do_takeover(),
ApiRequest::Start => self.do_start(),
};

self.respond(resp);
Expand Down Expand Up @@ -290,6 +291,13 @@ impl ApiServer {
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
}

fn do_start(&self) -> ApiResponse {
let d = self.daemon.as_ref();
d.trigger_start()
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
}
}

pub struct ApiSeverSubscriber {
Expand Down
111 changes: 73 additions & 38 deletions src/bin/nydusd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ type BackFileSystem = Box<dyn BackendFileSystem<Inode = u64, Handle = u64> + Sen
pub enum DaemonState {
INIT = 1,
RUNNING = 2,
UPGRADING = 3,
INTERRUPTED = 4,
STOPPED = 5,
UNKNOWN = 6,
READY = 3,
STOPPED = 4,
UNKNOWN = 5,
}

impl Display for DaemonState {
Expand All @@ -65,9 +64,8 @@ impl From<i32> for DaemonState {
match i {
1 => DaemonState::INIT,
2 => DaemonState::RUNNING,
3 => DaemonState::UPGRADING,
4 => DaemonState::INTERRUPTED,
5 => DaemonState::STOPPED,
3 => DaemonState::READY,
4 => DaemonState::STOPPED,
_ => DaemonState::UNKNOWN,
}
}
Expand Down Expand Up @@ -230,12 +228,24 @@ impl FsBackendCollection {
pub trait NydusDaemon: DaemonStateMachineSubscriber {
fn start(&self) -> DaemonResult<()>;
fn wait(&self) -> DaemonResult<()>;
fn wait_service(&self) -> DaemonResult<()> {
Ok(())
}
fn wait_state_machine(&self) -> DaemonResult<()> {
Ok(())
}
fn stop(&self) -> DaemonResult<()> {
let s = self.get_state();
if s != DaemonState::INTERRUPTED && s != DaemonState::STOPPED {
return self.on_event(DaemonStateMachineInput::Stop);

if s == DaemonState::STOPPED {
return Ok(());
}
Ok(())

if s == DaemonState::RUNNING {
self.on_event(DaemonStateMachineInput::Stop)?;
}

self.on_event(DaemonStateMachineInput::Stop)
}
/// close the current FUSE connection to properly shutdown
/// the FUSE server daemon.
Expand All @@ -248,12 +258,27 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber {
fn get_state(&self) -> DaemonState;
fn set_state(&self, s: DaemonState);
fn trigger_exit(&self) -> DaemonResult<()> {
let s = self.get_state();

if s == DaemonState::STOPPED {
return Ok(());
}

if s == DaemonState::INIT {
return self.on_event(DaemonStateMachineInput::Stop);
}

if s == DaemonState::RUNNING {
self.on_event(DaemonStateMachineInput::Stop)?;
}

self.on_event(DaemonStateMachineInput::Exit)
}
fn trigger_takeover(&self) -> DaemonResult<()> {
self.on_event(DaemonStateMachineInput::Takeover)?;
self.on_event(DaemonStateMachineInput::Successful)?;
Ok(())
self.on_event(DaemonStateMachineInput::Takeover)
}
fn trigger_start(&self) -> DaemonResult<()> {
self.on_event(DaemonStateMachineInput::Start)
}
fn id(&self) -> Option<String>;
fn supervisor(&self) -> Option<String>;
Expand Down Expand Up @@ -429,35 +454,28 @@ fn fs_backend_factory(cmd: &FsBackendMountCmd) -> DaemonResult<BackFileSystem> {
// - `Init` means nydusd is just started and potentially configured well but not
// yet negotiate with kernel the capabilities of both sides. It even does not try
// to set up fuse session by mounting `/fuse/dev`(in case of `fusedev` backend).
// - `Ready` means nydusd is ready for start or die. Fuse session is created.
// - `Running` means nydusd has successfully prepared all the stuff needed to work as a
// user-space fuse filesystem, however, the essential capabilities negotiation might not be
// done yet. It relies on `fuse-rs` to tell if capability negotiation is done.
// - `Upgrading` state means the nydus daemon is being live-upgraded. There's no need
// to do kernel mount again to set up a session but try to reuse a fuse fd from somewhere else.
// In this state, we try to push `Successful` event to state machine to trigger state transition.
// - `Interrupt` state means nydusd has shutdown fuse server, which means no more message will
// be read from kernel and handled and no pending and in-flight fuse message exists. But the
// nydusd daemon should be alive and wait for coming events.
// - `Die` state means the whole nydusd process is going to die.
state_machine! {
derive(Debug, Clone)
pub DaemonStateMachine(Init)

// FIXME: It's possible that failover does not succeed or resource is not capable to
// be passed. To handle event `Stop` when being `Init`.
Init => {
Mount => Running [StartService],
Takeover => Upgrading [Restore],
Exit => Die[StopStateMachine],
Mount => Ready,
Takeover => Ready[Restore],
Stop => Die[StopStateMachine],
},
Ready => {
Start => Running[StartService],
Stop => Die[Umount],
Exit => Die[StopStateMachine],
},
Running => {
Exit => Interrupted [TerminateFuseService],
Stop => Die[Umount],
Stop => Ready [TerminateFuseService],
},
Upgrading(Successful) => Running [StartService],
// Quit from daemon but not disconnect from fuse front-end.
Interrupted(Stop) => Die[StopStateMachine],
}

pub struct DaemonStateMachineContext {
Expand Down Expand Up @@ -524,20 +542,30 @@ impl DaemonStateMachineContext {
}),
TerminateFuseService => {
d.interrupt();
d.set_state(DaemonState::INTERRUPTED);
Ok(())
let res = d.wait_service();
if res.is_ok() {
d.set_state(DaemonState::READY);
}

res
}
Umount => d.disconnect().map(|r| {
// Always interrupt fuse service loop after shutdown connection to kernel.
// In case that kernel does not really shutdown the session due to some reasons
// causing service loop keep waiting of `/dev/fuse`.
d.interrupt();
d.wait_service()
.unwrap_or_else(|e| error!("failed to wait service {}", e));
// at least all fuse thread stopped, no matter what error each thread got
d.set_state(DaemonState::STOPPED);
r
}),
Restore => {
d.set_state(DaemonState::UPGRADING);
d.restore()
let res = d.restore();
if res.is_ok() {
d.set_state(DaemonState::READY);
}
res
}
StopStateMachine => {
d.set_state(DaemonState::STOPPED);
Expand All @@ -550,9 +578,7 @@ impl DaemonStateMachineContext {
// Safe to unwrap because channel is never closed
self.result_sender.send(r).unwrap();
// Quit state machine thread if interrupted or stopped
if d.get_state() == DaemonState::INTERRUPTED
|| d.get_state() == DaemonState::STOPPED
{
if d.get_state() == DaemonState::STOPPED {
break;
}
}
Expand Down Expand Up @@ -582,10 +608,19 @@ mod tests {
let stat = DaemonState::from(1);
assert_eq!(stat, DaemonState::INIT);

let stat = DaemonState::from(6);
let stat = DaemonState::from(2);
assert_eq!(stat, DaemonState::RUNNING);

let stat = DaemonState::from(3);
assert_eq!(stat, DaemonState::READY);

let stat = DaemonState::from(4);
assert_eq!(stat, DaemonState::STOPPED);

let stat = DaemonState::from(5);
assert_eq!(stat, DaemonState::UNKNOWN);

let stat = DaemonState::from(7);
let stat = DaemonState::from(8);
assert_eq!(stat, DaemonState::UNKNOWN);
}

Expand Down
51 changes: 43 additions & 8 deletions src/bin/nydusd/fusedev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ pub struct FusedevDaemon {
inflight_ops: Mutex<Vec<FuseOpWrapper>>,
result_receiver: Mutex<Receiver<DaemonResult<()>>>,
trigger: Arc<Mutex<Trigger>>,
threads: Mutex<Vec<JoinHandle<Result<()>>>>,
state_machine_thread: Mutex<Option<JoinHandle<Result<()>>>>,
fuse_service_threads: Mutex<Vec<JoinHandle<Result<()>>>>,
}

impl FusedevDaemon {
Expand All @@ -175,14 +176,20 @@ impl FusedevDaemon {
let thread = thread::Builder::new()
.name("fuse_server".to_string())
.spawn(move || {
let _ = s.svc_loop(&inflight_op);
// quit the daemon if any fuse server thread exits
exit_daemon();
if let Err(err) = s.svc_loop(&inflight_op) {
warn!("fuse server exits with err: {:?}, exiting daemon", err);
if let Err(e) = thread::Builder::new()
.name("exit_thread".to_string())
.spawn(exit_daemon)
{
error!("failed to exit daemon, err {}", e);
}
}
Ok(())
})
.map_err(DaemonError::ThreadSpawn)?;

self.threads.lock().unwrap().push(thread);
self.fuse_service_threads.lock().unwrap().push(thread);

Ok(())
}
Expand Down Expand Up @@ -230,8 +237,32 @@ impl NydusDaemon for FusedevDaemon {
}

fn wait(&self) -> DaemonResult<()> {
self.wait_state_machine()?;
self.wait_service()
}

fn wait_state_machine(&self) -> DaemonResult<()> {
let mut guard = self.state_machine_thread.lock().unwrap();
if guard.is_some() {
guard
.take()
.unwrap()
.join()
.map_err(|e| {
DaemonError::WaitDaemon(
*e.downcast::<Error>()
.unwrap_or_else(|e| Box::new(eother!(e))),
)
})?
.map_err(DaemonError::WaitDaemon)?;
}

Ok(())
}

fn wait_service(&self) -> DaemonResult<()> {
loop {
let handle = self.threads.lock().unwrap().pop();
let handle = self.fuse_service_threads.lock().unwrap().pop();
if let Some(handle) = handle {
handle
.join()
Expand Down Expand Up @@ -477,12 +508,13 @@ pub fn create_nydus_daemon(
inflight_ops: Mutex::new(Vec::new()),
result_receiver: Mutex::new(result_receiver),
trigger: Arc::new(Mutex::new(trigger)),
threads: Mutex::new(Vec::new()),
state_machine_thread: Mutex::new(None),
fuse_service_threads: Mutex::new(Vec::new()),
});

let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender);
let machine_thread = machine.kick_state_machine()?;
daemon.threads.lock().unwrap().push(machine_thread);
*daemon.state_machine_thread.lock().unwrap() = Some(machine_thread);

// Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper
// finding a victim is not necessary.
Expand All @@ -496,6 +528,9 @@ pub fn create_nydus_daemon(
daemon
.on_event(DaemonStateMachineInput::Mount)
.map_err(|e| eother!(e))?;
daemon
.on_event(DaemonStateMachineInput::Start)
.map_err(|e| eother!(e))?;
daemon.conn.store(calc_fuse_conn(mnt)?, Ordering::Relaxed);
}

Expand Down

0 comments on commit da31b99

Please sign in to comment.