Skip to content

Commit

Permalink
Actor Registry (#1969)
Browse files Browse the repository at this point in the history
* Added Actor Registry

Closes #1893

* CR Comments
  • Loading branch information
fulmicoton authored Oct 6, 2022
1 parent 94bb6ce commit acb249b
Show file tree
Hide file tree
Showing 23 changed files with 370 additions and 25 deletions.
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions quickwit/quickwit-actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ futures = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

quickwit-common = { workspace = true }
quickwit-proto = { workspace = true }
Expand Down
17 changes: 15 additions & 2 deletions quickwit/quickwit-actors/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tracing::{debug, error};

use crate::actor_state::{ActorState, AtomicState};
use crate::progress::{Progress, ProtectedZoneGuard};
use crate::registry::ActorRegistry;
use crate::scheduler::{Callback, ScheduleEvent, Scheduler};
use crate::spawn_builder::SpawnBuilder;
#[cfg(any(test, feature = "testsuite"))]
Expand Down Expand Up @@ -119,7 +120,7 @@ impl From<SendError> for ActorExitStatus {
#[async_trait]
pub trait Actor: Send + Sync + Sized + 'static {
/// Piece of state that can be copied for assert in unit test, admin, etc.
type ObservableState: Send + Sync + Clone + fmt::Debug;
type ObservableState: Send + Sync + Clone + serde::Serialize + fmt::Debug;
/// A name identifying the type of actor.
///
/// Ideally respect the `CamelCase` convention.
Expand Down Expand Up @@ -231,6 +232,7 @@ pub struct ActorContextInner<A: Actor> {
progress: Progress,
kill_switch: KillSwitch,
scheduler_mailbox: Mailbox<Scheduler>,
registry: ActorRegistry,
actor_state: AtomicState,
// Count the number of times the actor has slept.
// This counter is useful to unsure that obsolete WakeUp
Expand Down Expand Up @@ -270,6 +272,7 @@ impl<A: Actor> ActorContext<A> {
self_mailbox: Mailbox<A>,
kill_switch: KillSwitch,
scheduler_mailbox: Mailbox<Scheduler>,
registry: ActorRegistry,
observable_state_tx: watch::Sender<A::ObservableState>,
) -> Self {
ActorContext {
Expand All @@ -278,6 +281,7 @@ impl<A: Actor> ActorContext<A> {
progress: Progress::default(),
kill_switch,
scheduler_mailbox,
registry,
actor_state: AtomicState::default(),
sleep_count: AtomicUsize::default(),
observable_state_tx: Mutex::new(observable_state_tx),
Expand All @@ -296,6 +300,7 @@ impl<A: Actor> ActorContext<A> {
actor_mailbox,
universe.kill_switch.clone(),
universe.scheduler_mailbox.clone(),
universe.registry.clone(),
observable_state_tx,
)
}
Expand All @@ -308,6 +313,10 @@ impl<A: Actor> ActorContext<A> {
&self.self_mailbox
}

pub(crate) fn registry(&self) -> &ActorRegistry {
&self.registry
}

pub fn actor_instance_id(&self) -> &str {
self.mailbox().actor_instance_id()
}
Expand Down Expand Up @@ -344,7 +353,11 @@ impl<A: Actor> ActorContext<A> {
}

pub fn spawn_actor<SpawnedActor: Actor>(&self) -> SpawnBuilder<SpawnedActor> {
SpawnBuilder::new(self.scheduler_mailbox.clone(), self.kill_switch.child())
SpawnBuilder::new(
self.scheduler_mailbox.clone(),
self.kill_switch.child(),
self.registry.clone(),
)
}

/// Records some progress.
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-actors/src/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::any::Any;
use std::borrow::Borrow;
use std::fmt;

use serde::Serialize;
use tokio::sync::{oneshot, watch};
use tokio::task::JoinHandle;
use tokio::time::timeout;
Expand All @@ -38,7 +39,7 @@ pub struct ActorHandle<A: Actor> {
}

/// Describes the health of a given actor.
#[derive(Clone, Eq, PartialEq, Debug, Hash)]
#[derive(Clone, Eq, PartialEq, Debug, Hash, Serialize)]
pub enum Health {
/// The actor is running and behaving as expected.
Healthy,
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-actors/src/channel_with_priority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ pub struct Sender<T> {
}

impl<T> Sender<T> {
pub fn is_disconnected(&self) -> bool {
self.low_priority_tx.is_disconnected()
}

pub async fn send_low_priority(&self, msg: T) -> Result<(), SendError> {
self.low_priority_tx.send_async(msg).await?;
Ok(())
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-actors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ mod kill_switch;
mod mailbox;
mod observation;
mod progress;
mod registry;
mod scheduler;
mod spawn_builder;
mod supervisor;

#[cfg(test)]
mod tests;
pub(crate) mod tests;
mod universe;

pub use actor::{Actor, ActorExitStatus, Handler};
Expand All @@ -62,6 +63,7 @@ pub use self::actor::ActorContext;
pub use self::actor_state::ActorState;
pub use self::channel_with_priority::{QueueCapacity, RecvError, SendError};
pub use self::mailbox::{create_mailbox, create_test_mailbox, Inbox, Mailbox};
pub use self::registry::ActorObservation;
pub use self::supervisor::Supervisor;

/// Heartbeat used to verify that actors are progressing.
Expand Down
49 changes: 48 additions & 1 deletion quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::Any;
use std::convert::Infallible;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Weak};

use async_trait::async_trait;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -56,6 +56,15 @@ pub struct Mailbox<A: Actor> {
ref_count: Arc<AtomicUsize>,
}

impl<A: Actor> Mailbox<A> {
pub fn downgrade(&self) -> WeakMailbox<A> {
WeakMailbox {
inner: Arc::downgrade(&self.inner),
ref_count: Arc::downgrade(&self.ref_count),
}
}
}

impl<A: Actor> Drop for Mailbox<A> {
fn drop(&mut self) {
let old_val = self.ref_count.fetch_sub(1, Ordering::SeqCst);
Expand Down Expand Up @@ -135,6 +144,10 @@ impl<A: Actor> Mailbox<A> {
&self.inner.instance_id
}

pub fn is_disconnected(&self) -> bool {
self.inner.tx.is_disconnected()
}

/// Sends a message to the actor owning the associated inbox.
///
/// From an actor context, use the `ActorContext::send_message` method instead.
Expand Down Expand Up @@ -274,3 +287,37 @@ pub fn create_mailbox<A: Actor>(
pub fn create_test_mailbox<A: Actor>() -> (Mailbox<A>, Inbox<A>) {
create_mailbox("test-mailbox".to_string(), QueueCapacity::Unbounded)
}

pub struct WeakMailbox<A: Actor> {
inner: Weak<Inner<A>>,
ref_count: Weak<AtomicUsize>,
}

impl<A: Actor> WeakMailbox<A> {
pub fn upgrade(&self) -> Option<Mailbox<A>> {
let inner = self.inner.upgrade()?;
let ref_count = self.ref_count.upgrade()?;
Some(Mailbox { inner, ref_count })
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::tests::PingReceiverActor;

#[test]
fn test_weak_mailbox_downgrade_upgrade() {
let (mailbox, _inbox) = create_test_mailbox::<PingReceiverActor>();
let weak_mailbox = mailbox.downgrade();
assert!(weak_mailbox.upgrade().is_some());
}

#[test]
fn test_weak_mailbox_failing_upgrade() {
let (mailbox, _inbox) = create_test_mailbox::<PingReceiverActor>();
let weak_mailbox = mailbox.downgrade();
drop(mailbox);
assert!(weak_mailbox.upgrade().is_none());
}
}
Loading

0 comments on commit acb249b

Please sign in to comment.