diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f4d1fd1..aed09999 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate ### Added +- core: `Context::status_kind` API, now actors can read `ActorStatusKind` from the context ([#133]). +- core: `is_*` methods on `ActorStatusKind` for each variant ([#133]). - Specify MSRV as 1.76. - logger: log truncation up to the `max_line_size` configuration parameter ([#128]). - core: directly accept never returning functions in `ActorGroup::exec()` ([#127]). @@ -36,6 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#52]: https://github.com/elfo-rs/elfo/issues/52 [#127]: https://github.com/elfo-rs/elfo/pull/127 [#128]: https://github.com/elfo-rs/elfo/pull/128 +[#133]: https://github.com/elfo-rs/elfo/pull/133 ## [0.2.0-alpha.15] - 2024-05-13 ### Added diff --git a/elfo-core/src/actor.rs b/elfo-core/src/actor.rs index 06182af8..7764852e 100644 --- a/elfo-core/src/actor.rs +++ b/elfo-core/src/actor.rs @@ -1,4 +1,7 @@ -use std::{fmt, mem, sync::Arc}; +use std::{ + mem, + sync::{atomic, Arc}, +}; use futures_intrusive::sync::ManualResetEvent; use metrics::{decrement_gauge, increment_counter, increment_gauge}; @@ -7,6 +10,7 @@ use serde::{Deserialize, Serialize}; use tracing::{error, info, warn}; use crate::{ + actor_status::{ActorStatus, ActorStatusKind, AtomicActorStatusKind}, envelope::Envelope, errors::{SendError, TrySendError}, group::TerminationPolicy, @@ -29,95 +33,6 @@ pub struct ActorMeta { pub key: String, } -// === ActorStatus === - -/// Represents the current status of an actor. -/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ActorStatus { - kind: ActorStatusKind, - details: Option, -} - -impl ActorStatus { - pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming); - pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed); - pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing); - pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal); - pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated); - pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating); - - const fn new(kind: ActorStatusKind) -> Self { - Self { - kind, - details: None, - } - } - - /// Creates a new status with the same kind and provided details. - pub fn with_details(&self, details: impl fmt::Display) -> Self { - ActorStatus { - kind: self.kind, - details: Some(details.to_string()), - } - } - - /// Returns the corresponding [`ActorStatusKind`] for this status. - pub fn kind(&self) -> ActorStatusKind { - self.kind - } - - /// Returns details for this status, if provided. - pub fn details(&self) -> Option<&str> { - self.details.as_deref() - } - - pub(crate) fn is_failed(&self) -> bool { - self.kind == ActorStatusKind::Failed - } - - fn is_finished(&self) -> bool { - use ActorStatusKind::*; - matches!(self.kind, Failed | Terminated) - } -} - -impl fmt::Display for ActorStatus { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.details { - Some(details) => write!(f, "{:?}: {}", self.kind, details), - None => write!(f, "{:?}", self.kind), - } - } -} - -// === ActorStatusKind === - -/// A list specifying statuses of actors. It's used with the [`ActorStatus`]. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -#[non_exhaustive] -pub enum ActorStatusKind { - Normal, - Initializing, - Terminating, - Terminated, - Alarming, - Failed, -} - -impl ActorStatusKind { - fn as_str(&self) -> &'static str { - match self { - ActorStatusKind::Normal => "Normal", - ActorStatusKind::Initializing => "Initializing", - ActorStatusKind::Terminating => "Terminating", - ActorStatusKind::Terminated => "Terminated", - ActorStatusKind::Alarming => "Alarming", - ActorStatusKind::Failed => "Failed", - } - } -} - // === ActorStartInfo === /// A struct holding information related to an actor start. @@ -182,6 +97,7 @@ pub(crate) struct Actor { termination_policy: TerminationPolicy, mailbox: Mailbox, request_table: RequestTable, + status_kind: AtomicActorStatusKind, control: RwLock, finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`? status_subscription: Arc, @@ -206,6 +122,7 @@ impl Actor { status_subscription: Arc, ) -> Self { Actor { + status_kind: AtomicActorStatusKind::from(ActorStatusKind::Initializing), meta, termination_policy, mailbox: Mailbox::new(mailbox_config), @@ -306,8 +223,15 @@ impl Actor { self.control.write().restart_policy = policy; } + pub(crate) fn status_kind(&self) -> ActorStatusKind { + self.status_kind.load(atomic::Ordering::Acquire) + } + // Note that this method should be called inside a right scope. pub(crate) fn set_status(&self, status: ActorStatus) { + self.status_kind + .store(status.kind(), atomic::Ordering::Release); + let mut control = self.control.write(); let prev_status = mem::replace(&mut control.status, status.clone()); @@ -318,7 +242,7 @@ impl Actor { self.send_status_to_subscribers(&control); drop(control); - if status.is_finished() { + if status.kind().is_finished() { self.close(); // Drop all messages to release requests immediately. self.mailbox.drop_all(); @@ -328,10 +252,10 @@ impl Actor { log_status(&status); if status.kind != prev_status.kind { - if !prev_status.is_finished() { + if !prev_status.kind().is_finished() { decrement_gauge!("elfo_active_actors", 1., "status" => prev_status.kind.as_str()); } - if !status.is_finished() { + if !status.kind().is_finished() { increment_gauge!("elfo_active_actors", 1., "status" => status.kind.as_str()); } @@ -348,20 +272,6 @@ impl Actor { self.mailbox.close(scope::trace_id()) } - pub(crate) fn is_initializing(&self) -> bool { - matches!( - self.control.read().status.kind, - ActorStatusKind::Initializing - ) - } - - pub(crate) fn is_terminating(&self) -> bool { - matches!( - self.control.read().status.kind, - ActorStatusKind::Terminating - ) - } - pub(crate) async fn finished(&self) { self.finished.wait().await } @@ -415,8 +325,8 @@ mod tests { let fut = actor.finished(); actor.set_status(ActorStatus::TERMINATED); fut.await; - assert!(actor.control.read().status.is_finished()); + assert!(actor.status_kind().is_finished()); actor.finished().await; - assert!(actor.control.read().status.is_finished()); + assert!(actor.status_kind().is_finished()); } } diff --git a/elfo-core/src/actor_status.rs b/elfo-core/src/actor_status.rs new file mode 100644 index 00000000..9264d0bc --- /dev/null +++ b/elfo-core/src/actor_status.rs @@ -0,0 +1,149 @@ +use std::sync::atomic::{self, AtomicU8}; +use std::{fmt, mem}; + +use serde::{Deserialize, Serialize}; + +// === ActorStatus === + +/// Represents the current status of an actor. +/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ActorStatus { + pub(crate) kind: ActorStatusKind, + pub(crate) details: Option, +} + +impl ActorStatus { + pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming); + pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed); + pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing); + pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal); + pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated); + pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating); + + const fn new(kind: ActorStatusKind) -> Self { + Self { + kind, + details: None, + } + } + + /// Creates a new status with the same kind and provided details. + pub fn with_details(&self, details: impl fmt::Display) -> Self { + ActorStatus { + kind: self.kind, + details: Some(details.to_string()), + } + } + + /// Returns the corresponding [`ActorStatusKind`] for this status. + pub fn kind(&self) -> ActorStatusKind { + self.kind + } + + /// Returns details for this status, if provided. + pub fn details(&self) -> Option<&str> { + self.details.as_deref() + } +} + +impl fmt::Display for ActorStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.details { + Some(details) => write!(f, "{:?}: {}", self.kind, details), + None => write!(f, "{:?}", self.kind), + } + } +} + +// === ActorStatusKind === + +/// A list specifying statuses of actors. It's used with the [`ActorStatus`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[non_exhaustive] +#[repr(u8)] +pub enum ActorStatusKind { + Normal, + Initializing, + Terminating, + Terminated, + Alarming, + Failed, +} + +impl ActorStatusKind { + #[inline] + pub const fn is_normal(&self) -> bool { + matches!(self, Self::Normal) + } + + #[inline] + pub const fn is_initializing(&self) -> bool { + matches!(self, Self::Initializing) + } + + #[inline] + pub const fn is_terminating(&self) -> bool { + matches!(self, Self::Terminating) + } + + #[inline] + pub const fn is_terminated(&self) -> bool { + matches!(self, Self::Terminated) + } + + #[inline] + pub const fn is_alarming(&self) -> bool { + matches!(self, Self::Alarming) + } + + #[inline] + pub const fn is_failed(&self) -> bool { + matches!(self, Self::Failed) + } + + #[inline] + pub const fn is_finished(&self) -> bool { + self.is_failed() || self.is_terminated() + } +} + +impl ActorStatusKind { + pub(crate) fn as_str(&self) -> &'static str { + match self { + ActorStatusKind::Normal => "Normal", + ActorStatusKind::Initializing => "Initializing", + ActorStatusKind::Terminating => "Terminating", + ActorStatusKind::Terminated => "Terminated", + ActorStatusKind::Alarming => "Alarming", + ActorStatusKind::Failed => "Failed", + } + } +} + +// === AtomicActorStatusKind === + +#[derive(Debug)] +#[repr(transparent)] +pub(crate) struct AtomicActorStatusKind(AtomicU8); + +impl From for AtomicActorStatusKind { + fn from(value: ActorStatusKind) -> Self { + Self(AtomicU8::new(value as _)) + } +} + +impl AtomicActorStatusKind { + pub(crate) fn store(&self, kind: ActorStatusKind, ordering: atomic::Ordering) { + self.0.store(kind as u8, ordering); + } + + pub(crate) fn load(&self, ordering: atomic::Ordering) -> ActorStatusKind { + let result = self.0.load(ordering); + + // SAFETY: `ActorStatusKind` has `#[repr(u8)]` annotation. The only + // place where value may be changed is `Self::store`, which consumes `ActorStatusKind`, thus, + // guarantees that possibly invalid value cannot be stored + unsafe { mem::transmute::(result) } + } +} diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index 4ae91449..ef8d870d 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -8,7 +8,8 @@ use tracing::{info, trace}; use elfo_utils::unlikely; use crate::{ - actor::{Actor, ActorStartInfo, ActorStatus}, + actor::{Actor, ActorStartInfo}, + actor_status::ActorStatus, addr::Addr, address_book::AddressBook, config::AnyConfig, @@ -26,6 +27,7 @@ use crate::{ routers::Singleton, scope, source::{SourceHandle, Sources, UnattachedSource}, + ActorStatusKind, }; use self::stats::Stats; @@ -105,6 +107,31 @@ impl Context { ward!(self.actor.as_ref().and_then(|o| o.as_actor())).set_status(status); } + /// Gets the actor's status kind. + /// + /// # Example + /// ``` + /// # use elfo_core as elfo; + /// # fn exec(ctx: elfo::Context) { + /// // if actor is terminating. + /// assert!(ctx.status_kind().is_terminating()); + /// // if actor is alarming. + /// assert!(ctx.status_kind().is_alarming()); + /// // and so on... + /// # } + /// ``` + /// # Panics + /// + /// Panics when called on pruned context. + pub fn status_kind(&self) -> ActorStatusKind { + self.actor + .as_ref() + .expect("called `status_kind()` on pruned context") + .as_actor() + .expect("invariant") + .status_kind() + } + /// Overrides the group's default mailbox capacity, which set in the config. /// /// Note: after restart the actor will be created from scratch, so this @@ -850,7 +877,7 @@ impl Context { if unlikely(self.stage == Stage::PreRecv) { let actor = ward!(self.actor.as_ref().and_then(|o| o.as_actor())); - if actor.is_initializing() { + if actor.status_kind().is_initializing() { actor.set_status(ActorStatus::NORMAL); } self.stage = Stage::Working; @@ -1006,7 +1033,7 @@ fn e2m(envelope: Envelope) -> M { #[cold] fn on_input_closed(stage: &mut Stage, actor: &Actor) { - if !actor.is_terminating() { + if !actor.status_kind().is_terminating() { actor.set_status(ActorStatus::TERMINATING); } *stage = Stage::Closed; diff --git a/elfo-core/src/init.rs b/elfo-core/src/init.rs index 60dc2ddb..cc3ebd0a 100644 --- a/elfo-core/src/init.rs +++ b/elfo-core/src/init.rs @@ -16,7 +16,8 @@ use crate::{ }; use crate::{ - actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus}, + actor::{Actor, ActorMeta, ActorStartInfo}, + actor_status::ActorStatus, addr::{Addr, GroupNo}, config::SystemConfig, context::Context, diff --git a/elfo-core/src/lib.rs b/elfo-core/src/lib.rs index f1d1d38d..41cc0210 100644 --- a/elfo-core/src/lib.rs +++ b/elfo-core/src/lib.rs @@ -11,7 +11,8 @@ extern crate self as elfo_core; // TODO: revise this list pub use crate::{ - actor::{ActorMeta, ActorStartCause, ActorStartInfo, ActorStatus, ActorStatusKind}, + actor::{ActorMeta, ActorStartCause, ActorStartInfo}, + actor_status::{ActorStatus, ActorStatusKind}, addr::Addr, config::Config, context::{Context, RequestBuilder}, @@ -48,6 +49,7 @@ pub mod topology; pub mod tracing; mod actor; +mod actor_status; mod addr; mod address_book; mod context; diff --git a/elfo-core/src/messages.rs b/elfo-core/src/messages.rs index b40cf836..1a135937 100644 --- a/elfo-core/src/messages.rs +++ b/elfo-core/src/messages.rs @@ -2,11 +2,7 @@ use std::{fmt::Display, sync::Arc}; use derive_more::Constructor; -use crate::{ - actor::{ActorMeta, ActorStatus}, - config::AnyConfig, - message, -}; +use crate::{actor::ActorMeta, actor_status::ActorStatus, config::AnyConfig, message}; /// A helper type for using in generic code (e.g. as an associated type) to /// indicate a message that cannot be constructed. diff --git a/elfo-core/src/restarting/restart_policy.rs b/elfo-core/src/restarting/restart_policy.rs index 531efa8a..ab47a866 100644 --- a/elfo-core/src/restarting/restart_policy.rs +++ b/elfo-core/src/restarting/restart_policy.rs @@ -45,7 +45,7 @@ impl RestartPolicy { pub(crate) fn restarting_allowed(&self, status: &ActorStatus) -> bool { match &self.mode { RestartMode::Always(_) => true, - RestartMode::OnFailure(_) => status.is_failed(), + RestartMode::OnFailure(_) => status.kind().is_failed(), _ => false, } } diff --git a/elfo-core/src/supervisor.rs b/elfo-core/src/supervisor.rs index ecdee72c..63558262 100644 --- a/elfo-core/src/supervisor.rs +++ b/elfo-core/src/supervisor.rs @@ -11,7 +11,8 @@ use elfo_utils::CachePadded; use self::{error_chain::ErrorChain, measure_poll::MeasurePoll}; use crate::{ - actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus}, + actor::{Actor, ActorMeta, ActorStartInfo}, + actor_status::ActorStatus, config::{AnyConfig, Config, SystemConfig}, context::Context, envelope::Envelope,