Skip to content

Commit

Permalink
Changed configuration fields name (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
dayeon5470 authored Apr 4, 2024
1 parent cd49d58 commit aa1621c
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 81 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ Versioning](https://semver.org/spec/v2.0.0.html).
`crusher.toml`, the temporary file is named as `crusher.toml.temp.toml`.
- If the reload trigger succeeds, the new configuration is applied from the
temporary file; otherwise, the temporary file is deleted.
- Changed configuration fields name.
- `roots` to `root`.
- `giganto_ingest_address` to `giganto_ingest_srv_addr`.
- `giganto_publish_address` to `giganto_publish_srv_addr`.
- `review_address` to `review_rpc_srv_addr`.

## [0.3.2] - 2024-01-25

Expand Down
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ The following is key values in the TOML configuration file.

* `key`: Crusher's private key file.
* `cert`: Crusher's certificate file.
* `roots`: RootCA file. (for Giganto, Review)
* `root`: RootCA file. (for Giganto, Review)
* `giganto_name`: the name of the Giganto. This must match with the DNS name in
the certificate.
* `giganto_ingest_address`: IP address and port number of `Giganto ingest`.
* `giganto_publish_address`: IP address and port number of `Giganto publish`.
* `giganto_ingest_srv_addr`: IP address and port number of `Giganto ingest`.
* `giganto_publish_srv_addr`: IP address and port number of `Giganto publish`.
* `review_name`: the name of the review. This must match with the DNS name in
the certificate.
* `review_address`: IP address and port number of `review`.
* `review_rpc_srv_addr`: IP address and port number of `review`.
* `last_timestamp_data`: File that stores the timestamp of the last time series
per `sampling policy`.
* `log_dir`: Path to the log file.
Expand All @@ -40,12 +40,12 @@ Example
```toml
key = "key.pem"
cert = "cert.pem"
roots = ["ca1.pem", "ca2.pem", "ca3.pem"]
root = "root.pem"
giganto_name = "localhost"
giganto_ingest_address = "127.0.0.1:38370"
giganto_publish_address = "127.0.0.1:38371"
giganto_ingest_srv_addr = "127.0.0.1:38370"
giganto_publish_srv_addr = "127.0.0.1:38371"
review_name = "localhost"
review_address ="127.0.0.1:38390"
review_rpc_srv_addr ="127.0.0.1:38390"
last_timestamp_data = "tests/time_data.json"
log_dir = "/data/logs/apps"
```
Expand Down
18 changes: 8 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@ use tokio::time::Duration;
pub const KEEP_ALIVE_INTERVAL: Duration = Duration::from_millis(5_000);
pub const SERVER_RETRY_INTERVAL: u64 = 3;

pub fn config(certs: Vec<Certificate>, key: PrivateKey, files: Vec<Vec<u8>>) -> Result<Endpoint> {
pub fn config(certs: Vec<Certificate>, key: PrivateKey, root_pem: &[u8]) -> Result<Endpoint> {
let mut root_store = rustls::RootCertStore::empty();
for file in files {
let root_cert: Vec<rustls::Certificate> = rustls_pemfile::certs(&mut &*file)
.context("invalid PEM-encoded certificate")?
.into_iter()
.map(rustls::Certificate)
.collect();
if let Some(cert) = root_cert.first() {
root_store.add(cert)?;
}
let root_certs: Vec<rustls::Certificate> = rustls_pemfile::certs(&mut &*root_pem)
.context("invalid PEM-encoded certificate")?
.into_iter()
.map(rustls::Certificate)
.collect();
if let Some(cert) = root_certs.first() {
root_store.add(cert).context("failed to add root cert")?;
}

let tls_config = rustls::ClientConfig::builder()
Expand Down
22 changes: 11 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ async fn main() -> Result<()> {
)
})?;
let key = to_private_key(&key_pem).context("cannot read private key")?;

let mut files: Vec<Vec<u8>> = Vec::new();
for root in &settings.roots {
let file = fs::read(root).expect("Failed to read file");
files.push(file);
}
let root_pem = fs::read(&settings.root).with_context(|| {
format!(
"failed to read root certificate file: {}",
settings.root.display()
)
})?;

read_last_timestamp(&settings.last_timestamp_data).await?;

let (request_send, request_recv) =
async_channel::bounded::<RequestedPolicy>(REQUESTED_POLICY_CHANNEL_SIZE);

let request_client = request::Client::new(
settings.review_address,
settings.review_rpc_srv_addr,
settings.review_name,
cert.clone(),
key.clone(),
files.clone(),
&root_pem,
request_send,
);
let runtime_policy_list = Arc::new(RwLock::new(HashMap::new())); // current sampling_policy value
Expand All @@ -98,13 +98,13 @@ async fn main() -> Result<()> {
));

let subscribe_client = subscribe::Client::new(
settings.giganto_ingest_address,
settings.giganto_publish_address,
settings.giganto_ingest_srv_addr,
settings.giganto_publish_srv_addr,
settings.giganto_name,
settings.last_timestamp_data,
cert,
key,
files,
&root_pem,
request_recv,
);
task::spawn(subscribe_client.run(
Expand Down
4 changes: 2 additions & 2 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ impl Client {
server_name: String,
certs: Vec<Certificate>,
key: PrivateKey,
files: Vec<Vec<u8>>,
root_pem: &[u8],
request_send: Sender<RequestedPolicy>,
) -> Self {
let endpoint = client::config(certs, key, files)
let endpoint = client::config(certs, key, root_pem)
.expect("server configuration error with cert, key or root");
Client {
server_address,
Expand Down
54 changes: 27 additions & 27 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@ use std::{
use toml_edit::{value, Document};

const DEFAULT_GIGANTO_NAME: &str = "localhost";
const DEFAULT_GIGANTO_INGEST_ADDRESS: &str = "[::]:38370";
const DEFAULT_GIGANTO_PUBLISH_ADDRESS: &str = "[::]:38371";
const DEFAULT_GIGANTO_INGEST_SRV_ADDR: &str = "[::]:38370";
const DEFAULT_GIGANTO_PUBLISH_SRV_ADDR: &str = "[::]:38371";
const DEFAULT_REVIEW_NAME: &str = "localhost";
const DEFAULT_REVIEW_ADDRESS: &str = "[::]:38390";
const DEFAULT_REVIEW_RPC_SRV_ADDR: &str = "[::]:38390";
pub const TEMP_TOML_POST_FIX: &str = ".temp.toml";

/// The application settings.
#[derive(Clone, Debug, Deserialize)]
pub struct Settings {
pub cert: PathBuf, // Path to the certificate file
pub key: PathBuf, // Path to the private key file
pub roots: Vec<PathBuf>, // Path to the rootCA file
pub root: PathBuf, // Path to the rootCA file
pub giganto_name: String, // host name to giganto
#[serde(deserialize_with = "deserialize_socket_addr")]
pub giganto_ingest_address: SocketAddr, // IP address & port to giganto
pub giganto_ingest_srv_addr: SocketAddr, // IP address & port to giganto
#[serde(deserialize_with = "deserialize_socket_addr")]
pub giganto_publish_address: SocketAddr, // IP address & port to giganto
pub giganto_publish_srv_addr: SocketAddr, // IP address & port to giganto
pub review_name: String, // host name to review
#[serde(deserialize_with = "deserialize_socket_addr")]
pub review_address: SocketAddr, // IP address & port to review
pub review_rpc_srv_addr: SocketAddr, // IP address & port to review
pub last_timestamp_data: PathBuf, // Path to the last series timestamp data file
pub log_dir: PathBuf,
}
Expand Down Expand Up @@ -82,13 +82,13 @@ fn default_config_builder() -> ConfigBuilder<DefaultState> {
.expect("default key dir")
.set_default("giganto_name", DEFAULT_GIGANTO_NAME)
.expect("valid name")
.set_default("giganto_ingest_address", DEFAULT_GIGANTO_INGEST_ADDRESS)
.set_default("giganto_ingest_srv_addr", DEFAULT_GIGANTO_INGEST_SRV_ADDR)
.expect("valid address")
.set_default("giganto_publish_address", DEFAULT_GIGANTO_PUBLISH_ADDRESS)
.set_default("giganto_publish_srv_addr", DEFAULT_GIGANTO_PUBLISH_SRV_ADDR)
.expect("valid address")
.set_default("review_name", DEFAULT_REVIEW_NAME)
.expect("valid name")
.set_default("review_address", DEFAULT_REVIEW_ADDRESS)
.set_default("review_rpc_srv_addr", DEFAULT_REVIEW_RPC_SRV_ADDR)
.expect("valid address")
.set_default(
"last_timestamp_data",
Expand All @@ -115,29 +115,29 @@ pub fn get_config(config_path: &str) -> Result<Config> {
let toml = fs::read_to_string(config_path).context("toml not found")?;
let doc = toml.parse::<Document>()?;

let review_address = doc
.get("review_address")
.context("\"review_address\" not found")?
let review_rpc_srv_addr = doc
.get("review_rpc_srv_addr")
.context("\"review_rpc_srv_addr\" not found")?
.to_string()
.trim_matches('\"')
.parse::<SocketAddr>()?;
let giganto_publish_address = doc
.get("giganto_publish_address")
.context("\"giganto_publish_address\" not found")?
let giganto_publish_srv_addr = doc
.get("giganto_publish_srv_addr")
.context("\"giganto_publish_srv_addr\" not found")?
.to_string()
.trim_matches('\"')
.parse::<SocketAddr>()?;
let giganto_ingest_address = doc
.get("giganto_ingest_address")
.context("\"giganto_ingest_address\" not found")?
let giganto_ingest_srv_addr = doc
.get("giganto_ingest_srv_addr")
.context("\"giganto_ingest_srv_addr\" not found")?
.to_string()
.trim_matches('\"')
.parse::<SocketAddr>()?;

Ok(Config::Crusher(CrusherConfig {
review_address,
giganto_publish_address: Some(giganto_publish_address),
giganto_ingest_address: Some(giganto_ingest_address),
review_address: review_rpc_srv_addr,
giganto_publish_address: Some(giganto_publish_srv_addr),
giganto_ingest_address: Some(giganto_ingest_srv_addr),
}))
}

Expand All @@ -148,12 +148,12 @@ pub fn set_config(config: &Config, config_path: &str) -> Result<()> {
let mut doc = config_toml.parse::<Document>()?;

if let Config::Crusher(conf) = config {
doc["review_address"] = value(conf.review_address.to_string());
if let Some(giganto_ingest_address) = conf.giganto_ingest_address {
doc["giganto_ingest_address"] = value(giganto_ingest_address.to_string());
doc["review_rpc_srv_addr"] = value(conf.review_address.to_string());
if let Some(giganto_ingest_srv_addr) = conf.giganto_ingest_address {
doc["giganto_ingest_srv_addr"] = value(giganto_ingest_srv_addr.to_string());
}
if let Some(giganto_publish_address) = conf.giganto_publish_address {
doc["giganto_publish_address"] = value(giganto_publish_address.to_string());
if let Some(giganto_publish_srv_addr) = conf.giganto_publish_address {
doc["giganto_publish_srv_addr"] = value(giganto_publish_srv_addr.to_string());
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ impl Client {
last_series_time_path: PathBuf,
certs: Vec<Certificate>,
key: PrivateKey,
files: Vec<Vec<u8>>,
root_pem: &[u8],
request_recv: Receiver<RequestedPolicy>,
) -> Self {
let endpoint = client::config(certs, key, files)
let endpoint = client::config(certs, key, root_pem)
.expect("Server configuration error with cert, key or root");
Client {
ingest_addr,
Expand Down
32 changes: 15 additions & 17 deletions src/subscribe/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,19 @@ async fn handle_connection(conn: quinn::Connecting) {
}

fn config_server() -> ServerConfig {
let (cert, key, ca_certs) = cert_key();

let mut client_auth_roots = rustls::RootCertStore::empty();
for ca_cert in ca_certs {
let root_cert: Vec<rustls::Certificate> = rustls_pemfile::certs(&mut &*ca_cert)
.unwrap()
.into_iter()
.map(rustls::Certificate)
.collect();
if let Some(cert) = root_cert.get(0) {
client_auth_roots.add(cert).unwrap();
}
let (cert, key, root_pem) = cert_key();

let mut client_auth_root = rustls::RootCertStore::empty();
let root_certs: Vec<rustls::Certificate> = rustls_pemfile::certs(&mut &*root_pem)
.unwrap()
.into_iter()
.map(rustls::Certificate)
.collect();
if let Some(cert) = root_certs.get(0) {
client_auth_root.add(cert).unwrap();
}

let client_auth = rustls::server::AllowAnyAuthenticatedClient::new(client_auth_roots).boxed();
let client_auth = rustls::server::AllowAnyAuthenticatedClient::new(client_auth_root).boxed();
let server_crypto = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_client_cert_verifier(client_auth)
Expand All @@ -103,7 +101,7 @@ fn config_server() -> ServerConfig {
}

fn client() -> Client {
let (cert, key, ca_certs) = cert_key();
let (cert, key, root_pem) = cert_key();
let (_, rx) = async_channel::unbounded();

Client::new(
Expand All @@ -113,17 +111,17 @@ fn client() -> Client {
PathBuf::from(LAST_TIME_SERIES_PATH),
cert,
key,
ca_certs,
&root_pem,
rx,
)
}

fn cert_key() -> (Vec<Certificate>, PrivateKey, Vec<Vec<u8>>) {
fn cert_key() -> (Vec<Certificate>, PrivateKey, Vec<u8>) {
let cert_pem = fs::read(CERT_PATH).unwrap();
let cert = to_cert_chain(&cert_pem).unwrap();
let key_pem = fs::read(KEY_PATH).unwrap();
let key = to_private_key(&key_pem).unwrap();
let ca_certs = vec![fs::read(CA_CERT_PATH).unwrap()];
let ca_certs = fs::read(CA_CERT_PATH).unwrap();

(cert, key, ca_certs)
}
Expand Down
8 changes: 4 additions & 4 deletions tests/config.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
key = "tests/key.pem"
cert = "tests/cert.pem"
roots = ["tests/root.pem"]
root = "tests/root.pem"
giganto_name = "localhost"
giganto_ingest_address = "127.0.0.1:38370"
giganto_publish_address = "127.0.0.1:38371"
giganto_ingest_srv_addr = "127.0.0.1:38370"
giganto_publish_srv_addr = "127.0.0.1:38371"
review_name = "localhost"
review_address ="127.0.0.1:38390"
review_rpc_srv_addr ="127.0.0.1:38390"
last_timestamp_data = "tests/time_data.json"
log_dir = "tests/logs/apps"

0 comments on commit aa1621c

Please sign in to comment.