Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): introduce status_kind in the Context #133

Merged
merged 7 commits into from
Jul 17, 2024
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate
### Added
- core: `Context::status_kind` API, now actors can read `ActorStatusKind` from the context ([#133]).
- core: `is_*` methods on `ActorStatusKind` for each variant ([#133]).
- Specify MSRV as 1.76.
- logger: log truncation up to the `max_line_size` configuration parameter ([#128]).
- core: directly accept never returning functions in `ActorGroup::exec()` ([#127]).
Expand Down Expand Up @@ -36,6 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#52]: https://github.com/elfo-rs/elfo/issues/52
[#127]: https://github.com/elfo-rs/elfo/pull/127
[#128]: https://github.com/elfo-rs/elfo/pull/128
[#133]: https://github.com/elfo-rs/elfo/pull/133

## [0.2.0-alpha.15] - 2024-05-13
### Added
Expand Down
128 changes: 19 additions & 109 deletions elfo-core/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{fmt, mem, sync::Arc};
use std::{
mem,
sync::{atomic, Arc},
};

use futures_intrusive::sync::ManualResetEvent;
use metrics::{decrement_gauge, increment_counter, increment_gauge};
Expand All @@ -7,6 +10,7 @@ use serde::{Deserialize, Serialize};
use tracing::{error, info, warn};

use crate::{
actor_status::{ActorStatus, ActorStatusKind, AtomicActorStatusKind},
envelope::Envelope,
errors::{SendError, TrySendError},
group::TerminationPolicy,
Expand All @@ -29,95 +33,6 @@ pub struct ActorMeta {
pub key: String,
}

// === ActorStatus ===

/// Represents the current status of an actor.
/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActorStatus {
kind: ActorStatusKind,
details: Option<String>,
}

impl ActorStatus {
pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming);
pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed);
pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing);
pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal);
pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated);
pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating);

const fn new(kind: ActorStatusKind) -> Self {
Self {
kind,
details: None,
}
}

/// Creates a new status with the same kind and provided details.
pub fn with_details(&self, details: impl fmt::Display) -> Self {
ActorStatus {
kind: self.kind,
details: Some(details.to_string()),
}
}

/// Returns the corresponding [`ActorStatusKind`] for this status.
pub fn kind(&self) -> ActorStatusKind {
self.kind
}

/// Returns details for this status, if provided.
pub fn details(&self) -> Option<&str> {
self.details.as_deref()
}

pub(crate) fn is_failed(&self) -> bool {
self.kind == ActorStatusKind::Failed
}

fn is_finished(&self) -> bool {
use ActorStatusKind::*;
matches!(self.kind, Failed | Terminated)
}
}

impl fmt::Display for ActorStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.details {
Some(details) => write!(f, "{:?}: {}", self.kind, details),
None => write!(f, "{:?}", self.kind),
}
}
}

// === ActorStatusKind ===

/// A list specifying statuses of actors. It's used with the [`ActorStatus`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ActorStatusKind {
Normal,
Initializing,
Terminating,
Terminated,
Alarming,
Failed,
}

impl ActorStatusKind {
fn as_str(&self) -> &'static str {
match self {
ActorStatusKind::Normal => "Normal",
ActorStatusKind::Initializing => "Initializing",
ActorStatusKind::Terminating => "Terminating",
ActorStatusKind::Terminated => "Terminated",
ActorStatusKind::Alarming => "Alarming",
ActorStatusKind::Failed => "Failed",
}
}
}

// === ActorStartInfo ===

/// A struct holding information related to an actor start.
Expand Down Expand Up @@ -182,6 +97,7 @@ pub(crate) struct Actor {
termination_policy: TerminationPolicy,
mailbox: Mailbox,
request_table: RequestTable,
status_kind: AtomicActorStatusKind,
control: RwLock<Control>,
finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`?
status_subscription: Arc<SubscriptionManager>,
Expand All @@ -206,6 +122,7 @@ impl Actor {
status_subscription: Arc<SubscriptionManager>,
) -> Self {
Actor {
status_kind: AtomicActorStatusKind::from(ActorStatusKind::Initializing),
meta,
termination_policy,
mailbox: Mailbox::new(mailbox_config),
Expand Down Expand Up @@ -306,8 +223,15 @@ impl Actor {
self.control.write().restart_policy = policy;
}

pub(crate) fn status_kind(&self) -> ActorStatusKind {
nerodono marked this conversation as resolved.
Show resolved Hide resolved
self.status_kind.load(atomic::Ordering::Acquire)
}

// Note that this method should be called inside a right scope.
pub(crate) fn set_status(&self, status: ActorStatus) {
self.status_kind
.store(status.kind(), atomic::Ordering::Release);

let mut control = self.control.write();
let prev_status = mem::replace(&mut control.status, status.clone());

Expand All @@ -318,7 +242,7 @@ impl Actor {
self.send_status_to_subscribers(&control);
drop(control);

if status.is_finished() {
if status.kind().is_finished() {
self.close();
// Drop all messages to release requests immediately.
self.mailbox.drop_all();
Expand All @@ -328,10 +252,10 @@ impl Actor {
log_status(&status);

if status.kind != prev_status.kind {
if !prev_status.is_finished() {
if !prev_status.kind().is_finished() {
decrement_gauge!("elfo_active_actors", 1., "status" => prev_status.kind.as_str());
}
if !status.is_finished() {
if !status.kind().is_finished() {
increment_gauge!("elfo_active_actors", 1., "status" => status.kind.as_str());
}

Expand All @@ -348,20 +272,6 @@ impl Actor {
self.mailbox.close(scope::trace_id())
}

pub(crate) fn is_initializing(&self) -> bool {
matches!(
self.control.read().status.kind,
ActorStatusKind::Initializing
)
}

pub(crate) fn is_terminating(&self) -> bool {
matches!(
self.control.read().status.kind,
ActorStatusKind::Terminating
)
}

pub(crate) async fn finished(&self) {
self.finished.wait().await
}
Expand Down Expand Up @@ -415,8 +325,8 @@ mod tests {
let fut = actor.finished();
actor.set_status(ActorStatus::TERMINATED);
fut.await;
assert!(actor.control.read().status.is_finished());
assert!(actor.status_kind().is_finished());
actor.finished().await;
assert!(actor.control.read().status.is_finished());
assert!(actor.status_kind().is_finished());
}
}
149 changes: 149 additions & 0 deletions elfo-core/src/actor_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::sync::atomic::{self, AtomicU8};
use std::{fmt, mem};

use serde::{Deserialize, Serialize};

// === ActorStatus ===

/// Represents the current status of an actor.
/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActorStatus {
pub(crate) kind: ActorStatusKind,
pub(crate) details: Option<String>,
}

impl ActorStatus {
pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming);
pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed);
pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing);
pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal);
pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated);
pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating);

const fn new(kind: ActorStatusKind) -> Self {
Self {
kind,
details: None,
}
}

/// Creates a new status with the same kind and provided details.
pub fn with_details(&self, details: impl fmt::Display) -> Self {
ActorStatus {
kind: self.kind,
details: Some(details.to_string()),
}
}

/// Returns the corresponding [`ActorStatusKind`] for this status.
pub fn kind(&self) -> ActorStatusKind {
self.kind
}

/// Returns details for this status, if provided.
pub fn details(&self) -> Option<&str> {
self.details.as_deref()
}
}

impl fmt::Display for ActorStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.details {
Some(details) => write!(f, "{:?}: {}", self.kind, details),
None => write!(f, "{:?}", self.kind),
}
}
}

// === ActorStatusKind ===

/// A list specifying statuses of actors. It's used with the [`ActorStatus`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[repr(u8)]
pub enum ActorStatusKind {
Normal,
Initializing,
Terminating,
Terminated,
Alarming,
Failed,
}

impl ActorStatusKind {
#[inline]
pub const fn is_normal(&self) -> bool {
matches!(self, Self::Normal)
}

#[inline]
pub const fn is_initializing(&self) -> bool {
matches!(self, Self::Initializing)
}

#[inline]
pub const fn is_terminating(&self) -> bool {
matches!(self, Self::Terminating)
}

#[inline]
pub const fn is_terminated(&self) -> bool {
matches!(self, Self::Terminated)
}

#[inline]
pub const fn is_alarming(&self) -> bool {
matches!(self, Self::Alarming)
}

#[inline]
pub const fn is_failed(&self) -> bool {
matches!(self, Self::Failed)
}

#[inline]
pub const fn is_finished(&self) -> bool {
self.is_failed() || self.is_terminated()
}
}

impl ActorStatusKind {
pub(crate) fn as_str(&self) -> &'static str {
match self {
ActorStatusKind::Normal => "Normal",
ActorStatusKind::Initializing => "Initializing",
ActorStatusKind::Terminating => "Terminating",
ActorStatusKind::Terminated => "Terminated",
ActorStatusKind::Alarming => "Alarming",
ActorStatusKind::Failed => "Failed",
}
}
}

// === AtomicActorStatusKind ===

#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct AtomicActorStatusKind(AtomicU8);

impl From<ActorStatusKind> for AtomicActorStatusKind {
fn from(value: ActorStatusKind) -> Self {
Self(AtomicU8::new(value as _))
}
}

impl AtomicActorStatusKind {
pub(crate) fn store(&self, kind: ActorStatusKind, ordering: atomic::Ordering) {
self.0.store(kind as u8, ordering);
}

pub(crate) fn load(&self, ordering: atomic::Ordering) -> ActorStatusKind {
let result = self.0.load(ordering);

// SAFETY: `ActorStatusKind` has `#[repr(u8)]` annotation. The only
// place where value may be changed is `Self::store`, which consumes `ActorStatusKind`, thus,
// guarantees that possibly invalid value cannot be stored
unsafe { mem::transmute::<u8, ActorStatusKind>(result) }
}
}
Loading