Skip to content

Commit

Permalink
Initial sketchings of how distributed node's might look, based heavil…
Browse files Browse the repository at this point in the history
…y on the Erlang

protocol.

This is a collection of tcp managing actors and session management for automated session
handling

Related issue: #16
  • Loading branch information
slawlor committed Jan 17, 2023
1 parent 67ac50c commit 48ddbb3
Show file tree
Hide file tree
Showing 22 changed files with 1,935 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

members = [
"ractor",
"ractor-cluster",
"ractor-playground",
"xtask"
]
38 changes: 38 additions & 0 deletions ractor-cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "ractor-cluster"
version = "0.4.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
license = "MIT"
edition = "2018"
keywords = ["actor", "ractor", "cluster"]
repository = "https://github.com/slawlor/ractor"
readme = "../README.md"
homepage = "https://github.com/slawlor/ractor"
categories = ["actor", "erlang"]
build = "src/build.rs"

[build-dependencies]
protobuf-src = "1"
prost-build = { version = "0.11" }

[dependencies]
## Required dependencies
async-trait = "0.1"
bytes = { version = "1" }
# dashmap = "5"
# futures = "0.3"
log = "0.4"
# once_cell = "1"
prost = { version = "0.11" }
ractor = { version = "0.4", features = ["cluster"], path = "../ractor" }
rand = "0.8"
sha2 = "0.10"
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net"]}

## Optional dependencies
# tokio-rustls = { version = "0.23", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] }
31 changes: 31 additions & 0 deletions ractor-cluster/src/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! This is the pre-compilation build script for the crate `ractor` when running in distrubted
//! mode. It's used to compile protobuf into Rust code prior to compilation.
/// The shared-path for all protobuf specifications
const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol";
/// The list of protobuf files to generate inside PROBUF_BASE_DIRECTORY
const PROTOBUF_FILES: [&str; 3] = ["meta", "node", "auth"];

fn build_protobufs() {
std::env::set_var("PROTOC", protobuf_src::protoc());

let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len());

for file in PROTOBUF_FILES.iter() {
let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file);
println!("cargo:rerun-if-changed={}", proto_file);
protobuf_files.push(proto_file);
}

prost_build::compile_protos(&protobuf_files, &[PROTOBUF_BASE_DIRECTORY]).unwrap();
}

fn main() {
// compile the spec files into Rust code
build_protobufs();
}
25 changes: 25 additions & 0 deletions ractor-cluster/src/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Hashing utilities mainly used around challenge computation
pub(crate) const DIGEST_BYTES: usize = 32;
pub(crate) type Digest = [u8; DIGEST_BYTES];

/// Compute a challenge digest
pub(crate) fn challenge_digest<'a>(secret: &'a str, challenge: u32) -> Digest {
use sha2::Digest;

let secret_bytes = secret.as_bytes();
let mut data = Vec::with_capacity(secret_bytes.len() + 4);

let challenge_bytes = challenge.to_be_bytes();
data.copy_from_slice(&challenge_bytes);
data[4..].copy_from_slice(secret_bytes);

let hash = sha2::Sha256::digest(&data);

hash.into()
}
39 changes: 22 additions & 17 deletions ractor/src/distributed/mod.rs → ractor-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,38 @@
// LICENSE-MIT file in the root directory of this source tree.

//! Support for remote nodes in a distributed cluster.
//!
//!
//! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html)
//! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes.
//!
//!
//! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool.
//! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect
//! to them as well. They merge registries and pg groups together in order to create larger clusters of services.
//!
//! For messages to be transmittable across the [Node] boundaries to other [Node]s in the pool, they need to be
//! serializable to a binary format (say protobuf)
//!
//! We have chosen protobuf for our inter-node defined protocol, however you can chose whatever medium you like
//! for binary serialization + deserialization. The "remote" actor will simply encode your message type and send it
//! over the wire for you
use dashmap::DashMap;
// #![deny(warnings)]
#![warn(unused_imports)]
#![warn(unsafe_code)]
#![warn(missing_docs)]
#![warn(unused_crate_dependencies)]
#![cfg_attr(docsrs, feature(doc_cfg))]

mod hash;
pub mod net;
pub mod node;
pub mod protocol;

/// Node's are representing by an integer id
pub type NodeId = u64;

