Skip to content

Commit

Permalink
Improve docs
Browse files Browse the repository at this point in the history
  • Loading branch information
OscartGiles committed Jul 24, 2024
1 parent c951a3b commit 6ecdb38
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 40 deletions.
97 changes: 66 additions & 31 deletions rivelin_actors/src/actors/event_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
//!
//! The [Topic] trait can be implemented to define a topic.
//!
//! ```
//! ```rust
//! # use tokio_test;
//! use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Topic};
//! use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Topic, Filter};
//! use rivelin_actors::Actor;
//!
//! #[derive(Hash)]
//! struct HelloTopic;
//!
//! impl Topic for HelloTopic {
Expand All @@ -22,80 +21,117 @@
//! let addr = EventBusAddr(addr);
//!
//! // Subscribe to the topic
//! let mut consumer = addr.consumer(HelloTopic).await.unwrap();
//! let mut consumer = addr
//! .consumer::<HelloTopic, _>(Filter::all_subtopics)
//! .await
//! .unwrap();
//!
//! // Create a producer for the topic and send a message
//! let producer = addr.producer(HelloTopic);
//! producer.send("Hello from topic a".to_string()).await.unwrap();
//! producer
//! .send("Hello from topic a".to_string())
//! .await
//! .unwrap();
//!
//! // Receive a message from the topic
//! let res = consumer.recv().await.unwrap();
//! let value = res.as_ref();
//!
//! assert_eq!(value, "Hello from topic a");
//!
//! assert_eq!(value, &"Hello from topic a".to_string());
//! // Stop the actor
//! handle.abort();
//! handle.graceful_shutdown().await.unwrap();
//!
//! # })
//! ```
//! ## [Topic] fields
//! ## Subtopics
//!
//! Structs that implement [Topic] can have fields which define unique topics.
//! Structs that implement [Topic] can have fields. The value of these fields define `subtopics`
//! This is similar to topics [RabbitMQ](https://www.rabbitmq.com/tutorials/tutorial-five-python),
//!
//! ```
//! except that subtopics
//! are defined by fields rather than a string (e.g. quick.orange.rabbit). This makes them type safe.
//!
//!
//! ```rust
//! # use tokio_test;
//! # use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Topic};
//! # use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Topic, Filter};
//! # use rivelin_actors::Actor;
//! #[derive(Hash, Copy, Clone)]
//! struct TopicWithValues {
//! struct SubTopics {
//! a: u32,
//! b: u64,
//! b: u32,
//! };
//!
//! impl Topic for TopicWithValues {
//! type MessageType = String;
//! impl Topic for SubTopics {
//! type MessageType = &'static str;
//! }
//! # tokio_test::block_on(async {
//! # let (addr, handle) = Actor::spawn(EventBus::new(100), EventSinkState::new());
//! # let addr = EventBusAddr(addr);
//! let (addr, handle) = Actor::spawn(EventBus::new(100), EventSinkState::new());
//! let addr = EventBusAddr(addr);
//!
//! // These are two different topics
//! let topic_a = TopicWithValues { a: 1, b: 1 };
//! let topic_b = TopicWithValues { a: 1, b: 2 };
//! let topic_a = SubTopics { a: 1, b: 1 };
//! let topic_b = SubTopics { a: 1, b: 2 };
//!
//! let mut consumer_a = addr.consumer(topic_a).await.unwrap();
//! let mut consumer_b = addr.consumer(topic_b).await.unwrap();
//! # });
//! let producer_1 = addr.producer(topic_a);
//! let producer_2 = addr.producer(topic_b);
//!
//! // consumer_a will receive all messages
//! let mut consumer_a = addr
//! .consumer::<SubTopics, _>(Filter::all_subtopics)
//! .await
//! .unwrap();
//!
//! // consumer_b will receive all messages where the topic.b == 2
//! let mut consumer_b = addr.consumer(|t: &SubTopics| t.b == 2).await.unwrap();
//!
//! producer_1.send("This is subtopic_a").await.unwrap();
//! producer_2.send("This is subtopic_b").await.unwrap();
//!
//! assert_eq!(
//! *consumer_a.recv().await.unwrap().as_ref(),
//! "This is subtopic_a"
//! );
//! assert_eq!(
//! *consumer_a.recv().await.unwrap().as_ref(),
//! "This is subtopic_b"
//! );
//!
//! assert_eq!(
//! *consumer_b.recv().await.unwrap().as_ref(),
//! "This is subtopic_b"
//! );
//!
//! handle.graceful_shutdown().await.unwrap();
//! # })
//! ```
//!
//! ## [Stream] [Consumer]
//!
//! [Consumer] can be converted to a [Stream] to receive messages.
//! ```
//! # use tokio_test;
//! # use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Topic};
//! # use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Topic, Filter};
//! # use rivelin_actors::Actor;
//! use futures_util::StreamExt;
//!
//! # tokio_test::block_on(async {
//! # let (addr, handle) = Actor::spawn(EventBus::new(100), EventSinkState::new());
//! # let addr = EventBusAddr(addr);
//! # #[derive(Hash)]
//!
//! # struct HelloTopic;
//! # impl Topic for HelloTopic {
//! # type MessageType = String;
//! # }
//! let expected_values = vec!["First Message", "Second Message"];
//!
//! // Create a consumer and convert it to a stream
//! let mut consumer = addr.consumer(HelloTopic).await.unwrap().to_stream().enumerate();
//! let mut consumer = addr.consumer::<HelloTopic, _>(Filter::all_subtopics).await.unwrap().to_stream().enumerate();
//! # let producer = addr.producer(HelloTopic);
//! # producer.send("First Message".to_string()).await.unwrap();
//! # producer.send("Second Message".to_string()).await.unwrap();
//!
//! let mut recovered_values = vec![];
//! while let Some((i, Ok(value))) = consumer.next().await {
//! while let Some((i, value)) = consumer.next().await {
//! recovered_values.push(value.as_ref().to_owned());
//! if i == expected_values.len() - 1 {
//! break;
Expand All @@ -112,9 +148,8 @@
//!
//! ```
//! # use tokio_test;
//! # use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Topic};
//! # use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Topic, Filter};
//! # use rivelin_actors::Actor;
//! #[derive(Hash)]
//! struct TopicA;
//!
//! impl Topic for TopicA {
Expand All @@ -125,7 +160,7 @@
//! # let addr = EventBusAddr(addr);
//!
//! let producer = addr.producer(TopicA);
//! let mut consumer = addr.consumer(TopicA).await.unwrap();
//! let mut consumer = addr.consumer::<TopicA, _>(Filter::all_subtopics).await.unwrap();
//!
//! // This is Ok
//! producer.send(42).await.unwrap();
Expand Down
17 changes: 13 additions & 4 deletions rivelin_actors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ where
async {}
}

