Skip to content

Commit

Permalink
[cluster] Support encrypted connections
Browse files Browse the repository at this point in the history
Encryption support is provided by

1. `tokio_rustls`
2. `rustls`

which allow for both the NodeServer received connections and outgoing client cluster connections to utilize TLS configuration.

Integration tests added, but code-coverage will likely be low as integration tests don't count in `codecov`

```bash
$ docker compose --env-file ./ractor_cluster_integration_tests/envs/encryption.env up --build --exit-code-from node-b
...
node-a  | [2023-02-21T16:58:39.401Z WARN  ractor_cluster_integration_tests::tests::encryption] CA Cert SUB=10U
                                                                                                              ponytown RSA CA
node-a  | [2023-02-21T16:58:39.402Z INFO  ractor_cluster_integration_tests::tests::encryption] Starting NodeServer on port 8199
node-a  | [2023-02-21T16:58:39.402Z INFO  ractor_cluster_integration_tests::tests::encryption] Waiting for NodeSession status updates
node-b  | [2023-02-21T16:58:39.621Z WARN  ractor_cluster_integration_tests::tests::encryption] CA Cert SUB=10U
                                                                                                              ponytown RSA CA
node-b  | [2023-02-21T16:58:39.621Z INFO  ractor_cluster_integration_tests::tests::encryption] Starting NodeServer on port 8198
node-b  | [2023-02-21T16:58:39.621Z INFO  ractor_cluster_integration_tests::tests::encryption] Connecting to remote NodeServer at node-a:8199
node-b  | [2023-02-21T16:58:39.623Z DEBUG rustls::client::hs] No cached session for DnsName(DnsName(DnsName("testserver.com")))
node-a  | [2023-02-21T16:58:39.623Z DEBUG rustls::server::hs] decided upon suite TLS13_AES_256_GCM_SHA384
node-b  | [2023-02-21T16:58:39.623Z DEBUG rustls::client::hs] Not resuming any session
[cluster] Support encrypted connections
node-b  | [2023-02-21T16:58:39.624Z DEBUG rustls::client::hs] Using ciphersuite TLS13_AES_256_GCM_SHA384
node-b  | [2023-02-21T16:58:39.624Z DEBUG rustls::client::tls13] Not resuming
node-b  | [2023-02-21T16:58:39.624Z DEBUG rustls::client::tls13] TLS1.3 encrypted extensions: [ServerNameAck]
node-b  | [2023-02-21T16:58:39.624Z DEBUG rustls::client::hs] ALPN protocol is None
node-b  | [2023-02-21T16:58:39.624Z INFO  ractor_cluster::node::client] TCP Session opened for 172.18.0.2:8199
node-b  | [2023-02-21T16:58:39.624Z INFO  ractor_cluster_integration_tests::tests::encryption] Client connected NodeServer b to NodeServer a
node-b  | [2023-02-21T16:58:39.624Z INFO  ractor_cluster_integration_tests::tests::encryption] Waiting for NodeSession status updates
node-a  | [2023-02-21T16:58:39.624Z INFO  ractor_cluster::net::listener] TCP Session opened for 172.18.0.3:34662
...
node-a exited with code 0
```
  • Loading branch information
slawlor committed Feb 21, 2023
1 parent bcbda19 commit 428b789
Show file tree
Hide file tree
Showing 76 changed files with 1,797 additions and 57 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ jobs:
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/pg-groups.env up --exit-code-from node-b
- name: Encrypted communications
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/encryption.env up --exit-code-from node-b
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ Cargo.lock
debug/
coverage/
**/*.profraw
**/.DS_Store
11 changes: 9 additions & 2 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ pub trait Actor: Sized + Sync + Send + 'static {
}

/// Handle the incoming supervision event. Unhandled panicks will captured and
/// sent the the supervisor(s). The default supervision behavior is to ignore all
/// child events. To override this behavior, implement this method.
/// sent the the supervisor(s). The default supervision behavior is to exit the
/// supervisor on any child exit. To override this behavior, implement this function.
///
/// * `myself` - A handle to the [ActorCell] representing this actor
/// * `message` - The message to process
Expand All @@ -175,6 +175,13 @@ pub trait Actor: Sized + Sync + Send + 'static {
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(who, _, _)
| SupervisionEvent::ActorPanicked(who, _) => {
myself.stop(None);
}
_ => {}
}
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ prost-types = { version = "0.11" }
ractor = { version = "0.7.2", features = ["cluster"], path = "../ractor" }
ractor_cluster_derive = { version = "0.7.2", path = "../ractor_cluster_derive" }
rand = "0.8"
rustls = { version = "0.20" }
sha2 = "0.10"
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util"]}
tokio-rustls = { version = "0.23" }

