diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3ed601d0..f63a645d 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,6 +17,12 @@ jobs: - name: Run the default tests package: ractor # flags: + - name: Test ractor with the `cluster` feature + package: ractor + flags: -F cluster + - name: Test ractor-cluster + package: ractor-cluster + # flags: steps: - uses: actions/checkout@main @@ -27,7 +33,7 @@ jobs: toolchain: stable override: true - - name: Run test + - name: ${{matrix.name}} uses: actions-rs/cargo@v1 with: command: test diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 3c6ffbaa..a56879e6 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -15,13 +15,17 @@ jobs: - uses: hecrj/setup-rust-action@v1 with: rust-version: ${{ matrix.rust }} + - uses: actions/checkout@main + - name: Login to crates.io run: cargo login $CRATES_IO_TOKEN env: CRATES_IO_TOKEN: ${{ secrets.crates_io_token }} + - name: Dry run publish ractor run: cargo publish --dry-run --manifest-path Cargo.toml -p ractor + - name: Publish crate ractor run: cargo publish --manifest-path Cargo.toml -p ractor env: diff --git a/Cargo.toml b/Cargo.toml index 0515a6f6..eae5cf38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "ractor", + "ractor-cluster", "ractor-playground", "xtask" ] diff --git a/ractor-cluster/Cargo.toml b/ractor-cluster/Cargo.toml new file mode 100644 index 00000000..2581d618 --- /dev/null +++ b/ractor-cluster/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "ractor-cluster" +version = "0.4.0" +authors = ["Sean Lawlor", "Evan Au", "Dillon George"] +description = "Distributed cluster environment of Ractor actors" +documentation = "https://docs.rs/ractor" +license = "MIT" +edition = "2018" +keywords = ["actor", "ractor", "cluster"] +repository = "https://github.com/slawlor/ractor" +readme = "../README.md" +homepage = "https://github.com/slawlor/ractor" +categories = ["actor", "erlang"] +build = "src/build.rs" + +[build-dependencies] +protobuf-src = "1" +prost-build = { version = "0.11" } + +[dependencies] +## Required dependencies +async-trait = "0.1" +bytes = { version = "1" } +log = "0.4" +prost = { version = "0.11" } +prost-types = { version = "0.11" } +ractor = { version = "0.4", features = ["cluster"], path = "../ractor" } +rand = "0.8" +sha2 = "0.10" +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util"]} + +## Optional dependencies +# tokio-rustls = { version = "0.23", optional = true } + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util", "rt-multi-thread"] } diff --git a/ractor-cluster/src/build.rs b/ractor-cluster/src/build.rs new file mode 100644 index 00000000..d337a3fc --- /dev/null +++ b/ractor-cluster/src/build.rs @@ -0,0 +1,31 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! This is the pre-compilation build script for the crate `ractor` when running in distrubted +//! mode. It's used to compile protobuf into Rust code prior to compilation. + +/// The shared-path for all protobuf specifications +const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol"; +/// The list of protobuf files to generate inside PROBUF_BASE_DIRECTORY +const PROTOBUF_FILES: [&str; 4] = ["meta", "node", "auth", "control"]; + +fn build_protobufs() { + std::env::set_var("PROTOC", protobuf_src::protoc()); + + let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len()); + + for file in PROTOBUF_FILES.iter() { + let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file); + println!("cargo:rerun-if-changed={}", proto_file); + protobuf_files.push(proto_file); + } + + prost_build::compile_protos(&protobuf_files, &[PROTOBUF_BASE_DIRECTORY]).unwrap(); +} + +fn main() { + // compile the spec files into Rust code + build_protobufs(); +} diff --git a/ractor-cluster/src/hash.rs b/ractor-cluster/src/hash.rs new file mode 100644 index 00000000..edf81f66 --- /dev/null +++ b/ractor-cluster/src/hash.rs @@ -0,0 +1,25 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Hashing utilities mainly used around challenge computation + +pub(crate) const DIGEST_BYTES: usize = 32; +pub(crate) type Digest = [u8; DIGEST_BYTES]; + +/// Compute a challenge digest +pub(crate) fn challenge_digest(secret: &'_ str, challenge: u32) -> Digest { + use sha2::Digest; + + let secret_bytes = secret.as_bytes(); + let mut data = vec![0u8; secret_bytes.len() + 4]; + + let challenge_bytes = challenge.to_be_bytes(); + data[0..4].copy_from_slice(&challenge_bytes); + data[4..].copy_from_slice(secret_bytes); + + let hash = sha2::Sha256::digest(&data); + + hash.into() +} diff --git a/ractor-cluster/src/lib.rs b/ractor-cluster/src/lib.rs new file mode 100644 index 00000000..effa4bb1 --- /dev/null +++ b/ractor-cluster/src/lib.rs @@ -0,0 +1,53 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Support for remote nodes in a distributed cluster. +//! +//! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html) +//! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes. +//! +//! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool. +//! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect +//! to them as well. They merge registries and pg groups together in order to create larger clusters of services. +//! +//! We have chosen protobuf for our inter-node defined protocol, however you can chose whatever medium you like +//! for binary serialization + deserialization. The "remote" actor will simply encode your message type and send it +//! over the wire for you +//! +//! ## A note on usage +//! +//! An important note on usage, when utilizing `ractor-cluster` and [ractor] in the cluster configuration +//! (i.e. `ractor/cluster`), you no longer receive the auto-implementation for all types for [ractor::Message]. This +//! is due to specialization (see: https://github.com/rust-lang/rust/issues/31844). Ideally we'd have the trait have a +//! "default" non-serializable implementation for all types that could be messages, and specific implementations for +//! those that can be messages sent over the network. However this is presently a `+nightly` only functionality and +//! has a soundness hole in it's processes. Therefore as a workaround, when the `cluster` feature is enabled on [ractor] +//! the default implementation, specifically `impl Message for T {}` is disabled. +//! +//! This means that you need to specify the implementation of the [ractor::Message] trait on all message types, and when +//! they're not network supported messages, this is just a default empty implementation. When they **are** potentially +//! sent over a network in a dist protocol, then you need to fill out the implementation details for how the message +//! serialization is handled. See the documentation of [crate::serialized_rpc_forward] for an example. + +// #![deny(warnings)] +#![warn(unused_imports)] +#![warn(unsafe_code)] +#![warn(missing_docs)] +#![warn(unused_crate_dependencies)] +#![cfg_attr(docsrs, feature(doc_cfg))] + +mod hash; +mod net; +pub mod node; +pub(crate) mod protocol; +pub(crate) mod remote_actor; + +pub mod macros; + +// Re-exports +pub use node::NodeServer; + +/// Node's are representing by an integer id +pub type NodeId = u64; diff --git a/ractor-cluster/src/macros.rs b/ractor-cluster/src/macros.rs new file mode 100644 index 00000000..8058cca8 --- /dev/null +++ b/ractor-cluster/src/macros.rs @@ -0,0 +1,230 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Macro helpers for remote actors + +/// `serialized_rpc_forward!` converts a traditional RPC port to a port which recieves a serialized +/// [Vec<_>] message and can rebuild the reply. This is necessary for RPCs which can occur over the network. +/// +/// However when defining the serialized logic, the cost will ONLY be incurred for actors which live +/// on another `node()`, never locally. Local actors will always use the local [ractor::message::BoxedMessage] +/// notation. +/// +/// An example usage is +/// ```rust +/// use ractor::concurrency::Duration; +/// use ractor::{RpcReplyPort, Message}; +/// use ractor::message::SerializedMessage; +/// use ractor::message::BoxedDowncastErr; +/// use ractor_cluster::serialized_rpc_forward; +/// +/// enum MessageType { +/// Cast(String), +/// Call(String, RpcReplyPort), +/// } +/// +/// impl Message for MessageType { +/// fn serializable() -> bool { +/// true +/// } +/// +/// fn serialize(self) -> SerializedMessage { +/// match self { +/// Self::Cast(args) => SerializedMessage::Cast(args.into_bytes()), +/// Self::Call(args, reply) => { +/// let tx = serialized_rpc_forward!(reply, |bytes| String::from_utf8(bytes).unwrap()); +/// SerializedMessage::Call(args.into_bytes(), tx.into()) +/// } +/// } +/// } +/// +/// fn deserialize(bytes: SerializedMessage) -> Result { +/// match bytes { +/// SerializedMessage::Call(args, reply) => { +/// let tx = serialized_rpc_forward!(reply, |str: String| str.into_bytes()); +/// Ok(Self::Call(String::from_utf8(args).unwrap(), tx)) +/// } +/// SerializedMessage::Cast(args) => Ok(Self::Cast(String::from_utf8(args).unwrap())), +/// _ => Err(BoxedDowncastErr), +/// } +/// } +/// } +/// ``` +#[macro_export] +macro_rules! serialized_rpc_forward { + ($typed_port:expr, $converter:expr) => {{ + let (tx, rx) = ractor::concurrency::oneshot(); + let timeout = $typed_port.get_timeout(); + ractor::concurrency::spawn(async move { + match $typed_port.get_timeout() { + Some(timeout) => { + if let Ok(Ok(result)) = ractor::concurrency::timeout(timeout, rx).await { + let _ = $typed_port.send($converter(result)); + } + } + None => { + if let Ok(result) = rx.await { + let _ = $typed_port.send($converter(result)); + } + } + } + }); + if let Some(to) = timeout { + RpcReplyPort::<_>::from((tx, to)) + } else { + RpcReplyPort::<_>::from(tx) + } + }}; +} + +#[cfg(test)] +mod tests { + use ractor::concurrency::Duration; + use ractor::message::BoxedDowncastErr; + use ractor::message::SerializedMessage; + use ractor::{Message, RpcReplyPort}; + + enum MessageType { + Cast(String), + Call(String, RpcReplyPort), + } + + impl Message for MessageType { + fn serializable() -> bool { + true + } + + fn serialize(self) -> SerializedMessage { + match self { + Self::Cast(args) => SerializedMessage::Cast(args.into_bytes()), + Self::Call(args, reply) => { + let tx = + serialized_rpc_forward!(reply, |bytes| String::from_utf8(bytes).unwrap()); + SerializedMessage::Call(args.into_bytes(), tx) + } + } + } + + fn deserialize(bytes: SerializedMessage) -> Result { + match bytes { + SerializedMessage::Call(args, reply) => { + let tx = serialized_rpc_forward!(reply, |str: String| str.into_bytes()); + Ok(Self::Call(String::from_utf8(args).unwrap(), tx)) + } + SerializedMessage::Cast(args) => Ok(Self::Cast(String::from_utf8(args).unwrap())), + _ => Err(BoxedDowncastErr), + } + } + } + + #[tokio::test] + async fn no_timeout_rpc() { + let (tx, rx) = ractor::concurrency::oneshot(); + let no_timeout = MessageType::Call("test".to_string(), tx.into()); + let no_timeout_serialized = no_timeout.serialize(); + match no_timeout_serialized { + SerializedMessage::Call(args, reply) => { + let _ = reply.send(args); + } + _ => panic!("Invalid"), + } + + let no_timeout_reply = rx.await.expect("Receive error"); + assert_eq!(no_timeout_reply, "test".to_string()); + } + + #[tokio::test] + async fn with_timeout_rpc() { + let (tx, rx) = ractor::concurrency::oneshot(); + let duration = Duration::from_millis(10); + let with_timeout = MessageType::Call("test".to_string(), (tx, duration).into()); + + let with_timeout_serialized = with_timeout.serialize(); + match with_timeout_serialized { + SerializedMessage::Call(args, reply) => { + let _ = reply.send(args); + } + _ => panic!("Invalid"), + } + + let with_timeout_reply = rx.await.expect("Receive error"); + assert_eq!(with_timeout_reply, "test".to_string()); + } + + #[tokio::test] + async fn timeout_rpc() { + let (tx, rx) = ractor::concurrency::oneshot(); + let duration = Duration::from_millis(10); + let with_timeout = MessageType::Call("test".to_string(), (tx, duration).into()); + + let with_timeout_serialized = with_timeout.serialize(); + match with_timeout_serialized { + SerializedMessage::Call(args, reply) => { + ractor::concurrency::sleep(Duration::from_millis(50)).await; + let _ = reply.send(args); + } + _ => panic!("Invalid"), + } + + let result = rx.await; + assert!(matches!(result, Err(_))); + } + + #[tokio::test] + async fn no_timeout_rpc_decoded_reply() { + let (tx, rx) = ractor::concurrency::oneshot(); + + let no_timeout = MessageType::Call("test".to_string(), tx.into()); + let no_timeout_serialized = no_timeout.serialize(); + let no_timeout_deserialized = + MessageType::deserialize(no_timeout_serialized).expect("Failed to deserialize port"); + if let MessageType::Call(args, reply) = no_timeout_deserialized { + let _ = reply.send(args); + } else { + panic!("Failed to decode with `MessageType`"); + } + + let no_timeout_reply = rx.await.expect("Receive error"); + assert_eq!(no_timeout_reply, "test".to_string()); + } + + #[tokio::test] + async fn with_timeout_rpc_decoded_reply() { + let (tx, rx) = ractor::concurrency::oneshot(); + + let initial_call = + MessageType::Call("test".to_string(), (tx, Duration::from_millis(50)).into()); + let serialized_call = initial_call.serialize(); + let deserialized_call = + MessageType::deserialize(serialized_call).expect("Failed to deserialize port"); + if let MessageType::Call(args, reply) = deserialized_call { + let _ = reply.send(args); + } else { + panic!("Failed to decode with `MessageType`"); + } + + let message_reply = rx.await.expect("Receive error"); + assert_eq!(message_reply, "test".to_string()); + } + + #[tokio::test] + async fn with_timeout_rpc_decoded_timeout() { + let (tx, rx) = ractor::concurrency::oneshot(); + + let initial_call = + MessageType::Call("test".to_string(), (tx, Duration::from_millis(50)).into()); + let serialized_call = initial_call.serialize(); + let deserialized_call = + MessageType::deserialize(serialized_call).expect("Failed to deserialize port"); + if let MessageType::Call(args, reply) = deserialized_call { + ractor::concurrency::sleep(Duration::from_millis(100)).await; + let _ = reply.send(args); + } else { + panic!("Failed to decode with `MessageType`"); + } + + assert!(rx.await.is_err()); + } +} diff --git a/ractor-cluster/src/net/listener.rs b/ractor-cluster/src/net/listener.rs new file mode 100644 index 00000000..dbafa19d --- /dev/null +++ b/ractor-cluster/src/net/listener.rs @@ -0,0 +1,100 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP Server to accept incoming sessions + +use ractor::{cast, Message}; +use ractor::{Actor, ActorRef}; +use tokio::net::TcpListener; + +use crate::node::SessionManagerMessage; + +/// A Tcp Socket [Listener] responsible for accepting new connections and spawning [super::session::Session]s +/// which handle the message sending and receiving over the socket. +/// +/// The [Listener] supervises all of the TCP [super::session::Session] actors and is responsible for logging +/// connects and disconnects as well as tracking the current open [super::session::Session] actors. +pub struct Listener { + port: super::NetworkPort, + session_manager: ActorRef, +} + +impl Listener { + /// Create a new `Listener` + pub fn new( + port: super::NetworkPort, + session_manager: ActorRef, + ) -> Self { + Self { + port, + session_manager, + } + } +} + +/// The Node listener's state +pub struct ListenerState { + listener: Option, +} + +pub struct ListenerMessage; +impl Message for ListenerMessage {} + +#[async_trait::async_trait] +impl Actor for Listener { + type Msg = ListenerMessage; + + type State = ListenerState; + + async fn pre_start(&self, myself: ActorRef) -> Self::State { + let addr = format!("0.0.0.0:{}", self.port); + let listener = match TcpListener::bind(&addr).await { + Ok(l) => l, + Err(err) => { + panic!("Error listening to socket: {}", err); + } + }; + + // startup the event processing loop by sending an initial msg + let _ = myself.cast(ListenerMessage); + + // create the initial state + Self::State { + listener: Some(listener), + } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // close the listener properly, in case anyone else has handles to the actor stopping + // total droppage + drop(state.listener.take()); + } + + async fn handle(&self, myself: ActorRef, _message: Self::Msg, state: &mut Self::State) { + if let Some(listener) = &mut state.listener { + match listener.accept().await { + Ok((stream, addr)) => { + let _ = cast!( + self.session_manager, + SessionManagerMessage::ConnectionOpened { + stream, + is_server: true + } + ); + log::info!("TCP Session opened for {}", addr); + } + Err(socket_accept_error) => { + log::warn!( + "Error accepting socket {} on Node server", + socket_accept_error + ); + } + } + } + + // continue accepting new sockets + let _ = myself.cast(ListenerMessage); + } +} diff --git a/ractor-cluster/src/net/mod.rs b/ractor-cluster/src/net/mod.rs new file mode 100644 index 00000000..33f4c3bb --- /dev/null +++ b/ractor-cluster/src/net/mod.rs @@ -0,0 +1,21 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP server and session actors which transmit [prost::Message] encoded messages + +// TODO: we need a way to identify which session messages are coming from + going to. Therefore +// we should actually have a notification when a new session is launched, which can be used +// to match which session is tied to which actor id + +pub mod listener; +pub mod session; + +/// A trait which implements [prost::Message], [Default], and has a static lifetime +/// denoting protobuf-encoded messages which can be transmitted over the wire +pub trait NetworkMessage: prost::Message + Default + 'static {} +impl NetworkMessage for T {} + +/// A network port +pub type NetworkPort = u16; diff --git a/ractor-cluster/src/net/session.rs b/ractor-cluster/src/net/session.rs new file mode 100644 index 00000000..f842bd67 --- /dev/null +++ b/ractor-cluster/src/net/session.rs @@ -0,0 +1,405 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP session actor which is managing the specific communication to a node + +// TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs + +use std::convert::TryInto; +use std::marker::PhantomData; +use std::net::SocketAddr; + +use bytes::Bytes; +use prost::Message; +use ractor::{Actor, ActorCell, ActorRef}; +use ractor::{SpawnErr, SupervisionEvent}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::ErrorKind; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::TcpStream; + +use super::NetworkMessage; + +/// Helper method to read exactly `len` bytes from the stream into a pre-allocated buffer +/// of bytes +async fn read_n_bytes(stream: &mut OwnedReadHalf, len: usize) -> Result, tokio::io::Error> { + let mut buf = vec![0u8; len]; + let mut c_len = 0; + stream.readable().await?; + while c_len < len { + let n = stream.read(&mut buf[c_len..]).await?; + if n == 0 { + // EOF + return Err(tokio::io::Error::new( + tokio::io::ErrorKind::UnexpectedEof, + "EOF", + )); + } + c_len += n; + } + Ok(buf) +} + +// ========================= Node Session actor ========================= // + +/// Represents a bi-directional tcp connection along with send + receive operations +/// +/// The [Session] actor supervises two child actors, [SessionReader] and [SessionWriter]. Should +/// either the reader or writer exit, they will terminate the entire session. +pub struct Session { + pub(crate) handler: ActorRef, + pub(crate) peer_addr: SocketAddr, + pub(crate) local_addr: SocketAddr, +} + +impl Session { + pub(crate) async fn spawn_linked( + handler: ActorRef, + stream: TcpStream, + peer_addr: SocketAddr, + local_addr: SocketAddr, + supervisor: ActorCell, + ) -> Result, SpawnErr> { + match Actor::spawn_linked( + None, + Session { + handler, + peer_addr, + local_addr, + }, + supervisor, + ) + .await + { + Err(err) => { + log::error!("Failed to spawn session writer actor: {}", err); + Err(err) + } + Ok((a, _)) => { + // intiialize this actor & its children + let _ = a.cast(SessionMessage::SetStream(stream)); + // return the actor handle + Ok(a) + } + } + } +} + +/// The node connection messages +pub enum SessionMessage { + /// Set the session's tcp stream, which initializes all underlying states + SetStream(TcpStream), + + /// Send a message over the channel + Send(crate::protocol::NetworkMessage), + + /// An object was received on the channel + ObjectAvailable(crate::protocol::NetworkMessage), +} +impl ractor::Message for SessionMessage {} + +/// The node session's state +pub struct SessionState { + writer: ActorRef>, + reader: ActorRef, +} + +#[async_trait::async_trait] +impl Actor for Session { + type Msg = SessionMessage; + type State = SessionState; + + async fn pre_start(&self, myself: ActorRef) -> Self::State { + // spawn writer + reader child actors + let (writer, _) = Actor::spawn_linked( + None, + SessionWriter:: { + _phantom: PhantomData, + }, + myself.get_cell(), + ) + .await + .expect("Failed to start session writer"); + let (reader, _) = Actor::spawn_linked( + None, + SessionReader { + session: myself.clone(), + }, + myself.get_cell(), + ) + .await + .expect("Failed to start session reader"); + + Self::State { writer, reader } + } + + async fn post_stop(&self, _myself: ActorRef, _state: &mut Self::State) { + log::info!("TCP Session closed for {}", self.peer_addr); + } + + async fn handle(&self, _myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) => { + let (read, write) = stream.into_split(); + // initialize the writer & reader state's + let _ = state.writer.cast(SessionWriterMessage::SetStream(write)); + let _ = state.reader.cast(SessionReaderMessage::SetStream(read)); + } + Self::Msg::Send(msg) => { + log::debug!( + "SEND: {} -> {} - '{:?}'", + self.local_addr, + self.peer_addr, + msg + ); + let _ = state.writer.cast(SessionWriterMessage::WriteObject(msg)); + } + Self::Msg::ObjectAvailable(msg) => { + log::debug!( + "RECEIVE {} <- {} - '{:?}'", + self.local_addr, + self.peer_addr, + msg + ); + let _ = self + .handler + .cast(crate::node::SessionMessage::MessageReceived(msg)); + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + // sockets open, they close, the world goes round... If a reader or writer exits for any reason, we'll start the shutdown procedure + // which requires that all actors exit + match message { + SupervisionEvent::ActorPanicked(actor, panic_msg) => { + if actor.get_id() == state.reader.get_id() { + log::error!("TCP Session's reader panicked with '{}'", panic_msg); + } else if actor.get_id() == state.writer.get_id() { + log::error!("TCP Session's writer panicked with '{}'", panic_msg); + } else { + log::error!("TCP Session received a child panic from an unknown child actor ({}) - '{}'", actor.get_id(), panic_msg); + } + myself.stop(Some("child_panic".to_string())); + } + SupervisionEvent::ActorTerminated(actor, _, exit_reason) => { + if actor.get_id() == state.reader.get_id() { + log::debug!("TCP Session's reader exited"); + } else if actor.get_id() == state.writer.get_id() { + log::debug!("TCP Session's writer exited"); + } else { + log::warn!("TCP Session received a child exit from an unknown child actor ({}) - '{:?}'", actor.get_id(), exit_reason); + } + myself.stop(Some("child_terminate".to_string())); + } + _ => { + // all ok + } + } + } +} + +// ========================= Node Session writer ========================= // + +struct SessionWriter +where + TMsg: NetworkMessage, +{ + _phantom: PhantomData, +} + +struct SessionWriterState { + writer: Option, +} + +enum SessionWriterMessage +where + TMsg: NetworkMessage, +{ + /// Set the stream, providing a [TcpStream], which + /// to utilize for this node's connection + SetStream(OwnedWriteHalf), + + /// Write an object over the wire + WriteObject(TMsg), +} +impl ractor::Message for SessionWriterMessage where TMsg: NetworkMessage {} + +#[async_trait::async_trait] +impl Actor for SessionWriter +where + TMsg: NetworkMessage, +{ + type Msg = SessionWriterMessage; + + type State = SessionWriterState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + // OK we've established connection, now we can process requests + + Self::State { writer: None } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // drop the channel to close it should we be exiting + drop(state.writer.take()); + } + + async fn handle(&self, _myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + SessionWriterMessage::SetStream(stream) if state.writer.is_none() => { + state.writer = Some(stream); + } + SessionWriterMessage::WriteObject(msg) if state.writer.is_some() => { + if let Some(stream) = &mut state.writer { + stream.writable().await.unwrap(); + + let encoded_data = msg.encode_length_delimited_to_vec(); + let length = encoded_data.len(); + let length_bytes: [u8; 8] = (length as u64).to_be_bytes(); + log::trace!("Writing 8 length bytes"); + if let Err(write_err) = stream.write_all(&length_bytes).await { + log::warn!("Error writing to the stream '{}'", write_err); + } else { + log::trace!("Wrote length, writing payload (len={})", length); + // now send the object + if let Err(write_err) = stream.write_all(&encoded_data).await { + log::warn!("Error writing to the stream '{}'", write_err); + } + // flush the stream + stream.flush().await.unwrap(); + } + } + } + _ => { + // no-op, wait for next send request + } + } + } +} + +// ========================= Node Session reader ========================= // + +struct SessionReader { + session: ActorRef, +} + +/// The node connection messages +pub enum SessionReaderMessage { + /// Set the stream, providing a [TcpStream], which + /// to utilize for this node's connection + SetStream(OwnedReadHalf), + + /// Wait for an object from the stream + WaitForObject, + + /// Read next object off the stream + ReadObject(u64), +} + +impl ractor::Message for SessionReaderMessage {} + +struct SessionReaderState { + reader: Option, +} + +#[async_trait::async_trait] +impl Actor for SessionReader { + type Msg = SessionReaderMessage; + + type State = SessionReaderState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State { reader: None } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // drop the channel to close it should we be exiting + drop(state.reader.take()); + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) if state.reader.is_none() => { + state.reader = Some(stream); + // wait for an incoming object + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + Self::Msg::WaitForObject if state.reader.is_some() => { + if let Some(stream) = &mut state.reader { + match read_n_bytes(stream, 8).await { + Ok(buf) => { + let length = u64::from_be_bytes(buf.try_into().unwrap()); + log::trace!("Payload length message ({}) received", length); + let _ = myself.cast(SessionReaderMessage::ReadObject(length)); + return; + } + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + log::trace!("Error (EOF) on stream"); + // EOF, close the stream by dropping the stream + drop(state.reader.take()); + myself.stop(Some("channel_closed".to_string())); + } + Err(_other_err) => { + log::trace!("Error ({:?}) on stream", _other_err); + // some other TCP error, more handling necessary + } + } + } + + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + Self::Msg::ReadObject(length) if state.reader.is_some() => { + if let Some(stream) = &mut state.reader { + match read_n_bytes(stream, length as usize).await { + Ok(buf) => { + log::trace!("Payload of length({}) received", buf.len()); + // NOTE: Our implementation writes 2 messages when sending something over the wire, the first + // is exactly 8 bytes which constitute the length of the payload message (u64 in big endian format), + // followed by the payload. This tells our TCP reader how much data to read off the wire + + // [buf] here should contain the exact amount of data to decode an object properly. + let bytes = Bytes::from(buf); + match crate::protocol::NetworkMessage::decode_length_delimited(bytes) { + Ok(msg) => { + // we decoded a message, pass it up the chain + let _ = self.session.cast(SessionMessage::ObjectAvailable(msg)); + } + Err(decode_err) => { + log::error!( + "Error decoding network message: '{}'. Discarding", + decode_err + ); + } + } + } + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + // EOF, close the stream by dropping the stream + drop(state.reader.take()); + myself.stop(Some("channel_closed".to_string())); + return; + } + Err(_other_err) => { + // TODO: some other TCP error, more handling necessary + } + } + } + + // we've read the object, now wait for next object + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + _ => { + // no stream is available, keep looping until one is available + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + } + } +} diff --git a/ractor-cluster/src/node/auth.rs b/ractor-cluster/src/node/auth.rs new file mode 100644 index 00000000..5188e4da --- /dev/null +++ b/ractor-cluster/src/node/auth.rs @@ -0,0 +1,170 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Define's a node's authentication process between peers. Definition +//! can be found in [Erlang's handshake](https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html) + +use rand::RngCore; + +use crate::hash::Digest; +use crate::protocol::auth as proto; + +/// Server authentication FSM +#[derive(Debug)] +pub(crate) enum ServerAuthenticationProcess { + /// (1) Client initiates handshake by sending their peer name + WaitingOnPeerName, + + /// (2) We have the peer name, and have replied with our own [proto::ServerStatus] + /// reply + HavePeerName(proto::NameMessage), + + /// (2B) Waiting on the client's status (true/false), if [proto::ClientStatus] was `alive` + WaitingOnClientStatus, + + /// (3) Waiting on the client's reply to the [proto::Challenge] from the server. + /// State is the name message from the client, the challenge, and the expected digest reply + /// from the client + /// + /// Arguments are the challenge to send to the client and the expected digest we should get back + WaitingOnClientChallengeReply(u32, Digest), + + /// (4) We processed the client challenge value, and replied and we're ok with the channel. + /// The client has the final decision after they check our challenge computation which we send + /// with [proto::ChallengeAck] + /// + /// Argument is the digest to send to the client + Ok(Digest), + + /// Close + Close, +} + +impl ServerAuthenticationProcess { + /// Initialize the FSM state + pub fn init() -> Self { + Self::WaitingOnPeerName + } + + pub fn start_challenge(&self, cookie: &'_ str) -> Self { + if matches!(self, Self::WaitingOnClientStatus | Self::HavePeerName(_)) { + let challenge = rand::thread_rng().next_u32(); + let digest = crate::hash::challenge_digest(cookie, challenge); + Self::WaitingOnClientChallengeReply(challenge, digest) + } else { + Self::Close + } + } + + /// Implement the FSM state transitions + pub fn next(&self, auth_message: proto::AuthenticationMessage, cookie: &'_ str) -> Self { + if let Some(msg) = auth_message.msg { + match msg { + proto::authentication_message::Msg::Name(name) => { + if let Self::WaitingOnPeerName = &self { + return Self::HavePeerName(name); + } + } + proto::authentication_message::Msg::ClientStatus(status) => { + if let Self::WaitingOnClientStatus = &self { + // client says to not continue the session + if !status.status { + return Self::Close; + } else { + return self.start_challenge(cookie); + } + } + } + proto::authentication_message::Msg::ClientChallenge(challenge_reply) => { + if let Self::WaitingOnClientChallengeReply(_, digest) = &self { + if digest.to_vec() == challenge_reply.digest { + let reply_digest = + crate::hash::challenge_digest(cookie, challenge_reply.challenge); + return Self::Ok(reply_digest); + } else { + // digest's don't match! + return Self::Close; + } + } + } + _ => {} + } + } + // received either an empty message or an out-of-order message. The node can't be trusted + Self::Close + } +} + +/// Client authentication FSM +#[derive(Debug)] +pub(crate) enum ClientAuthenticationProcess { + /// (1) After the client has sent their peer name + /// they wait for the [proto::ServerStatus] from the server + WaitingForServerStatus, + + /// (2) We've potentially sent our client status. Either way + /// we're waiting for the [proto::Challenge] from the server + WaitingForServerChallenge(proto::ServerStatus), + + /// (3) We've sent our challenge to the server, and we're waiting + /// on the server's calculation to determine if we should open the + /// channel. State is our challenge value and the expected digest + /// + /// Arguments are servers_challenge, server_digest_reply, client_challenge_value, expected_digest + WaitingForServerChallengeAck(proto::Challenge, Digest, u32, Digest), + + /// (4) We've validated the server's challenge digest and agree + /// that the channel is now open for node inter-communication + Ok, + + /// Close + Close, +} + +impl ClientAuthenticationProcess { + /// Initialize the FSM state + pub fn init() -> Self { + Self::WaitingForServerStatus + } + + /// Implement the client FSM transitions + pub fn next(&self, auth_message: proto::AuthenticationMessage, cookie: &'_ str) -> Self { + if let Some(msg) = auth_message.msg { + match msg { + proto::authentication_message::Msg::ServerStatus(status) => { + if let Self::WaitingForServerStatus = &self { + return Self::WaitingForServerChallenge(status); + } + } + proto::authentication_message::Msg::ServerChallenge(challenge_msg) => { + if let Self::WaitingForServerChallenge(_) = &self { + let server_digest = + crate::hash::challenge_digest(cookie, challenge_msg.challenge); + let challenge = rand::thread_rng().next_u32(); + let expected_digest = crate::hash::challenge_digest(cookie, challenge); + return Self::WaitingForServerChallengeAck( + challenge_msg, + server_digest, + challenge, + expected_digest, + ); + } + } + proto::authentication_message::Msg::ServerAck(challenge_ack) => { + if let Self::WaitingForServerChallengeAck(_, _, _, expected_digest) = &self { + if expected_digest.to_vec() == challenge_ack.digest { + return Self::Ok; + } else { + return Self::Close; + } + } + } + _ => {} + } + } + // received either an empty message or an out-of-order message. The node can't be trusted + Self::Close + } +} diff --git a/ractor-cluster/src/node/client.rs b/ractor-cluster/src/node/client.rs new file mode 100644 index 00000000..2d6f8fca --- /dev/null +++ b/ractor-cluster/src/node/client.rs @@ -0,0 +1,82 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! This module contains the logic for initiating client requests to other [super::NodeServer]s + +use std::fmt::Display; + +use ractor::{cast, ActorRef, MessagingErr, SpawnErr}; +use tokio::net::TcpStream; + +/// Client connection error types +#[derive(Debug)] +pub enum ClientConnectError { + /// Socket failed to bind, returning the underlying tokio error + Socket(tokio::io::Error), + /// Error communicating to the [super::NodeServer] actor. Actor receiving port is + /// closed + Messaging(MessagingErr), + /// A timeout in trying to start a new [NodeSession] + Timeout, + /// Error spawning the tcp session actor supervision tree + TcpSpawn(SpawnErr), +} + +impl Display for ClientConnectError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for ClientConnectError { + fn from(value: tokio::io::Error) -> Self { + Self::Socket(value) + } +} + +impl From for ClientConnectError { + fn from(value: MessagingErr) -> Self { + Self::Messaging(value) + } +} + +impl From for ClientConnectError { + fn from(value: SpawnErr) -> Self { + Self::TcpSpawn(value) + } +} + +/// Connect to another [super::NodeServer] instance +/// +/// * `host` - The hostname to connect to +/// * `port` - The host's port to connect to +/// +/// Returns: [Ok(())] if the connection was successful and the [NodeSession] was started. Handshake will continue +/// automatically. Results in a [Err(ClientConnectError)] if any part of the process failed to initiate +pub async fn connect( + node_server: ActorRef, + host: &'static str, + port: crate::net::NetworkPort, +) -> Result<(), ClientConnectError> { + // connect to the socket + let stream = TcpStream::connect(format!("{host}:{port}")).await?; + + // Startup the TCP handler, linked to the newly created `NodeSession` + let addr = stream.peer_addr()?; + + let _ = cast!( + node_server, + super::SessionManagerMessage::ConnectionOpened { + stream, + is_server: false + } + ); + + // // notify the `NodeSession` about it's tcp connection + // let _ = session_handler.cast(super::SessionMessage::SetTcpSession(tcp_actor)); + log::info!("TCP Session opened for {}", addr); + + Ok(()) +} diff --git a/ractor-cluster/src/node/mod.rs b/ractor-cluster/src/node/mod.rs new file mode 100644 index 00000000..25cbec7f --- /dev/null +++ b/ractor-cluster/src/node/mod.rs @@ -0,0 +1,379 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Erlang `node()` host communication for managing remote actor communication in +//! a cluster +//! +//! The supervision tree is the following +//! +//! [NodeServer] supervises +//! 1. The server-socket TCP [crate::net::listener::Listener] +//! 2. All of the individual [NodeSession]s +//! +//! Each [NodeSession] supervises +//! 1. The TCP [crate::net::session::Session] connection +//! 2. (todo) All of the remote referenced actors. That way if the overall node session closes (due to tcp err for example) will lose connectivity +//! to all of the remote actors +//! +//! Each [crate::net::session::Session] supervises +//! 1. A TCP writer actor (`crate::net::session::SessionWriter`) +//! 2. A TCP reader actor (`crate::net::session::SessionReader`) +//! -> If either child actor closes, then it will terminate the overall [crate::net::session::Session] which in +//! turn will terminate the [NodeSession] and the [NodeServer] will de-register the [NodeSession] from its +//! internal state +//! + +/* +TODO: + +Overview: + +A `NodeServer` handles opening the TCP listener and managing incoming and outgoing `NodeSession` requests. `NodeSession`s +will represent a remote server locally. + +Additionally, you can open a session as a "client" by requesting a new session from the NodeServer +after intially connecting a [TcpStream] to the desired endpoint and then attaching the NodeSession +to the TcpStream (and linking the actor). (See src/node/client.rs) + +What's there to do? +1. The inter-node messaging protocol -> Based heavily on https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html#protocol-between-connected-nodes +2. Having a [NodeSession] manage child actors from the remote system +3. Remote-supportive actors, which support serializing their payloads over the wire +4. Populating the global + pg registries with remote registered actors +5. Adjustments in the default Message type which allow messages to be serializable. (with `cluster` feature enabled) +6. Allow actor id's to be set for remote actors, tied to a specific node session + +*/ + +pub mod auth; +pub mod client; +pub mod node_session; +pub use node_session::NodeSession; +use tokio::net::TcpStream; + +use std::collections::HashMap; +use std::{cmp::Ordering, collections::hash_map::Entry}; + +use ractor::{cast, Actor, ActorId, ActorRef, RpcReplyPort, SupervisionEvent}; + +use crate::protocol::auth as auth_protocol; + +const PROTOCOL_VERSION: u32 = 1; + +/// Reply to a [SessionManagerMessage::CheckSession] message +pub enum SessionCheckReply { + /// There is no other connection with this peer + NoOtherConnection, + /// There is another connection with this peer, and it + /// should continue. Shutdown this connection. + OtherConnectionContinues, + /// There is another connection with this peer, but + /// this connection should take over. Terminating the other + /// connection + ThisConnectionContinues, + /// There is another connection with the peer, + /// in the same format as this attempted connection. + /// Perhaps the other connection is dying or the peer is + /// confused + DuplicateConnection, +} + +impl From for auth_protocol::server_status::Status { + fn from(value: SessionCheckReply) -> Self { + match value { + SessionCheckReply::NoOtherConnection => Self::Ok, + SessionCheckReply::ThisConnectionContinues => Self::OkSimultaneous, + SessionCheckReply::OtherConnectionContinues => Self::NotOk, + SessionCheckReply::DuplicateConnection => Self::Alive, + } + } +} + +/// Messages to/from the session aggregator +pub enum SessionManagerMessage { + /// Notifies the session manager that a new incoming (`is_server = true`) or outgoing (`is_server = false`) + /// [TcpStream] was accepted + ConnectionOpened { + /// The [TcpStream] for this network connection + stream: TcpStream, + /// Flag denoting if it's a server (incoming) connection when [true], [false] for outgoing + is_server: bool, + }, + + /// A request to check if a session is currently open, and if it is is the ordering such that we should + /// reject the incoming request + /// + /// i.e. if A is connected to B and A.name > B.name, but then B connects to A, B's request to connect + /// to A should be rejected + CheckSession { + /// The peer's name to investigate + peer_name: auth_protocol::NameMessage, + /// Reply channel for RPC + reply: RpcReplyPort, + }, + + /// A request to update the session mapping with this now known node's name + UpdateSession { + /// The ID of the [NodeSession] actor + actor_id: ActorId, + /// The node's name (now that we've received it) + name: auth_protocol::NameMessage, + }, +} + +impl ractor::Message for SessionManagerMessage {} + +/// Message from the TCP [session::Session] actor and the +/// monitoring Sesson actor +pub enum SessionMessage { + /// The Session actor is setting it's handle + SetTcpStream(TcpStream), + + /// A network message was received from the network + MessageReceived(crate::protocol::NetworkMessage), + + /// Send a message over the node channel to the remote `node()` + SendMessage(crate::protocol::node::NodeMessage), +} +impl ractor::Message for SessionMessage {} + +/// Represents the server which is managing all node session instances +/// +/// The [NodeServer] supervises a single [crate::net::listener::Listener] actor which is +/// responsible for hosting a server port for incoming `node()` connections. It also supervises +/// all of the [NodeSession] actors which are tied to tcp sessions and manage the FSM around `node()`s +/// establishing inter connections. +pub struct NodeServer { + port: crate::net::NetworkPort, + cookie: String, + node_name: String, + hostname: String, +} + +impl NodeServer { + /// Create a new node server instance + pub fn new( + port: crate::net::NetworkPort, + cookie: String, + node_name: String, + hostname: String, + ) -> Self { + Self { + port, + cookie, + node_name, + hostname, + } + } +} + +struct NodeServerSessionInformation { + actor: ActorRef, + peer_name: Option, + is_server: bool, +} + +impl NodeServerSessionInformation { + fn new(actor: ActorRef, is_server: bool) -> Self { + Self { + actor, + peer_name: None, + is_server, + } + } + + fn update(&mut self, peer_name: auth_protocol::NameMessage) { + self.peer_name = Some(peer_name); + } +} + +/// The state of the node server +pub struct NodeServerState { + listener: ActorRef, + node_sessions: HashMap, + node_id_counter: u64, + this_node_name: auth_protocol::NameMessage, +} + +impl NodeServerState { + fn check_peers(&self, new_peer: auth_protocol::NameMessage) -> SessionCheckReply { + for (_key, value) in self.node_sessions.iter() { + if let Some(existing_peer) = &value.peer_name { + if existing_peer.name == new_peer.name { + match ( + existing_peer.name.cmp(&self.this_node_name.name), + value.is_server, + ) { + // the peer's name is > this node's name and they connected to us + // od + // the peer's name is < this node's name and we connected to them + (Ordering::Greater, true) | (Ordering::Less, false) => { + value.actor.stop(Some("duplicate_connection".to_string())); + return SessionCheckReply::OtherConnectionContinues; + } + (Ordering::Greater, false) | (Ordering::Less, true) => { + // the inverse of the first two conditions, terminate the other + // connection and let this one continue + return SessionCheckReply::ThisConnectionContinues; + } + _ => { + // something funky is going on... + return SessionCheckReply::DuplicateConnection; + } + } + } + } + } + SessionCheckReply::NoOtherConnection + } +} + +#[async_trait::async_trait] +impl Actor for NodeServer { + type Msg = SessionManagerMessage; + type State = NodeServerState; + async fn pre_start(&self, myself: ActorRef) -> Self::State { + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + + Self::State { + node_sessions: HashMap::new(), + listener: actor_ref, + node_id_counter: 0, + this_node_name: auth_protocol::NameMessage { + flags: Some(auth_protocol::NodeFlags { + version: PROTOCOL_VERSION, + }), + name: format!("{}@{}", self.node_name, self.hostname), + }, + } + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::ConnectionOpened { stream, is_server } => { + let node_id = state.node_id_counter; + if let Ok((actor, _)) = Actor::spawn_linked( + None, + NodeSession::new( + node_id, + is_server, + self.cookie.clone(), + myself.clone(), + state.this_node_name.clone(), + ), + myself.get_cell(), + ) + .await + { + let _ = cast!(actor, SessionMessage::SetTcpStream(stream)); + state.node_sessions.insert( + actor.get_id(), + NodeServerSessionInformation::new(actor.clone(), is_server), + ); + state.node_id_counter += 1; + } else { + // failed to startup actor, drop the socket + log::warn!("Failed to startup `NodeSession`, dropping connection"); + drop(stream); + } + } + Self::Msg::UpdateSession { actor_id, name } => { + if let Some(entry) = state.node_sessions.get_mut(&actor_id) { + entry.update(name); + } + } + Self::Msg::CheckSession { peer_name, reply } => { + let _ = reply.send(state.check_peers(peer_name)); + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + match message { + SupervisionEvent::ActorPanicked(actor, msg) => { + if state.listener.get_id() == actor.get_id() { + log::error!( + "The Node server's TCP listener failed with '{}'. Respawning!", + msg + ); + + // try to re-create the listener. If it's a port-bind issue, we will have already panicked on + // trying to start the NodeServer + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + state.listener = actor_ref; + } else { + match state.node_sessions.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::warn!( + "Node session {:?} panicked with '{}'", + o.get().peer_name, + msg + ); + o.remove(); + } + Entry::Vacant(_) => { + log::warn!( + "An unknown actor ({:?}) panicked with '{}'", + actor.get_id(), + msg + ); + } + } + } + } + SupervisionEvent::ActorTerminated(actor, _, maybe_reason) => { + if state.listener.get_id() == actor.get_id() { + log::error!( + "The Node server's TCP listener exited with '{:?}'. Respawning!", + maybe_reason + ); + + // try to re-create the listener. If it's a port-bind issue, we will have already panicked on + // trying to start the NodeServer + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + state.listener = actor_ref; + } else { + match state.node_sessions.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::warn!( + "Node session {:?} exited with '{:?}'", + o.get().peer_name, + maybe_reason + ); + o.remove(); + } + Entry::Vacant(_) => { + log::warn!( + "An unknown actor ({:?}) exited with '{:?}'", + actor.get_id(), + maybe_reason + ); + } + } + } + } + _ => { + //no-op + } + } + } +} diff --git a/ractor-cluster/src/node/node_session.rs b/ractor-cluster/src/node/node_session.rs new file mode 100644 index 00000000..4f0cce65 --- /dev/null +++ b/ractor-cluster/src/node/node_session.rs @@ -0,0 +1,703 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! A [NodeSession] is an individual connection between a specific pair of +//! `node()`s and all of its authentication and communication for that +//! pairing + +use std::collections::HashMap; +use std::convert::TryInto; +use std::net::SocketAddr; + +use ractor::message::SerializedMessage; +use ractor::rpc::CallResult; +use ractor::{Actor, ActorId, ActorRef, SupervisionEvent}; +use rand::Rng; +use tokio::time::Duration; + +use super::{auth, NodeServer}; +use crate::net::session::SessionMessage; +use crate::protocol::auth as auth_protocol; +use crate::protocol::control as control_protocol; +use crate::protocol::node as node_protocol; +use crate::remote_actor::RemoteActor; + +const MIN_PING_LATENCY_MS: u64 = 1000; +const MAX_PING_LATENCY_MS: u64 = 5000; + +enum AuthenticationState { + AsClient(auth::ClientAuthenticationProcess), + AsServer(auth::ServerAuthenticationProcess), +} + +impl AuthenticationState { + fn is_ok(&self) -> bool { + match self { + Self::AsClient(c) => matches!(c, auth::ClientAuthenticationProcess::Ok), + Self::AsServer(s) => matches!(s, auth::ServerAuthenticationProcess::Ok(_)), + } + } + + fn is_close(&self) -> bool { + match self { + Self::AsClient(c) => matches!(c, auth::ClientAuthenticationProcess::Close), + Self::AsServer(s) => matches!(s, auth::ServerAuthenticationProcess::Close), + } + } +} + +/// Represents a session with a specific node +pub struct NodeSession { + node_id: u64, + is_server: bool, + cookie: String, + node_server: ActorRef, + node_name: auth_protocol::NameMessage, +} + +impl NodeSession { + /// Construct a new [NodeSession] with the supplied + /// arguments + pub fn new( + node_id: u64, + is_server: bool, + cookie: String, + node_server: ActorRef, + node_name: auth_protocol::NameMessage, + ) -> Self { + Self { + node_id, + is_server, + cookie, + node_server, + node_name, + } + } +} + +impl NodeSession { + async fn handle_auth( + &self, + state: &mut NodeSessionState, + message: auth_protocol::AuthenticationMessage, + myself: ActorRef, + ) { + if state.auth.is_ok() { + // nothing to do, we're already authenticated + return; + } + if state.auth.is_close() { + log::info!( + "Node Session {} is shutting down due to authentication failure", + self.node_id + ); + // we need to shutdown, the session needs to be terminated + myself.stop(Some("auth_fail".to_string())); + if let Some(tcp) = &state.tcp { + tcp.stop(Some("auth_fail".to_string())); + } + } + + match &state.auth { + AuthenticationState::AsClient(client_auth) => { + let mut next = client_auth.next(message, &self.cookie); + match &next { + auth::ClientAuthenticationProcess::WaitingForServerChallenge(server_status) => { + match server_status.status() { + auth_protocol::server_status::Status::Ok => { + // this handshake will continue + } + auth_protocol::server_status::Status::OkSimultaneous => { + // this handshake will continue, but there is another handshake underway + // that will be shut down (i.e. this was a server connection and we're currently trying + // a client connection) + } + auth_protocol::server_status::Status::NotOk => { + // The handshake will not continue, as there's already another client handshake underway + // which itself initiated (Simultaneous connect where the other connection's name is > this node + // name) + next = auth::ClientAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::NotAllowed => { + // unspecified auth reason + next = auth::ClientAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::Alive => { + // A connection to the node is already alive, which means either the + // node is confused in its connection state or the previous TCP connection is + // breaking down. Send ClientStatus + // TODO: check the status properly + state.tcp_send_auth(auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ClientStatus( + auth_protocol::ClientStatus { status: true }, + ), + ), + }); + } + } + } + auth::ClientAuthenticationProcess::WaitingForServerChallengeAck( + server_challenge_value, + reply_to_server, + our_challenge, + _expected_digest, + ) => { + // record the name + state.name = Some(auth_protocol::NameMessage { + name: server_challenge_value.name.clone(), + flags: server_challenge_value.flags.clone(), + }); + // tell the node server that we now know this peer's name information + let _ = + self.node_server + .cast(super::SessionManagerMessage::UpdateSession { + actor_id: myself.get_id(), + name: self.node_name.clone(), + }); + // send the client challenge to the server + let reply = auth_protocol::AuthenticationMessage { + msg: Some(auth_protocol::authentication_message::Msg::ClientChallenge( + auth_protocol::ChallengeReply { + digest: reply_to_server.to_vec(), + challenge: *our_challenge, + }, + )), + }; + state.tcp_send_auth(reply); + } + _ => { + // no message to send + } + } + + if let auth::ClientAuthenticationProcess::Close = &next { + log::info!( + "Node Session {} is shutting down due to authentication failure", + self.node_id + ); + myself.stop(Some("auth_fail".to_string())); + } + if let auth::ClientAuthenticationProcess::Ok = &next { + log::info!("Node Session {} is authenticated", self.node_id); + } + log::debug!("Next client auth state: {:?}", next); + state.auth = AuthenticationState::AsClient(next); + } + AuthenticationState::AsServer(server_auth) => { + let mut next = server_auth.next(message, &self.cookie); + + match &next { + auth::ServerAuthenticationProcess::HavePeerName(peer_name) => { + // store the peer node's name in the session state + state.name = Some(peer_name.clone()); + + // send the status message, followed by the server's challenge + let server_status_result = self + .node_server + .call( + |tx| super::SessionManagerMessage::CheckSession { + peer_name: peer_name.clone(), + reply: tx, + }, + Some(Duration::from_millis(500)), + ) + .await; + match server_status_result { + Err(_) | Ok(CallResult::Timeout) | Ok(CallResult::SenderError) => { + next = auth::ServerAuthenticationProcess::Close; + } + Ok(CallResult::Success(reply)) => { + let server_status: auth_protocol::server_status::Status = + reply.into(); + // Send the server's status message + let status_msg = auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ServerStatus( + auth_protocol::ServerStatus { + status: server_status.into(), + }, + ), + ), + }; + state.tcp_send_auth(status_msg); + + match server_status { + auth_protocol::server_status::Status::Ok + | auth_protocol::server_status::Status::OkSimultaneous => { + // Good to proceed, start a challenge + next = next.start_challenge(&self.cookie); + if let auth::ServerAuthenticationProcess::WaitingOnClientChallengeReply( + challenge, + _digest, + ) = &next + { + let challenge_msg = auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ServerChallenge( + auth_protocol::Challenge { + name: self.node_name.name.clone(), + flags: self.node_name.flags.clone(), + challenge: *challenge, + }, + ), + ), + }; + state.tcp_send_auth(challenge_msg); + } + } + auth_protocol::server_status::Status::NotOk + | auth_protocol::server_status::Status::NotAllowed => { + next = auth::ServerAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::Alive => { + // we sent the `Alive` status, so we're waiting on the client to confirm their status + // before continuing + next = auth::ServerAuthenticationProcess::WaitingOnClientStatus; + } + } + } + } + } + auth::ServerAuthenticationProcess::Ok(digest) => { + let client_challenge_reply = auth_protocol::AuthenticationMessage { + msg: Some(auth_protocol::authentication_message::Msg::ServerAck( + auth_protocol::ChallengeAck { + digest: digest.to_vec(), + }, + )), + }; + state.tcp_send_auth(client_challenge_reply); + } + _ => { + // no message to send + } + } + + if let auth::ServerAuthenticationProcess::Close = &next { + log::info!( + "Node Session {} is shutting down due to authentication failure", + self.node_id + ); + myself.stop(Some("auth_fail".to_string())); + } + if let auth::ServerAuthenticationProcess::Ok(_) = &next { + log::info!("Node Session {} is authenticated", self.node_id); + } + log::debug!("Next server auth state: {:?}", next); + state.auth = AuthenticationState::AsServer(next); + } + } + } + + fn handle_node( + &self, + state: &mut NodeSessionState, + message: node_protocol::NodeMessage, + myself: ActorRef, + ) { + if let Some(msg) = message.msg { + match msg { + node_protocol::node_message::Msg::Cast(cast_args) => { + if let Some(actor) = ractor::registry::get_pid(ActorId::from_pid(cast_args.to)) + { + let _ = actor.send_serialized(SerializedMessage::Cast(cast_args.what)); + } + } + node_protocol::node_message::Msg::Call(call_args) => { + let to = call_args.to; + let tag = call_args.tag; + if let Some(actor) = ractor::registry::get_pid(ActorId::from_pid(call_args.to)) + { + let (tx, rx) = ractor::concurrency::oneshot(); + + // send off the transmission in the serialized format, letting the message's own deserialization handle + // the conversion + let maybe_timeout = call_args.timeout_ms.map(Duration::from_millis); + if let Some(timeout) = maybe_timeout { + let _ = actor.send_serialized(SerializedMessage::Call( + call_args.what, + (tx, timeout).into(), + )); + } else { + let _ = actor.send_serialized(SerializedMessage::Call( + call_args.what, + tx.into(), + )); + } + + // kick off a background task to reply to the channel request, threading the tag and who to reply to + let _ = ractor::concurrency::spawn(async move { + if let Some(timeout) = maybe_timeout { + if let Ok(Ok(result)) = + ractor::concurrency::timeout(timeout, rx).await + { + let reply = node_protocol::node_message::Msg::Reply( + node_protocol::CallReply { + tag, + to, + what: result, + }, + ); + let _ = ractor::cast!( + myself, + super::SessionMessage::SendMessage( + node_protocol::NodeMessage { msg: Some(reply) } + ) + ); + } + } else if let Ok(result) = rx.await { + let reply = node_protocol::node_message::Msg::Reply( + node_protocol::CallReply { + tag, + to, + what: result, + }, + ); + let _ = ractor::cast!( + myself, + super::SessionMessage::SendMessage( + node_protocol::NodeMessage { msg: Some(reply) } + ) + ); + } + }); + } + } + node_protocol::node_message::Msg::Reply(call_reply_args) => { + if let Some(actor) = state.remote_actors.get(&call_reply_args.to) { + let _ = actor.send_serialized(SerializedMessage::CallReply( + call_reply_args.tag, + call_reply_args.what, + )); + } + } + } + } + } + + async fn handle_control( + &self, + state: &mut NodeSessionState, + message: control_protocol::ControlMessage, + myself: ActorRef, + ) { + if let Some(msg) = message.msg { + match msg { + control_protocol::control_message::Msg::Spawn(spawn_actor) => { + let actor = crate::remote_actor::RemoteActor { + session: myself.clone(), + }; + + match actor + .spawn_linked( + spawn_actor.name, + spawn_actor.id, + self.node_id, + myself.get_cell(), + ) + .await + { + Ok((actor, _)) => { + state.remote_actors.insert(spawn_actor.id, actor); + } + Err(spawn_err) => { + log::error!("Failed to spawn remote actor with {}", spawn_err); + } + } + } + control_protocol::control_message::Msg::Terminate(termination) => { + if let Some(actor) = state.remote_actors.remove(&termination.id) { + actor.stop(Some(format!("Remote: {:?}", termination))); + if termination.is_panic { + log::info!( + "Remote actor {} panicked with {:?}", + actor.get_id(), + termination.panic_reason + ); + } else { + log::debug!( + "Remote actor {} exited with {:?}", + actor.get_id(), + termination.panic_reason + ); + } + } + } + control_protocol::control_message::Msg::Ping(ping) => { + state.tcp_send_control(control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::Pong( + control_protocol::Pong { + timestamp: ping.timestamp, + }, + )), + }); + } + control_protocol::control_message::Msg::Pong(pong) => { + let ts: std::time::SystemTime = pong + .timestamp + .expect("Timestamp missing in Pong") + .try_into() + .expect("Failed to convert Pong(Timestamp) to SystemTime"); + let delta_ms = std::time::SystemTime::now() + .duration_since(ts) + .expect("Time went backwards") + .as_millis(); + log::debug!("Ping -> Pong took {}ms", delta_ms); + if delta_ms > 50 { + let default = || { + SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0) + }; + log::warn!( + "Super long ping detected {} - {} ({}ms)", + state.local_addr.unwrap_or_else(default), + state.peer_addr.unwrap_or_else(default), + delta_ms + ); + } + // schedule next ping + state.schedule_tcp_ping(); + } + } + } + } + + /// Called once the session is authenticated + fn after_authenticated(&self, state: &mut NodeSessionState) { + log::info!( + "Session authenticated on NodeSession {} - ({:?})", + self.node_id, + state.peer_addr + ); + + // startup the ping sending operation + state.schedule_tcp_ping(); + + // TODO: startup control message processing and additionally subscribe to process + // group changes + } +} + +/// The state of the node session +pub struct NodeSessionState { + tcp: Option>, + peer_addr: Option, + local_addr: Option, + name: Option, + auth: AuthenticationState, + remote_actors: HashMap>, +} + +impl NodeSessionState { + fn is_tcp_actor(&self, actor: ActorId) -> bool { + self.tcp + .as_ref() + .map(|t| t.get_id() == actor) + .unwrap_or(false) + } + + fn tcp_send_auth(&self, msg: auth_protocol::AuthenticationMessage) { + if let Some(tcp) = &self.tcp { + let net_msg = crate::protocol::NetworkMessage { + message: Some(crate::protocol::meta::network_message::Message::Auth(msg)), + }; + let _ = ractor::cast!(tcp, SessionMessage::Send(net_msg)); + } + } + + fn tcp_send_node(&self, msg: node_protocol::NodeMessage) { + if let Some(tcp) = &self.tcp { + let net_msg = crate::protocol::NetworkMessage { + message: Some(crate::protocol::meta::network_message::Message::Node(msg)), + }; + let _ = ractor::cast!(tcp, SessionMessage::Send(net_msg)); + } + } + + fn tcp_send_control(&self, msg: control_protocol::ControlMessage) { + if let Some(tcp) = &self.tcp { + let net_msg = crate::protocol::NetworkMessage { + message: Some(crate::protocol::meta::network_message::Message::Control( + msg, + )), + }; + let _ = ractor::cast!(tcp, SessionMessage::Send(net_msg)); + } + } + + fn schedule_tcp_ping(&self) { + if let Some(tcp) = &self.tcp { + let _ = tcp.send_after(Self::get_send_delay(), || { + let ping = control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::Ping( + control_protocol::Ping { + timestamp: Some(prost_types::Timestamp::from( + std::time::SystemTime::now(), + )), + }, + )), + }; + let net_msg = crate::protocol::NetworkMessage { + message: Some(crate::protocol::meta::network_message::Message::Control( + ping, + )), + }; + SessionMessage::Send(net_msg) + }); + } + } + + fn get_send_delay() -> Duration { + Duration::from_millis( + rand::thread_rng().gen_range(MIN_PING_LATENCY_MS..MAX_PING_LATENCY_MS), + ) + } +} + +#[async_trait::async_trait] +impl Actor for NodeSession { + type Msg = super::SessionMessage; + type State = NodeSessionState; + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State { + tcp: None, + name: None, + auth: if self.is_server { + AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()) + } else { + AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()) + }, + remote_actors: HashMap::new(), + peer_addr: None, + local_addr: None, + } + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + super::SessionMessage::SetTcpStream(stream) if state.tcp.is_none() => { + let peer_addr = stream.peer_addr().expect("Failed to get peer address"); + let my_addr = stream.local_addr().expect("Failed to get local address"); + // startup the TCP socket handler for message write + reading + let actor = crate::net::session::Session::spawn_linked( + myself.clone(), + stream, + peer_addr, + my_addr, + myself.get_cell(), + ) + .await + .expect("Failed to spawn TCP session"); + + state.tcp = Some(actor); + state.peer_addr = Some(peer_addr); + state.local_addr = Some(my_addr); + + // If a client-connection, startup the handshake + if !self.is_server { + state.tcp_send_auth(auth_protocol::AuthenticationMessage { + msg: Some(auth_protocol::authentication_message::Msg::Name( + self.node_name.clone(), + )), + }); + } + } + Self::Msg::MessageReceived(maybe_network_message) if state.tcp.is_some() => { + if let Some(network_message) = maybe_network_message.message { + match network_message { + crate::protocol::meta::network_message::Message::Auth(auth_message) => { + let p_state = state.auth.is_ok(); + self.handle_auth(state, auth_message, myself).await; + if !p_state && state.auth.is_ok() { + self.after_authenticated(state); + } + } + crate::protocol::meta::network_message::Message::Node(node_message) => { + self.handle_node(state, node_message, myself); + } + crate::protocol::meta::network_message::Message::Control( + control_message, + ) => { + self.handle_control(state, control_message, myself).await; + } + } + } + } + Self::Msg::SendMessage(node_message) if state.tcp.is_some() => { + state.tcp_send_node(node_message); + } + _ => { + // no-op, ignore + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + match message { + SupervisionEvent::ActorPanicked(actor, msg) => { + if state.is_tcp_actor(actor.get_id()) { + log::error!( + "Node session {:?}'s TCP session panicked with '{}'", + state.name, + msg + ); + myself.stop(Some("tcp_session_err".to_string())); + } else if let Some(actor) = state.remote_actors.remove(&actor.get_id().get_pid()) { + log::warn!( + "Node session {:?} had a remote actor ({}) panic with {}", + state.name, + actor.get_id(), + msg + ); + actor.kill(); + + // NOTE: This is a legitimate panic of the `RemoteActor`, not the actor on the remote machine panicking (which + // is handled by the remote actor's supervisor). Therefore we should re-spawn the actor + let pid = actor.get_id().get_pid(); + let (remote_actor, _) = crate::remote_actor::RemoteActor { + session: myself.clone(), + } + .spawn_linked(actor.get_name(), pid, self.node_id, myself.get_cell()) + .await + .expect("Failed to spawn remote actor"); + state.remote_actors.insert(pid, remote_actor); + } else { + log::error!("NodeSesion {:?} received an unknown child panic superivision message from {} - '{}'", + state.name, + actor.get_id(), + msg + ); + } + } + SupervisionEvent::ActorTerminated(actor, _, maybe_reason) => { + if state.is_tcp_actor(actor.get_id()) { + log::info!("NodeSession {:?} connection closed", state.name); + myself.stop(Some("tcp_session_closed".to_string())); + } else if let Some(actor) = state.remote_actors.remove(&actor.get_id().get_pid()) { + log::debug!( + "NodeSession {:?} received a child exit with reason '{:?}'", + state.name, + maybe_reason + ); + actor.kill(); + } else { + log::info!("NodeSession {:?} received an unknown child actor exit event from {} - '{:?}'", + state.name, + actor.get_id(), + maybe_reason, + ); + } + } + _ => { + //no-op + } + } + } +} diff --git a/ractor-cluster/src/protocol/auth.proto b/ractor-cluster/src/protocol/auth.proto new file mode 100644 index 00000000..19e47496 --- /dev/null +++ b/ractor-cluster/src/protocol/auth.proto @@ -0,0 +1,117 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Authentication handshake messages for connecting `node()`s together. +//! The protocol messages defined here roughly follow the Erlang distributed systems guide +//! found at: https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html#distribution-handshake + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +syntax = "proto3"; + +package auth; + +// Placeholder to represent a node's flags +message NodeFlags { + // The node version + uint32 version = 1; +} + +// A message containing the node's name +message NameMessage { + // The node's full name + // Format: `node_name@hostname` + string name = 1; + + // The node's capability flags + NodeFlags flags = 2; +} + +// Server -> Client: `SendStatus` is the server replying with the handshake status to the client +message ServerStatus { + // Status types + enum Status { + // The handshake will continue + OK = 0; + // The handshake will continue, but A is informed that B has another ongoing + // connection attempt that will be shut down (simultaneous connect where A's + // name is greater than B's name, compared literally). + OK_SIMULTANEOUS = 1; + // The handshake will not continue, as B already has an ongoing handshake, which + // it itself has initiated (simultaneous connect where B's name is greater than A's). + NOT_OK = 2; + // The connection is disallowed for some (unspecified) security reason. + NOT_ALLOWED = 3; + // A connection to the node is already active, which either means that node A is confused + // or that the TCP connection breakdown of a previous node with this name has not yet + // reached node B. + ALIVE = 4; + + // Skipped NAMED = 5; + } + + // The status + Status status = 1; +} + +// The client's status reply if the `ServerStatus` was ALIVE +// +// If status was alive, node A answers with another status message containing either true, +// which means that the connection is to continue (the old connection from this node is +// broken), or false, which means that the connection is to be closed (the connection +// attempt was a mistake. +message ClientStatus { + // The status + bool status = 1; +} + +// The server's initial challenge request +message Challenge { + // The server's name + string name = 1; + // The node's capability flags + NodeFlags flags = 2; + // The challenge value + uint32 challenge = 3; +} + +// The reply to the server's challenge. +message ChallengeReply { + // The client's own challenge for the server to handle + uint32 challenge = 1; + // An MD5 digest that the client constructed from the server's + // challenge value + bytes digest = 2; +} + +// The server's reply to the client about their own +// challenge +message ChallengeAck { + // Another MD5 digest that the server constructed from the + // client's challenge value + bytes digest = 1; +} + +// A authentication message +message AuthenticationMessage { + // The inner message type + oneof msg { + // Send the name + NameMessage name = 1; + // Send the status + ServerStatus server_status = 2; + // Send the client status + ClientStatus client_status = 3; + // Server's challenge to the client + Challenge server_challenge = 4; + // Client's reply to server's challenge and + // client's own challenge to the server + ChallengeReply client_challenge = 5; + // Server's reply to the client's challenge + ChallengeAck server_ack = 6; + } +} diff --git a/ractor-cluster/src/protocol/control.proto b/ractor-cluster/src/protocol/control.proto new file mode 100644 index 00000000..1d585904 --- /dev/null +++ b/ractor-cluster/src/protocol/control.proto @@ -0,0 +1,62 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Control messages for authenticated `NodeSession`s. These manage the synchronization +//! of actors and their lifecycles over the remote link + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +syntax = "proto3"; + +package control; + +import "google/protobuf/timestamp.proto"; + +// A heartbeat between actors +message Ping { + // The original time of the ping send, returned in the `Pong` message + // to measure latency + google.protobuf.Timestamp timestamp = 1; +} + +message Pong { + // The original time of the ping send, set by the original sender + google.protobuf.Timestamp timestamp = 1; +} + +// Spawn an actor +message Spawn { + // The actor's Id + uint64 id = 1; + // The actor's name (to be inserted in the global registry) + optional string name = 2; +} + +// An actor termination event +message Terminate { + // The remote actor's PID + uint64 id = 1; + // Flag denoting if the termination was due to panic + bool is_panic = 2; + // The exit reason or panic message + string panic_reason = 3; +} + +// Control messages between authenticated `node()`s which are dist-connected +message ControlMessage { + // The message payload + oneof msg { + // Spawn an actor + Spawn spawn = 1; + // A actor terminated + Terminate terminate = 2; + // A ping + Ping ping = 3; + // A pong + Pong pong = 4; + } +} \ No newline at end of file diff --git a/ractor-cluster/src/protocol/meta.proto b/ractor-cluster/src/protocol/meta.proto new file mode 100644 index 00000000..94d1929e --- /dev/null +++ b/ractor-cluster/src/protocol/meta.proto @@ -0,0 +1,28 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Meta-messages which all bundle together into a single `NetworkMessage` +//! payload + +syntax = "proto3"; + +import "auth.proto"; +import "node.proto"; +import "control.proto"; + +package meta; + +// Represents a message over the network +message NetworkMessage { + // The inner message + oneof message { + // An authentication message + auth.AuthenticationMessage auth = 1; + // An inter-node message + node.NodeMessage node = 2; + // A control message + control.ControlMessage control = 3; + } +} diff --git a/ractor-cluster/src/protocol/mod.rs b/ractor-cluster/src/protocol/mod.rs new file mode 100644 index 00000000..f316ba20 --- /dev/null +++ b/ractor-cluster/src/protocol/mod.rs @@ -0,0 +1,29 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Protobuf specifications for over-the-wire intercommuncation +//! between nodes. Generated via [prost] + +/// Node authentication protocol +pub mod auth { + include!(concat!(env!("OUT_DIR"), "/auth.rs")); +} + +/// Node actor inter-communication protocol +pub mod node { + include!(concat!(env!("OUT_DIR"), "/node.rs")); +} + +/// Control messages between nodes +pub mod control { + include!(concat!(env!("OUT_DIR"), "/control.rs")); +} + +/// Meta types which include all base network protocol message types +pub mod meta { + include!(concat!(env!("OUT_DIR"), "/meta.rs")); +} + +pub use meta::NetworkMessage; diff --git a/ractor-cluster/src/protocol/node.proto b/ractor-cluster/src/protocol/node.proto new file mode 100644 index 00000000..345d7a5a --- /dev/null +++ b/ractor-cluster/src/protocol/node.proto @@ -0,0 +1,55 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +syntax = "proto3"; + +package node; + +// Represents a cast to a remote actor +message Cast { + // `to` is the intended actor + uint64 to = 1; + // `what` is the payload for the cast operation + bytes what = 2; +} + +message Call { + // `to` is the intended actor + uint64 to = 1; + // `what` is the serialized arguments to the call + bytes what = 2; + // `tag` is a unique request tag which the RemoteActor applied in order + // to match requests back up to replies + uint64 tag = 3; + // `timeout_ms` is the timeout in milliseconds for the call to complete + optional uint64 timeout_ms = 4; +} + +message CallReply { + // `to` is the intended RemoteActor + uint64 to = 1; + // `tag` is a unique request tag which the RemoteActor applied in order + // to match requests back up to replies + uint64 tag = 2; + // `what` is the payload for the call reply + bytes what = 3; +} + +// An inter-node message for inter-actor communications +message NodeMessage { + // The message payload + oneof msg { + // A cast to a remote actor + Cast cast = 1; + // A call to a remote actor + Call call = 2; + // A reply to a call from the remote actor + CallReply reply = 3; + } +} diff --git a/ractor-cluster/src/remote_actor/mod.rs b/ractor-cluster/src/remote_actor/mod.rs new file mode 100644 index 00000000..aad31985 --- /dev/null +++ b/ractor-cluster/src/remote_actor/mod.rs @@ -0,0 +1,124 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! A [RemoteActor] is an actor which handles serialized messages, and represents an actor +//! running on a remote `node()` server. See [crate::node::NodeServer] for more on inter-node +//! protocols + +use std::collections::HashMap; + +use ractor::cast; +use ractor::concurrency::JoinHandle; +use ractor::message::SerializedMessage; +use ractor::{Actor, ActorCell, ActorId, ActorName, ActorRef, RpcReplyPort, SpawnErr}; + +use crate::node::SessionMessage; +use crate::NodeId; + +/// A Remote actor is an actor which represents an actor on another node +pub(crate) struct RemoteActor { + /// The owning node session + pub(crate) session: ActorRef, +} + +impl RemoteActor { + /// Spawn an actor of this type with a supervisor, automatically starting the actor + /// + /// * `name`: A name to give the actor. Useful for global referencing or debug printing + /// * `handler`: The implementation of Self + /// * `pid`: The actor's local id on the remote system + /// * `node_id` The actor's node-identifier. Alongside `pid` this makes for a unique actor identifier + /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor + /// + /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference + /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if + /// the actor failed to start + pub(crate) async fn spawn_linked( + self, + name: Option, + pid: u64, + node_id: NodeId, + supervisor: ActorCell, + ) -> Result<(ActorRef, JoinHandle<()>), SpawnErr> { + let actor_id = ActorId::Remote { node_id, pid }; + ractor::ActorRuntime::<_, _, Self>::spawn_linked_remote(name, self, actor_id, supervisor) + .await + } +} + +#[derive(Default)] +pub(crate) struct RemoteActorState { + tag: u64, + pending_requests: HashMap>>, +} + +impl RemoteActorState { + fn next_tag(&mut self) -> u64 { + self.tag += 1; + self.tag + } +} + +// Placeholder message for a remote actor, as we won't actually +// handle anything but serialized messages on this channel +pub(crate) struct RemoteActorMessage; +impl ractor::Message for RemoteActorMessage {} + +#[async_trait::async_trait] +impl Actor for RemoteActor { + type Msg = RemoteActorMessage; + type State = RemoteActorState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State::default() + } + + async fn handle(&self, _myself: ActorRef, _message: Self::Msg, _state: &mut Self::State) { + panic!("Remote actors cannot handle local messages!"); + } + + async fn handle_serialized( + &self, + myself: ActorRef, + message: SerializedMessage, + state: &mut Self::State, + ) { + // get the local pid on the remote system + let to = myself.get_id().pid(); + // messages should be forwarded over the network link (i.e. sent through the node session) to the intended + // target node's relevant actor. The receiving runtime NodeSession will decode the message and pass it up + // to the parent + match message { + SerializedMessage::Call(args, reply) => { + let tag = state.next_tag(); + let node_msg = crate::protocol::node::NodeMessage { + msg: Some(crate::protocol::node::node_message::Msg::Call( + crate::protocol::node::Call { + to, + tag, + what: args, + timeout_ms: reply.get_timeout().map(|t| t.as_millis() as u64), + }, + )), + }; + state.pending_requests.insert(tag, reply); + let _ = cast!(self.session, SessionMessage::SendMessage(node_msg)); + } + SerializedMessage::Cast(args) => { + let node_msg = crate::protocol::node::NodeMessage { + msg: Some(crate::protocol::node::node_message::Msg::Cast( + crate::protocol::node::Cast { to, what: args }, + )), + }; + let _ = cast!(self.session, SessionMessage::SendMessage(node_msg)); + } + SerializedMessage::CallReply(message_tag, reply_data) => { + if let Some(port) = state.pending_requests.remove(&message_tag) { + let _ = port.send(reply_data); + } + } + } + } +} diff --git a/ractor-playground/Cargo.toml b/ractor-playground/Cargo.toml index 8dc31408..f7b8523d 100644 --- a/ractor-playground/Cargo.toml +++ b/ractor-playground/Cargo.toml @@ -12,6 +12,10 @@ publish = false [dependencies] async-trait = "0.1" -ractor = { path="../ractor" } -rustyrepl = "0.1" -tokio = { version = "1.23", features = ["full"]} +clap = { version = "4", features = ["derive"] } +env_logger = "0.10" +log = "0.4" +ractor = { path="../ractor", features = ["cluster"] } +ractor-cluster = { path = "../ractor-cluster" } +# rustyrepl = "0.1" +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] } diff --git a/ractor-playground/src/distributed.rs b/ractor-playground/src/distributed.rs new file mode 100644 index 00000000..ac1b2f09 --- /dev/null +++ b/ractor-playground/src/distributed.rs @@ -0,0 +1,54 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Distributed cluster playground + +use ractor::{concurrency::Duration, Actor}; + +/// Run test with +/// +/// ```bash +/// RUST_LOG=debug cargo run -p ractor-playground -- cluster-handshake 8198 8199 true +/// ``` +pub(crate) async fn test_auth_handshake(port_a: u16, port_b: u16, valid_cookies: bool) { + let cookie_a = "cookie".to_string(); + let cookie_b = if valid_cookies { + cookie_a.clone() + } else { + "bad cookie".to_string() + }; + let hostname = "localhost".to_string(); + + let server_a = + ractor_cluster::NodeServer::new(port_a, cookie_a, "node_a".to_string(), hostname.clone()); + let server_b = + ractor_cluster::NodeServer::new(port_b, cookie_b, "node_b".to_string(), hostname); + + let (actor_a, handle_a) = Actor::spawn(None, server_a) + .await + .expect("Failed to start NodeServer A"); + let (actor_b, handle_b) = Actor::spawn(None, server_b) + .await + .expect("Failed to start NodeServer B"); + + if let Err(error) = + ractor_cluster::node::client::connect(actor_b.clone(), "127.0.0.1", port_a).await + { + log::error!("Failed to connect with error {error}") + } else { + log::info!("Client connected NodeServer b to NodeServer a"); + } + + ractor::concurrency::sleep(Duration::from_millis(10000)).await; + log::warn!("Terminating test"); + + // cleanup + actor_a.stop(None); + actor_b.stop(None); + handle_a.await.unwrap(); + handle_b.await.unwrap(); +} + +// TODO: protocol integration tests diff --git a/ractor-playground/src/main.rs b/ractor-playground/src/main.rs index 31b28752..ff5443d6 100644 --- a/ractor-playground/src/main.rs +++ b/ractor-playground/src/main.rs @@ -3,15 +3,50 @@ // This source code is licensed under both the MIT license found in the // LICENSE-MIT file in the root directory of this source tree. +mod distributed; mod ping_pong; -use ractor::Actor; +use std::env; + +use clap::Parser; + +#[derive(Parser, Debug)] +enum Cli { + /// Run the ping-pong actor test + PingPong, + /// Run two dist nodes and try and have them connect together + ClusterHandshake { + /// The server port for the first node + port_a: u16, + /// The server port for the second node + port_b: u16, + /// Flag indicating if the cookies between the hosts match + valid_cookies: Option, + }, +} // MAIN // -#[tokio::main] +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() { - let (_, actor_handle) = Actor::spawn(None, ping_pong::PingPong) - .await - .expect("Failed to start actor"); - actor_handle.await.expect("Actor failed to exit cleanly"); + let cli = Cli::parse(); + + // if it's not set, set the log level to debug + if env::var("RUST_LOG").is_err() { + env::set_var("RUST_LOG", "debug"); + } + env_logger::builder().format_timestamp_millis().init(); + + // parse the CLI and run the correct playground scenario + match cli { + Cli::PingPong => { + ping_pong::run_ping_pong().await; + } + Cli::ClusterHandshake { + port_a, + port_b, + valid_cookies, + } => { + distributed::test_auth_handshake(port_a, port_b, valid_cookies.unwrap_or(true)).await; + } + } } diff --git a/ractor-playground/src/ping_pong.rs b/ractor-playground/src/ping_pong.rs index d7f4f323..6826bd54 100644 --- a/ractor-playground/src/ping_pong.rs +++ b/ractor-playground/src/ping_pong.rs @@ -14,6 +14,7 @@ pub enum Message { Ping, Pong, } +impl ractor::Message for Message {} impl Message { fn next(&self) -> Self { @@ -66,3 +67,15 @@ impl Actor for PingPong { } } } + +/// Run the ping-pong actor test with +/// +/// ```bash +/// cargo run -p ractor-playground -- ping-pong +/// ``` +pub(crate) async fn run_ping_pong() { + let (_, actor_handle) = Actor::spawn(None, PingPong) + .await + .expect("Failed to start actor"); + actor_handle.await.expect("Actor failed to exit cleanly"); +} diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index b44f723b..fa49c0b5 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -12,16 +12,19 @@ readme = "../README.md" homepage = "https://github.com/slawlor/ractor" categories = ["actor", "erlang"] +[features] # WIP -# [features] # tokio_runtime = ["tokio/time"] # async_std_runtime = ["async-std"] # default = ["tokio_runtime"] # default = ["async_std_runtime"] +cluster = [] +default = [] + [dependencies] -async-std = { version = "1", optional = true } +## Required dependencies async-trait = "0.1" dashmap = "5" futures = "0.3" diff --git a/ractor/benches/actor.rs b/ractor/benches/actor.rs index 4dc71e0f..f663c3e8 100644 --- a/ractor/benches/actor.rs +++ b/ractor/benches/actor.rs @@ -7,18 +7,22 @@ extern crate criterion; use criterion::{BatchSize, Criterion}; -use ractor::{Actor, ActorRef}; +use ractor::{Actor, ActorRef, Message}; struct BenchActor; +struct BenchActorMessage; +#[cfg(feature = "cluster")] +impl Message for BenchActorMessage {} + #[async_trait::async_trait] impl Actor for BenchActor { - type Msg = (); + type Msg = BenchActorMessage; type State = (); async fn pre_start(&self, myself: ActorRef) -> Self::State { - let _ = myself.cast(()); + let _ = myself.cast(BenchActorMessage); } async fn handle(&self, myself: ActorRef, _message: Self::Msg, _state: &mut Self::State) { @@ -95,7 +99,7 @@ fn schedule_work(c: &mut Criterion) { }) }, |mut handles| { - runtime.block_on(async move { while let Some(_) = handles.join_next().await {} }) + runtime.block_on(async move { while handles.join_next().await.is_some() {} }) }, BatchSize::PerIteration, ); @@ -118,7 +122,7 @@ fn schedule_work(c: &mut Criterion) { }) }, |mut handles| { - runtime.block_on(async move { while let Some(_) = handles.join_next().await {} }) + runtime.block_on(async move { while handles.join_next().await.is_some() {} }) }, BatchSize::PerIteration, ); @@ -135,12 +139,12 @@ fn process_messages(c: &mut Criterion) { #[async_trait::async_trait] impl Actor for MessagingActor { - type Msg = (); + type Msg = BenchActorMessage; type State = u64; async fn pre_start(&self, myself: ActorRef) -> Self::State { - let _ = myself.cast(()); + let _ = myself.cast(BenchActorMessage); 0u64 } @@ -154,7 +158,7 @@ fn process_messages(c: &mut Criterion) { if *state >= self.num_msgs { myself.stop(None); } else { - let _ = myself.cast(()); + let _ = myself.cast(BenchActorMessage); } } } diff --git a/ractor/examples/counter.rs b/ractor/examples/counter.rs index f98bcbb4..6209e323 100644 --- a/ractor/examples/counter.rs +++ b/ractor/examples/counter.rs @@ -28,6 +28,8 @@ enum CounterMessage { Decrement(i64), Retrieve(RpcReplyPort), } +#[cfg(feature = "cluster")] +impl ractor::Message for CounterMessage {} #[async_trait::async_trait] impl Actor for Counter { diff --git a/ractor/examples/monte_carlo.rs b/ractor/examples/monte_carlo.rs index 2761bce7..b4c09c73 100644 --- a/ractor/examples/monte_carlo.rs +++ b/ractor/examples/monte_carlo.rs @@ -58,10 +58,13 @@ impl GameState { } struct Game; +struct GameMessage(ActorRef); +#[cfg(feature = "cluster")] +impl ractor::Message for GameMessage {} #[async_trait::async_trait] impl Actor for Game { - type Msg = ActorRef; + type Msg = GameMessage; type State = GameState; @@ -82,7 +85,7 @@ impl Actor for Game { // Now that the game is finished, the results of the game need to be reported // to the `GameManager`. cast!( - message, + message.0, GameManagerMessage { id: myself.get_id(), results: state.results_vec.clone(), @@ -105,6 +108,8 @@ struct GameManagerMessage { id: ActorId, results: Vec, } +#[cfg(feature = "cluster")] +impl ractor::Message for GameManagerMessage {} struct GameManagerState { /// The number of games that have been played so far. @@ -145,7 +150,7 @@ impl Actor for GameManager { let (actor, _) = Actor::spawn_linked(None, Game, myself.clone().into()) .await .expect("Failed to start game"); - cast!(actor, myself.clone()).expect("Failed to send message"); + cast!(actor, GameMessage(myself.clone())).expect("Failed to send message"); } GameManagerState::new(self.num_games) diff --git a/ractor/examples/output_port.rs b/ractor/examples/output_port.rs index f950ab78..bbbbb140 100644 --- a/ractor/examples/output_port.rs +++ b/ractor/examples/output_port.rs @@ -21,9 +21,16 @@ use tokio::time::{timeout, Duration}; enum PublisherMessage { Publish(String), } +#[cfg(feature = "cluster")] +impl ractor::Message for PublisherMessage {} + +#[derive(Clone)] +struct Output(String); +#[cfg(feature = "cluster")] +impl ractor::Message for Output {} struct Publisher { - output: Arc>, + output: Arc>, } #[async_trait::async_trait] @@ -38,7 +45,7 @@ impl Actor for Publisher { match message { Self::Msg::Publish(msg) => { println!("Publishing {}", msg); - self.output.send(format!("Published: {}", msg)); + self.output.send(Output(format!("Published: {}", msg))); } } } @@ -49,6 +56,8 @@ struct Subscriber; enum SubscriberMessage { Published(String), } +#[cfg(feature = "cluster")] +impl ractor::Message for SubscriberMessage {} #[async_trait::async_trait] impl Actor for Subscriber { @@ -94,7 +103,7 @@ async fn main() { // TODO: there has to be a better syntax than keeping an arc to the port? port.subscribe(actor_ref.clone(), |msg| { - Some(SubscriberMessage::Published(msg)) + Some(SubscriberMessage::Published(msg.0)) }); subscriber_refs.push(actor_ref); diff --git a/ractor/examples/philosophers.rs b/ractor/examples/philosophers.rs index 1b456a0f..3cc6b46a 100644 --- a/ractor/examples/philosophers.rs +++ b/ractor/examples/philosophers.rs @@ -34,6 +34,8 @@ enum ForkMessage { /// allow the fork to be sent to the next user. PutForkDown(ActorId), } +#[cfg(feature = "cluster")] +impl ractor::Message for ForkMessage {} struct ForkState { /// Flag to identify if the fork is clean or not @@ -232,6 +234,8 @@ enum PhilosopherMessage { ReceiveFork(ActorId), SendMetrics(RpcReplyPort), } +#[cfg(feature = "cluster")] +impl ractor::Message for PhilosopherMessage {} struct Philosopher { time_slice: Duration, @@ -248,10 +252,10 @@ impl Philosopher { state.last_state_change = Instant::now(); // schedule become hungry after the thinking time has elapsed - let _ = myself.send_after( - self.time_slice, - PhilosopherMessage::BecomeHungry(state.metrics.state_change_count), - ); + let metrics_count = state.metrics.state_change_count; + let _ = myself.send_after(self.time_slice, move || { + PhilosopherMessage::BecomeHungry(metrics_count) + }); } /// Helper command to set the internal state to begin eating @@ -273,10 +277,10 @@ impl Philosopher { .cast(ForkMessage::UsingFork(myself.get_id())); // schedule stop eating after the eating time has elapsed - let _ = myself.send_after( - self.time_slice, - PhilosopherMessage::StopEating(state.metrics.state_change_count), - ); + let metrics_count = state.metrics.state_change_count; + let _ = myself.send_after(self.time_slice, move || { + PhilosopherMessage::StopEating(metrics_count) + }); } /// Helper command to request any forks which are missing @@ -468,10 +472,10 @@ async fn main() { left: forks[left].clone(), right: forks[right].clone(), }; - let (philosopher, handle) = Actor::spawn(Some(philosopher_names[left]), p) + let (philosopher, handle) = Actor::spawn(Some(philosopher_names[left].to_string()), p) .await .expect("Failed to create philosopher!"); - results.insert(philosopher_names[left], None); + results.insert(philosopher_names[left].to_string(), None); philosophers.push(philosopher); all_handles.spawn(handle); } @@ -494,7 +498,7 @@ async fn main() { } // wait for everything to shut down - while let Some(_) = all_handles.join_next().await {} + while all_handles.join_next().await.is_some() {} // print metrics println!("Simulation results"); diff --git a/ractor/examples/ping_pong.rs b/ractor/examples/ping_pong.rs index 6a311c0a..e1c990c5 100644 --- a/ractor/examples/ping_pong.rs +++ b/ractor/examples/ping_pong.rs @@ -23,6 +23,8 @@ pub enum Message { Ping, Pong, } +#[cfg(feature = "cluster")] +impl ractor::Message for Message {} impl Message { fn next(&self) -> Self { diff --git a/ractor/examples/supervisor.rs b/ractor/examples/supervisor.rs index 13ce0104..add0efda 100644 --- a/ractor/examples/supervisor.rs +++ b/ractor/examples/supervisor.rs @@ -19,7 +19,7 @@ use tokio::time::Duration; #[tokio::main] async fn main() { - let (root, handle) = Actor::spawn(Some("root"), RootActor) + let (root, handle) = Actor::spawn(Some("root".to_string()), RootActor) .await .expect("Failed to start root actor"); let mid_level = root @@ -65,6 +65,8 @@ enum LeafActorMessage { Boom, NoOp, } +#[cfg(feature = "cluster")] +impl ractor::Message for LeafActorMessage {} #[async_trait::async_trait] impl Actor for LeafActor { @@ -115,6 +117,8 @@ struct MidLevelActorState { enum MidLevelActorMessage { GetLeaf(RpcReplyPort>), } +#[cfg(feature = "cluster")] +impl ractor::Message for MidLevelActorMessage {} #[async_trait::async_trait] impl Actor for MidLevelActor { @@ -122,9 +126,10 @@ impl Actor for MidLevelActor { type State = MidLevelActorState; async fn pre_start(&self, myself: ActorRef) -> Self::State { - let (leaf_actor, _) = Actor::spawn_linked(Some("leaf"), LeafActor, myself.into()) - .await - .expect("Failed to start leaf actor"); + let (leaf_actor, _) = + Actor::spawn_linked(Some("leaf".to_string()), LeafActor, myself.into()) + .await + .expect("Failed to start leaf actor"); MidLevelActorState { leaf_actor } } @@ -187,6 +192,8 @@ struct RootActorState { enum RootActorMessage { GetMidLevel(RpcReplyPort>), } +#[cfg(feature = "cluster")] +impl ractor::Message for RootActorMessage {} #[async_trait::async_trait] impl Actor for RootActor { @@ -196,7 +203,7 @@ impl Actor for RootActor { async fn pre_start(&self, myself: ActorRef) -> Self::State { println!("RootActor: Started {:?}", myself); let (mid_level_actor, _) = - Actor::spawn_linked(Some("mid-level"), MidLevelActor, myself.into()) + Actor::spawn_linked(Some("mid-level".to_string()), MidLevelActor, myself.into()) .await .expect("Failed to spawn mid-level actor"); RootActorState { mid_level_actor } diff --git a/ractor/src/actor/actor_cell/actor_properties.rs b/ractor/src/actor/actor_cell/actor_properties.rs index 50c0830b..9471f860 100644 --- a/ractor/src/actor/actor_cell/actor_properties.rs +++ b/ractor/src/actor/actor_cell/actor_properties.rs @@ -6,14 +6,17 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; -use crate::concurrency as mpsc; +use crate::{concurrency as mpsc, Message}; -use crate::actor::messages::{BoxedMessage, StopMessage}; +use crate::actor::messages::StopMessage; use crate::actor::supervision::SupervisionTree; use crate::concurrency::{ MpscReceiver as BoundedInputPortReceiver, MpscSender as BoundedInputPort, MpscUnboundedReceiver as InputPortReceiver, MpscUnboundedSender as InputPort, }; +use crate::message::BoxedMessage; +#[cfg(feature = "cluster")] +use crate::message::SerializedMessage; use crate::{Actor, ActorId, ActorName, ActorStatus, MessagingErr, Signal, SupervisionEvent}; // The inner-properties of an Actor @@ -39,6 +42,22 @@ impl ActorProperties { InputPortReceiver, InputPortReceiver, ) + where + TActor: Actor, + { + Self::new_remote::(name, crate::actor_id::get_new_local_id()) + } + + pub fn new_remote( + name: Option, + id: ActorId, + ) -> ( + Self, + BoundedInputPortReceiver, + BoundedInputPortReceiver, + InputPortReceiver, + InputPortReceiver, + ) where TActor: Actor, { @@ -48,7 +67,7 @@ impl ActorProperties { let (tx_message, rx_message) = mpsc::mpsc_unbounded(); ( Self { - id: crate::actor_id::get_new_local_id(), + id, name, status: Arc::new(AtomicU8::new(ActorStatus::Unstarted as u8)), signal: tx_signal, @@ -92,11 +111,22 @@ impl ActorProperties { where TActor: Actor, { - if self.type_id != std::any::TypeId::of::() { + // Only type-check messages of local actors, remote actors send serialized + // payloads + if self.id.is_local() && self.type_id != std::any::TypeId::of::() { return Err(MessagingErr::InvalidActorType); } - let boxed = BoxedMessage::new(message); + let boxed = message.box_message(&self.id); + self.message.send(boxed).map_err(|e| e.into()) + } + + #[cfg(feature = "cluster")] + pub fn send_serialized(&self, message: SerializedMessage) -> Result<(), MessagingErr> { + let boxed = BoxedMessage { + msg: None, + serialized_msg: Some(message), + }; self.message.send(boxed).map_err(|e| e.into()) } diff --git a/ractor/src/actor/actor_cell/mod.rs b/ractor/src/actor/actor_cell/mod.rs index f7266243..e68f993d 100644 --- a/ractor/src/actor/actor_cell/mod.rs +++ b/ractor/src/actor/actor_cell/mod.rs @@ -12,13 +12,16 @@ use std::sync::Arc; use super::errors::MessagingErr; -use super::messages::{BoxedMessage, Signal, StopMessage}; - +use super::messages::{Signal, StopMessage}; use super::SupervisionEvent; use crate::concurrency::{ MpscReceiver as BoundedInputPortReceiver, MpscUnboundedReceiver as InputPortReceiver, }; -use crate::{Actor, ActorId, ActorName, SpawnErr}; +use crate::message::BoxedMessage; +#[cfg(feature = "cluster")] +use crate::message::SerializedMessage; +use crate::ActorId; +use crate::{Actor, ActorName, SpawnErr}; pub mod actor_ref; pub use actor_ref::ActorRef; @@ -52,17 +55,26 @@ pub const ACTIVE_STATES: [ActorStatus; 3] = [ ]; /// The collection of ports an actor needs to listen to -pub(crate) struct ActorPortSet { - pub(crate) signal_rx: BoundedInputPortReceiver, - pub(crate) stop_rx: BoundedInputPortReceiver, - pub(crate) supervisor_rx: InputPortReceiver, - pub(crate) message_rx: InputPortReceiver, +pub struct ActorPortSet { + /// The inner signal port + pub signal_rx: BoundedInputPortReceiver, + /// The inner stop port + pub stop_rx: BoundedInputPortReceiver, + /// The inner supervisor port + pub supervisor_rx: InputPortReceiver, + /// The inner message port + pub message_rx: InputPortReceiver, } -pub(crate) enum ActorPortMessage { +/// Messages that come in off an actor's port, with associated priority +pub enum ActorPortMessage { + /// A signal message Signal(Signal), + /// A stop message Stop(StopMessage), + /// A supervision message Supervision(SupervisionEvent), + /// A regular message Message(BoxedMessage), } @@ -148,10 +160,12 @@ impl ActorCell { where TActor: Actor, { - let (props, rx1, rx2, rx3, rx4) = ActorProperties::new::(name); + let (props, rx1, rx2, rx3, rx4) = ActorProperties::new::(name.clone()); let cell = Self { inner: Arc::new(props), }; + #[cfg(feature = "cluster")] + crate::registry::register_pid(cell.get_id(), cell.clone()); if let Some(r_name) = name { crate::registry::register(r_name, cell.clone())?; } @@ -167,6 +181,37 @@ impl ActorCell { )) } + /// Create a new remote actor, to be called from the `ractor-cluster` crate + #[cfg(feature = "cluster")] + pub(crate) fn new_remote( + name: Option, + id: ActorId, + ) -> Result<(Self, ActorPortSet), SpawnErr> + where + TActor: Actor, + { + if !id.is_local() { + panic!("Cannot create a new remote actor handler without the actor id being marked as a remote actor!"); + } + + let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::(name.clone(), id); + let cell = Self { + inner: Arc::new(props), + }; + if let Some(r_name) = name { + crate::registry::register(r_name, cell.clone())?; + } + Ok(( + cell, + ActorPortSet { + signal_rx: rx1, + stop_rx: rx2, + supervisor_rx: rx3, + message_rx: rx4, + }, + )) + } + /// Retrieve the [super::Actor]'s unique identifier [ActorId] pub fn get_id(&self) -> ActorId { self.inner.id @@ -174,7 +219,7 @@ impl ActorCell { /// Retrieve the [super::Actor]'s name pub fn get_name(&self) -> Option { - self.inner.name + self.inner.name.clone() } /// Retrieve the current status of an [super::Actor] @@ -193,6 +238,8 @@ impl ActorCell { pub(crate) fn set_status(&self, status: ActorStatus) { // The actor is shut down if status == ActorStatus::Stopped || status == ActorStatus::Stopping { + #[cfg(feature = "cluster")] + crate::registry::unregister_pid(self.get_id()); // If it's enrolled in the registry, remove it if let Some(name) = self.get_name() { crate::registry::unregister(name); @@ -279,6 +326,16 @@ impl ActorCell { self.inner.send_message::(message) } + /// Send a sserialized binary message to the actor. + /// + /// * `message` - The message to send + /// + /// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise + #[cfg(feature = "cluster")] + pub fn send_serialized(&self, message: SerializedMessage) -> Result<(), MessagingErr> { + self.inner.send_serialized(message) + } + /// Notify the supervisors that a supervision event occurred /// /// * `evt` - The event to send to this [super::Actor]'s supervisors diff --git a/ractor/src/actor/messages.rs b/ractor/src/actor/messages.rs index 79d6372f..0d23b9a5 100644 --- a/ractor/src/actor/messages.rs +++ b/ractor/src/actor/messages.rs @@ -5,56 +5,15 @@ //! Messages which are built-in for `ractor`'s processing routines //! -//! Additionally contains definitions for [BoxedMessage] and [BoxedState] -//! which are used to handle strongly-typed messages and states in a +//! Additionally contains definitions for [BoxedState] +//! which are used to handle strongly-typed states in a //! generic way without having to know the strong type in the underlying framework use std::any::Any; use std::fmt::Debug; -use crate::{Message, State}; - -/// An error downcasting a boxed item to a strong type -#[derive(Debug)] -pub struct BoxedDowncastErr; - -/// A "boxed" message denoting a strong-type message -/// but generic so it can be passed around without type -/// constraints -pub struct BoxedMessage { - /// The message value - pub msg: Option>, -} - -impl BoxedMessage { - /// Create a new [BoxedMessage] from a strongly-typed message - pub fn new(msg: T) -> Self - where - T: Message, - { - Self { - msg: Some(Box::new(msg)), - } - } - - /// Try and take the resulting message as a specific type, consumes - /// the boxed message - pub fn take(&mut self) -> Result - where - T: Message, - { - match self.msg.take() { - Some(m) => { - if m.is::() { - Ok(*m.downcast::().unwrap()) - } else { - Err(BoxedDowncastErr) - } - } - None => Err(BoxedDowncastErr), - } - } -} +use crate::message::BoxedDowncastErr; +use crate::State; /// A "boxed" message denoting a strong-type message /// but generic so it can be passed around without type @@ -95,8 +54,8 @@ impl BoxedState { } /// Messages to stop an actor -pub(crate) enum StopMessage { - // Normal stop +pub enum StopMessage { + /// Normal stop Stop, /// Stop with a reason Reason(String), diff --git a/ractor/src/actor/mod.rs b/ractor/src/actor/mod.rs index 48c1ebad..367a563c 100644 --- a/ractor/src/actor/mod.rs +++ b/ractor/src/actor/mod.rs @@ -11,9 +11,12 @@ use std::{panic::AssertUnwindSafe, sync::Arc}; -use crate::concurrency::JoinHandle; use futures::TryFutureExt; +use crate::concurrency::JoinHandle; +#[cfg(feature = "cluster")] +use crate::ActorId; + pub mod messages; use messages::*; @@ -88,7 +91,7 @@ pub trait Actor: Sized + Sync + Send + 'static { #[allow(unused_variables)] async fn post_stop(&self, myself: ActorRef, state: &mut Self::State) {} - /// Handle the incoming message from the event processing loop. Unhandled panicks will be + /// Handle the incoming message from the event processing loop. Unhandled panickes will be /// captured and sent to the supervisor(s) /// /// * `myself` - A handle to the [ActorCell] representing this actor @@ -97,8 +100,25 @@ pub trait Actor: Sized + Sync + Send + 'static { #[allow(unused_variables)] async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) {} + /// Handle the remote incoming message from the event processing loop. Unhandled panickes will be + /// captured and sent to the supervisor(s) + /// + /// * `myself` - A handle to the [ActorCell] representing this actor + /// * `message` - The serialized messgae to handle + /// * `state` - A mutable reference to the internal actor's state + #[allow(unused_variables)] + #[cfg(feature = "cluster")] + async fn handle_serialized( + &self, + myself: ActorRef, + message: crate::message::SerializedMessage, + state: &mut Self::State, + ) { + } + /// Handle the incoming supervision event. Unhandled panicks will captured and - /// sent the the supervisor(s) + /// sent the the supervisor(s). The default supervision behavior is to ignore all + /// child events. To override this behavior, implement this method. /// /// * `myself` - A handle to the [ActorCell] representing this actor /// * `message` - The message to process @@ -197,6 +217,41 @@ where actor.start(ports, Some(supervisor)).await } + /// Spawn a REMOTE actor with a supervisor, automatically starting the actor. Only for use + /// by `ractor_cluster::node::NodeSession` + /// + /// * `name`: A name to give the actor. Useful for global referencing or debug printing + /// * `handler` The [Actor] defining the logic for this actor + /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor + /// + /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference + /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if + /// the actor failed to start + #[cfg(feature = "cluster")] + pub async fn spawn_linked_remote( + name: Option, + handler: THandler, + id: ActorId, + supervisor: ActorCell, + ) -> Result<(ActorRef, JoinHandle<()>), SpawnErr> { + if !id.is_local() { + Err(SpawnErr::StartupPanic( + "Cannot spawn a remote actor when the identifier is not remote!".to_string(), + )) + } else { + let (actor_cell, ports) = actor_cell::ActorCell::new_remote::(name, id)?; + + let (actor, ports) = ( + Self { + base: actor_cell.into(), + handler: Arc::new(handler), + }, + ports, + ); + actor.start(ports, Some(supervisor)).await + } + } + /// Create a new actor with some handler implementation and initial state /// /// * `name`: A name to give the actor. Useful for global referencing or debug printing @@ -361,7 +416,7 @@ where match ports.listen_in_priority().await { Ok(actor_port_message) => match actor_port_message { actor_cell::ActorPortMessage::Signal(signal) => { - (true, Self::handle_signal(myself, signal).await) + (true, Self::handle_signal(myself, signal)) } actor_cell::ActorPortMessage::Stop(stop_message) => { let exit_reason = match stop_message { @@ -380,7 +435,7 @@ where let new_state = ports.run_with_signal(new_state_future).await; match new_state { Ok(()) => (false, None), - Err(signal) => (true, Self::handle_signal(myself, signal).await), + Err(signal) => (true, Self::handle_signal(myself, signal)), } } actor_cell::ActorPortMessage::Message(msg) => { @@ -389,7 +444,7 @@ where let new_state = ports.run_with_signal(new_state_future).await; match new_state { Ok(()) => (false, None), - Err(signal) => (true, Self::handle_signal(myself, signal).await), + Err(signal) => (true, Self::handle_signal(myself, signal)), } } }, @@ -411,10 +466,28 @@ where myself: ActorRef, state: &mut TState, handler: Arc, - mut msg: BoxedMessage, + msg: crate::message::BoxedMessage, ) { // panic in order to kill the actor - let typed_msg = match msg.take() { + #[cfg(feature = "cluster")] + { + if !myself.get_id().is_local() { + match msg.serialized_msg { + Some(serialized_msg) => { + handler + .handle_serialized(myself, serialized_msg, state) + .await; + return; + } + None => { + panic!("Failed to read serialized message from `BoxedMessage`"); + } + } + } + } + + // panic in order to kill the actor + let typed_msg = match TMsg::from_boxed(msg) { Ok(m) => m, Err(_) => { panic!( @@ -427,7 +500,7 @@ where handler.handle(myself, typed_msg, state).await } - async fn handle_signal(myself: ActorRef, signal: Signal) -> Option { + fn handle_signal(myself: ActorRef, signal: Signal) -> Option { match &signal { Signal::Kill => { myself.terminate(); diff --git a/ractor/src/actor/tests/mod.rs b/ractor/src/actor/tests/mod.rs index e25d622d..eff91d51 100644 --- a/ractor/src/actor/tests/mod.rs +++ b/ractor/src/actor/tests/mod.rs @@ -16,13 +16,17 @@ use crate::{Actor, ActorCell, ActorRef, ActorStatus, SpawnErr, SupervisionEvent} mod supervisor; +struct EmptyMessage; +#[cfg(feature = "cluster")] +impl crate::Message for EmptyMessage {} + #[crate::concurrency::test] async fn test_panic_on_start_captured() { struct TestActor; #[async_trait::async_trait] impl Actor for TestActor { - type Msg = (); + type Msg = EmptyMessage; type State = (); @@ -45,7 +49,7 @@ async fn test_stop_higher_priority_over_messages() { #[async_trait::async_trait] impl Actor for TestActor { - type Msg = (); + type Msg = EmptyMessage; type State = (); @@ -74,7 +78,7 @@ async fn test_stop_higher_priority_over_messages() { // pump 10 messages on the queue for _i in 0..10 { actor - .send_message(()) + .send_message(EmptyMessage) .expect("Failed to send message to actor"); } @@ -108,7 +112,7 @@ async fn test_kill_terminates_work() { #[async_trait::async_trait] impl Actor for TestActor { - type Msg = (); + type Msg = EmptyMessage; type State = (); @@ -129,7 +133,7 @@ async fn test_kill_terminates_work() { .expect("Actor failed to start"); actor - .send_message(()) + .send_message(EmptyMessage) .expect("Failed to send message to actor"); crate::concurrency::sleep(Duration::from_millis(10)).await; @@ -146,7 +150,7 @@ async fn test_stop_does_not_terminate_async_work() { #[async_trait::async_trait] impl Actor for TestActor { - type Msg = (); + type Msg = EmptyMessage; type State = (); @@ -168,7 +172,7 @@ async fn test_stop_does_not_terminate_async_work() { // send a work message followed by a stop message actor - .send_message(()) + .send_message(EmptyMessage) .expect("Failed to send message to actor"); crate::concurrency::sleep(Duration::from_millis(2)).await; actor.stop(None); @@ -191,7 +195,7 @@ async fn test_kill_terminates_supervision_work() { #[async_trait::async_trait] impl Actor for TestActor { - type Msg = (); + type Msg = EmptyMessage; type State = (); @@ -229,6 +233,8 @@ async fn test_kill_terminates_supervision_work() { async fn test_sending_message_to_invalid_actor_type() { struct TestActor1; struct TestMessage1; + #[cfg(feature = "cluster")] + impl crate::Message for TestMessage1 {} #[async_trait::async_trait] impl Actor for TestActor1 { type Msg = TestMessage1; @@ -237,6 +243,8 @@ async fn test_sending_message_to_invalid_actor_type() { } struct TestActor2; struct TestMessage2; + #[cfg(feature = "cluster")] + impl crate::Message for TestMessage2 {} #[async_trait::async_trait] impl Actor for TestActor2 { type Msg = TestMessage2; diff --git a/ractor/src/actor/tests/supervisor.rs b/ractor/src/actor/tests/supervisor.rs index 8350a47a..ff9cae92 100644 --- a/ractor/src/actor/tests/supervisor.rs +++ b/ractor/src/actor/tests/supervisor.rs @@ -14,6 +14,9 @@ use crate::concurrency::Duration; use crate::{Actor, ActorCell, ActorRef, ActorStatus, SupervisionEvent}; +#[cfg(feature = "cluster")] +impl crate::Message for () {} + #[crate::concurrency::test] async fn test_supervision_panic_in_post_startup() { struct Child; diff --git a/ractor/src/actor_id.rs b/ractor/src/actor_id.rs index cf386fcd..12b2d1da 100644 --- a/ractor/src/actor_id.rs +++ b/ractor/src/actor_id.rs @@ -15,7 +15,12 @@ pub enum ActorId { Local(u64), /// A remote actor on another system (system, id) - Remote(u64, u64), + Remote { + /// The remote node id + node_id: u64, + /// The local id on the remote system + pid: u64, + }, } impl ActorId { @@ -25,13 +30,27 @@ impl ActorId { pub fn is_local(&self) -> bool { matches!(self, ActorId::Local(_)) } + + /// Retrieve the actor's PID + pub fn pid(&self) -> u64 { + match self { + ActorId::Local(pid) => *pid, + ActorId::Remote { pid, .. } => *pid, + } + } + + /// Build an actor id from just it's pid. Assumes LOCAL only + #[cfg(feature = "cluster")] + pub fn from_pid(pid: u64) -> ActorId { + ActorId::Local(pid) + } } impl Display for ActorId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ActorId::Local(id) => write!(f, "0.{}", id), - ActorId::Remote(system_id, id) => write!(f, "{}.{}", system_id, id), + ActorId::Remote { node_id, pid } => write!(f, "{}.{}", node_id, pid), } } } @@ -39,17 +58,23 @@ impl Display for ActorId { /// The local id allocator for actors static ACTOR_ID_ALLOCATOR: AtomicU64 = AtomicU64::new(0u64); -/// Retreiev a new local id +/// Retrieve a new local id pub(crate) fn get_new_local_id() -> ActorId { ActorId::Local(ACTOR_ID_ALLOCATOR.fetch_add(1, std::sync::atomic::Ordering::AcqRel)) } +/// Create a new actor id for an actor on a remote `node()` +#[cfg(feature = "cluster")] +pub fn new_remote_id(node_id: u64, pid: u64) -> ActorId { + ActorId::Remote { node_id, pid } +} + impl ActorId { /// Retrieve the PID of the actor, ignoring local/remote properties pub fn get_pid(&self) -> u64 { match self { ActorId::Local(pid) => *pid, - ActorId::Remote(_, pid) => *pid, + ActorId::Remote { pid, .. } => *pid, } } } diff --git a/ractor/src/concurrency.rs b/ractor/src/concurrency.rs index 6e08fb30..8a37bc0a 100644 --- a/ractor/src/concurrency.rs +++ b/ractor/src/concurrency.rs @@ -52,6 +52,9 @@ pub mod tokio_primatives { /// A duration of time pub type Duration = tokio::time::Duration; + /// An instant measured on system time + pub type Instant = tokio::time::Instant; + /// Sleep the task for a duration of time pub async fn sleep(dur: super::Duration) { tokio::time::sleep(dur).await; diff --git a/ractor/src/distributed/mod.rs b/ractor/src/distributed/mod.rs deleted file mode 100644 index acdf01ea..00000000 --- a/ractor/src/distributed/mod.rs +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) Sean Lawlor -// -// This source code is licensed under both the MIT license found in the -// LICENSE-MIT file in the root directory of this source tree. - -//! Support for remote nodes in a distributed cluster. -//! -//! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html) -//! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes. -//! -//! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool. -//! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect -//! to them as well. They merge registries and pg groups together in order to create larger clusters of services. -//! -//! For messages to be transmittable across the [Node] boundaries to other [Node]s in the pool, they need to be -//! serializable to a binary format (say protobuf) - -use dashmap::DashMap; - -/// Represents messages that can cross the node boundary which can be serialized and sent over the wire -pub trait NodeSerializableMessage { - /// Serialize the message to binary - fn serialize(&self) -> &[u8]; - - /// Deserialize from binary back into the message type - fn deserialize(&self, data: &[u8]) -> Self; -} - -/// The identifier of a node is a globally unique u64 -pub type NodeId = u64; - -/// A node in the distributed compute pool. -pub struct Node { - node_id: u64, - other_nodes: DashMap, -} \ No newline at end of file diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index 08b88ebf..377ebb77 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -34,6 +34,8 @@ //! Ping, //! Pong, //! } +//! #[cfg(feature = "cluster")] +//! impl ractor::Message for Message {} //! //! impl Message { //! // retrieve the next message in the sequence @@ -133,20 +135,19 @@ #![warn(unsafe_code)] #![warn(missing_docs)] #![warn(unused_crate_dependencies)] -#![cfg_attr(docsrs, feature(doc_cfg))] - -use std::any::Any; +// #![cfg_attr(docsrs, feature(doc_cfg))] /// An actor's name, equivalent to an [Erlang `atom()`](https://www.erlang.org/doc/reference_manual/data_types.html#atom) -pub type ActorName = &'static str; +pub type ActorName = String; /// A process group's name, equivalent to an [Erlang `atom()`](https://www.erlang.org/doc/reference_manual/data_types.html#atom) -pub type GroupName = &'static str; +pub type GroupName = String; pub mod actor; pub mod actor_id; pub mod concurrency; pub mod macros; +pub mod message; pub mod pg; pub mod port; pub mod registry; @@ -161,39 +162,19 @@ use criterion as _; #[cfg(test)] use rand as _; -// WIP -// #[cfg(feature = "remote")] -// pub mod distributed; - // re-exports pub use actor::actor_cell::{ActorCell, ActorRef, ActorStatus, ACTIVE_STATES}; pub use actor::errors::{ActorErr, MessagingErr, SpawnErr}; pub use actor::messages::{Signal, SupervisionEvent}; pub use actor::{Actor, ActorRuntime}; pub use actor_id::ActorId; +pub use message::Message; pub use port::{OutputMessage, OutputPort, RpcReplyPort}; -/// Message type for an actor. Generally an enum -/// which muxes the various types of inner-messages the actor -/// supports -/// -/// ## Example -/// -/// ```rust -/// pub enum MyMessage { -/// /// Record the name to the actor state -/// RecordName(String), -/// /// Print the recorded name from the state to command line -/// PrintName, -/// } -/// ``` -pub trait Message: Any + Send + 'static {} -impl Message for T {} - /// Represents the state of an actor. Must be safe /// to send between threads (same bounds as a [Message]) -pub trait State: Message {} -impl State for T {} +pub trait State: std::any::Any + Send + 'static {} +impl State for T {} /// Error types which can result from Ractor processes #[derive(Debug)] diff --git a/ractor/src/macros.rs b/ractor/src/macros.rs index 06a08c22..328b22d1 100644 --- a/ractor/src/macros.rs +++ b/ractor/src/macros.rs @@ -23,13 +23,15 @@ macro_rules! cast { /// constructing the message payload /// /// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure -/// Example usage -/// ```rust +/// Example usage (without the `cluster` feature) +/// ```no_run /// use ractor::{call, Actor, RpcReplyPort, ActorRef}; /// struct TestActor; /// enum MessageFormat { /// TestRpc(String, RpcReplyPort), /// } +/// #[cfg(feature = "cluster")] +/// impl ractor::Message for MessageFormat {} /// /// #[async_trait::async_trait] /// impl Actor for TestActor { @@ -106,6 +108,8 @@ macro_rules! call { /// enum MessageFormat { /// TestRpc(String, RpcReplyPort), /// } +/// #[cfg(feature = "cluster")] +/// impl ractor::Message for MessageFormat {} /// /// #[async_trait::async_trait] /// impl Actor for TestActor { diff --git a/ractor/src/message.rs b/ractor/src/message.rs new file mode 100644 index 00000000..99312640 --- /dev/null +++ b/ractor/src/message.rs @@ -0,0 +1,146 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Message trait definition for inter-actor messaging + +use std::any::Any; + +use crate::ActorId; +#[cfg(feature = "cluster")] +use crate::RpcReplyPort; + +/// An error downcasting a boxed item to a strong type +#[derive(Debug)] +pub struct BoxedDowncastErr; + +/// Represents a serialized call or cast message +#[cfg(feature = "cluster")] +pub enum SerializedMessage { + /// A cast (one-way) with the serialized payload + Cast(Vec), + /// A call (remote procedure call, waiting on a reply) with the + /// serialized arguments and reply channel + Call(Vec, RpcReplyPort>), + /// A serialized reply from a call operation. Format is + /// (`message_tag`, `reply_data`). It should not be the output + /// of [Message::serialize] function, and is only generated + /// from the `NodeSession` + CallReply(u64, Vec), +} + +/// A "boxed" message denoting a strong-type message +/// but generic so it can be passed around without type +/// constraints +pub struct BoxedMessage { + pub(crate) msg: Option>, + /// A serialized message for a remote actor, accessed only by the `RemoteActorRuntime` + #[cfg(feature = "cluster")] + pub serialized_msg: Option, +} + +/// Message type for an actor. Generally an enum +/// which muxes the various types of inner-messages the actor +/// supports +/// +/// ## Example +/// +/// ```rust +/// pub enum MyMessage { +/// /// Record the name to the actor state +/// RecordName(String), +/// /// Print the recorded name from the state to command line +/// PrintName, +/// } +/// ``` +pub trait Message: Any + Send + Sized + 'static { + /// Convert a [BoxedMessage] to this concrete type + #[cfg(feature = "cluster")] + fn from_boxed(mut m: BoxedMessage) -> Result { + if m.msg.is_some() { + match m.msg.take() { + Some(m) => { + if m.is::() { + Ok(*m.downcast::().unwrap()) + } else { + Err(BoxedDowncastErr) + } + } + _ => Err(BoxedDowncastErr), + } + } else if m.serialized_msg.is_some() { + match m.serialized_msg.take() { + Some(m) => Self::deserialize(m), + _ => Err(BoxedDowncastErr), + } + } else { + Err(BoxedDowncastErr) + } + } + + /// Convert a [BoxedMessage] to this concrete type + #[cfg(not(feature = "cluster"))] + fn from_boxed(mut m: BoxedMessage) -> Result { + match m.msg.take() { + Some(m) => { + if m.is::() { + Ok(*m.downcast::().unwrap()) + } else { + Err(BoxedDowncastErr) + } + } + _ => Err(BoxedDowncastErr), + } + } + + /// Convert this message to a [BoxedMessage] + #[cfg(feature = "cluster")] + fn box_message(self, pid: &ActorId) -> BoxedMessage { + if Self::serializable() && !pid.is_local() { + // it's a message to a remote actor, serialize it and send it over the wire! + BoxedMessage { + msg: None, + serialized_msg: Some(self.serialize()), + } + } else if pid.is_local() { + BoxedMessage { + msg: Some(Box::new(self)), + serialized_msg: None, + } + } else { + panic!("Cannot send message to remote actor without the message being serializable"); + } + } + + /// Convert this message to a [BoxedMessage] + #[cfg(not(feature = "cluster"))] + fn box_message(self, _pid: &ActorId) -> BoxedMessage { + BoxedMessage { + msg: Some(Box::new(self)), + } + } + + /// Determines if this type is serializable + #[cfg(feature = "cluster")] + fn serializable() -> bool { + false + } + + /// Serializes this message (if supported) + #[cfg(feature = "cluster")] + fn serialize(self) -> SerializedMessage { + unimplemented!() + } + + /// Deserialize binary data to this message type + #[cfg(feature = "cluster")] + fn deserialize(_bytes: SerializedMessage) -> Result { + unimplemented!() + } +} + +// Auto-Implement the [Message] trait for all types when NOT in the `cluster` configuration +// since there's no need for an override +#[cfg(not(feature = "cluster"))] +impl Message for T {} diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 9d8229ff..5519b3cf 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -3,7 +3,18 @@ // This source code is licensed under both the MIT license found in the // LICENSE-MIT file in the root directory of this source tree. -//! Process groups (PG) +//! Process groups (PG) are named groups of actors with a friendly name +//! which can be used for retrieval of the process groups. Then within +//! the group, either a random actor (for dispatch) can be selected or +//! the whole group (broadcast), or a subset (partial-broadcast) can have +//! a message sent to them. Common operations are to (a) upcast the group +//! members to a strong-type'd actor then dispatch a message with [crate::call] +//! or [crate::cast]. +//! +//! Process groups can also be monitored for changes with calling [monitor] to +//! subscribe to changes and [demonitor] to unsubscribe. Subscribers will receive +//! process group change notifications via a [SupervisionEvent] called on the +//! supervision port of the [crate::Actor] //! //! Inspired from [Erlang's `pg` module](https://www.erlang.org/doc/man/pg.html) @@ -33,8 +44,8 @@ impl GroupChangeMessage { /// Retrieve the group that changed pub fn get_group(&self) -> GroupName { match self { - Self::Join(name, _) => name, - Self::Leave(name, _) => name, + Self::Join(name, _) => name.clone(), + Self::Leave(name, _) => name.clone(), } } } @@ -60,7 +71,7 @@ fn get_monitor<'a>() -> &'a PgState { pub fn join(group: GroupName, actors: Vec) { let monitor = get_monitor(); // insert into the monitor group - match monitor.map.entry(group) { + match monitor.map.entry(group.clone()) { Occupied(mut occupied) => { let oref = occupied.get_mut(); for actor in actors.iter() { @@ -79,7 +90,7 @@ pub fn join(group: GroupName, actors: Vec) { if let Some(listeners) = monitor.listeners.get(&group) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(group, actors.clone()), + GroupChangeMessage::Join(group.clone(), actors.clone()), )); } } @@ -91,7 +102,7 @@ pub fn join(group: GroupName, actors: Vec) { /// * `actors` - Thie list of actors to remove from the group pub fn leave(group: GroupName, actors: Vec) { let monitor = get_monitor(); - match monitor.map.entry(group) { + match monitor.map.entry(group.clone()) { Vacant(_) => {} Occupied(mut occupied) => { let mut_ref = occupied.get_mut(); @@ -105,7 +116,7 @@ pub fn leave(group: GroupName, actors: Vec) { if let Some(listeners) = monitor.listeners.get(&group) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group, actors.clone()), + GroupChangeMessage::Leave(group.clone(), actors.clone()), )); } } @@ -124,10 +135,10 @@ pub(crate) fn leave_all(actor: ActorId) { for mut kv in map.iter_mut() { if let Some(actor_cell) = kv.value_mut().remove(&actor) { - removal_events.insert(*kv.key(), actor_cell); + removal_events.insert(kv.key().clone(), actor_cell); } if kv.value().is_empty() { - empty_groups.push(*kv.key()); + empty_groups.push(kv.key().clone()); } } @@ -137,7 +148,7 @@ pub(crate) fn leave_all(actor: ActorId) { if let Some(this_listeners) = all_listeners.get(&group) { this_listeners.iter().for_each(|listener| { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group, vec![cell.clone()]), + GroupChangeMessage::Leave(group.clone(), vec![cell.clone()]), )); }); } @@ -145,16 +156,16 @@ pub(crate) fn leave_all(actor: ActorId) { // Cleanup empty groups for group in empty_groups { - map.remove(group); + map.remove(&group); } } /// Returns all the actors running on the local node in the group `group`. /// /// * `group_name` - Either a statically named group or scope -pub fn get_local_members(group_name: GroupName) -> Vec { +pub fn get_local_members(group_name: &GroupName) -> Vec { let monitor = get_monitor(); - if let Some(actors) = monitor.map.get(&group_name) { + if let Some(actors) = monitor.map.get(group_name) { actors .value() .values() @@ -171,9 +182,9 @@ pub fn get_local_members(group_name: GroupName) -> Vec { /// * `group_name` - Either a statically named group or scope /// /// Returns [Vec<_>] with the associated actors -pub fn get_members(group_name: GroupName) -> Vec { +pub fn get_members(group_name: &GroupName) -> Vec { let monitor = get_monitor(); - if let Some(actors) = monitor.map.get(&group_name) { + if let Some(actors) = monitor.map.get(group_name) { actors.value().values().cloned().collect::>() } else { vec![] @@ -188,7 +199,7 @@ pub fn which_groups() -> Vec { monitor .map .iter() - .map(|kvp| *(kvp.key())) + .map(|kvp| kvp.key().clone()) .collect::>() } @@ -231,7 +242,7 @@ pub(crate) fn demonitor_all(actor: ActorId) { let v = kvp.value_mut(); v.retain(|v| v.get_id() != actor); if v.is_empty() { - empty_groups.push(*kvp.key()); + empty_groups.push(kvp.key().clone()); } } diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 9abde186..99fe3446 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -31,12 +31,12 @@ async fn test_basic_group() { .await .expect("Failed to spawn test actor"); - let group = function_name!(); + let group = function_name!().to_string(); // join the group - pg::join(group, vec![actor.clone().into()]); + pg::join(group.clone(), vec![actor.clone().into()]); - let members = pg::get_members(group); + let members = pg::get_members(&group); assert_eq!(1, members.len()); // Cleanup @@ -47,7 +47,7 @@ async fn test_basic_group() { #[named] #[crate::concurrency::test] async fn test_multiple_members_in_group() { - let group = function_name!(); + let group = function_name!().to_string(); let mut actors = vec![]; let mut handles = vec![]; @@ -61,14 +61,14 @@ async fn test_multiple_members_in_group() { // join the group pg::join( - group, + group.clone(), actors .iter() .map(|aref| aref.clone().get_cell()) .collect::>(), ); - let members = pg::get_members(group); + let members = pg::get_members(&group); assert_eq!(10, members.len()); // Cleanup @@ -83,8 +83,8 @@ async fn test_multiple_members_in_group() { #[named] #[crate::concurrency::test] async fn test_multiple_groups() { - let group_a = concat!(function_name!(), "_a"); - let group_b = concat!(function_name!(), "_b"); + let group_a = concat!(function_name!(), "_a").to_string(); + let group_b = concat!(function_name!(), "_b").to_string(); let mut actors = vec![]; let mut handles = vec![]; @@ -101,18 +101,18 @@ async fn test_multiple_groups() { .iter() .map(|a| a.clone().get_cell()) .collect::>(); - pg::join(group_a, these_actors); + pg::join(group_a.clone(), these_actors); let these_actors = actors[5..10] .iter() .map(|a| a.clone().get_cell()) .collect::>(); - pg::join(group_b, these_actors); + pg::join(group_b.clone(), these_actors); - let members = pg::get_members(group_a); + let members = pg::get_members(&group_a); assert_eq!(5, members.len()); - let members = pg::get_members(group_b); + let members = pg::get_members(&group_b); assert_eq!(5, members.len()); // Cleanup @@ -131,12 +131,12 @@ async fn test_actor_leaves_pg_group_on_shutdown() { .await .expect("Failed to spawn test actor"); - let group = function_name!(); + let group = function_name!().to_string(); // join the group - pg::join(group, vec![actor.clone().into()]); + pg::join(group.clone(), vec![actor.clone().into()]); - let members = pg::get_members(group); + let members = pg::get_members(&group); assert_eq!(1, members.len()); // Cleanup @@ -144,38 +144,38 @@ async fn test_actor_leaves_pg_group_on_shutdown() { handle.await.expect("Actor cleanup failed"); drop(actor); - let members = pg::get_members(group); + let members = pg::get_members(&group); assert_eq!(0, members.len()); } #[named] #[crate::concurrency::test] async fn test_actor_leaves_pg_group_manually() { - let group = function_name!(); + let group = function_name!().to_string(); let (actor, handle) = Actor::spawn(None, TestActor) .await .expect("Failed to spawn test actor"); // join the group (create on first use) - pg::join(group, vec![actor.clone().into()]); + pg::join(group.clone(), vec![actor.clone().into()]); // the group was created and is present let groups = pg::which_groups(); assert!(groups.contains(&group)); - let members = pg::get_members(group); + let members = pg::get_members(&group); assert_eq!(1, members.len()); // leave the group - pg::leave(group, vec![actor.clone().into()]); + pg::leave(group.clone(), vec![actor.clone().into()]); // pif-paf-poof the group is gone! let groups = pg::which_groups(); assert!(!groups.contains(&group)); // members comes back empty - let members = pg::get_members(group); + let members = pg::get_members(&group); assert_eq!(0, members.len()); // Cleanup @@ -186,7 +186,7 @@ async fn test_actor_leaves_pg_group_manually() { #[named] #[crate::concurrency::test] async fn test_pg_monitoring() { - let group = function_name!(); + let group = function_name!().to_string(); let counter = Arc::new(AtomicU8::new(0u8)); @@ -201,7 +201,7 @@ async fn test_pg_monitoring() { type State = (); async fn pre_start(&self, myself: crate::ActorRef) -> Self::State { - pg::join(self.pg_group, vec![myself.into()]); + pg::join(self.pg_group.clone(), vec![myself.into()]); } } @@ -217,7 +217,7 @@ async fn test_pg_monitoring() { type State = (); async fn pre_start(&self, myself: crate::ActorRef) -> Self::State { - pg::monitor(self.pg_group, myself.into()); + pg::monitor(self.pg_group.clone(), myself.into()); } async fn handle_supervisor_evt( @@ -241,7 +241,7 @@ async fn test_pg_monitoring() { let (monitor_actor, monitor_handle) = Actor::spawn( None, NotificationMonitor { - pg_group: group, + pg_group: group.clone(), counter: counter.clone(), }, ) diff --git a/ractor/src/port/mod.rs b/ractor/src/port/mod.rs index d26ac051..7516afbb 100644 --- a/ractor/src/port/mod.rs +++ b/ractor/src/port/mod.rs @@ -24,9 +24,17 @@ pub use output::*; /// consistent error type pub struct RpcReplyPort { port: concurrency::OneshotSender, + timeout: Option, } impl RpcReplyPort { + /// Read the timeout of this RPC reply port + /// + /// Returns [Some(concurrency::Duration)] if a timeout is set, [None] otherwise + pub fn get_timeout(&self) -> Option { + self.timeout + } + /// Send a message to the Rpc reply port. This consumes the port /// /// * `msg` - The message to send @@ -48,6 +56,18 @@ impl RpcReplyPort { impl From> for RpcReplyPort { fn from(value: concurrency::OneshotSender) -> Self { - Self { port: value } + Self { + port: value, + timeout: None, + } + } +} + +impl From<(concurrency::OneshotSender, concurrency::Duration)> for RpcReplyPort { + fn from((value, timeout): (concurrency::OneshotSender, concurrency::Duration)) -> Self { + Self { + port: value, + timeout: Some(timeout), + } } } diff --git a/ractor/src/port/output/tests.rs b/ractor/src/port/output/tests.rs index e635c5ee..658108c5 100644 --- a/ractor/src/port/output/tests.rs +++ b/ractor/src/port/output/tests.rs @@ -20,6 +20,8 @@ async fn test_single_forward() { enum TestActorMessage { Stop, } + #[cfg(feature = "cluster")] + impl crate::Message for TestActorMessage {} #[async_trait::async_trait] impl Actor for TestActor { type Msg = TestActorMessage; @@ -76,6 +78,8 @@ async fn test_50_receivers() { enum TestActorMessage { Stop, } + #[cfg(feature = "cluster")] + impl crate::Message for TestActorMessage {} #[async_trait::async_trait] impl Actor for TestActor { type Msg = TestActorMessage; diff --git a/ractor/src/registry/mod.rs b/ractor/src/registry/mod.rs index d4a39506..cb847218 100644 --- a/ractor/src/registry/mod.rs +++ b/ractor/src/registry/mod.rs @@ -5,8 +5,8 @@ //! Represents an actor registry. //! -//! It allows unique naming of actors via `'static &str` (not strings) -//! so it works more like a Erlang `atom()` +//! It allows unique naming of actors via `String` +//! so it works more or less like an Erlang `atom()` //! //! Actors are automatically registered into the global registry, if they //! provide a name, upon construction.Actors are also @@ -24,7 +24,7 @@ //! //! ```rust //! async fn test() { -//! let maybe_actor = ractor::registry::where_is("my_actor"); +//! let maybe_actor = ractor::registry::where_is("my_actor".to_string()); //! if let Some(actor) = maybe_actor { //! // send a message, or interact with the actor //! // but you'll need to know the actor's strong type @@ -38,6 +38,8 @@ use dashmap::mapref::entry::Entry::{Occupied, Vacant}; use dashmap::DashMap; use once_cell::sync::OnceCell; +#[cfg(feature = "cluster")] +use crate::ActorId; use crate::{ActorCell, ActorName}; #[cfg(test)] @@ -51,15 +53,21 @@ pub enum ActorRegistryErr { /// The name'd actor registry static ACTOR_REGISTRY: OnceCell>> = OnceCell::new(); +#[cfg(feature = "cluster")] +static PID_REGISTRY: OnceCell>> = OnceCell::new(); /// Retrieve the named actor registry handle fn get_actor_registry<'a>() -> &'a Arc> { ACTOR_REGISTRY.get_or_init(|| Arc::new(DashMap::new())) } +#[cfg(feature = "cluster")] +fn get_pid_registry<'a>() -> &'a Arc> { + PID_REGISTRY.get_or_init(|| Arc::new(DashMap::new())) +} /// Put an actor into the registry pub(crate) fn register(name: ActorName, actor: ActorCell) -> Result<(), ActorRegistryErr> { - match get_actor_registry().entry(name) { + match get_actor_registry().entry(name.clone()) { Occupied(_) => Err(ActorRegistryErr::AlreadyRegistered(name)), Vacant(vacancy) => { vacancy.insert(actor); @@ -67,6 +75,12 @@ pub(crate) fn register(name: ActorName, actor: ActorCell) -> Result<(), ActorReg } } } +#[cfg(feature = "cluster")] +pub(crate) fn register_pid(id: ActorId, actor: ActorCell) { + if id.is_local() { + get_pid_registry().insert(id, actor); + } +} /// Remove an actor from the registry given it's actor name pub(crate) fn unregister(name: ActorName) { @@ -74,6 +88,12 @@ pub(crate) fn unregister(name: ActorName) { let _ = reg.remove(&name); } } +#[cfg(feature = "cluster")] +pub(crate) fn unregister_pid(id: ActorId) { + if id.is_local() { + let _ = get_pid_registry().remove(&id); + } +} /// Try and retrieve an actor from the registry /// @@ -92,5 +112,19 @@ pub fn where_is(name: ActorName) -> Option { /// currently pub fn registered() -> Vec { let reg = get_actor_registry(); - reg.iter().map(|kvp| *kvp.key()).collect::>() + reg.iter().map(|kvp| kvp.key().clone()).collect::>() +} + +/// Retrieve an actor from the global registery of all local actors +/// +/// * `id` - The **local** id of the actor to retrieve +/// +/// Returns [Some(_)] if the actor exists locally, [None] otherwise +#[cfg(feature = "cluster")] +pub fn get_pid(id: ActorId) -> Option { + if id.is_local() { + get_pid_registry().get(&id).map(|v| v.value().clone()) + } else { + None + } } diff --git a/ractor/src/registry/tests.rs b/ractor/src/registry/tests.rs index 74148807..5d01401f 100644 --- a/ractor/src/registry/tests.rs +++ b/ractor/src/registry/tests.rs @@ -22,11 +22,11 @@ async fn test_basic_registation() { async fn pre_start(&self, _this_actor: crate::ActorRef) -> Self::State {} } - let (actor, handle) = Actor::spawn(Some("my_actor"), EmptyActor) + let (actor, handle) = Actor::spawn(Some("my_actor".to_string()), EmptyActor) .await .expect("Actor failed to start"); - assert!(crate::registry::where_is("my_actor").is_some()); + assert!(crate::registry::where_is("my_actor".to_string()).is_some()); actor.stop(None); handle.await.expect("Failed to clean stop the actor"); @@ -45,13 +45,13 @@ async fn test_duplicate_registration() { async fn pre_start(&self, _this_actor: crate::ActorRef) -> Self::State {} } - let (actor, handle) = Actor::spawn(Some("my_second_actor"), EmptyActor) + let (actor, handle) = Actor::spawn(Some("my_second_actor".to_string()), EmptyActor) .await .expect("Actor failed to start"); - assert!(crate::registry::where_is("my_second_actor").is_some()); + assert!(crate::registry::where_is("my_second_actor".to_string()).is_some()); - let second_actor = Actor::spawn(Some("my_second_actor"), EmptyActor).await; + let second_actor = Actor::spawn(Some("my_second_actor".to_string()), EmptyActor).await; // fails to spawn the second actor due to name err assert!(matches!( second_actor, @@ -59,7 +59,7 @@ async fn test_duplicate_registration() { )); // make sure the first actor is still registered - assert!(crate::registry::where_is("my_second_actor").is_some()); + assert!(crate::registry::where_is("my_second_actor".to_string()).is_some()); actor.stop(None); handle.await.expect("Failed to clean stop the actor"); @@ -78,11 +78,11 @@ async fn test_actor_registry_unenrollment() { async fn pre_start(&self, _this_actor: crate::ActorRef) -> Self::State {} } - let (actor, handle) = Actor::spawn(Some("unenrollment"), EmptyActor) + let (actor, handle) = Actor::spawn(Some("unenrollment".to_string()), EmptyActor) .await .expect("Actor failed to start"); - assert!(crate::registry::where_is("unenrollment").is_some()); + assert!(crate::registry::where_is("unenrollment".to_string()).is_some()); // stop the actor and wait for its death actor.stop(None); @@ -95,5 +95,5 @@ async fn test_actor_registry_unenrollment() { crate::concurrency::sleep(Duration::from_millis(100)).await; // the actor was automatically removed - assert!(crate::registry::where_is("unenrollment").is_none()); + assert!(crate::registry::where_is("unenrollment".to_string()).is_none()); } diff --git a/ractor/src/rpc/mod.rs b/ractor/src/rpc/mod.rs index f408165f..6cc6a42c 100644 --- a/ractor/src/rpc/mod.rs +++ b/ractor/src/rpc/mod.rs @@ -12,7 +12,7 @@ use crate::concurrency::{self, Duration, JoinHandle}; -use crate::{Actor, ActorCell, ActorRef, Message, MessagingErr, RpcReplyPort}; +use crate::{Actor, ActorCell, ActorRef, MessagingErr, RpcReplyPort}; pub mod call_result; pub use call_result::CallResult; @@ -53,7 +53,11 @@ where TMsgBuilder: FnOnce(RpcReplyPort) -> TActor::Msg, { let (tx, rx) = concurrency::oneshot(); - actor.send_message::(msg_builder(tx.into()))?; + let port: RpcReplyPort = match timeout_option { + Some(duration) => (tx, duration).into(), + None => tx.into(), + }; + actor.send_message::(msg_builder(port))?; // wait for the reply Ok(if let Some(duration) = timeout_option { @@ -95,7 +99,11 @@ where // send to all actors for actor in actors { let (tx, rx) = concurrency::oneshot(); - actor.send_message::(msg_builder(tx.into()))?; + let port: RpcReplyPort = match timeout_option { + Some(duration) => (tx, duration).into(), + None => tx.into(), + }; + actor.send_message::(msg_builder(port))?; rx_ports.push(rx); } @@ -162,13 +170,17 @@ pub fn call_and_forward( ) -> Result>>, MessagingErr> where TActor: Actor, - TReply: Message, + TReply: Send + 'static, TMsgBuilder: FnOnce(RpcReplyPort) -> TActor::Msg, TForwardActor: Actor, FwdMapFn: FnOnce(TReply) -> TForwardActor::Msg + Send + 'static, { let (tx, rx) = concurrency::oneshot(); - actor.send_message::(msg_builder(tx.into()))?; + let port: RpcReplyPort = match timeout_option { + Some(duration) => (tx, duration).into(), + None => tx.into(), + }; + actor.send_message::(msg_builder(port))?; // wait for the reply Ok(crate::concurrency::spawn(async move { @@ -221,7 +233,7 @@ where timeout_option: Option, ) -> Result>>, MessagingErr> where - TReply: Message, + TReply: Send + 'static, TMsgBuilder: FnOnce(RpcReplyPort) -> TActor::Msg, TForwardActor: Actor, TFwdMessageBuilder: FnOnce(TReply) -> TForwardActor::Msg + Send + 'static, diff --git a/ractor/src/rpc/tests.rs b/ractor/src/rpc/tests.rs index 71d2c138..ac3e2084 100644 --- a/ractor/src/rpc/tests.rs +++ b/ractor/src/rpc/tests.rs @@ -71,7 +71,8 @@ async fn test_rpc_call() { Timeout(rpc::RpcReplyPort), MultiArg(String, u32, rpc::RpcReplyPort), } - + #[cfg(feature = "cluster")] + impl crate::Message for MessageFormat {} #[async_trait::async_trait] impl Actor for TestActor { type Msg = MessageFormat; @@ -156,7 +157,8 @@ async fn test_rpc_call_forwarding() { enum WorkerMessage { TestRpc(rpc::RpcReplyPort), } - + #[cfg(feature = "cluster")] + impl crate::Message for WorkerMessage {} #[async_trait::async_trait] impl Actor for Worker { type Msg = WorkerMessage; @@ -191,6 +193,8 @@ async fn test_rpc_call_forwarding() { enum ForwarderMessage { ForwardResult(String), } + #[cfg(feature = "cluster")] + impl crate::Message for ForwarderMessage {} #[async_trait::async_trait] impl Actor for Forwarder { diff --git a/ractor/src/time/mod.rs b/ractor/src/time/mod.rs index 14efa48f..02f2b272 100644 --- a/ractor/src/time/mod.rs +++ b/ractor/src/time/mod.rs @@ -61,17 +61,18 @@ where /// Returns: The [JoinHandle>] which represents the backgrounded work. /// Awaiting the handle will yield the result of the send operation. Can be safely ignored to /// "fire and forget" -pub fn send_after( +pub fn send_after( period: Duration, actor: ActorCell, - msg: TActor::Msg, + msg: F, ) -> JoinHandle> where TActor: Actor, + F: Fn() -> TActor::Msg + Send + 'static, { crate::concurrency::spawn(async move { crate::concurrency::sleep(period).await; - actor.send_message::(msg) + actor.send_message::(msg()) }) } @@ -119,12 +120,11 @@ where } /// Alias of [send_after] - pub fn send_after( - &self, - period: Duration, - msg: TActor::Msg, - ) -> JoinHandle> { - send_after::(period, self.get_cell(), msg) + pub fn send_after(&self, period: Duration, msg: F) -> JoinHandle> + where + F: Fn() -> TActor::Msg + Send + 'static, + { + send_after::(period, self.get_cell(), msg) } /// Alias of [exit_after] diff --git a/ractor/src/time/tests.rs b/ractor/src/time/tests.rs index c84b163a..089d0f2e 100644 --- a/ractor/src/time/tests.rs +++ b/ractor/src/time/tests.rs @@ -55,7 +55,7 @@ async fn test_intervals() { // therefore the counter should be empty assert_eq!(0, counter.load(Ordering::Relaxed)); - crate::concurrency::sleep(Duration::from_millis(120)).await; + crate::concurrency::sleep(Duration::from_millis(150)).await; // kill the actor actor_ref.stop(None); @@ -104,7 +104,7 @@ async fn test_send_after() { .await .expect("Failed to create test actor"); - let send_after_handle = actor_ref.send_after(Duration::from_millis(10), ()); + let send_after_handle = actor_ref.send_after(Duration::from_millis(10), || ()); // even though the timer is started, we should be in a "pause" state for 10ms, // therefore the counter should be empty