From cf5fdc4878f04f2984d427615eb0df171f448a75 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Sun, 6 Oct 2024 21:15:48 +0800 Subject: [PATCH] docs: improve code docs and examples with all tests passing (#54) --- Cargo.toml | 1 + README.md | 14 +- src/actor.rs | 153 +++++++++------ src/actor/actor_ref.rs | 416 +++++++++++++++++++++++++++++++---------- src/actor/pool.rs | 74 +++++--- src/actor/pubsub.rs | 119 +++++++++++- src/actor/spawn.rs | 68 +++++-- src/message.rs | 13 +- src/remote.rs | 20 +- src/remote/swarm.rs | 22 ++- 10 files changed, 679 insertions(+), 221 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c51b8a2..7a3192e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ tokio = { version = "1", features = [ "time", ] } tokio-stream = { version = "0.1.15", features = ["time"] } +tokio-test = "0.4.4" tracing-subscriber = { version = "0.3", features = ["env-filter"] } [[bench]] diff --git a/README.md b/README.md index d8bbba7..5be69ee 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ cargo add kameo ## Example: Defining an Actor -```rust +```rust,ignore use kameo::Actor; use kameo::message::{Context, Message}; use kameo::request::MessageSend; @@ -76,7 +76,7 @@ impl Message for Counter { Spawn and message the actor. -```rust +```rust,ignore // Spawn the actor and get a reference to it let actor_ref = kameo::spawn(Counter { count: 0 }); @@ -95,7 +95,7 @@ Distributed actors in Kameo are designed to interact with each other as if they 1. **Actor Registration:** Actors can be registered under a unique name using `ActorRef::register`. Once registered, other actors or systems can look up this actor from a remote node. Under the hood, Kameo uses a **Kademlia Distributed Hash Table (DHT)**, provided by libp2p, to handle actor registration and lookup across nodes in a distributed manner. - ```rust + ```rust,ignore // On the host node, register the actor let actor_ref = kameo::spawn(MyActor::default()); actor_ref.register("my_actor").await?; @@ -103,14 +103,14 @@ Distributed actors in Kameo are designed to interact with each other as if they 2. **Actor Lookup:** On remote nodes, actors can look up registered actors using `RemoteActorRef::lookup`. This returns a reference to the remote actor that can be messaged. - ```rust + ```rust,ignore // On a guest node, lookup the remote actor let remote_actor_ref = RemoteActorRef::::lookup("my_actor").await?; ``` 3. **Message Passing:** Once the remote actor reference is obtained, you can send messages to it just like with local actors. The message is serialized, sent over the network, deserialized on the remote node, and handled by the actor. - ```rust + ```rust,ignore if let Some(remote_actor_ref) = remote_actor_ref { let count = remote_actor_ref.ask(&Inc { amount: 10 }).send().await?; println!("Incremented! Count is {count}"); @@ -119,7 +119,7 @@ Distributed actors in Kameo are designed to interact with each other as if they 4. **Message Registration:** In order to send messages between nodes, the message type must implement `Serialize` and `Deserialize`. Additionally, it needs to be annotated with the `#[remote_message("uuid")]` macro, where the `uuid` is a unique identifier for the message type. This UUID helps identify which message implementation to use when sending and receiving messages over the network. It's important to ensure that the UUID does not conflict with other registered messages in your crate. - ```rust + ```rust,ignore #[remote_message("3b9128f1-0593-44a0-b83a-f4188baa05bf")] impl Message for MyActor { type Reply = i64; @@ -138,7 +138,7 @@ Distributed actors in Kameo are designed to interact with each other as if they Kameo actors use these multiaddresses when communicating across nodes. The `ActorSwarm` component handles networking, allowing actors to register, look up, and send messages to remote actors, abstracting away the underlying complexity. - ```rust + ```rust,ignore // Bootstrap the actor swarm and listen on a UDP port 8020 ActorSwarm::bootstrap()? .listen_on("/ip4/0.0.0.0/udp/8020/quic-v1".parse()?) diff --git a/src/actor.rs b/src/actor.rs index 2250702..ccb5065 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -1,25 +1,25 @@ -//! Actor abstractions and utilities for building concurrent, asynchronous systems. +//! Core functionality for defining and managing actors in Kameo. //! -//! This module provides the core abstractions for spawning and managing actors in an -//! asynchronous, concurrent application. Actors in kameo are independent units of -//! computation that communicate through message passing, encapsulating state and behavior. +//! Actors are independent units of computation that run asynchronously, sending and receiving messages. +//! Each actor operates within its own task, and its lifecycle is managed by hooks such as `on_start`, `on_panic`, and `on_stop`. //! -//! Central to this module is the [Actor] trait, which defines the lifecycle hooks and -//! functionalities every actor must implement. These hooks include initialization ([`on_start`](Actor::on_start)), -//! cleanup ([`on_stop`](Actor::on_stop)), error handling ([`on_panic`](Actor::on_panic)), and managing relationships with other -//! actors ([`on_link_died`](Actor::on_link_died)). Additionally, the [`name`](Actor::name) method provides a means to identify -//! actors, facilitating debugging and tracing. +//! The actor trait is designed to support fault tolerance, recovery from panics, and clean termination. +//! Lifecycle hooks allow customization of actor behavior when starting, encountering errors, or shutting down. //! -//! To interact with and manage actors, this module introduces two key structures: -//! - [ActorRef]: A strong reference to an actor, containing all necessary information for -//! sending messages, stopping the actor, and managing actor links. It serves as the primary -//! interface for external interactions with an actor. -//! - [WeakActorRef]: Similar to `ActorRef`, but does not prevent the actor from being stopped. +//! This module contains the primary `Actor` trait, which all actors must implement, +//! as well as types for managing message queues (mailboxes) and actor references ([`ActorRef`]). //! -//! The design of this module emphasizes loose coupling and high cohesion among actors, promoting -//! a scalable and maintainable architecture. By leveraging asynchronous message passing and -//! lifecycle management, developers can create complex, responsive systems with high degrees -//! of concurrency and parallelism. +//! # Features +//! - **Asynchronous Message Handling**: Each actor processes messages asynchronously within its own task. +//! - **Lifecycle Hooks**: Customizable hooks ([`on_start`], [`on_stop`], [`on_panic`]) for managing the actor's lifecycle. +//! - **Backpressure**: Mailboxes can be bounded or unbounded, controlling the flow of messages. +//! - **Supervision**: Actors can be linked, enabling robust supervision and error recovery systems. +//! +//! This module allows building resilient, fault-tolerant, distributed systems with flexible control over the actor lifecycle. +//! +//! [`on_start`]: Actor::on_start +//! [`on_stop`]: Actor::on_stop +//! [`on_panic`]: Actor::on_panic mod actor_ref; mod id; @@ -43,65 +43,106 @@ pub use pool::*; pub use pubsub::*; pub use spawn::*; -/// Functionality for an actor including lifecycle hooks. +/// Core behavior of an actor, including its lifecycle events and how it processes messages. +/// +/// Every actor must implement this trait, which provides hooks +/// for the actor's initialization ([`on_start`]), handling errors ([`on_panic`]), and cleanup ([`on_stop`]). /// -/// Methods in this trait that return [`BoxError`] will stop the actor with the reason -/// [`ActorStopReason::Panicked`] containing the error. +/// The actor runs within its own task and processes messages asynchronously from a mailbox. +/// Each actor can be linked to others, allowing for robust supervision and failure recovery mechanisms. /// -/// # Example +/// Methods in this trait that return [`BoxError`] will cause the actor to stop with the reason +/// [`ActorStopReason::Panicked`] if an error occurs. This enables graceful handling of actor panics +/// or errors. +/// +/// # Example with Derive /// /// ``` /// use kameo::Actor; -/// use kameo::error::{ActorStopReason, BoxError, PanicError}; +/// +/// #[derive(Actor)] +/// struct MyActor; +/// ``` +/// +/// # Example Override Behaviour +/// +/// ``` +/// use kameo::actor::{Actor, ActorRef, WeakActorRef}; +/// use kameo::error::{ActorStopReason, BoxError}; +/// use kameo::mailbox::unbounded::UnboundedMailbox; /// /// struct MyActor; /// /// impl Actor for MyActor { -/// async fn on_start(&mut self) -> Result<(), BoxError> { +/// type Mailbox = UnboundedMailbox; +/// +/// async fn on_start(&mut self, actor_ref: ActorRef) -> Result<(), BoxError> { /// println!("actor started"); /// Ok(()) /// } /// -/// async fn on_panic(&mut self, err: PanicError) -> Result, BoxError> { -/// println!("actor panicked"); -/// Ok(Some(ActorStopReason::Panicked(err))) // Return `Some` to stop the actor -/// } -/// -/// async fn on_stop(&mut self, reason: ActorStopReason) -> Result<(), BoxError> { +/// async fn on_stop( +/// self, +/// actor_ref: WeakActorRef, +/// reason: ActorStopReason, +/// ) -> Result<(), BoxError> { /// println!("actor stopped"); /// Ok(()) /// } /// } /// ``` +/// +/// # Lifecycle Hooks +/// - `on_start`: Called when the actor starts. This is where initialization happens. +/// - `on_panic`: Called when the actor encounters a panic or an error while processing a "tell" message. +/// - `on_stop`: Called before the actor is stopped. This allows for cleanup tasks. +/// - `on_link_died`: Hook that is invoked when a linked actor dies. +/// +/// # Mailboxes +/// Actors use a mailbox to queue incoming messages. You can choose between: +/// - **Bounded Mailbox**: Limits the number of messages that can be queued, providing backpressure. +/// - **Unbounded Mailbox**: Allows an infinite number of messages, but can lead to high memory usage. +/// +/// Mailboxes enable efficient asynchronous message passing with support for both backpressure and +/// unbounded queueing depending on system requirements. +/// +/// [`on_start`]: Actor::on_start +/// [`on_stop`]: Actor::on_stop +/// [`on_panic`]: Actor::on_panic pub trait Actor: Sized + Send + 'static { - /// The mailbox used for the actor. + /// The mailbox type used for the actor. /// - /// This can either be `BoundedMailbox` or `UnboundedMailbox.` + /// This can either be a `BoundedMailbox` or an `UnboundedMailbox`, depending on + /// whether you want to enforce backpressure or allow infinite message queueing. /// - /// Bounded mailboxes enable backpressure to prevent the queued messages from growing indefinitely, - /// whilst unbounded mailboxes have no backpressure and can grow infinitely until the system runs out of memory. + /// - **Bounded Mailbox**: Prevents unlimited message growth, enforcing backpressure. + /// - **Unbounded Mailbox**: Allows an infinite number of messages, but can consume large amounts of memory. type Mailbox: Mailbox; - /// Actor name, useful for logging. + /// The name of the actor, which can be useful for logging or debugging. + /// + /// # Default Implementation + /// By default, this returns the type name of the actor. fn name() -> &'static str { any::type_name::() } - /// Creates a new mailbox. + /// Creates a new mailbox for the actor. This sets up the message queue and receiver for the actor. + /// + /// # Returns + /// A tuple containing: + /// - The created mailbox for sending messages. + /// - The receiver for processing messages. fn new_mailbox() -> (Self::Mailbox, >::Receiver) { Self::Mailbox::default_mailbox() } - /// Hook that is called before the actor starts processing messages. + /// Called when the actor starts, before it processes any messages. /// - /// # Guarantees - /// Messages sent internally by the actor during `on_start` are prioritized and processed before any externally - /// sent messages, even if external messages are received before `on_start` completes. - /// This is ensured by an internal buffering mechanism that holds external messages until after `on_start` has - /// finished executing. + /// Messages sent internally by the actor during `on_start` are prioritized and processed + /// before any externally sent messages, even if external messages are received first. /// - /// # Returns - /// A result indicating successful initialization or an error if initialization fails. + /// This ensures that the actor can properly initialize before handling external messages. #[allow(unused_variables)] fn on_start( &mut self, @@ -110,16 +151,17 @@ pub trait Actor: Sized + Send + 'static { async { Ok(()) } } - /// Hook that is called when an actor panicked or returns an error during "tell" message. + /// Called when the actor encounters a panic or an error during "tell" message handling. /// - /// This method provides an opportunity to clean up or reset state. - /// It can also determine whether the actor should be killed or if it should continue processing messages by returning `None`. + /// This method gives the actor an opportunity to clean up or reset its state and determine + /// whether it should be stopped or continue processing messages. /// /// # Parameters - /// - `err`: The error that occurred. + /// - `err`: The panic or error that occurred. /// /// # Returns - /// Whether the actor should continue processing, or be stopped by returning a stop reason. + /// - `Some(ActorStopReason)`: Stops the actor. + /// - `None`: Allows the actor to continue processing messages. #[allow(unused_variables)] fn on_panic( &mut self, @@ -129,12 +171,13 @@ pub trait Actor: Sized + Send + 'static { async move { Ok(Some(ActorStopReason::Panicked(err))) } } - /// Hook that is called when a linked actor dies. + /// Called when a linked actor dies. /// - /// By default, the current actor will be stopped if the reason is anything other than normal. + /// By default, the actor will stop if the reason for the linked actor's death is anything other + /// than `Normal`. You can customize this behavior in the implementation. /// /// # Returns - /// Whether the actor should continue processing, or be stopped by returning a stop reason. + /// Whether the actor should stop or continue processing messages. #[allow(unused_variables)] fn on_link_died( &mut self, @@ -155,11 +198,9 @@ pub trait Actor: Sized + Send + 'static { } } - /// Hook that is called before the actor is stopped. + /// Called before the actor stops. /// - /// This method allows for cleanup and finalization tasks to be performed before the - /// actor is fully stopped. It can be used to release resources, notify other actors, - /// or complete any final tasks. + /// This allows the actor to perform any necessary cleanup or release resources before being fully stopped. /// /// # Parameters /// - `reason`: The reason why the actor is being stopped. diff --git a/src/actor/actor_ref.rs b/src/actor/actor_ref.rs index f15a5eb..c261371 100644 --- a/src/actor/actor_ref.rs +++ b/src/actor/actor_ref.rs @@ -26,7 +26,11 @@ thread_local! { pub(crate) static CURRENT_THREAD_ACTOR_ID: Cell> = Cell::new(None); } -/// A reference to an actor for sending messages and managing the actor. +/// A reference to an actor, used for sending messages and managing its lifecycle. +/// +/// An `ActorRef` allows interaction with an actor through message passing, both for asking (waiting for a reply) +/// and telling (without waiting for a reply). It also provides utilities for managing the actor's state, +/// such as checking if the actor is alive, registering the actor under a name, and stopping the actor gracefully. pub struct ActorRef { id: ActorID, mailbox: A::Mailbox, @@ -48,7 +52,7 @@ where } } - /// Returns the actor identifier. + /// Returns the unique identifier of the actor. #[inline] pub fn id(&self) -> ActorID { self.id @@ -61,6 +65,8 @@ where } /// Registers the actor under a given name within the actor swarm. + /// + /// This makes the actor discoverable by other nodes in the distributed system. pub async fn register(&self, name: &str) -> Result<(), RegistrationError> where A: RemoteActor + 'static, @@ -71,7 +77,9 @@ where .await } - /// Looks up an actor registered locally. + /// Looks up an actor registered locally by its name. + /// + /// Returns `Some` if the actor exists, or `None` if no actor with the given name is registered. pub async fn lookup(name: &str) -> Result, RegistrationError> where A: RemoteActor + 'static, @@ -109,7 +117,9 @@ where self.mailbox.weak_count() } - /// Returns true if called from within the actor. + /// Returns `true` if the current task is the actor itself. + /// + /// This is useful when checking if certain code is being executed from within the actor's own context. #[inline] pub fn is_current(&self) -> bool { CURRENT_ACTOR_ID @@ -120,10 +130,8 @@ where /// Signals the actor to stop after processing all messages currently in its mailbox. /// - /// This method sends a special stop message to the end of the actor's mailbox, ensuring - /// that the actor will process all preceding messages before stopping. Any messages sent - /// after this stop signal will be ignored and dropped. This approach allows for a graceful - /// shutdown of the actor, ensuring all pending work is completed before termination. + /// This method ensures that the actor finishes processing any messages that were already in the queue + /// before it shuts down. Any new messages sent after the stop signal will be ignored. #[inline] pub async fn stop_gracefully(&self) -> Result<(), SendError> { self.mailbox.signal_stop().await @@ -151,26 +159,38 @@ where /// Note: This method does not initiate the stop process; it only waits for the actor to /// stop. You should signal the actor to stop using [`stop_gracefully`](ActorRef::stop_gracefully) or [`kill`](ActorRef::kill) /// before calling this method. - /// - /// # Examples - /// - /// ```ignore - /// // Assuming `actor.stop_gracefully().await` has been called earlier - /// actor.wait_for_stop().await; - /// ``` #[inline] pub async fn wait_for_stop(&self) { self.mailbox.closed().await } - /// Sends a message to the actor, waiting for a reply. + /// Sends a message to the actor and waits for a reply. + /// + /// The `ask` pattern is used when you expect a response from the actor. This method returns + /// an `AskRequest`, which can be awaited asynchronously, or sent in a blocking manner using one of the [`request`](crate::request) traits. /// /// # Example /// /// ``` - /// actor_ref.ask(msg).send().await?; // Receive reply asyncronously - /// actor_ref.ask(msg).blocking_send()?; // Receive reply blocking - /// actor_ref.ask(msg).mailbox_timeout(Duration::from_secs(1)).send().await?; // Timeout after 1 second + /// use kameo::actor::ActorRef; + /// use kameo::request::MessageSend; + /// + /// # #[derive(kameo::Actor)] + /// # struct MyActor; + /// # + /// # struct Msg; + /// # + /// # impl kameo::message::Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: kameo::message::Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let actor_ref = kameo::spawn(MyActor); + /// # let msg = Msg; + /// let reply = actor_ref.ask(msg).send().await?; + /// # Ok::<(), Box>(()) + /// # }); /// ``` #[inline] pub fn ask( @@ -190,13 +210,33 @@ where AskRequest::new(self, msg) } - /// Sends a message to the actor asyncronously without waiting for a reply. + /// Sends a message to the actor without waiting for a reply. + /// + /// The `tell` pattern is used for one-way communication, where no response is expected from the actor. This method + /// returns a `TellRequest`, which can be awaited asynchronously, or configured using one of the [`request`](crate::request) traits. /// /// # Example /// /// ``` - /// actor_ref.tell(msg).send().await?; // Send message - /// actor_ref.tell(msg).timeout(Duration::from_secs(1)).send().await?; // Timeout after 1 second + /// use kameo::actor::ActorRef; + /// use kameo::request::MessageSend; + /// + /// # #[derive(kameo::Actor)] + /// # struct MyActor; + /// # + /// # struct Msg; + /// # + /// # impl kameo::message::Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: kameo::message::Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let actor_ref = kameo::spawn(MyActor); + /// # let msg = Msg; + /// actor_ref.tell(msg).send().await?; + /// # Ok::<(), Box>(()) + /// # }); /// ``` #[inline] pub fn tell( @@ -210,70 +250,30 @@ where TellRequest::new(self, msg) } - /// Attaches a stream of messages to the actor. + /// Links this actor with a child actor, establishing a parent-child relationship. /// - /// This spawns a tokio task which forwards the stream to the actor. - /// The returned `JoinHandle` can be aborted to stop the messages from being forwarded to the actor. + /// If the parent dies, the child actor will be notified with a "link died" signal. /// - /// The `start_value` and `finish_value` can be provided to pass additional context when attaching the stream. - /// If there's no data to be sent, these can be set to `()`. - pub fn attach_stream( - &self, - mut stream: S, - start_value: T, - finish_value: F, - ) -> JoinHandle, ::Error>>> - where - A: Message>, - S: Stream + Send + Unpin + 'static, - M: Send + 'static, - T: Send + 'static, - F: Send + 'static, - for<'a> TellRequest< - LocalTellRequest<'a, A, A::Mailbox>, - A::Mailbox, - StreamMessage, - WithoutRequestTimeout, - >: MessageSend< - Ok = (), - Error = SendError, ::Error>, - >, - { - let actor_ref = self.clone(); - tokio::spawn(async move { - actor_ref - .tell(StreamMessage::Started(start_value)) - .send() - .await?; - - loop { - tokio::select! { - msg = stream.next() => { - match msg { - Some(msg) => { - actor_ref.tell(StreamMessage::Next(msg)).send().await?; - } - None => break, - } - } - _ = actor_ref.wait_for_stop() => { - return Ok(stream); - } - } - } - - actor_ref - .tell(StreamMessage::Finished(finish_value)) - .send() - .await?; - - Ok(stream) - }) - } - - /// Links this actor with a child, making this one the parent. + /// # Example /// - /// If the parent dies, then the child will be notified with a link died signal. + /// ``` + /// # #[derive(kameo::Actor)] + /// # struct MyActor; + /// # + /// # struct Msg; + /// # + /// # impl kameo::message::Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: kameo::message::Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let actor_ref = kameo::spawn(MyActor); + /// let child_ref = kameo::spawn(MyActor); + /// actor_ref.link_child(&child_ref).await; + /// # Ok::<(), Box>(()) + /// # }); + /// ``` #[inline] pub async fn link_child(&self, child: &ActorRef) where @@ -289,6 +289,28 @@ where } /// Unlinks a previously linked child actor. + /// + /// # Example + /// + /// ``` + /// # #[derive(kameo::Actor)] + /// # struct MyActor; + /// # + /// # struct Msg; + /// # + /// # impl kameo::message::Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: kameo::message::Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let actor_ref = kameo::spawn(MyActor); + /// let child_ref = kameo::spawn(MyActor); + /// actor_ref.link_child(&child_ref).await; + /// actor_ref.unlink_child(&child_ref).await; + /// # Ok::<(), Box>(()) + /// # }); + /// ``` #[inline] pub async fn unlink_child(&self, child: &ActorRef) where @@ -301,23 +323,66 @@ where self.links.lock().await.remove(&child.id()); } - /// Links this actor with a sibbling, notifying eachother if either one dies. + /// Links two actors as siblings, ensuring they notify each other if either one dies. + /// + /// # Example + /// + /// ``` + /// # #[derive(kameo::Actor)] + /// # struct MyActor; + /// # + /// # struct Msg; + /// # + /// # impl kameo::message::Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: kameo::message::Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let actor_ref = kameo::spawn(MyActor); + /// let sibbling_ref = kameo::spawn(MyActor); + /// actor_ref.link_together(&sibbling_ref).await; + /// # Ok::<(), Box>(()) + /// # }); + /// ``` #[inline] - pub async fn link_together(&self, sibbling: &ActorRef) + pub async fn link_together(&self, sibbling_ref: &ActorRef) where B: Actor, { - if self.id == sibbling.id() { + if self.id == sibbling_ref.id() { return; } let (mut this_links, mut sibbling_links) = - tokio::join!(self.links.lock(), sibbling.links.lock()); - this_links.insert(sibbling.id(), sibbling.weak_signal_mailbox()); + tokio::join!(self.links.lock(), sibbling_ref.links.lock()); + this_links.insert(sibbling_ref.id(), sibbling_ref.weak_signal_mailbox()); sibbling_links.insert(self.id, self.weak_signal_mailbox()); } - /// Unlinks previously linked processes from eachother. + /// Unlinks two previously linked sibling actors. + /// + /// # Example + /// + /// ``` + /// # #[derive(kameo::Actor)] + /// # struct MyActor; + /// # + /// # struct Msg; + /// # + /// # impl kameo::message::Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: kameo::message::Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let actor_ref = kameo::spawn(MyActor); + /// let sibbling_ref = kameo::spawn(MyActor); + /// actor_ref.link_together(&sibbling_ref).await; + /// actor_ref.unlink_together(&sibbling_ref).await; + /// # Ok::<(), Box>(()) + /// # }); + /// ``` #[inline] pub async fn unlink_together(&self, sibbling: &ActorRef) where @@ -333,6 +398,101 @@ where sibbling_links.remove(&self.id); } + /// Attaches a stream of messages to the actor, forwarding each item in the stream. + /// + /// The stream will continue until it is completed or the actor is stopped. A `JoinHandle` is returned, + /// which can be used to cancel the stream. The `start_value` and `finish_value` can provide additional + /// context for the stream but are optional. + /// + /// # Example + /// + /// ``` + /// use kameo::Actor; + /// use kameo::message::{Context, Message, StreamMessage}; + /// + /// #[derive(kameo::Actor)] + /// struct MyActor; + /// + /// impl Message> for MyActor { + /// type Reply = (); + /// + /// async fn handle(&mut self, msg: StreamMessage, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { + /// match msg { + /// StreamMessage::Next(num) => { + /// println!("Received item: {num}"); + /// } + /// StreamMessage::Started(()) => { + /// println!("Stream attached!"); + /// } + /// StreamMessage::Finished(()) => { + /// println!("Stream finished!"); + /// } + /// } + /// } + /// } + /// # + /// # tokio_test::block_on(async { + /// let stream = futures::stream::iter(vec![17, 19, 24]); + /// + /// let actor_ref = kameo::spawn(MyActor); + /// actor_ref.attach_stream(stream, (), ()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// ``` + pub fn attach_stream( + &self, + mut stream: S, + start_value: T, + finish_value: F, + ) -> JoinHandle, ::Error>>> + where + A: Message>, + S: Stream + Send + Unpin + 'static, + M: Send + 'static, + T: Send + 'static, + F: Send + 'static, + for<'a> TellRequest< + LocalTellRequest<'a, A, A::Mailbox>, + A::Mailbox, + StreamMessage, + WithoutRequestTimeout, + >: MessageSend< + Ok = (), + Error = SendError, ::Error>, + >, + { + let actor_ref = self.clone(); + tokio::spawn(async move { + actor_ref + .tell(StreamMessage::Started(start_value)) + .send() + .await?; + + loop { + tokio::select! { + msg = stream.next() => { + match msg { + Some(msg) => { + actor_ref.tell(StreamMessage::Next(msg)).send().await?; + } + None => break, + } + } + _ = actor_ref.wait_for_stop() => { + return Ok(stream); + } + } + } + + actor_ref + .tell(StreamMessage::Finished(finish_value)) + .send() + .await?; + + Ok(stream) + }) + } + #[inline] pub(crate) fn mailbox(&self) -> &A::Mailbox { &self.mailbox @@ -378,6 +538,9 @@ impl AsRef for ActorRef { } /// A reference to an actor running remotely. +/// +/// `RemoteActorRef` allows sending messages to actors on different nodes in a distributed system. +/// It supports the same messaging patterns as `ActorRef` for local actors, including `ask` and `tell` messaging. pub struct RemoteActorRef { id: ActorID, swarm_tx: SwarmSender, @@ -393,12 +556,14 @@ impl RemoteActorRef { } } - /// Returns the actor identifier. + /// Returns the unique identifier of the remote actor. pub fn id(&self) -> ActorID { self.id } - /// Looks up an actor registered locally. + /// Looks up an actor registered by name across the distributed network. + /// + /// Returns `Some` if the actor is found, or `None` if no actor with the given name is registered. pub async fn lookup(name: &str) -> Result, RegistrationError> where A: RemoteActor + 'static, @@ -409,7 +574,37 @@ impl RemoteActorRef { .await } - /// Sends a message to the remote actor, waiting for a reply. + /// Sends a message to the remote actor and waits for a reply. + /// + /// The `ask` pattern is used when a response is expected from the remote actor. This method + /// returns an `AskRequest`, which can be awaited asynchronously, or sent in a blocking manner using one of the [`request`](crate::request) traits. + /// + /// # Example + /// + /// ```no_run + /// use kameo::actor::RemoteActorRef; + /// use kameo::request::MessageSend; + /// + /// # #[derive(kameo::Actor, kameo::RemoteActor)] + /// # #[actor(mailbox = bounded)] + /// # struct MyActor; + /// # + /// # #[derive(serde::Serialize, serde::Deserialize)] + /// # struct Msg; + /// # + /// # #[kameo::remote_message("id")] + /// # impl kameo::message::Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: kameo::message::Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let remote_actor_ref = RemoteActorRef::::lookup("my_actor").await?.unwrap(); + /// # let msg = Msg; + /// let reply = remote_actor_ref.ask(&msg).send().await?; + /// # Ok::<(), Box>(()) + /// # }); + /// ``` #[inline] pub fn ask<'a, M>( &'a self, @@ -429,8 +624,37 @@ impl RemoteActorRef { AskRequest::new_remote(self, msg) } - /// Sends a message to the remote actor asyncronously without waiting for a reply from the actor. - /// This still waits for an acknowledgement from the remote node. + /// Sends a message to the remote actor without waiting for a reply. + /// + /// The `tell` pattern is used when no response is expected from the remote actor. This method + /// returns a `TellRequest`, which can be awaited asynchronously, or configured using one of the [`request`](crate::request) traits. + /// + /// # Example + /// + /// ```no_run + /// use kameo::actor::RemoteActorRef; + /// use kameo::request::MessageSend; + /// + /// # #[derive(kameo::Actor, kameo::RemoteActor)] + /// # #[actor(mailbox = bounded)] + /// # struct MyActor; + /// # + /// # #[derive(serde::Serialize, serde::Deserialize)] + /// # struct Msg; + /// # + /// # #[kameo::remote_message("id")] + /// # impl kameo::message::Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: kameo::message::Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// # tokio_test::block_on(async { + /// let remote_actor_ref = RemoteActorRef::::lookup("my_actor").await?.unwrap(); + /// # let msg = Msg; + /// remote_actor_ref.tell(&msg).send().await?; + /// # Ok::<(), Box>(()) + /// # }); + /// ``` #[inline] pub fn tell<'a, M>( &'a self, @@ -538,7 +762,9 @@ impl fmt::Debug for WeakActorRef { } } -/// A hashmap of linked actors to be notified when the actor dies. +/// A collection of links to other actors that are notified when the actor dies. +/// +/// Links are used for parent-child or sibling relationships, allowing actors to observe each other's lifecycle. #[derive(Clone, Default)] #[allow(missing_debug_implementations)] pub struct Links(Arc>>>); diff --git a/src/actor/pool.rs b/src/actor/pool.rs index 6257051..e6738b7 100644 --- a/src/actor/pool.rs +++ b/src/actor/pool.rs @@ -1,3 +1,46 @@ +//! Provides a pool of actors for task distribution and load balancing. +//! +//! The `pool` module offers the ability to manage a group of actors that work together to process tasks. +//! It enables the creation of an `ActorPool`, which distributes incoming messages to a fixed set of worker actors +//! in a round-robin fashion. This ensures that tasks are evenly distributed across workers, improving +//! resource utilization and overall performance. +//! +//! `ActorPool` must be spawned as an actor, and tasks can be sent to it using the `WorkerMsg` message +//! for individual workers or the `BroadcastMsg` to send a message to all workers in the pool. +//! +//! # Features +//! - **Load Balancing**: Messages are distributed among a fixed set of actors in a round-robin manner. +//! - **Resilience**: Workers that stop or fail are automatically replaced to ensure continued operation. +//! - **Flexible Actor Management**: The pool can manage any type of actor that implements the [Actor] trait, +//! allowing it to be used for various tasks. +//! +//! # Example +//! +//! ``` +//! use kameo::Actor; +//! use kameo::actor::{ActorPool, WorkerMsg, BroadcastMsg}; +//! # use kameo::message::{Context, Message}; +//! use kameo::request::MessageSend; +//! +//! #[derive(Actor)] +//! struct MyWorker; +//! # +//! # impl Message<&'static str> for MyWorker { +//! # type Reply = (); +//! # async fn handle(&mut self, msg: &'static str, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { } +//! # } +//! +//! # tokio_test::block_on(async { +//! // Spawn the actor pool with 4 workers +//! let pool_actor = kameo::spawn(ActorPool::new(4, || kameo::spawn(MyWorker))); +//! +//! // Send tasks to the pool +//! pool_actor.tell(WorkerMsg("Hello worker!")).send().await?; +//! pool_actor.tell(BroadcastMsg("Hello all workers!")).send().await?; +//! # Ok::<(), Box>(()) +//! # }); +//! ``` + use std::fmt; use futures::{ @@ -33,35 +76,8 @@ enum Factory { /// and resource utilization. Additionally, it handles the dynamic replacement of workers /// that stop due to errors or other reasons, maintaining the pool's resilience and reliability. /// -/// The pool is generic over the type of actors it contains, allowing it to manage any actor -/// that implements the [Actor] trait. This design provides flexibility in using the pool -/// with different types of actors for various tasks. -/// -/// `ActorPool` can handle any message that the worker actors can handle. -/// -/// # Examples -/// -/// ```no_run -/// use kameo::Actor; -/// use kameo::actor::{ActorPool, ActorRef}; -/// use kameo::message::Message; -/// use kameo::request::MessageSend; -/// -/// #[derive(Actor)] -/// struct MyActor; -/// -/// struct MyMessage; -/// impl Message for MyActor { -/// // ... -/// } -/// -/// // Create a pool with 5 workers. -/// let pool = ActorPool::new(5, || { -/// kameo::spawn(MyActor) -/// }); -/// -/// pool.ask(WorkerMsg(MyMessage)).send().await?; -/// ``` +/// The pool can be used either as a standalone object or spawned as an actor. When spawned, tasks can be +/// sent using the `WorkerMsg` and `BroadcastMsg` messages for individual or broadcast communication with workers. pub struct ActorPool { workers: Vec>, size: usize, diff --git a/src/actor/pubsub.rs b/src/actor/pubsub.rs index 93116dd..500bc3e 100644 --- a/src/actor/pubsub.rs +++ b/src/actor/pubsub.rs @@ -1,3 +1,50 @@ +//! Provides a publish-subscribe (pubsub) mechanism for actors. +//! +//! The `pubsub` module allows actors to broadcast messages to multiple subscribers. It offers +//! a lightweight pubsub actor that can manage multiple subscriptions and publish messages to +//! all subscribed actors simultaneously. This is useful in scenarios where multiple actors need +//! to react to the same event or data. +//! +//! `PubSub` can be used either as a standalone object or as a spawned actor. When spawned as an actor, +//! the `Publish(msg)` and `Subscribe(actor_ref)` messages are used to interact with it. +//! +//! # Features +//! - **Publish-Subscribe Pattern**: Actors can subscribe to the `PubSub` actor to receive broadcast messages. +//! - **Message Broadcasting**: Messages published to the `PubSub` actor are sent to all subscribed actors. +//! - **Subscriber Management**: Actors can subscribe and unsubscribe dynamically, allowing flexible message routing. +//! +//! # Example +//! +//! ``` +//! use kameo::Actor; +//! use kameo::actor::{PubSub, Publish, Subscribe}; +//! # use kameo::message::{Context, Message}; +//! use kameo::request::MessageSend; +//! +//! #[derive(Actor)] +//! struct MyActor; +//! # +//! # impl Message<&'static str> for MyActor { +//! # type Reply = (); +//! # async fn handle(&mut self, msg: &'static str, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { } +//! # } +//! +//! # tokio_test::block_on(async { +//! let mut pubsub = PubSub::new(); +//! let actor_ref = kameo::spawn(MyActor); +//! +//! // Use PubSub as a standalone object +//! pubsub.subscribe(actor_ref.clone()); +//! pubsub.publish("Hello, World!").await; +//! +//! // Or spawn PubSub as an actor and use messages +//! let pubsub_actor_ref = kameo::spawn(PubSub::new()); +//! pubsub_actor_ref.tell(Subscribe(actor_ref)).send().await?; +//! pubsub_actor_ref.tell(Publish("Hello, spawned world!")).send().await?; +//! # Ok::<(), Box>(()) +//! # }); +//! ``` + use std::collections::HashMap; use futures::future::{join_all, BoxFuture}; @@ -12,7 +59,12 @@ use crate::{ use super::{ActorID, ActorRef}; -/// A mpsc-like pubsub actor. +/// A publish-subscribe (pubsub) actor that allows message broadcasting to multiple subscribers. +/// +/// `PubSub` can be used as a standalone object or spawned as an actor. When spawned, messages can +/// be sent using the `Publish(msg)` and `Subscribe(actor_ref)` messages to publish data and manage subscribers. +/// This provides flexibility in how you interact with the pubsub system, depending on whether you want +/// to manage it directly or interact with it via messages. #[allow(missing_debug_implementations)] pub struct PubSub { subscribers: HashMap + Send + Sync>>, @@ -20,13 +72,35 @@ pub struct PubSub { impl PubSub { /// Creates a new pubsub instance. + /// + /// This initializes the pubsub actor with an empty list of subscribers. pub fn new() -> Self { PubSub { subscribers: HashMap::new(), } } - /// Publishes a message to all subscribers. + /// Publishes a message to all subscribed actors. + /// + /// The message is cloned and sent to each subscriber. Any actor subscribed to the `PubSub` actor + /// will receive a copy of the message. + /// + /// # Example + /// + /// ``` + /// use kameo::actor::PubSub; + /// + /// #[derive(Clone)] + /// struct Msg(String); + /// + /// # tokio_test::block_on(async { + /// let mut pubsub = PubSub::new(); + /// pubsub.publish(Msg("Hello!".to_string())).await; + /// # }) + /// ``` + /// + /// # Requirements + /// The message type `M` must implement `Clone` and `Send`, since it needs to be duplicated for each subscriber. pub async fn publish(&mut self, msg: M) where M: Clone + Send + 'static, @@ -49,7 +123,36 @@ impl PubSub { } } - /// Subscribes an actor receive all messages published. + /// Subscribes an actor to receive all messages published by the pubsub actor. + /// + /// Once subscribed, the actor will receive all messages sent to the pubsub actor via the `publish` method. + /// The actor reference is stored in the list of subscribers, and messages are sent to the actor asynchronously. + /// + /// # Example + /// + /// ``` + /// # use kameo::Actor; + /// use kameo::actor::PubSub; + /// # use kameo::message::{Context, Message}; + /// + /// # #[derive(Actor)] + /// # struct MyActor; + /// # + /// # impl Message for MyActor { + /// # type Reply = (); + /// # async fn handle(&mut self, msg: Msg, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { } + /// # } + /// # + /// #[derive(Clone)] + /// struct Msg(String); + /// + /// # tokio_test::block_on(async { + /// let mut pubsub = PubSub::new(); + /// + /// let actor_ref = kameo::spawn(MyActor); + /// pubsub.subscribe(actor_ref); + /// # }) + /// ``` #[inline] pub fn subscribe(&mut self, actor_ref: ActorRef) where @@ -72,7 +175,10 @@ impl Default for PubSub { } } -/// Publishes a message to a pubsub actor. +/// A message used to publish data to a `PubSub` actor. +/// +/// This struct wraps a message of type `M` and is used when sending a message to a pubsub actor. +/// When this message is received, it is broadcast to all subscribers. #[derive(Clone, Debug)] pub struct Publish(pub M); @@ -91,7 +197,10 @@ where } } -/// Subscribes an actor to a pubsub actor. +/// A message used to subscribe an actor to a `PubSub` actor. +/// +/// This struct wraps an `ActorRef` and is used to subscribe an actor to a pubsub actor. Once subscribed, +/// the actor will receive all published messages from the pubsub actor. #[derive(Clone, Debug)] pub struct Subscribe(pub ActorRef); diff --git a/src/actor/spawn.rs b/src/actor/spawn.rs index fdd06c3..f80a70f 100644 --- a/src/actor/spawn.rs +++ b/src/actor/spawn.rs @@ -18,18 +18,26 @@ use crate::{ use super::ActorID; -/// Spawns an actor in a tokio task. +/// Spawns an actor in a Tokio task, running asynchronously. +/// +/// This function spawns the actor in a non-blocking Tokio task, making it suitable for actors that need to +/// perform asynchronous operations. The actor runs in the background and can be interacted with through +/// the returned [`ActorRef`]. /// /// # Example /// -/// ```no_run +/// ``` /// use kameo::Actor; /// /// #[derive(Actor)] /// struct MyActor; /// -/// kameo::spawn(MyActor); +/// # tokio_test::block_on(async { +/// let actor_ref = kameo::spawn(MyActor); +/// # }) /// ``` +/// +/// The actor will continue running in the background, and messages can be sent to it via `actor_ref`. pub fn spawn(actor: A) -> ActorRef where A: Actor, @@ -39,18 +47,33 @@ where .unwrap() } -/// Spawns an actor in a tokio task. +/// Spawns an actor in a Tokio task, using a factory function that provides access to the [`ActorRef`]. +/// +/// This function is useful when the actor requires access to its own reference during initialization. The +/// factory function is provided with the actor reference and is responsible for returning the initialized actor. /// /// # Example /// -/// ```no_run +/// ``` /// use kameo::Actor; +/// use kameo::actor::ActorRef; /// /// #[derive(Actor)] -/// struct MyActor; +/// struct MyActor { actor_ref: ActorRef } /// -/// kameo::spawn(MyActor); +/// # tokio_test::block_on(async { +/// let actor_ref = kameo::actor::spawn_with(|actor_ref| { +/// let actor = MyActor { +/// actor_ref: actor_ref.clone(), +/// }; +/// async { actor } +/// }) +/// .await; +/// # }) /// ``` +/// +/// This allows the actor to have access to its own `ActorRef` during creation, which can be useful for actors +/// that need to communicate with themselves or manage internal state more effectively. pub async fn spawn_with(f: F) -> ActorRef where A: Actor, @@ -60,34 +83,43 @@ where spawn_inner::, _, _>(f).await } -/// Spawns an actor in its own dedicated thread where blocking is acceptable. +/// Spawns an actor in its own dedicated thread, allowing for blocking operations. /// -/// This is useful for actors which require or may benefit from using blocking operations rather than async, -/// whilst still enabling asyncronous functionality. +/// This function spawns the actor in a separate thread, making it suitable for actors that perform blocking +/// operations, such as file I/O or other tasks that cannot be efficiently executed in an asynchronous context. +/// Despite running in a blocking thread, the actor can still communicate asynchronously with other actors. /// /// # Example /// /// ```no_run +/// use std::io::{self, Write}; +/// use std::fs::File; +/// /// use kameo::Actor; -/// use kameo::request::MessageSend; +/// use kameo::message::{Context, Message}; +/// use kameo::request::MessageSendSync; /// /// #[derive(Actor)] /// struct MyActor { -/// file: std::fs::File, +/// file: File, /// } /// /// struct Flush; -/// impl Message for Actor { -/// type Reply = std::io::Result<()>; +/// impl Message for MyActor { +/// type Reply = io::Result<()>; /// -/// fn handle(&mut self, _: Flush, _ctx: Context) -> Self::Reply { -/// self.file.flush() // This operation is blocking, but acceptable since we're spawning in a thread +/// async fn handle(&mut self, _: Flush, _ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { +/// self.file.flush() // This blocking operation is handled in its own thread /// } /// } /// -/// let actor_ref = kameo::actor::spawn_in_thread(MyActor { ... }); -/// actor_ref.tell(Flush).send()?; +/// let actor_ref = kameo::actor::spawn_in_thread(MyActor { file: File::create("output.txt").unwrap() }); +/// actor_ref.tell(Flush).send_sync()?; +/// # Ok::<(), kameo::error::SendError>(()) /// ``` +/// +/// This function is useful for actors that require or benefit from running blocking operations while still +/// enabling asynchronous functionality. pub fn spawn_in_thread(actor: A) -> ActorRef where A: Actor, diff --git a/src/message.rs b/src/message.rs index f83bf4b..329a574 100644 --- a/src/message.rs +++ b/src/message.rs @@ -110,12 +110,19 @@ where /// - The [ReplySender], if not `None`, should be used by the delegated responder to send the actual reply. /// /// ``` - /// use kameo::message::{Context, DelegatedReply, Message}; + /// use kameo::message::{Context, Message}; + /// use kameo::reply::DelegatedReply; /// - /// impl Message for MyActor { + /// # #[derive(kameo::Actor)] + /// # struct MyActor; + /// # + /// + /// struct Msg; + /// + /// impl Message for MyActor { /// type Reply = DelegatedReply; /// - /// async fn handle(&mut self, msg: MyMsg, mut ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { + /// async fn handle(&mut self, msg: Msg, mut ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { /// let (delegated_reply, reply_sender) = ctx.reply_sender(); /// /// if let Some(tx) = reply_sender { diff --git a/src/remote.rs b/src/remote.rs index c997307..d82aaf2 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -6,6 +6,7 @@ //! names and send messages between actors on different nodes as though they were local. //! //! ## Key Features +//! //! - **Swarm Management**: The `ActorSwarm` struct handles a distributed swarm of nodes, //! managing peer discovery and communication. //! - **Actor Registration**: Actors can be registered under a unique name and looked up across @@ -14,27 +15,34 @@ //! of Kademlia DHT and libp2p's networking capabilities. //! //! ## Getting Started +//! //! To use remote actors, you must first initialize an `ActorSwarm`, which will set up the necessary //! networking components to allow remote actors to communicate across nodes. //! -//! ```rust +//! ``` //! use kameo::remote::ActorSwarm; //! +//! # tokio_test::block_on(async { //! // Initialize the actor swarm -//! let actor_swarm = ActorSwarm::bootstrap(); -//! actor_swarm.listen_on("/ip4/0.0.0.0/udp/8020/quic-v1".parse()?); +//! ActorSwarm::bootstrap()? +//! .listen_on("/ip4/0.0.0.0/udp/8020/quic-v1".parse()?).await?; +//! # Ok::<(), Box>(()) +//! # }); //! ``` //! //! ## Example Use Case +//! //! - A distributed chat system where actors represent individual users, and messages are sent between them across multiple nodes. //! //! ## Types in the Module +//! //! - [`ActorSwarm`]: The core struct for managing the distributed swarm of nodes and coordinating actor registration and messaging. //! - [`SwarmFuture`]: A future that holds the response from the actor swarm. //! - [`RemoteActor`]: A trait for identifying remote actors via a unique ID. //! - [`RemoteMessage`]: A trait for identifying remote messages via a unique ID. //! //! ### Re-exports +//! //! - `Keypair`, `PeerId`, `dial_opts`: Re-exported from the libp2p library to assist with handling peer identities and dialing options. use std::{ @@ -83,7 +91,9 @@ static REMOTE_MESSAGES_MAP: Lazy, R /// /// ## Example with Derive /// -/// ```rust +/// ``` +/// use kameo::RemoteActor; +/// /// #[derive(RemoteActor)] /// pub struct MyActor; /// ``` @@ -91,6 +101,8 @@ static REMOTE_MESSAGES_MAP: Lazy, R /// ## Example Manual Implementation /// /// ``` +/// use kameo::remote::RemoteActor; +/// /// pub struct MyActor; /// /// impl RemoteActor for MyActor { diff --git a/src/remote/swarm.rs b/src/remote/swarm.rs index bb16843..28a4daa 100644 --- a/src/remote/swarm.rs +++ b/src/remote/swarm.rs @@ -39,18 +39,26 @@ static ACTOR_SWARM: OnceCell = OnceCell::new(); /// enabling peer discovery, actor registration, and remote message routing. /// /// ## Key Features +/// /// - **Swarm Management**: Initializes and manages the libp2p swarm, allowing nodes to discover /// and communicate in a peer-to-peer network. /// - **Actor Registration**: Actors can be registered under a unique name, making them discoverable /// and accessible across the network. /// - **Message Routing**: Handles reliable message delivery to remote actors using Kademlia DHT. /// -/// ### Example -/// ```rust +/// # Example +/// +/// ``` +/// use kameo::remote::ActorSwarm; +/// +/// # tokio_test::block_on(async { /// // Initialize the actor swarm -/// let actor_swarm = ActorSwarm::bootstrap(); +/// let actor_swarm = ActorSwarm::bootstrap()?; +/// /// // Set up the swarm to listen on a specific address /// actor_swarm.listen_on("/ip4/0.0.0.0/udp/8020/quic-v1".parse()?); +/// # Ok::<(), Box>(()) +/// # }); /// ``` /// /// The `ActorSwarm` is the essential component for enabling distributed actor communication @@ -144,10 +152,16 @@ impl ActorSwarm { /// For more information on multiaddresses, see [libp2p addressing](https://docs.libp2p.io/concepts/fundamentals/addressing/). /// /// ## Example - /// ```rust + /// + /// ``` + /// use kameo::remote::ActorSwarm; + /// + /// # tokio_test::block_on(async { /// ActorSwarm::bootstrap()? /// .listen_on("/ip4/0.0.0.0/udp/8020/quic-v1".parse()?) /// .await?; + /// # Ok::<(), Box>(()) + /// # }); /// ``` /// /// ## Returns