Skip to content

Commit

Permalink
Merge pull request #439 from ccx1024cc/upmaster
Browse files Browse the repository at this point in the history
refact: nydusd's state machine
  • Loading branch information
imeoer authored May 31, 2022
2 parents a0fc4cb + 8a7b4ce commit 6be13a7
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 47 deletions.
6 changes: 5 additions & 1 deletion api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use url::Url;
use crate::http_endpoint_v1::{
EventsHandler, ExitHandler, FsBackendInfo, InfoHandler, MetricsBackendHandler,
MetricsBlobcacheHandler, MetricsFilesHandler, MetricsHandler, MetricsInflightHandler,
MetricsPatternHandler, MountHandler, SendFuseFdHandler, TakeoverHandler, HTTP_ROOT_V1,
MetricsPatternHandler, MountHandler, SendFuseFdHandler, StartHandler, TakeoverHandler,
HTTP_ROOT_V1,
};
use crate::http_endpoint_v2::{BlobObjectListHandlerV2, HTTP_ROOT_V2};

Expand Down Expand Up @@ -126,6 +127,7 @@ pub enum ApiRequest {
ExportFilesMetrics(Option<String>, bool),
Exit,
Takeover,
Start,

// Filesystem Related
Mount(String, ApiMountCmd),
Expand Down Expand Up @@ -398,6 +400,8 @@ lazy_static! {
r.routes.insert(endpoint_v2!("/daemon/events"), Box::new(EventsHandler{}));
r.routes.insert(endpoint_v1!("/daemon/exit"), Box::new(ExitHandler{}));
r.routes.insert(endpoint_v2!("/daemon/exit"), Box::new(ExitHandler{}));
r.routes.insert(endpoint_v1!("/daemon/start"), Box::new(StartHandler{}));
r.routes.insert(endpoint_v2!("/daemon/start"), Box::new(StartHandler{}));
r.routes.insert(endpoint_v1!("/metrics/backend"), Box::new(MetricsBackendHandler{}));
r.routes.insert(endpoint_v2!("/metrics/backend"), Box::new(MetricsBackendHandler{}));
r.routes.insert(endpoint_v1!("/metrics/blobcache"), Box::new(MetricsBlobcacheHandler{}));
Expand Down
17 changes: 17 additions & 0 deletions api/src/http_endpoint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,23 @@ impl EndpointHandler for MountHandler {
}
}

pub struct StartHandler {}
impl EndpointHandler for StartHandler {
fn handle_request(
&self,
req: &Request,
kicker: &dyn Fn(ApiRequest) -> ApiResponse,
) -> HttpResult {
match (req.method(), req.body.as_ref()) {
(Method::Put, None) => {
let r = kicker(ApiRequest::Start);
Ok(convert_to_response(r, HttpError::Upgrade))
}
_ => Err(HttpError::BadRequest),
}
}
}

pub struct MetricsInflightHandler {}
impl EndpointHandler for MetricsInflightHandler {
fn handle_request(
Expand Down
8 changes: 8 additions & 0 deletions src/bin/nydusd/api_server_glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl ApiServer {
ApiRequest::ConfigureDaemon(conf) => self.configure_daemon(conf),
ApiRequest::DaemonInfo => self.daemon_info(true),
ApiRequest::Exit => self.do_exit(),
ApiRequest::Start => self.do_start(),
ApiRequest::Takeover => self.do_takeover(),
ApiRequest::Events => Self::events(),
ApiRequest::ExportGlobalMetrics(id) => Self::export_global_metrics(id),
Expand Down Expand Up @@ -325,6 +326,13 @@ impl ApiServer {
}
}
}

fn do_start(&self) -> ApiResponse {
let d = self.get_daemon_object()?;
d.trigger_start()
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
}
}

struct ApiServerHandler {
Expand Down
112 changes: 73 additions & 39 deletions src/bin/nydusd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ use crate::upgrade::UpgradeMgrError;
pub enum DaemonState {
INIT = 1,
RUNNING = 2,
UPGRADING = 3,
INTERRUPTED = 4,
STOPPED = 5,
UNKNOWN = 6,
READY = 3,
STOPPED = 4,
UNKNOWN = 5,
}

impl Display for DaemonState {
Expand All @@ -52,9 +51,8 @@ impl From<i32> for DaemonState {
match i {
1 => DaemonState::INIT,
2 => DaemonState::RUNNING,
3 => DaemonState::UPGRADING,
4 => DaemonState::INTERRUPTED,
5 => DaemonState::STOPPED,
3 => DaemonState::READY,
4 => DaemonState::STOPPED,
_ => DaemonState::UNKNOWN,
}
}
Expand Down Expand Up @@ -196,23 +194,49 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {
fn interrupt(&self) {}
fn stop(&self) -> DaemonResult<()> {
let s = self.get_state();
if s != DaemonState::INTERRUPTED && s != DaemonState::STOPPED {
return self.on_event(DaemonStateMachineInput::Stop);

if s == DaemonState::STOPPED {
return Ok(());
}
Ok(())

if s == DaemonState::RUNNING {
self.on_event(DaemonStateMachineInput::Stop)?;
}

self.on_event(DaemonStateMachineInput::Stop)
}
fn wait(&self) -> DaemonResult<()>;
fn wait_service(&self) -> DaemonResult<()> {
Ok(())
}
fn wait_state_machine(&self) -> DaemonResult<()> {
Ok(())
}
fn trigger_exit(&self) -> DaemonResult<()> {
let s = self.get_state();

if s == DaemonState::STOPPED {
return Ok(());
}

if s == DaemonState::INIT {
return self.on_event(DaemonStateMachineInput::Stop);
}

if s == DaemonState::RUNNING {
self.on_event(DaemonStateMachineInput::Stop)?;
}

self.on_event(DaemonStateMachineInput::Exit)
}

fn supervisor(&self) -> Option<String>;
fn save(&self) -> DaemonResult<()>;
fn restore(&self) -> DaemonResult<()>;
fn trigger_takeover(&self) -> DaemonResult<()> {
self.on_event(DaemonStateMachineInput::Takeover)?;
self.on_event(DaemonStateMachineInput::Successful)?;
Ok(())
self.on_event(DaemonStateMachineInput::Takeover)
}
fn trigger_start(&self) -> DaemonResult<()> {
self.on_event(DaemonStateMachineInput::Start)
}

// For backward compatibility.
Expand All @@ -225,35 +249,28 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {
// - `Init` means nydusd is just started and potentially configured well but not
// yet negotiate with kernel the capabilities of both sides. It even does not try
// to set up fuse session by mounting `/fuse/dev`(in case of `fusedev` backend).
// - `Ready` means nydusd is ready for start or die. Fuse session is created.
// - `Running` means nydusd has successfully prepared all the stuff needed to work as a
// user-space fuse filesystem, however, the essential capabilities negotiation might not be
// done yet. It relies on `fuse-rs` to tell if capability negotiation is done.
// - `Upgrading` state means the nydus daemon is being live-upgraded. There's no need
// to do kernel mount again to set up a session but try to reuse a fuse fd from somewhere else.
// In this state, we try to push `Successful` event to state machine to trigger state transition.
// - `Interrupted` state means nydusd has shutdown fuse server, which means no more message will
// be read from kernel and handled and no pending and in-flight fuse message exists. But the
// nydusd daemon should be alive and wait for coming events.
// - `Die` state means the whole nydusd process is going to die.
state_machine! {
derive(Debug, Clone)
pub DaemonStateMachine(Init)

// FIXME: It's possible that failover does not succeed or resource is not capable to
// be passed. To handle event `Stop` when being `Init`.
Init => {
Start => Running [StartService],
Takeover => Upgrading [Restore],
Exit => Die[StopStateMachine],
Mount => Ready,
Takeover => Ready[Restore],
Stop => Die[StopStateMachine],
},
Ready => {
Start => Running[StartService],
Stop => Die[Umount],
Exit => Die[StopStateMachine],
},
Running => {
Exit => Interrupted [TerminateService],
Stop => Die[Umount],
Stop => Ready [TerminateService],
},
Upgrading(Successful) => Running [StartService],
// Quit from daemon but not disconnect from fuse front-end.
Interrupted(Stop) => Die[StopStateMachine],
}

/// Implementation of the state machine defined by `DaemonStateMachine`.
Expand Down Expand Up @@ -322,20 +339,30 @@ impl DaemonStateMachineContext {
}),
TerminateService => {
d.interrupt();
d.set_state(DaemonState::INTERRUPTED);
Ok(())
let res = d.wait_service();
if res.is_ok() {
d.set_state(DaemonState::READY);
}

res
}
Umount => d.disconnect().map(|r| {
// Always interrupt fuse service loop after shutdown connection to kernel.
// In case that kernel does not really shutdown the session due to some reasons
// causing service loop keep waiting of `/dev/fuse`.
d.interrupt();
d.wait_service()
.unwrap_or_else(|e| error!("failed to wait service {}", e));
// at least all fuse thread stopped, no matter what error each thread got
d.set_state(DaemonState::STOPPED);
r
}),
Restore => {
d.set_state(DaemonState::UPGRADING);
d.restore()
let res = d.restore();
if res.is_ok() {
d.set_state(DaemonState::READY);
}
res
}
StopStateMachine => {
d.set_state(DaemonState::STOPPED);
Expand All @@ -348,9 +375,7 @@ impl DaemonStateMachineContext {
// Safe to unwrap because channel is never closed
self.result_sender.send(r).unwrap();
// Quit state machine thread if interrupted or stopped
if d.get_state() == DaemonState::INTERRUPTED
|| d.get_state() == DaemonState::STOPPED
{
if d.get_state() == DaemonState::STOPPED {
break;
}
}
Expand Down Expand Up @@ -380,10 +405,19 @@ mod tests {
let stat = DaemonState::from(1);
assert_eq!(stat, DaemonState::INIT);

let stat = DaemonState::from(6);
let stat = DaemonState::from(2);
assert_eq!(stat, DaemonState::RUNNING);

let stat = DaemonState::from(3);
assert_eq!(stat, DaemonState::READY);

let stat = DaemonState::from(4);
assert_eq!(stat, DaemonState::STOPPED);

let stat = DaemonState::from(5);
assert_eq!(stat, DaemonState::UNKNOWN);

let stat = DaemonState::from(7);
let stat = DaemonState::from(8);
assert_eq!(stat, DaemonState::UNKNOWN);
}

Expand Down
48 changes: 41 additions & 7 deletions src/bin/nydusd/fusedev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ pub struct FusedevDaemon {
state: AtomicI32,
supervisor: Option<String>,
threads_cnt: u32,
threads: Mutex<Vec<JoinHandle<Result<()>>>>,
state_machine_thread: Mutex<Option<JoinHandle<Result<()>>>>,
fuse_service_threads: Mutex<Vec<JoinHandle<Result<()>>>>,
}

impl FusedevDaemon {
Expand All @@ -256,14 +257,19 @@ impl FusedevDaemon {
let thread = thread::Builder::new()
.name("fuse_server".to_string())
.spawn(move || {
let _ = s.svc_loop(&inflight_op);
if let Err(err) = s.svc_loop(&inflight_op) {
warn!("fuse server exits with err: {:?}, exiting daemon", err);
if let Err(err) = waker.wake() {
error!("fail to exit daemon, error: {:?}", err);
}
}
// Notify the daemon controller that one working thread has exited.
let _ = waker.wake();

Ok(())
})
.map_err(DaemonError::ThreadSpawn)?;

self.threads.lock().unwrap().push(thread);
self.fuse_service_threads.lock().unwrap().push(thread);

Ok(())
}
Expand Down Expand Up @@ -338,8 +344,32 @@ impl NydusDaemon for FusedevDaemon {
}

fn wait(&self) -> DaemonResult<()> {
self.wait_state_machine()?;
self.wait_service()
}

fn wait_state_machine(&self) -> DaemonResult<()> {
let mut guard = self.state_machine_thread.lock().unwrap();
if guard.is_some() {
guard
.take()
.unwrap()
.join()
.map_err(|e| {
DaemonError::WaitDaemon(
*e.downcast::<Error>()
.unwrap_or_else(|e| Box::new(eother!(e))),
)
})?
.map_err(DaemonError::WaitDaemon)?;
}

Ok(())
}

fn wait_service(&self) -> DaemonResult<()> {
loop {
let handle = self.threads.lock().unwrap().pop();
let handle = self.fuse_service_threads.lock().unwrap().pop();
if let Some(handle) = handle {
handle
.join()
Expand Down Expand Up @@ -508,11 +538,12 @@ pub fn create_fuse_daemon(
result_receiver: Mutex::new(result_receiver),
request_sender: Arc::new(Mutex::new(trigger)),
service: Arc::new(service),
threads: Mutex::new(Vec::new()),
state_machine_thread: Mutex::new(None),
fuse_service_threads: Mutex::new(Vec::new()),
});
let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender);
let machine_thread = machine.kick_state_machine()?;
daemon.threads.lock().unwrap().push(machine_thread);
*daemon.state_machine_thread.lock().unwrap() = Some(machine_thread);

// Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper
// finding a victim is not necessary.
Expand All @@ -523,6 +554,9 @@ pub fn create_fuse_daemon(
daemon.service.mount(cmd)?;
}
daemon.service.session.lock().unwrap().mount()?;
daemon
.on_event(DaemonStateMachineInput::Mount)
.map_err(|e| eother!(e))?;
daemon
.on_event(DaemonStateMachineInput::Start)
.map_err(|e| eother!(e))?;
Expand Down

0 comments on commit 6be13a7

Please sign in to comment.