Skip to content

Commit

Permalink
Merge pull request #133 from nerodono/feat/status_kind-in-context
Browse files Browse the repository at this point in the history
feat(core/context): add `Context::status_kind()`
  • Loading branch information
loyd authored Jul 17, 2024
2 parents 9bd5bb8 + de165a3 commit e498d53
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 121 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate
### Added
- core: `Context::status_kind` API, now actors can read `ActorStatusKind` from the context ([#133]).
- core: `is_*` methods on `ActorStatusKind` for each variant ([#133]).
- Specify MSRV as 1.76.
- logger: log truncation up to the `max_line_size` configuration parameter ([#128]).
- core: directly accept never returning functions in `ActorGroup::exec()` ([#127]).
Expand Down Expand Up @@ -36,6 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#52]: https://github.com/elfo-rs/elfo/issues/52
[#127]: https://github.com/elfo-rs/elfo/pull/127
[#128]: https://github.com/elfo-rs/elfo/pull/128
[#133]: https://github.com/elfo-rs/elfo/pull/133

## [0.2.0-alpha.15] - 2024-05-13
### Added
Expand Down
128 changes: 19 additions & 109 deletions elfo-core/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{fmt, mem, sync::Arc};
use std::{
mem,
sync::{atomic, Arc},
};

use futures_intrusive::sync::ManualResetEvent;
use metrics::{decrement_gauge, increment_counter, increment_gauge};
Expand All @@ -7,6 +10,7 @@ use serde::{Deserialize, Serialize};
use tracing::{error, info, warn};

use crate::{
actor_status::{ActorStatus, ActorStatusKind, AtomicActorStatusKind},
envelope::Envelope,
errors::{SendError, TrySendError},
group::TerminationPolicy,
Expand All @@ -29,95 +33,6 @@ pub struct ActorMeta {
pub key: String,
}

// === ActorStatus ===

/// Represents the current status of an actor.
/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActorStatus {
kind: ActorStatusKind,
details: Option<String>,
}

impl ActorStatus {
pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming);
pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed);
pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing);
pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal);
pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated);
pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating);

const fn new(kind: ActorStatusKind) -> Self {
Self {
kind,
details: None,
}
}

/// Creates a new status with the same kind and provided details.
pub fn with_details(&self, details: impl fmt::Display) -> Self {
ActorStatus {
kind: self.kind,
details: Some(details.to_string()),
}
}

/// Returns the corresponding [`ActorStatusKind`] for this status.
pub fn kind(&self) -> ActorStatusKind {
self.kind
}

/// Returns details for this status, if provided.
pub fn details(&self) -> Option<&str> {
self.details.as_deref()
}

pub(crate) fn is_failed(&self) -> bool {
self.kind == ActorStatusKind::Failed
}

fn is_finished(&self) -> bool {
use ActorStatusKind::*;
matches!(self.kind, Failed | Terminated)
}
}

impl fmt::Display for ActorStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.details {
Some(details) => write!(f, "{:?}: {}", self.kind, details),
None => write!(f, "{:?}", self.kind),
}
}
}

// === ActorStatusKind ===

/// A list specifying statuses of actors. It's used with the [`ActorStatus`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ActorStatusKind {
Normal,
Initializing,
Terminating,
Terminated,
Alarming,
Failed,
}

impl ActorStatusKind {
fn as_str(&self) -> &'static str {
match self {
ActorStatusKind::Normal => "Normal",
ActorStatusKind::Initializing => "Initializing",
ActorStatusKind::Terminating => "Terminating",
ActorStatusKind::Terminated => "Terminated",
ActorStatusKind::Alarming => "Alarming",
ActorStatusKind::Failed => "Failed",
}
}
}

// === ActorStartInfo ===

/// A struct holding information related to an actor start.
Expand Down Expand Up @@ -182,6 +97,7 @@ pub(crate) struct Actor {
termination_policy: TerminationPolicy,
mailbox: Mailbox,
request_table: RequestTable,
status_kind: AtomicActorStatusKind,
control: RwLock<Control>,
finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`?
status_subscription: Arc<SubscriptionManager>,
Expand All @@ -206,6 +122,7 @@ impl Actor {
status_subscription: Arc<SubscriptionManager>,
) -> Self {
Actor {
status_kind: AtomicActorStatusKind::from(ActorStatusKind::Initializing),
meta,
termination_policy,
mailbox: Mailbox::new(mailbox_config),
Expand Down Expand Up @@ -306,8 +223,15 @@ impl Actor {
self.control.write().restart_policy = policy;
}

pub(crate) fn status_kind(&self) -> ActorStatusKind {
self.status_kind.load(atomic::Ordering::Acquire)
}

// Note that this method should be called inside a right scope.
pub(crate) fn set_status(&self, status: ActorStatus) {
self.status_kind
.store(status.kind(), atomic::Ordering::Release);

let mut control = self.control.write();
let prev_status = mem::replace(&mut control.status, status.clone());

Expand All @@ -318,7 +242,7 @@ impl Actor {
self.send_status_to_subscribers(&control);
drop(control);

if status.is_finished() {
if status.kind().is_finished() {
self.close();
// Drop all messages to release requests immediately.
self.mailbox.drop_all();
Expand All @@ -328,10 +252,10 @@ impl Actor {
log_status(&status);

if status.kind != prev_status.kind {
if !prev_status.is_finished() {
if !prev_status.kind().is_finished() {
decrement_gauge!("elfo_active_actors", 1., "status" => prev_status.kind.as_str());
}
if !status.is_finished() {
if !status.kind().is_finished() {
increment_gauge!("elfo_active_actors", 1., "status" => status.kind.as_str());
}

Expand All @@ -348,20 +272,6 @@ impl Actor {
self.mailbox.close(scope::trace_id())
}

pub(crate) fn is_initializing(&self) -> bool {
matches!(
self.control.read().status.kind,
ActorStatusKind::Initializing
)
}

pub(crate) fn is_terminating(&self) -> bool {
matches!(
self.control.read().status.kind,
ActorStatusKind::Terminating
)
}

pub(crate) async fn finished(&self) {
self.finished.wait().await
}
Expand Down Expand Up @@ -415,8 +325,8 @@ mod tests {
let fut = actor.finished();
actor.set_status(ActorStatus::TERMINATED);
fut.await;
assert!(actor.control.read().status.is_finished());
assert!(actor.status_kind().is_finished());
actor.finished().await;
assert!(actor.control.read().status.is_finished());
assert!(actor.status_kind().is_finished());
}
}
149 changes: 149 additions & 0 deletions elfo-core/src/actor_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::sync::atomic::{self, AtomicU8};
use std::{fmt, mem};

use serde::{Deserialize, Serialize};

// === ActorStatus ===

/// Represents the current status of an actor.
/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActorStatus {
pub(crate) kind: ActorStatusKind,
pub(crate) details: Option<String>,
}

impl ActorStatus {
pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming);
pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed);
pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing);
pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal);
pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated);
pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating);

const fn new(kind: ActorStatusKind) -> Self {
Self {
kind,
details: None,
}
}

/// Creates a new status with the same kind and provided details.
pub fn with_details(&self, details: impl fmt::Display) -> Self {
ActorStatus {
kind: self.kind,
details: Some(details.to_string()),
}
}

/// Returns the corresponding [`ActorStatusKind`] for this status.
pub fn kind(&self) -> ActorStatusKind {
self.kind
}

/// Returns details for this status, if provided.
pub fn details(&self) -> Option<&str> {
self.details.as_deref()
}
}

impl fmt::Display for ActorStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.details {
Some(details) => write!(f, "{:?}: {}", self.kind, details),
None => write!(f, "{:?}", self.kind),
}
}
}

// === ActorStatusKind ===

/// A list specifying statuses of actors. It's used with the [`ActorStatus`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[repr(u8)]
pub enum ActorStatusKind {
Normal,
Initializing,
Terminating,
Terminated,
Alarming,
Failed,
}

impl ActorStatusKind {
#[inline]
pub const fn is_normal(&self) -> bool {
matches!(self, Self::Normal)
}

#[inline]
pub const fn is_initializing(&self) -> bool {
matches!(self, Self::Initializing)
}

#[inline]
pub const fn is_terminating(&self) -> bool {
matches!(self, Self::Terminating)
}

#[inline]
pub const fn is_terminated(&self) -> bool {
matches!(self, Self::Terminated)
}

#[inline]
pub const fn is_alarming(&self) -> bool {
matches!(self, Self::Alarming)
}

#[inline]
pub const fn is_failed(&self) -> bool {
matches!(self, Self::Failed)
}

#[inline]
pub const fn is_finished(&self) -> bool {
self.is_failed() || self.is_terminated()
}
}

impl ActorStatusKind {
pub(crate) fn as_str(&self) -> &'static str {
match self {
ActorStatusKind::Normal => "Normal",
ActorStatusKind::Initializing => "Initializing",
ActorStatusKind::Terminating => "Terminating",
ActorStatusKind::Terminated => "Terminated",
ActorStatusKind::Alarming => "Alarming",
ActorStatusKind::Failed => "Failed",
}
}
}

// === AtomicActorStatusKind ===

#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct AtomicActorStatusKind(AtomicU8);

impl From<ActorStatusKind> for AtomicActorStatusKind {
fn from(value: ActorStatusKind) -> Self {
Self(AtomicU8::new(value as _))
}
}

impl AtomicActorStatusKind {
pub(crate) fn store(&self, kind: ActorStatusKind, ordering: atomic::Ordering) {
self.0.store(kind as u8, ordering);
}

pub(crate) fn load(&self, ordering: atomic::Ordering) -> ActorStatusKind {
let result = self.0.load(ordering);

// SAFETY: `ActorStatusKind` has `#[repr(u8)]` annotation. The only
// place where value may be changed is `Self::store`, which consumes `ActorStatusKind`, thus,
// guarantees that possibly invalid value cannot be stored
unsafe { mem::transmute::<u8, ActorStatusKind>(result) }
}
}
Loading

0 comments on commit e498d53

Please sign in to comment.