Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact: nydusd's state machine #439

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use url::Url;
use crate::http_endpoint_v1::{
EventsHandler, ExitHandler, FsBackendInfo, InfoHandler, MetricsBackendHandler,
MetricsBlobcacheHandler, MetricsFilesHandler, MetricsHandler, MetricsInflightHandler,
MetricsPatternHandler, MountHandler, SendFuseFdHandler, TakeoverHandler, HTTP_ROOT_V1,
MetricsPatternHandler, MountHandler, SendFuseFdHandler, StartHandler, TakeoverHandler,
HTTP_ROOT_V1,
};
use crate::http_endpoint_v2::{BlobObjectListHandlerV2, HTTP_ROOT_V2};

Expand Down Expand Up @@ -126,6 +127,7 @@ pub enum ApiRequest {
ExportFilesMetrics(Option<String>, bool),
Exit,
Takeover,
Start,

// Filesystem Related
Mount(String, ApiMountCmd),
Expand Down Expand Up @@ -398,6 +400,8 @@ lazy_static! {
r.routes.insert(endpoint_v2!("/daemon/events"), Box::new(EventsHandler{}));
r.routes.insert(endpoint_v1!("/daemon/exit"), Box::new(ExitHandler{}));
r.routes.insert(endpoint_v2!("/daemon/exit"), Box::new(ExitHandler{}));
r.routes.insert(endpoint_v1!("/daemon/start"), Box::new(StartHandler{}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we relocate the API path to /daemon/fuse/start since the handler of endpoint /daemon/start just starts fuse server's service loop. For nydusd itself, it is already started, otherwise how it can handle API requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we relocate the API path to /daemon/fuse/start since the handler of endpoint /daemon/start just starts fuse server's service loop. For nydusd itself, it is already started, otherwise how it can handle API requests?

The start endpoint works with status Ready and RUNNING, which are designed for only fuse but also for other implements. Maybe run is more comfortable?

r.routes.insert(endpoint_v2!("/daemon/start"), Box::new(StartHandler{}));
r.routes.insert(endpoint_v1!("/metrics/backend"), Box::new(MetricsBackendHandler{}));
r.routes.insert(endpoint_v2!("/metrics/backend"), Box::new(MetricsBackendHandler{}));
r.routes.insert(endpoint_v1!("/metrics/blobcache"), Box::new(MetricsBlobcacheHandler{}));
Expand Down
17 changes: 17 additions & 0 deletions api/src/http_endpoint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,23 @@ impl EndpointHandler for MountHandler {
}
}

pub struct StartHandler {}
impl EndpointHandler for StartHandler {
fn handle_request(
&self,
req: &Request,
kicker: &dyn Fn(ApiRequest) -> ApiResponse,
) -> HttpResult {
match (req.method(), req.body.as_ref()) {
(Method::Put, None) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PUT method in RESTful API design means that the operation can be executed multiple times and always have an idempotent state, is this what the API expects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PUT method in RESTful API design means that the operation can be executed multiple times and always have an idempotent state, is this what the API expects?

It's expected that daemon in READY is trigger to RUNNING. Just like takeover endpoint with PUT method, it's required for daemon in certain status.

let r = kicker(ApiRequest::Start);
Ok(convert_to_response(r, HttpError::Upgrade))
}
_ => Err(HttpError::BadRequest),
}
}
}

pub struct MetricsInflightHandler {}
impl EndpointHandler for MetricsInflightHandler {
fn handle_request(
Expand Down
8 changes: 8 additions & 0 deletions src/bin/nydusd/api_server_glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl ApiServer {
ApiRequest::ConfigureDaemon(conf) => self.configure_daemon(conf),
ApiRequest::DaemonInfo => self.daemon_info(true),
ApiRequest::Exit => self.do_exit(),
ApiRequest::Start => self.do_start(),
ApiRequest::Takeover => self.do_takeover(),
ApiRequest::Events => Self::events(),
ApiRequest::ExportGlobalMetrics(id) => Self::export_global_metrics(id),
Expand Down Expand Up @@ -325,6 +326,13 @@ impl ApiServer {
}
}
}

fn do_start(&self) -> ApiResponse {
let d = self.get_daemon_object()?;
d.trigger_start()
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
}
}

struct ApiServerHandler {
Expand Down
112 changes: 73 additions & 39 deletions src/bin/nydusd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ use crate::upgrade::UpgradeMgrError;
pub enum DaemonState {
INIT = 1,
RUNNING = 2,
UPGRADING = 3,
INTERRUPTED = 4,
STOPPED = 5,
UNKNOWN = 6,
READY = 3,
STOPPED = 4,
UNKNOWN = 5,
}

impl Display for DaemonState {
Expand All @@ -52,9 +51,8 @@ impl From<i32> for DaemonState {
match i {
1 => DaemonState::INIT,
2 => DaemonState::RUNNING,
3 => DaemonState::UPGRADING,
4 => DaemonState::INTERRUPTED,
5 => DaemonState::STOPPED,
3 => DaemonState::READY,
4 => DaemonState::STOPPED,
_ => DaemonState::UNKNOWN,
}
}
Expand Down Expand Up @@ -196,23 +194,49 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {
fn interrupt(&self) {}
fn stop(&self) -> DaemonResult<()> {
let s = self.get_state();
if s != DaemonState::INTERRUPTED && s != DaemonState::STOPPED {
return self.on_event(DaemonStateMachineInput::Stop);

if s == DaemonState::STOPPED {
return Ok(());
}
Ok(())

if s == DaemonState::RUNNING {
self.on_event(DaemonStateMachineInput::Stop)?;
}

self.on_event(DaemonStateMachineInput::Stop)
}
fn wait(&self) -> DaemonResult<()>;
fn wait_service(&self) -> DaemonResult<()> {
Ok(())
}
fn wait_state_machine(&self) -> DaemonResult<()> {
Ok(())
}
fn trigger_exit(&self) -> DaemonResult<()> {
let s = self.get_state();

if s == DaemonState::STOPPED {
return Ok(());
}

if s == DaemonState::INIT {
return self.on_event(DaemonStateMachineInput::Stop);
}

if s == DaemonState::RUNNING {
self.on_event(DaemonStateMachineInput::Stop)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not merge the two events handling if blocks.
They both react on Stop event

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not merge the two events handling if blocks. They both react on Stop event

The INIT can transfer to DIE by stop directly. While the RUNNING is transferred to DIE via READY. For INIT, there is only one stop event, instead of two stop events for RUNNING.

}

self.on_event(DaemonStateMachineInput::Exit)
}

fn supervisor(&self) -> Option<String>;
fn save(&self) -> DaemonResult<()>;
fn restore(&self) -> DaemonResult<()>;
fn trigger_takeover(&self) -> DaemonResult<()> {
self.on_event(DaemonStateMachineInput::Takeover)?;
self.on_event(DaemonStateMachineInput::Successful)?;
Ok(())
self.on_event(DaemonStateMachineInput::Takeover)
}
fn trigger_start(&self) -> DaemonResult<()> {
self.on_event(DaemonStateMachineInput::Start)
}

// For backward compatibility.
Expand All @@ -225,35 +249,28 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {
// - `Init` means nydusd is just started and potentially configured well but not
// yet negotiate with kernel the capabilities of both sides. It even does not try
// to set up fuse session by mounting `/fuse/dev`(in case of `fusedev` backend).
// - `Ready` means nydusd is ready for start or die. Fuse session is created.
// - `Running` means nydusd has successfully prepared all the stuff needed to work as a
// user-space fuse filesystem, however, the essential capabilities negotiation might not be
// done yet. It relies on `fuse-rs` to tell if capability negotiation is done.
// - `Upgrading` state means the nydus daemon is being live-upgraded. There's no need
// to do kernel mount again to set up a session but try to reuse a fuse fd from somewhere else.
// In this state, we try to push `Successful` event to state machine to trigger state transition.
// - `Interrupted` state means nydusd has shutdown fuse server, which means no more message will
// be read from kernel and handled and no pending and in-flight fuse message exists. But the
// nydusd daemon should be alive and wait for coming events.
// - `Die` state means the whole nydusd process is going to die.
state_machine! {
derive(Debug, Clone)
pub DaemonStateMachine(Init)

// FIXME: It's possible that failover does not succeed or resource is not capable to
// be passed. To handle event `Stop` when being `Init`.
Init => {
Start => Running [StartService],
Takeover => Upgrading [Restore],
Exit => Die[StopStateMachine],
Mount => Ready,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we no longer need Mount event since nothing is done when migrating state to Ready

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we no longer need Mount event since nothing is done when migrating state to Ready

In fact, we shall do Mount action. However, it's a bit risky now. Because the Mount content are different in different cases, such as upgrade startup, normal startup, endpoint startup. In total, You're right. There will be abstraction for Mount action in future.

Takeover => Ready[Restore],
Stop => Die[StopStateMachine],
},
Ready => {
Start => Running[StartService],
Stop => Die[Umount],
Exit => Die[StopStateMachine],
},
Running => {
Exit => Interrupted [TerminateService],
Stop => Die[Umount],
Stop => Ready [TerminateService],
},
Upgrading(Successful) => Running [StartService],
// Quit from daemon but not disconnect from fuse front-end.
Interrupted(Stop) => Die[StopStateMachine],
}

/// Implementation of the state machine defined by `DaemonStateMachine`.
Expand Down Expand Up @@ -322,20 +339,30 @@ impl DaemonStateMachineContext {
}),
TerminateService => {
d.interrupt();
d.set_state(DaemonState::INTERRUPTED);
Ok(())
let res = d.wait_service();
if res.is_ok() {
d.set_state(DaemonState::READY);
}

res
}
Umount => d.disconnect().map(|r| {
// Always interrupt fuse service loop after shutdown connection to kernel.
// In case that kernel does not really shutdown the session due to some reasons
// causing service loop keep waiting of `/dev/fuse`.
d.interrupt();
d.wait_service()
.unwrap_or_else(|e| error!("failed to wait service {}", e));
// at least all fuse thread stopped, no matter what error each thread got
d.set_state(DaemonState::STOPPED);
r
}),
Restore => {
d.set_state(DaemonState::UPGRADING);
d.restore()
let res = d.restore();
if res.is_ok() {
d.set_state(DaemonState::READY);
}
res
}
StopStateMachine => {
d.set_state(DaemonState::STOPPED);
Expand All @@ -348,9 +375,7 @@ impl DaemonStateMachineContext {
// Safe to unwrap because channel is never closed
self.result_sender.send(r).unwrap();
// Quit state machine thread if interrupted or stopped
if d.get_state() == DaemonState::INTERRUPTED
|| d.get_state() == DaemonState::STOPPED
{
if d.get_state() == DaemonState::STOPPED {
break;
}
}
Expand Down Expand Up @@ -380,10 +405,19 @@ mod tests {
let stat = DaemonState::from(1);
assert_eq!(stat, DaemonState::INIT);

let stat = DaemonState::from(6);
let stat = DaemonState::from(2);
assert_eq!(stat, DaemonState::RUNNING);

let stat = DaemonState::from(3);
assert_eq!(stat, DaemonState::READY);

let stat = DaemonState::from(4);
assert_eq!(stat, DaemonState::STOPPED);

let stat = DaemonState::from(5);
assert_eq!(stat, DaemonState::UNKNOWN);

let stat = DaemonState::from(7);
let stat = DaemonState::from(8);
assert_eq!(stat, DaemonState::UNKNOWN);
}

Expand Down
48 changes: 41 additions & 7 deletions src/bin/nydusd/fusedev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ pub struct FusedevDaemon {
state: AtomicI32,
supervisor: Option<String>,
threads_cnt: u32,
threads: Mutex<Vec<JoinHandle<Result<()>>>>,
state_machine_thread: Mutex<Option<JoinHandle<Result<()>>>>,
fuse_service_threads: Mutex<Vec<JoinHandle<Result<()>>>>,
}

impl FusedevDaemon {
Expand All @@ -256,14 +257,19 @@ impl FusedevDaemon {
let thread = thread::Builder::new()
.name("fuse_server".to_string())
.spawn(move || {
let _ = s.svc_loop(&inflight_op);
if let Err(err) = s.svc_loop(&inflight_op) {
warn!("fuse server exits with err: {:?}, exiting daemon", err);
if let Err(err) = waker.wake() {
error!("fail to exit daemon, error: {:?}", err);
}
}
// Notify the daemon controller that one working thread has exited.
let _ = waker.wake();

Ok(())
})
.map_err(DaemonError::ThreadSpawn)?;

self.threads.lock().unwrap().push(thread);
self.fuse_service_threads.lock().unwrap().push(thread);

Ok(())
}
Expand Down Expand Up @@ -338,8 +344,32 @@ impl NydusDaemon for FusedevDaemon {
}

fn wait(&self) -> DaemonResult<()> {
self.wait_state_machine()?;
self.wait_service()
}

fn wait_state_machine(&self) -> DaemonResult<()> {
let mut guard = self.state_machine_thread.lock().unwrap();
if guard.is_some() {
guard
.take()
.unwrap()
.join()
.map_err(|e| {
DaemonError::WaitDaemon(
*e.downcast::<Error>()
.unwrap_or_else(|e| Box::new(eother!(e))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Um. Hard error conversion path to io::Error, maybe let WaitDaemon wrap another type of error message is an easier way. But has nothing to do with this PR.

)
})?
.map_err(DaemonError::WaitDaemon)?;
}

Ok(())
}

fn wait_service(&self) -> DaemonResult<()> {
loop {
let handle = self.threads.lock().unwrap().pop();
let handle = self.fuse_service_threads.lock().unwrap().pop();
if let Some(handle) = handle {
handle
.join()
Expand Down Expand Up @@ -508,11 +538,12 @@ pub fn create_fuse_daemon(
result_receiver: Mutex::new(result_receiver),
request_sender: Arc::new(Mutex::new(trigger)),
service: Arc::new(service),
threads: Mutex::new(Vec::new()),
state_machine_thread: Mutex::new(None),
fuse_service_threads: Mutex::new(Vec::new()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is reaping fuse threads important?
Only wait for state machine termination?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is reaping fuse threads important? Only wait for state machine termination?

Yep, seeking for waiting state machine and fuse service thread termination separately. Because daemon status is only updated after successful action. For example, interrupt action is executed, then wait fuse service workers done, finally the daemon status is changed from RUNNING to READY.

});
let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender);
let machine_thread = machine.kick_state_machine()?;
daemon.threads.lock().unwrap().push(machine_thread);
*daemon.state_machine_thread.lock().unwrap() = Some(machine_thread);

// Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper
// finding a victim is not necessary.
Expand All @@ -523,6 +554,9 @@ pub fn create_fuse_daemon(
daemon.service.mount(cmd)?;
}
daemon.service.session.lock().unwrap().mount()?;
daemon
.on_event(DaemonStateMachineInput::Mount)
.map_err(|e| eother!(e))?;
daemon
.on_event(DaemonStateMachineInput::Start)
.map_err(|e| eother!(e))?;
Expand Down