Skip to content

Commit

Permalink
Initial sketchings of how distributed node's might look, based heavil…
Browse files Browse the repository at this point in the history
…y on the Erlang

protocol.

This is a collection of tcp managing actors and session management for automated session
handling

Related issue: #16
  • Loading branch information
slawlor committed Jan 15, 2023
1 parent 67ac50c commit ca8c758
Show file tree
Hide file tree
Showing 17 changed files with 1,048 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

members = [
"ractor",
"ractor-cluster",
"ractor-playground",
"xtask"
]
36 changes: 36 additions & 0 deletions ractor-cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "ractor-cluster"
version = "0.4.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
license = "MIT"
edition = "2018"
keywords = ["actor", "ractor", "cluster"]
repository = "https://github.com/slawlor/ractor"
readme = "../README.md"
homepage = "https://github.com/slawlor/ractor"
categories = ["actor", "erlang"]
build = "src/build.rs"

[build-dependencies]
protobuf-src = "1"
prost-build = { version = "0.11" }

[dependencies]
## Required dependencies
async-trait = "0.1"
bytes = { version = "1" }
# dashmap = "5"
# futures = "0.3"
log = "0.4"
# once_cell = "1"
prost = { version = "0.11" }
ractor = { version = "0.4", features = ["cluster"], path = "../ractor" }
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net"]}

## Optional dependencies
# tokio-rustls = { version = "0.23", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] }
31 changes: 31 additions & 0 deletions ractor-cluster/src/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! This is the pre-compilation build script for the crate `ractor` when running in distrubted
//! mode. It's used to compile protobuf into Rust code prior to compilation.
/// The shared-path for all protobuf specifications
const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol";
/// The list of protobuf files to generate inside PROBUF_BASE_DIRECTORY
const PROTOBUF_FILES: [&str; 1] = ["meta"]; //"node", "auth",

fn build_protobufs() {
std::env::set_var("PROTOC", protobuf_src::protoc());

let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len());

for file in PROTOBUF_FILES.iter() {
let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file);
println!("cargo:rerun-if-changed={}", proto_file);
protobuf_files.push(proto_file);
}

prost_build::compile_protos(&protobuf_files, &[PROTOBUF_BASE_DIRECTORY]).unwrap();
}

fn main() {
// compile the spec files into Rust code
build_protobufs();
}
38 changes: 21 additions & 17 deletions ractor/src/distributed/mod.rs → ractor-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,37 @@
// LICENSE-MIT file in the root directory of this source tree.

//! Support for remote nodes in a distributed cluster.
//!
//!
//! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html)
//! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes.
//!
//!
//! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool.
//! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect
//! to them as well. They merge registries and pg groups together in order to create larger clusters of services.
//!
//! For messages to be transmittable across the [Node] boundaries to other [Node]s in the pool, they need to be
//! serializable to a binary format (say protobuf)
//!
//! We have chosen protobuf for our inter-node defined protocol, however you can chose whatever medium you like
//! for binary serialization + deserialization. The "remote" actor will simply encode your message type and send it
//! over the wire for you
use dashmap::DashMap;
#![deny(warnings)]
#![warn(unused_imports)]
#![warn(unsafe_code)]
#![warn(missing_docs)]
#![warn(unused_crate_dependencies)]
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod net;
pub mod node;
pub mod protocol;

/// Node's are representing by an integer id
pub type NodeId = u64;

