Skip to content

Commit

Permalink
feat: add pubsub actor (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
tqwewe committed Jun 25, 2024
1 parent 4d66865 commit 2753384
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 0 deletions.
57 changes: 57 additions & 0 deletions kameo/examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use kameo::{
actor::{PubSub, Publish, Subscribe},
message::{Context, Message},
Actor,
};
use tracing_subscriber::EnvFilter;

#[derive(Clone)]
struct PrintActorID;

#[derive(Actor, Default)]
struct ActorA;

impl Message<PrintActorID> for ActorA {
type Reply = ();

async fn handle(
&mut self,
_: PrintActorID,
ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
println!("ActorA: {}", ctx.actor_ref().id());
}
}

#[derive(Actor, Default)]
struct ActorB;

impl Message<PrintActorID> for ActorB {
type Reply = ();

async fn handle(
&mut self,
_: PrintActorID,
ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
println!("ActorB: {}", ctx.actor_ref().id());
}
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("warn".parse::<EnvFilter>().unwrap())
.without_time()
.with_target(false)
.init();

let pubsub = kameo::spawn(PubSub::<PrintActorID>::new());
let actor_a = kameo::spawn(ActorA);
let actor_b = kameo::spawn(ActorB);
pubsub.ask(Subscribe(actor_a)).send().await?;
pubsub.ask(Subscribe(actor_b)).send().await?;
pubsub.ask(Publish(PrintActorID)).send().await?;

Ok(())
}
2 changes: 2 additions & 0 deletions kameo/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod actor_ref;
mod kind;
mod mailbox;
mod pool;
mod pubsub;
mod spawn;

use std::any;
Expand All @@ -36,6 +37,7 @@ use crate::error::{ActorStopReason, BoxError, PanicError};
pub use actor_ref::*;
pub use mailbox::*;
pub use pool::*;
pub use pubsub::*;
pub use spawn::*;

/// Functionality for an actor including lifecycle hooks.
Expand Down
132 changes: 132 additions & 0 deletions kameo/src/actor/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::collections::HashMap;

use futures::future::{join_all, BoxFuture};

use crate::{
error::SendError,
message::{Context, Message},
request::Request,
Actor,
};

use super::{ActorRef, BoundedMailbox};

/// A mpsc-like pubsub actor.
#[allow(missing_debug_implementations)]
pub struct PubSub<M> {
subscribers: HashMap<u64, Box<dyn MessageSubscriber<M> + Send + Sync>>,
}

impl<M> PubSub<M> {
/// Creates a new pubsub instance.
pub fn new() -> Self {
PubSub {
subscribers: HashMap::new(),
}
}

/// Publishes a message to all subscribers.
pub async fn publish(&mut self, msg: M)
where
M: Clone + Send + 'static,
{
let results = join_all(self.subscribers.iter().map(|(id, subscriber)| {
let msg = msg.clone();
async move { (*id, subscriber.tell(msg).await) }
}))
.await;
for (id, result) in results.into_iter() {
match result {
Ok(_) => {}
Err(SendError::ActorNotRunning(_)) | Err(SendError::ActorStopped) => {
self.subscribers.remove(&id);
}
Err(SendError::MailboxFull(_))
| Err(SendError::HandlerError(_))
| Err(SendError::Timeout(_))
| Err(SendError::QueriesNotSupported) => {}
}
}
}

/// Subscribes an actor receive all messages published.
#[inline]
pub fn subscribe<A>(&mut self, actor_ref: ActorRef<A>)
where
A: Actor + Message<M>,
M: Send + 'static,
ActorRef<A>: Request<A, M, A::Mailbox>,
{
self.subscribers.insert(actor_ref.id(), Box::new(actor_ref));
}
}

impl<M> Actor for PubSub<M> {
type Mailbox = BoundedMailbox<Self>;
}

impl<M> Default for PubSub<M> {
fn default() -> Self {
PubSub::new()
}
}

/// Publishes a message to a pubsub actor.
#[derive(Clone, Debug)]
pub struct Publish<M>(pub M);

impl<M> Message<Publish<M>> for PubSub<M>
where
M: Clone + Send + 'static,
{
type Reply = ();

async fn handle(
&mut self,
Publish(msg): Publish<M>,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.publish(msg).await
}
}

/// Subscribes an actor to a pubsub actor.
#[derive(Clone, Debug)]
pub struct Subscribe<A: Actor>(pub ActorRef<A>);

impl<A, M> Message<Subscribe<A>> for PubSub<M>
where
A: Actor + Message<M>,
M: Send + 'static,
ActorRef<A>: Request<A, M, A::Mailbox>,
{
type Reply = ();

async fn handle(
&mut self,
Subscribe(actor_ref): Subscribe<A>,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.subscribe(actor_ref)
}
}

trait MessageSubscriber<M> {
fn tell(&self, msg: M) -> BoxFuture<'_, Result<(), SendError<M, ()>>>;
}

impl<A, M, Mb> MessageSubscriber<M> for ActorRef<A>
where
A: Actor<Mailbox = Mb> + Message<M>,
M: Send + 'static,
Mb: Sync,
ActorRef<A>: Request<A, M, Mb>,
{
fn tell(&self, msg: M) -> BoxFuture<'_, Result<(), SendError<M, ()>>> {
Box::pin(async move {
Request::tell(self, msg)
.await
.map_err(|err| err.map_err(|_| ()))
})
}
}

0 comments on commit 2753384

Please sign in to comment.