## Optional dependencies
# tokio-rustls = { version = "0.23", optional = true }
Expand Down
2 changes: 2 additions & 0 deletions ractor_cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ pub mod node;
pub type NodeId = u64;

// ============== Re-exports ============== //
pub use net::{IncomingEncryptionMode, NetworkStream};
pub use node::client::connect as client_connect;
pub use node::client::connect_enc as client_connect_enc;
pub use node::{
client::ClientConnectErr, NodeServer, NodeServerMessage, NodeSession, NodeSessionMessage,
};
Expand Down
41 changes: 35 additions & 6 deletions ractor_cluster/src/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ractor::{cast, ActorProcessingErr};
use ractor::{Actor, ActorRef};
use tokio::net::TcpListener;

use super::IncomingEncryptionMode;
use crate::node::NodeServerMessage;

/// A Tcp Socket [Listener] responsible for accepting new connections and spawning [super::session::Session]s
Expand All @@ -19,17 +20,20 @@ use crate::node::NodeServerMessage;
pub struct Listener {
port: super::NetworkPort,
session_manager: ActorRef<crate::node::NodeServer>,
encryption: IncomingEncryptionMode,
}

impl Listener {
/// Create a new `Listener`
pub fn new(
port: super::NetworkPort,
session_manager: ActorRef<crate::node::NodeServer>,
encryption: IncomingEncryptionMode,
) -> Self {
Self {
port,
session_manager,
encryption,
}
}
}
Expand Down Expand Up @@ -90,14 +94,39 @@ impl Actor for Listener {
if let Some(listener) = &mut state.listener {
match listener.accept().await {
Ok((stream, addr)) => {
let _ = cast!(
self.session_manager,
NodeServerMessage::ConnectionOpened {
let local = stream.local_addr()?;

let session = match &self.encryption {
IncomingEncryptionMode::Raw => Some(super::NetworkStream::Raw {
peer_addr: addr,
local_addr: local,
stream,
is_server: true
}),
IncomingEncryptionMode::Tls(acceptor) => {
match acceptor.accept(stream).await {
Ok(enc_stream) => Some(super::NetworkStream::TlsServer {
peer_addr: addr,
local_addr: local,
stream: enc_stream,
}),
Err(some_err) => {
log::warn!("Error establishing secure socket: {}", some_err);
None
}
}
}
);
log::info!("TCP Session opened for {}", addr);
};

if let Some(stream) = session {
let _ = cast!(
self.session_manager,
NodeServerMessage::ConnectionOpened {
stream,
is_server: true
}
);
log::info!("TCP Session opened for {}", addr);
}
}
Err(socket_accept_error) => {
log::warn!(
Expand Down
65 changes: 65 additions & 0 deletions ractor_cluster/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,73 @@

//! TCP server and session actors which transmit [prost::Message] encoded messages
use std::net::SocketAddr;

use tokio::net::TcpStream;

pub mod listener;
pub mod session;

/// A network port
pub type NetworkPort = u16;

/// A network data stream which can either be
/// 1. unencrypted
/// 2. encrypted and the server-side of the session
/// 3. encrypted and the client-side of the session
pub enum NetworkStream {
/// Unencrypted session
Raw {
/// The peer's address
peer_addr: SocketAddr,
/// The local address
local_addr: SocketAddr,
/// The stream
stream: TcpStream,
},
/// Encrypted as the server-side of the session
TlsServer {
/// The peer's address
peer_addr: SocketAddr,
/// The local address
local_addr: SocketAddr,
/// The stream
stream: tokio_rustls::server::TlsStream<TcpStream>,
},
/// Encrypted as the client-side of the session
TlsClient {
/// The peer's address
peer_addr: SocketAddr,
/// The local address
local_addr: SocketAddr,
/// The stream
stream: tokio_rustls::client::TlsStream<TcpStream>,
},
}

impl NetworkStream {
pub(crate) fn peer_addr(&self) -> SocketAddr {
match self {
Self::Raw { peer_addr, .. } => *peer_addr,
Self::TlsServer { peer_addr, .. } => *peer_addr,
Self::TlsClient { peer_addr, .. } => *peer_addr,
}
}

pub(crate) fn local_addr(&self) -> SocketAddr {
match self {
Self::Raw { local_addr, .. } => *local_addr,
Self::TlsServer { local_addr, .. } => *local_addr,
Self::TlsClient { local_addr, .. } => *local_addr,
}
}
}

/// Incoming encryption mode
#[derive(Clone)]
pub enum IncomingEncryptionMode {
/// Accept sockets raw, with no encryption
Raw,
/// Accept sockets and establish a secure connection
Tls(tokio_rustls::TlsAcceptor),
}
Loading

0 comments on commit 428b789

Please sign in to comment.