/// Runs when the actor is gracefully stopped.
/// Runs when the actor attempts gracefully shutdown.
fn on_stop(self, _state: &mut Self::State) -> impl std::future::Future<Output = ()> + Send {
async {}
}

/// Runs the actor. The default implementation simple iterates over a stream of messages and calls [`Actor::handle`] for each message.
/// Override this method if you need to handle messages in a different way.
/// The [Actor]'s event loop.
/// The default implementation simple iterates over a stream of messages and calls [`Actor::handle`] for each message.
/// Override this method if the actor needs to handle events other than messages.
fn run(
self,
mut message_stream: impl Stream<Item = Self::Message> + Send + 'static + std::marker::Unpin,
Expand Down Expand Up @@ -108,7 +109,9 @@ where
state: &mut Self::State,
) -> impl std::future::Future<Output = ()> + Send;

/// Create an Actor instance.
/// Spawn an [Actor] with an initial state.
/// The [Actor] will be moved into a tokio task and started.
/// Returns a tuple containing an [Addr] to send messages to the actor and an [ActorHandle] to gracefully shutdown the actor.
fn spawn<K>(actor: Self, state: Self::State) -> (K, ActorHandle)
where
K: From<Addr<Self>>,
Expand Down Expand Up @@ -138,15 +141,21 @@ where
}
}

/// A handle to an actor that can be used to gracefully shutdown the actor or abort it.
pub struct ActorHandle {
task_handle: tokio::task::JoinHandle<()>,
cancellation_token: CancellationToken,
}

impl ActorHandle {
/// Abort the actor.
/// Internally this calls [`tokio::task::JoinHandle::abort`].
pub fn abort(&self) {
self.task_handle.abort();
}

/// Gracefully shutdown the actor.
/// This cancel's the actor's cancellation token which should call [`Actor::on_stop`] and wait for the actor to finish processing messages.
pub async fn graceful_shutdown(self) -> Result<(), tokio::task::JoinError> {
self.cancellation_token.cancel();
self.task_handle.await
Expand Down
20 changes: 15 additions & 5 deletions rivelin_actors/tests/event_bus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::hash::Hash;
use std::time::Duration;

use futures_util::StreamExt;
use rivelin_actors::event_bus::{EventBus, EventBusAddr, EventSinkState, Filter, Topic};
Expand All @@ -21,7 +22,10 @@ async fn event_bus_string() -> anyhow::Result<()> {
.await?;

let producer = addr.producer(HelloTopic);
producer.send("Hello from topic a".to_string()).await?;
producer
.send("Hello from topic a".to_string())
.await
.unwrap();

let res = consumer.recv().await.unwrap();
let value = res.as_ref();
Expand Down Expand Up @@ -188,8 +192,9 @@ async fn event_bus_subtopics() -> anyhow::Result<()> {
// Should only receive the last value, even though sent last
assert_eq!(*res.as_ref(), 3);

// No more values should be available
assert!(consumer.try_recv().is_err());
// No more values should be available. If we haven't received on in 500ms will assume there are no more values
let res = tokio::time::timeout(Duration::from_millis(500), consumer.recv()).await;
assert!(res.is_err());

Ok(())
}
Expand Down Expand Up @@ -245,12 +250,17 @@ async fn event_bus_multiple_subtopics() -> anyhow::Result<()> {
assert_eq!(*res.as_ref(), 1);
let res = consumer_topic_a.recv().await.unwrap();
assert_eq!(*res.as_ref(), 2);
assert!(consumer_topic_a.try_recv().is_err());

// No more values should be available. If we haven't received on in 500ms will assume there are no more values
let res = tokio::time::timeout(Duration::from_millis(500), consumer_topic_a.recv()).await;
assert!(res.is_err());

// Consumer for topic b can get the value from SubtopicB
let res = consumer_topic_b.recv().await.unwrap();
assert_eq!(*res.as_ref(), "hello");
assert!(consumer_topic_b.try_recv().is_err());

let res = tokio::time::timeout(Duration::from_millis(500), consumer_topic_b.recv()).await;
assert!(res.is_err());

Ok(())
}

0 comments on commit 6ecdb38

Please sign in to comment.