From 27533843726f787c042425bacc2306a28e3f96b6 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Tue, 25 Jun 2024 16:24:06 +1000 Subject: [PATCH] feat: add pubsub actor (#31) --- kameo/examples/pubsub.rs | 57 ++++++++++++++++ kameo/src/actor.rs | 2 + kameo/src/actor/pubsub.rs | 132 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 191 insertions(+) create mode 100644 kameo/examples/pubsub.rs create mode 100644 kameo/src/actor/pubsub.rs diff --git a/kameo/examples/pubsub.rs b/kameo/examples/pubsub.rs new file mode 100644 index 0000000..b04c249 --- /dev/null +++ b/kameo/examples/pubsub.rs @@ -0,0 +1,57 @@ +use kameo::{ + actor::{PubSub, Publish, Subscribe}, + message::{Context, Message}, + Actor, +}; +use tracing_subscriber::EnvFilter; + +#[derive(Clone)] +struct PrintActorID; + +#[derive(Actor, Default)] +struct ActorA; + +impl Message for ActorA { + type Reply = (); + + async fn handle( + &mut self, + _: PrintActorID, + ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + println!("ActorA: {}", ctx.actor_ref().id()); + } +} + +#[derive(Actor, Default)] +struct ActorB; + +impl Message for ActorB { + type Reply = (); + + async fn handle( + &mut self, + _: PrintActorID, + ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + println!("ActorB: {}", ctx.actor_ref().id()); + } +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter("warn".parse::().unwrap()) + .without_time() + .with_target(false) + .init(); + + let pubsub = kameo::spawn(PubSub::::new()); + let actor_a = kameo::spawn(ActorA); + let actor_b = kameo::spawn(ActorB); + pubsub.ask(Subscribe(actor_a)).send().await?; + pubsub.ask(Subscribe(actor_b)).send().await?; + pubsub.ask(Publish(PrintActorID)).send().await?; + + Ok(()) +} diff --git a/kameo/src/actor.rs b/kameo/src/actor.rs index 8b4ea6e..27320c9 100644 --- a/kameo/src/actor.rs +++ b/kameo/src/actor.rs @@ -25,6 +25,7 @@ mod actor_ref; mod kind; mod mailbox; mod pool; +mod pubsub; mod spawn; use std::any; @@ -36,6 +37,7 @@ use crate::error::{ActorStopReason, BoxError, PanicError}; pub use actor_ref::*; pub use mailbox::*; pub use pool::*; +pub use pubsub::*; pub use spawn::*; /// Functionality for an actor including lifecycle hooks. diff --git a/kameo/src/actor/pubsub.rs b/kameo/src/actor/pubsub.rs new file mode 100644 index 0000000..6b530cf --- /dev/null +++ b/kameo/src/actor/pubsub.rs @@ -0,0 +1,132 @@ +use std::collections::HashMap; + +use futures::future::{join_all, BoxFuture}; + +use crate::{ + error::SendError, + message::{Context, Message}, + request::Request, + Actor, +}; + +use super::{ActorRef, BoundedMailbox}; + +/// A mpsc-like pubsub actor. +#[allow(missing_debug_implementations)] +pub struct PubSub { + subscribers: HashMap + Send + Sync>>, +} + +impl PubSub { + /// Creates a new pubsub instance. + pub fn new() -> Self { + PubSub { + subscribers: HashMap::new(), + } + } + + /// Publishes a message to all subscribers. + pub async fn publish(&mut self, msg: M) + where + M: Clone + Send + 'static, + { + let results = join_all(self.subscribers.iter().map(|(id, subscriber)| { + let msg = msg.clone(); + async move { (*id, subscriber.tell(msg).await) } + })) + .await; + for (id, result) in results.into_iter() { + match result { + Ok(_) => {} + Err(SendError::ActorNotRunning(_)) | Err(SendError::ActorStopped) => { + self.subscribers.remove(&id); + } + Err(SendError::MailboxFull(_)) + | Err(SendError::HandlerError(_)) + | Err(SendError::Timeout(_)) + | Err(SendError::QueriesNotSupported) => {} + } + } + } + + /// Subscribes an actor receive all messages published. + #[inline] + pub fn subscribe(&mut self, actor_ref: ActorRef) + where + A: Actor + Message, + M: Send + 'static, + ActorRef: Request, + { + self.subscribers.insert(actor_ref.id(), Box::new(actor_ref)); + } +} + +impl Actor for PubSub { + type Mailbox = BoundedMailbox; +} + +impl Default for PubSub { + fn default() -> Self { + PubSub::new() + } +} + +/// Publishes a message to a pubsub actor. +#[derive(Clone, Debug)] +pub struct Publish(pub M); + +impl Message> for PubSub +where + M: Clone + Send + 'static, +{ + type Reply = (); + + async fn handle( + &mut self, + Publish(msg): Publish, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.publish(msg).await + } +} + +/// Subscribes an actor to a pubsub actor. +#[derive(Clone, Debug)] +pub struct Subscribe(pub ActorRef); + +impl Message> for PubSub +where + A: Actor + Message, + M: Send + 'static, + ActorRef: Request, +{ + type Reply = (); + + async fn handle( + &mut self, + Subscribe(actor_ref): Subscribe, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.subscribe(actor_ref) + } +} + +trait MessageSubscriber { + fn tell(&self, msg: M) -> BoxFuture<'_, Result<(), SendError>>; +} + +impl MessageSubscriber for ActorRef +where + A: Actor + Message, + M: Send + 'static, + Mb: Sync, + ActorRef: Request, +{ + fn tell(&self, msg: M) -> BoxFuture<'_, Result<(), SendError>> { + Box::pin(async move { + Request::tell(self, msg) + .await + .map_err(|err| err.map_err(|_| ())) + }) + } +}