Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests, Docker integration tests, additional serialization coverage, and proc_macro support for reply port type args #39

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Cargo
# will have compiled files and executables
/target/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk

# Remove all target compilation folders from all sub-folders as well
**/target/

# Remove code-coverage generated files from git
debug/
coverage/
**/*.profraw

.github/
5 changes: 5 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ jobs:
command: test
args: --package ${{matrix.package}} ${{matrix.flags}}

- name: Test everything
uses: actions-rs/cargo@v1
with:
command: test

clippy:
name: Clippy
runs-on: ubuntu-latest
Expand Down
40 changes: 40 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Ractor Cluster integration tests
on:
push:
branches:
- main
paths:
- 'ractor_cluster*/**'
pull_request:
types: [opened, reopened, synchronize]
paths:
- 'ractor_cluster*/**'

jobs:
test:
name: Test ractor_cluster with Docker based networked images
runs-on: ${{matrix.os}}-latest
strategy:
fail-fast: false
matrix:
toolchain: [stable]
os: [ubuntu]

steps:
- uses: actions/checkout@main

- name: Build the docker image
working-directory: .
run: |
docker compose build

- name: Authentication Handshake
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/auth-handshake.env up --exit-code-from node-b

- name: Process Groups
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/pg-groups.env up --exit-code-from node-b

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ members = [
"ractor_cluster",
"ractor_cluster_derive",
"ractor_playground",
"ractor_cluster_integration_tests",
"xtask"
]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ A pure-Rust actor framework. Inspired from [Erlang's `gen_server`](https://www.e
[<img alt="github" src="https://img.shields.io/badge/github-slawlor/ractor-8da0cb?style=for-the-badge&labelColor=555555&logo=github" height="20">](https://github.com/slawlor/ractor)
[<img alt="crates.io" src="https://img.shields.io/crates/v/ractor.svg?style=for-the-badge&color=fc8d62&logo=rust" height="20">](https://crates.io/crates/ractor)
[<img alt="docs.rs" src="https://img.shields.io/badge/docs.rs-ractor-66c2a5?style=for-the-badge&labelColor=555555&logo=docs.rs" height="20">](https://docs.rs/ractor)
[![CI/main](https://github.com/slawlor/repl/actions/workflows/ci.yaml/badge.svg?branch=main)](https://github.com/slawlor/repl/actions/workflows/ci.yaml)
[![CI/main](https://github.com/slawlor/ractor/actions/workflows/ci.yaml/badge.svg?branch=main)](https://github.com/slawlor/ractor/actions/workflows/ci.yaml)
[![codecov](https://codecov.io/gh/slawlor/ractor/branch/main/graph/badge.svg?token=61AGYYPWBA)](https://codecov.io/gh/slawlor/ractor)
![Downloads](https://img.shields.io/crates/d/ractor.svg)

Expand Down
12 changes: 9 additions & 3 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ codecov:
require_ci_to_pass: false

ignore:
- "ractor_playground"
# This is the code coverage tool.
- "xtask"
# Tests are not covered
- "ractor_playground"
# Tests are not covered
- "ractor/examples"
# Tests are not covered
- "ractor/benches"
- "ractor_cluster" # for now
- "ractor_cluster_derive"
# Tests are not covered
- "ractor_cluster_integration_tests"
# Protocol is all generated code and should be excluded from code coverage
- "ractor_cluster/src/protocol"
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
version: '3.3'
services:
node-a:
container_name: "node-a"
build:
context: .
dockerfile: ractor_cluster_integration_tests/Dockerfile
image: ractor_cluster_tests:latest
networks:
- test-net
entrypoint: ''
command: ractor_cluster_integration_tests test ${A_TEST}
environment:
RUST_LOG: debug
node-b:
depends_on:
- node-a
container_name: "node-b"
build:
context: .
dockerfile: ractor_cluster_integration_tests/Dockerfile
image: ractor_cluster_tests:latest
networks:
- test-net
entrypoint: ''
command: ractor_cluster_integration_tests test ${B_TEST}
environment:
RUST_LOG: debug
networks:
test-net:
external: false
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.6.0"
version = "0.6.1"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
4 changes: 2 additions & 2 deletions ractor/src/actor/actor_cell/actor_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl ActorProperties {
supervision: tx_supervision,
message: tx_message,
tree: SupervisionTree::default(),
type_id: std::any::TypeId::of::<TActor>(),
type_id: std::any::TypeId::of::<TActor::Msg>(),
#[cfg(feature = "cluster")]
supports_remoting: TActor::Msg::serializable(),
},
Expand Down Expand Up @@ -117,7 +117,7 @@ impl ActorProperties {
{
// Only type-check messages of local actors, remote actors send serialized
// payloads
if self.id.is_local() && self.type_id != std::any::TypeId::of::<TActor>() {
if self.id.is_local() && self.type_id != std::any::TypeId::of::<TActor::Msg>() {
return Err(MessagingErr::InvalidActorType);
}

Expand Down
6 changes: 4 additions & 2 deletions ractor/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ pub trait Message: Any + Send + Sized + 'static {

/// Convert this message to a [BoxedMessage]
#[cfg(not(feature = "cluster"))]
fn box_message(self, _pid: &ActorId) -> Result<BoxedMessage, BoxedDowncastErr> {
#[allow(unused_variables)]
fn box_message(self, pid: &ActorId) -> Result<BoxedMessage, BoxedDowncastErr> {
Ok(BoxedMessage {
msg: Some(Box::new(self)),
})
Expand All @@ -154,7 +155,8 @@ pub trait Message: Any + Send + Sized + 'static {

/// Deserialize binary data to this message type
#[cfg(feature = "cluster")]
fn deserialize(_bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
#[allow(unused_variables)]
fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
Err(BoxedDowncastErr)
}
}
Expand Down
1 change: 1 addition & 0 deletions ractor/src/port/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use output::*;

/// A remote procedure call's reply port. Wrapper of [concurrency::OneshotSender] with a
/// consistent error type
#[derive(Debug)]
pub struct RpcReplyPort<TMsg> {
port: concurrency::OneshotSender<TMsg>,
timeout: Option<concurrency::Duration>,
Expand Down
3 changes: 2 additions & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.6.0"
version = "0.6.1"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand Down Expand Up @@ -34,4 +34,5 @@ tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-
# tokio-rustls = { version = "0.23", optional = true }

[dev-dependencies]
paste = "1"
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util", "rt-multi-thread"] }
22 changes: 22 additions & 0 deletions ractor_cluster/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,25 @@ pub(crate) fn challenge_digest(secret: &'_ str, challenge: u32) -> Digest {

hash.into()
}

#[cfg(test)]
mod tests {

use super::challenge_digest;
use super::DIGEST_BYTES;

#[test]
fn test_challenge_digest_generation() {
let secret = "cookie";
let challenge: u32 = 42;
let digest = challenge_digest(secret, challenge);
assert_eq!(DIGEST_BYTES, digest.len());
assert_eq!(
digest,
[
20, 62, 0, 217, 211, 179, 29, 157, 36, 69, 47, 133, 172, 4, 68, 137, 83, 8, 26, 2,
237, 2, 39, 46, 89, 44, 91, 19, 205, 66, 46, 247
]
);
}
}
3 changes: 1 addition & 2 deletions ractor_cluster/src/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use ractor::{Actor, ActorRef};
use tokio::net::TcpListener;

use crate::node::NodeServerMessage;
use crate::RactorMessage;

/// A Tcp Socket [Listener] responsible for accepting new connections and spawning [super::session::Session]s
/// which handle the message sending and receiving over the socket.
Expand Down Expand Up @@ -40,7 +39,7 @@ pub struct ListenerState {
listener: Option<TcpListener>,
}

#[derive(RactorMessage)]
#[derive(crate::RactorMessage)]
pub struct ListenerMessage;

#[async_trait::async_trait]
Expand Down
9 changes: 0 additions & 9 deletions ractor_cluster/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,8 @@

//! TCP server and session actors which transmit [prost::Message] encoded messages

// TODO: we need a way to identify which session messages are coming from + going to. Therefore
// we should actually have a notification when a new session is launched, which can be used
// to match which session is tied to which actor id

pub mod listener;
pub mod session;

/// A trait which implements [prost::Message], [Default], and has a static lifetime
/// denoting protobuf-encoded messages which can be transmitted over the wire
pub trait NetworkMessage: prost::Message + Default + 'static {}
impl<T: prost::Message + Default + 'static> NetworkMessage for T {}

/// A network port
pub type NetworkPort = u16;
36 changes: 8 additions & 28 deletions ractor_cluster/src/net/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs

use std::convert::TryInto;
use std::marker::PhantomData;
use std::net::SocketAddr;

use bytes::Bytes;
Expand All @@ -21,7 +20,6 @@ use tokio::io::ErrorKind;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;

use super::NetworkMessage;
use crate::RactorMessage;

/// Helper method to read exactly `len` bytes from the stream into a pre-allocated buffer
Expand Down Expand Up @@ -104,7 +102,7 @@ pub enum SessionMessage {

/// The node session's state
pub struct SessionState {
writer: ActorRef<SessionWriter<crate::protocol::NetworkMessage>>,
writer: ActorRef<SessionWriter>,
reader: ActorRef<SessionReader>,
}

Expand All @@ -115,14 +113,7 @@ impl Actor for Session {

async fn pre_start(&self, myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
// spawn writer + reader child actors
let (writer, _) = Actor::spawn_linked(
None,
SessionWriter::<crate::protocol::NetworkMessage> {
_phantom: PhantomData,
},
myself.get_cell(),
)
.await?;
let (writer, _) = Actor::spawn_linked(None, SessionWriter, myself.get_cell()).await?;
let (reader, _) = Actor::spawn_linked(
None,
SessionReader {
Expand Down Expand Up @@ -220,36 +211,25 @@ impl Actor for Session {

// ========================= Node Session writer ========================= //

struct SessionWriter<TMsg>
where
TMsg: NetworkMessage,
{
_phantom: PhantomData<TMsg>,
}
struct SessionWriter;

struct SessionWriterState {
writer: Option<OwnedWriteHalf>,
}

enum SessionWriterMessage<TMsg>
where
TMsg: NetworkMessage,
{
#[derive(crate::RactorMessage)]
enum SessionWriterMessage {
/// Set the stream, providing a [TcpStream], which
/// to utilize for this node's connection
SetStream(OwnedWriteHalf),

/// Write an object over the wire
WriteObject(TMsg),
WriteObject(crate::protocol::NetworkMessage),
}
impl<TMsg> ractor::Message for SessionWriterMessage<TMsg> where TMsg: NetworkMessage {}

#[async_trait::async_trait]
impl<TMsg> Actor for SessionWriter<TMsg>
where
TMsg: NetworkMessage,
{
type Msg = SessionWriterMessage<TMsg>;
impl Actor for SessionWriter {
type Msg = SessionWriterMessage;

type State = SessionWriterState;

Expand Down
Loading