/// Represents messages that can cross the node boundary which can be serialized and sent over the wire
pub trait NodeSerializableMessage {
pub trait SerializableMessage {
/// Serialize the message to binary
fn serialize(&self) -> &[u8];
fn serialize(&self) -> Vec<u8>;

/// Deserialize from binary back into the message type
fn deserialize(&self, data: &[u8]) -> Self;
}

/// The identifier of a node is a globally unique u64
pub type NodeId = u64;

/// A node in the distributed compute pool.
pub struct Node {
node_id: u64,
other_nodes: DashMap<u64, String>,
}
126 changes: 126 additions & 0 deletions ractor-cluster/src/net/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! TCP Server to accept incoming sessions
use ractor::rpc::CallResult;
use ractor::{Actor, ActorRef};
use tokio::net::TcpListener;
use tokio::time::Duration;

use crate::node::{SessionManagerMessage, SessionMessage};

/// A Tcp Socket [Listener] responsible for accepting new connections and spawning [super::session::Session]s
/// which handle the message sending and receiving over the socket.
///
/// The [Listener] supervises all of the TCP [super::session::Session] actors and is responsible for logging
/// connects and disconnects as well as tracking the current open [super::session::Session] actors.
pub struct Listener {
port: super::NetworkPort,
session_manager: ActorRef<crate::node::NodeServer>,
}

impl Listener {
/// Create a new `Listener`
pub fn new(
port: super::NetworkPort,
session_manager: ActorRef<crate::node::NodeServer>,
) -> Self {
Self {
port,
session_manager,
}
}
}

/// The Node listener's state
pub struct ListenerState {
listener: Option<TcpListener>,
}

#[async_trait::async_trait]
impl Actor for Listener {
type Msg = ();

type State = ListenerState;

async fn pre_start(&self, myself: ActorRef<Self>) -> Self::State {
let addr = format!("0.0.0.0:{}", self.port);
let listener = match TcpListener::bind(&addr).await {
Ok(l) => l,
Err(err) => {
panic!("Error listening to socket: {}", err);
}
};

// startup the event processing loop by sending an initial msg
let _ = myself.cast(());

// create the initial state
Self::State {
listener: Some(listener),
}
}

async fn post_stop(&self, _myself: ActorRef<Self>, state: &mut Self::State) {
// close the listener properly, in case anyone else has handles to the actor stopping
// total droppage
drop(state.listener.take());
}

async fn handle(&self, myself: ActorRef<Self>, _message: Self::Msg, state: &mut Self::State) {
if let Some(listener) = &mut state.listener {
match listener.accept().await {
Ok((stream, addr)) => {
// ask the session manager for a new session agent
let session = self
.session_manager
.call(
|tx| SessionManagerMessage::OpenSession {
is_server: true,
reply: tx,
},
Some(Duration::from_millis(500)),
)
.await
.unwrap();
match session {
CallResult::Timeout => {
log::warn!("Timeout in trying to open session. Failed to retrieve a new Session handler from the SessionManager in {} ms. Refusing connection", 500);
}
CallResult::SenderError => {
log::error!("Sender error when trying to receive session handler");
myself.stop(Some("Session handler retrieval failure".to_string()));
}
CallResult::Success(session_handler) => {
// Spawn off the connection management actor and make me the supervisor of it
let supervisor = session_handler.get_cell();
if let Ok(actor) = super::session::Session::spawn_linked(
session_handler.clone(),
stream,
addr,
supervisor,
)
.await
{
let _ = session_handler.cast(SessionMessage::SetTcpSession(actor));
log::info!("TCP Session opened for {}", addr);
}
}
}
}
Err(socket_accept_error) => {
log::warn!(
"Error accepting socket {} on Node server",
socket_accept_error
);
}
}
}

// continue accepting new sockets
let _ = myself.cast(());
}
}
21 changes: 21 additions & 0 deletions ractor-cluster/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! TCP server and session actors which transmit [prost::Message] encoded messages
// TODO: we need a way to identify which session messages are coming from + going to. Therefore
// we should actually have a notification when a new session is launched, which can be used
// to match which session is tied to which actor id

pub mod listener;
pub mod session;

/// A trait which implements [prost::Message], [Default], and has a static lifetime
/// denoting protobuf-encoded messages which can be transmitted over the wire
pub trait NetworkMessage: prost::Message + Default + 'static {}
impl<T: prost::Message + Default + 'static> NetworkMessage for T {}

/// A network port
pub type NetworkPort = u16;
Loading

0 comments on commit 48ddbb3

Please sign in to comment.