/// Represents messages that can cross the node boundary which can be serialized and sent over the wire
pub trait NodeSerializableMessage {
pub trait SerializableMessage {
/// Serialize the message to binary
fn serialize(&self) -> &[u8];
fn serialize(&self) -> Vec<u8>;

/// Deserialize from binary back into the message type
fn deserialize(&self, data: &[u8]) -> Self;
}

/// The identifier of a node is a globally unique u64
pub type NodeId = u64;

/// A node in the distributed compute pool.
pub struct Node {
node_id: u64,
other_nodes: DashMap<u64, String>,
}
190 changes: 190 additions & 0 deletions ractor-cluster/src/net/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! TCP Server to accept incoming sessions
use std::collections::HashMap;
use std::{collections::hash_map::Entry, marker::PhantomData};

use ractor::rpc::CallResult;
use ractor::{Actor, ActorId, ActorRef, SupervisionEvent};
use tokio::net::TcpListener;
use tokio::time::Duration;

use super::NetworkMessage;

/// A Tcp Socket [Listener] responsible for accepting new connections and spawning [super::session::Session]s
/// which handle the message sending and receiving over the socket.
///
/// The [Listener] supervises all of the TCP [super::session::Session] actors and is responsible for logging
/// connects and disconnects as well as tracking the current open [super::session::Session] actors.
pub struct Listener<TMsg, TSessionManager, TSessionHandler>
where
TSessionManager: Actor<Msg = super::SessionManagerMessage<TSessionHandler, TMsg>>,
TMsg: NetworkMessage,
TSessionHandler: Actor<Msg = TMsg>,
{
port: super::NetworkPort,
session_manager: ActorRef<TSessionManager>,
_phantom: PhantomData<TMsg>,
_session_handler: PhantomData<TSessionHandler>,
}

impl<TMsg, TSessionManager, TSessionHandler> Listener<TMsg, TSessionManager, TSessionHandler>
where
TSessionManager: Actor<Msg = super::SessionManagerMessage<TSessionHandler, TMsg>>,
TMsg: NetworkMessage,
TSessionHandler: Actor<Msg = TMsg>,
{
/// Create a new `Listener`
pub fn new(port: super::NetworkPort, session_manager: ActorRef<TSessionManager>) -> Self {
Self {
port,
session_manager,
_phantom: PhantomData,
_session_handler: PhantomData,
}
}
}

/// The Node listener's state
pub struct ListenerState {
listener: Option<TcpListener>,
nodes: HashMap<ActorId, std::net::SocketAddr>,
}

#[async_trait::async_trait]
impl<TSessionHandler, TMsg, TSessionManager> Actor
for Listener<TMsg, TSessionManager, TSessionHandler>
where
TSessionManager: Actor<Msg = super::SessionManagerMessage<TSessionHandler, TMsg>>,
TMsg: NetworkMessage,
TSessionHandler: Actor<Msg = TMsg>,
{
type Msg = ();

type State = ListenerState;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
let addr = format!("0.0.0.0:{}", self.port);
let listener = match TcpListener::bind(&addr).await {
Ok(l) => l,
Err(err) => {
panic!("Error listening to socket: {}", err);
}
};

// startup the event processing loop by sending an initial msg
let _ = myself.cast(());

// create the initial state
Self::State {
listener: Some(listener),
nodes: HashMap::new(),
}
}

async fn post_stop(&self, _myself: ActorRef<Self>, state: &mut Self::State) {
// close the listener properly, in case anyone else has handles to the actor stopping
// total droppage
drop(state.listener.take());
}

async fn handle(&self, myself: ActorRef<Self>, _message: Self::Msg, state: &mut Self::State) {
if let Some(listener) = &mut state.listener {
match listener.accept().await {
Ok((stream, addr)) => {
// ask the session manager for a new session agent
let session = self
.session_manager
.call(
super::SessionManagerMessage::SessionOpened,
Some(Duration::from_millis(500)),
)
.await
.unwrap();
match session {
CallResult::Timeout => {
log::warn!("Timeout in trying to open session. Failed to retrieve a new Session handler from the SessionManager in {} ms. Refusing connection", 500);
}
CallResult::SenderError => {
log::error!("Sender error when trying to receive session handler");
myself.stop(Some("Session handler retrieval failure".to_string()));
}
CallResult::Success(session_handler) => {
// Spawn off the connection management actor and make me the supervisor of it
if let Some(actor) =
super::session::Session::<TSessionHandler, TMsg>::spawn_linked(
session_handler,
stream,
myself.get_cell(),
)
.await
{
state.nodes.insert(actor.get_id(), addr);
}
}
}
}
Err(socket_accept_error) => {
log::warn!(
"Error accepting socket {} on Node server",
socket_accept_error
);
}
}
}

// continue accepting new sockets
let _ = myself.cast(());
}

async fn handle_supervisor_evt(
&self,
_myself: ActorRef<Self>,
message: SupervisionEvent,
state: &mut Self::State,
) {
// sockets open, they close, the world goes round...
match message {
SupervisionEvent::ActorPanicked(actor, msg) => {
match state.nodes.entry(actor.get_id()) {
Entry::Occupied(o) => {
log::error!("Connection with {} panicked with message: {}", o.get(), msg);
o.remove();
}
Entry::Vacant(_) => {
log::error!("Connection with ([unknown]) panicked with message: {}", msg);
}
}
}
SupervisionEvent::ActorTerminated(actor, _, _) => {
match state.nodes.entry(actor.get_id()) {
Entry::Occupied(o) => {
log::error!("Connection closed with {}", o.get());
o.remove();
}
Entry::Vacant(_) => {
log::error!("Connection with ([unknown]) closed");
}
}
let _ = self
.session_manager
.cast(super::SessionManagerMessage::SessionClosed(actor.get_id()));
}
SupervisionEvent::ActorStarted(actor) => match state.nodes.entry(actor.get_id()) {
Entry::Occupied(o) => {
log::error!("Connection opened with {}", o.get());
}
Entry::Vacant(_) => {
log::error!("Connection with ([unknown]) opened");
}
},
_ => {
//no-op
}
}
}
}
37 changes: 37 additions & 0 deletions ractor-cluster/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! TCP server and session actors which transmit [prost::Message] encoded messages
// TODO: we need a way to identify which session messages are coming from + going to. Therefore
// we should actually have a notification when a new session is launched, which can be used
// to match which session is tied to which actor id

use ractor::{Actor, ActorId, ActorRef, RpcReplyPort};

pub mod listener;
pub mod session;

/// Messages to/from the session aggregator
pub enum SessionManagerMessage<TSessionHandler, TMsg>
where
TSessionHandler: Actor<Msg = TMsg>,
TMsg: NetworkMessage,
{
/// Notification when a new session is opened, and the handle to communicate with it
/// Returns the actor which will be responsible for handling messages on this session
SessionOpened(RpcReplyPort<ActorRef<TSessionHandler>>),

/// Notification when a session is closed, and the id of the actor to cleanup
SessionClosed(ActorId),
}

/// A trait which implements [prost::Message], [Default], and has a static lifetime
/// denoting protobuf-encoded messages which can be transmitted over the wire
pub trait NetworkMessage: prost::Message + Default + 'static {}
impl<T: prost::Message + Default + 'static> NetworkMessage for T {}

/// A network port
pub type NetworkPort = u16;
Loading

0 comments on commit ca8c758

Please sign in to comment.