Skip to content

Commit

Permalink
Merge pull request #30 from slawlor/macros
Browse files Browse the repository at this point in the history
Small macro cleanups + big concurrency cleanup
  • Loading branch information
slawlor authored Jan 20, 2023
2 parents 183339b + 57514c5 commit 59e6ff8
Show file tree
Hide file tree
Showing 22 changed files with 341 additions and 200 deletions.
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ A pure-Rust actor framework. Inspired from [Erlang's `gen_server`](https://www.e

`ractor` tries to solve the problem of building and maintaing an Erlang-like actor framework in Rust. It gives
a set of generic primitives and helps automate the supervision tree and management of our actors along with the traditional actor message processing logic. It's built *heavily* on `tokio` which is a
hard requirement for `ractor`.
hard requirement for `ractor`.

`ractor` is a modern actor framework written in 100% rust with NO `unsafe` code.
`ractor` is a modern actor framework written in 100% rust with NO `unsafe` code.

### Why ractor?

Expand All @@ -27,7 +27,7 @@ Ractor tries to be different my modelling more on a pure Erlang `gen_server`. Th

Additionally we wrote `ractor` without building on some kind of "Runtime" or "System" which needs to be spawned. Actors can be run independently, in conjunction with other basic `tokio` runtimes with little additional overhead.

We currently have full support for
We currently have full support for:

1. Single-threaded message processing
2. Actor supervision tree
Expand All @@ -36,7 +36,6 @@ We currently have full support for
5. Named actor registry (`ractor::registry`) from [Erlang's `Registered processes`](https://www.erlang.org/doc/reference_manual/processes.html)
6. Process groups (`ractor::pg`) from [Erlang's `pg` module](https://www.erlang.org/doc/man/pg.html)


On our roadmap is to add more of the Erlang functionality including potentially a distributed actor cluster.

## Installation
Expand All @@ -45,7 +44,7 @@ Install `ractor` by adding the following to your Cargo.toml dependencies

```toml
[dependencies]
ractor = "0.3"
ractor = "0.4"
```

## Working with Actors
Expand Down Expand Up @@ -150,7 +149,7 @@ work will complete, and on the next message processing iteration Stop will take
currently executing work, regardless of the provided reason.
3. SupervisionEvent: Supervision events are messages from child actors to their supervisors in the event of their startup, death, and/or unhandled panic. Supervision events
are how an actor's supervisor(s) are notified of events of their children and can handle lifetime events for them.
4. Messages: Regular, user-defined, messages are the last channel of communication to actors. They are the lowest priority of the 4 message types and denote general actor work. The first
4. Messages: Regular, user-defined, messages are the last channel of communication to actors. They are the lowest priority of the 4 message types and denote general actor work. The first
3 messages types (signals, stop, supervision) are generally quiet unless it's a lifecycle event for the actor, but this channel is the "work" channel doing what your actor wants to do!

## Contributors
Expand All @@ -159,4 +158,4 @@ The original authors of `ractor` are Sean Lawlor (@slawlor), Dillon George (@dil

## License

This project is licensed under [MIT](https://github.com/slawlor/ractor/blob/main/LICENSE).
This project is licensed under [MIT](https://github.com/slawlor/ractor/blob/main/LICENSE).
13 changes: 11 additions & 2 deletions ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.4.1"
version = "0.4.2"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -12,12 +12,21 @@ readme = "../README.md"
homepage = "https://github.com/slawlor/ractor"
categories = ["actor", "erlang"]

# WIP
# [features]
# tokio_runtime = ["tokio/time"]
# async_std_runtime = ["async-std"]

# default = ["tokio_runtime"]
# default = ["async_std_runtime"]

[dependencies]
async-std = { version = "1", optional = true }
async-trait = "0.1"
dashmap = "5"
futures = "0.3"
once_cell = "1"
tokio = { version = "1", features = ["rt", "time", "sync", "macros"]}
tokio = { version = "1", features = ["sync", "time"] }

[dev-dependencies]
criterion = "0.3"
Expand Down
25 changes: 11 additions & 14 deletions ractor/benches/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,19 @@ fn schedule_work(c: &mut Criterion) {
b.iter_batched(
|| {
runtime.block_on(async move {
let mut handles = vec![];
let mut join_set = tokio::task::JoinSet::new();

for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor)
.await
.expect("Failed to create test agent");
handles.push(handler);
join_set.spawn(handler);
}
handles
join_set
})
},
|handles| {
runtime.block_on(async move {
let _ = futures::future::join_all(handles).await;
})
|mut handles| {
runtime.block_on(async move { while let Some(_) = handles.join_next().await {} })
},
BatchSize::PerIteration,
);
Expand All @@ -108,20 +107,18 @@ fn schedule_work(c: &mut Criterion) {
b.iter_batched(
|| {
runtime.block_on(async move {
let mut handles = vec![];
let mut join_set = tokio::task::JoinSet::new();
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor)
.await
.expect("Failed to create test agent");
handles.push(handler);
join_set.spawn(handler);
}
handles
join_set
})
},
|handles| {
runtime.block_on(async move {
let _ = futures::future::join_all(handles).await;
})
|mut handles| {
runtime.block_on(async move { while let Some(_) = handles.join_next().await {} })
},
BatchSize::PerIteration,
);
Expand Down
8 changes: 4 additions & 4 deletions ractor/examples/philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ async fn main() {
];
let mut forks = Vec::with_capacity(philosopher_names.len());
let mut philosophers = Vec::with_capacity(philosopher_names.len());
let mut all_handles = Vec::new();
let mut all_handles = tokio::task::JoinSet::new();

let mut results: HashMap<ActorName, Option<PhilosopherMetrics>> =
HashMap::with_capacity(philosopher_names.len());
Expand All @@ -453,7 +453,7 @@ async fn main() {
.await
.expect("Failed to create fork!");
forks.push(fork);
all_handles.push(handle);
all_handles.spawn(handle);
}

// Spawn the philosopher actors clockwise from top of the table
Expand All @@ -473,7 +473,7 @@ async fn main() {
.expect("Failed to create philosopher!");
results.insert(philosopher_names[left], None);
philosophers.push(philosopher);
all_handles.push(handle);
all_handles.spawn(handle);
}

// wait for the simulation to end
Expand All @@ -494,7 +494,7 @@ async fn main() {
}

// wait for everything to shut down
let _ = futures::future::join_all(all_handles).await;
while let Some(_) = all_handles.join_next().await {}

// print metrics
println!("Simulation results");
Expand Down
15 changes: 9 additions & 6 deletions ractor/src/actor/actor_cell/actor_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;

use tokio::sync::mpsc;
use crate::concurrency as mpsc;

use crate::actor::messages::{BoxedMessage, StopMessage};
use crate::actor::supervision::SupervisionTree;
use crate::port::{BoundedInputPort, BoundedInputPortReceiver, InputPort, InputPortReceiver};
use crate::concurrency::{
MpscReceiver as BoundedInputPortReceiver, MpscSender as BoundedInputPort,
MpscUnboundedReceiver as InputPortReceiver, MpscUnboundedSender as InputPort,
};
use crate::{Actor, ActorId, ActorName, ActorStatus, MessagingErr, Signal, SupervisionEvent};

// The inner-properties of an Actor
Expand Down Expand Up @@ -39,10 +42,10 @@ impl ActorProperties {
where
TActor: Actor,
{
let (tx_signal, rx_signal) = mpsc::channel(2);
let (tx_stop, rx_stop) = mpsc::channel(2);
let (tx_supervision, rx_supervision) = mpsc::unbounded_channel();
let (tx_message, rx_message) = mpsc::unbounded_channel();
let (tx_signal, rx_signal) = mpsc::mpsc_bounded(2);
let (tx_stop, rx_stop) = mpsc::mpsc_bounded(2);
let (tx_supervision, rx_supervision) = mpsc::mpsc_unbounded();
let (tx_message, rx_message) = mpsc::mpsc_unbounded();
(
Self {
id: crate::actor_id::get_new_local_id(),
Expand Down
22 changes: 6 additions & 16 deletions ractor/src/actor/actor_cell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use super::errors::MessagingErr;
use super::messages::{BoxedMessage, Signal, StopMessage};

use super::SupervisionEvent;
use crate::port::{BoundedInputPortReceiver, InputPortReceiver};
use crate::concurrency::{
MpscReceiver as BoundedInputPortReceiver, MpscUnboundedReceiver as InputPortReceiver,
};
use crate::{Actor, ActorId, ActorName, SpawnErr};

pub mod actor_ref;
Expand Down Expand Up @@ -75,18 +77,12 @@ impl ActorPortSet {
/// signal interrupts the async work.
pub async fn run_with_signal<TState>(
&mut self,
future: impl futures::Future<Output = TState>,
future: impl std::future::Future<Output = TState>,
) -> Result<TState, Signal>
where
TState: crate::State,
{
tokio::select! {
// Biased ensures that we poll the ports in the order they appear, giving
// priority to our message reception operations. See:
// https://docs.rs/tokio/latest/tokio/macro.select.html#fairness
// for more information
biased;

crate::concurrency::select! {
// supervision or message processing work
// can be interrupted by the signal port receiving
// a kill signal
Expand All @@ -108,13 +104,7 @@ impl ActorPortSet {
/// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr]
/// in the event any of the channels is closed.
pub async fn listen_in_priority(&mut self) -> Result<ActorPortMessage, MessagingErr> {
tokio::select! {
// Biased ensures that we poll the ports in the order they appear, giving
// priority to our message reception operations. See:
// https://docs.rs/tokio/latest/tokio/macro.select.html#fairness
// for more information
biased;

crate::concurrency::select! {
signal = self.signal_rx.recv() => {
signal.map(ActorPortMessage::Signal).ok_or(MessagingErr::ChannelClosed)
}
Expand Down
13 changes: 0 additions & 13 deletions ractor/src/actor/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,6 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for MessagingErr {
Self::ChannelClosed
}
}

impl<T> From<tokio::sync::broadcast::error::SendError<T>> for MessagingErr {
fn from(_: tokio::sync::broadcast::error::SendError<T>) -> Self {
Self::ChannelClosed
}
}

impl<T> From<tokio::sync::watch::error::SendError<T>> for MessagingErr {
fn from(_: tokio::sync::watch::error::SendError<T>) -> Self {
Self::ChannelClosed
}
}

impl<T> From<tokio::sync::mpsc::error::TrySendError<T>> for MessagingErr {
fn from(_: tokio::sync::mpsc::error::TrySendError<T>) -> Self {
Self::ChannelClosed
Expand Down
6 changes: 3 additions & 3 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
use std::{panic::AssertUnwindSafe, sync::Arc};

use crate::concurrency::JoinHandle;
use futures::TryFutureExt;
use tokio::task::JoinHandle;

pub mod messages;
use messages::*;
Expand Down Expand Up @@ -216,7 +216,7 @@ where

/// Start the actor immediately, optionally linking to a parent actor (supervision tree)
///
/// NOTE: This returned [tokio::task::JoinHandle] is guaranteed to not panic (unless the runtime is shutting down perhaps).
/// NOTE: This returned [crate::concurrency::JoinHandle] is guaranteed to not panic (unless the runtime is shutting down perhaps).
/// An inner join handle is capturing panic results from any part of the inner tasks, so therefore
/// we can safely ignore it, or wait on it to block on the actor's progress
///
Expand Down Expand Up @@ -249,7 +249,7 @@ where
// run the processing loop, backgrounding the work
let myself = self.base.clone();
let myself_ret = self.base.clone();
let handle = tokio::spawn(async move {
let handle = crate::concurrency::spawn(async move {
let evt = match Self::processing_loop(
ports,
&mut state,
Expand Down
Loading

0 comments on commit 59e6ff8

Please sign in to comment.