From 18394e0dce8c73eab861a560a0b3048d1527e22b Mon Sep 17 00:00:00 2001 From: Riccardo Gallo Date: Wed, 2 Oct 2024 17:55:57 +0200 Subject: [PATCH] refactor(con): change retrieval of Astarte config Now it is only possible to specify the type of connection (mqtt or grpc) through the ASTARTE_CONNECTION env variable. The application will first try to retrieve the config from env; if an error occurs, it tries to retrieve them from a config.toml (if a path has beem specified). The merge operation for the 2 sources of config has been removed. Signed-off-by: Riccardo Gallo --- Cargo.lock | 1 - Cargo.toml | 1 - src/astarte.rs | 248 +++++++++++++++---------------------------------- src/cli.rs | 4 - src/main.rs | 25 ++--- 5 files changed, 89 insertions(+), 190 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 258fee1..f2d316d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2052,7 +2052,6 @@ dependencies = [ "rand", "serde", "tokio", - "tokio-stream", "toml", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index b879332..7c34bee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ clap = { version = "=4.4.18", features = ["derive", "env", "string"] } color-eyre = "0.6.3" toml = "0.8.12" tokio = { version = "1.37.0", features = ["rt-multi-thread", "sync", "macros", "signal"] } -tokio-stream = "0.1.15" tracing = "0.1.37" tracing-subscriber = { version = "0.3.0", features = ["env-filter"]} rand = "0.8.5" diff --git a/src/astarte.rs b/src/astarte.rs index 15397fe..aa88f9b 100644 --- a/src/astarte.rs +++ b/src/astarte.rs @@ -15,7 +15,7 @@ use astarte_device_sdk::transport::mqtt::{Credential, Mqtt, MqttConfig}; use astarte_device_sdk::{Client, DeviceClient, DeviceConnection}; use clap::ValueEnum; use color_eyre::eyre; -use color_eyre::eyre::{OptionExt, WrapErr}; +use color_eyre::eyre::{eyre, OptionExt, WrapErr}; use serde::Deserialize; use std::path::{Path, PathBuf}; use std::time::SystemTime; @@ -60,78 +60,67 @@ pub struct ConnectionConfigBuilder { store_directory: Option, /// Astarte Device SDK config options #[serde(rename = "mqtt", default)] - mqtt_config: MqttConfigBuilder, - /// Astarte Message Hub config options + mqtt_config: Option, + /// Astarte Message Hub endpoint #[serde(rename = "grpc", default)] - grpc_config: GrpcConfigBuilder, + grpc_config: Option, } impl ConnectionConfigBuilder { - /// Builder constructor + /// Init astarte config from env var if they have been set /// - /// Specify if the builder should use the Astarte Device SDK or the Astarte Message Hub - pub fn with_connection(astarte_connection: Option) -> Self { - Self { - astarte_connection, - store_directory: None, - mqtt_config: MqttConfigBuilder::default(), - grpc_config: GrpcConfigBuilder::default(), - } - } + /// If an error is returned, it means that one or more environment variables have not been set + pub fn try_from_env(&mut self) -> eyre::Result<()> { + let con = env::var("ASTARTE_CONNECTION") + .map(|s| AstarteConnection::from_str(&s, true))? + .map_err(|err| eyre!(err))?; - /// Init astarte config from env var if they have been set - pub fn from_env(&mut self) { - // doesn't change it if it's been set from CLI - if self.astarte_connection.is_none() { - self.astarte_connection = env::var("ASTARTE_CONNECTION") - .ok() - .map(|s| AstarteConnection::from_str(&s, true)) - .transpose() - .ok() - .unwrap_or_default(); - } + self.store_directory = Some(env::var("ASTARTE_STORE_DIRECTORY").map(PathBuf::from)?); - self.store_directory = env::var("ASTARTE_STORE_DIRECTORY").ok().map(PathBuf::from); - - // update the mqtt config info - let device_id = env::var("ASTARTE_DEVICE_ID").ok(); - let realm = env::var("ASTARTE_REALM").ok(); - let pairing_url = env::var("ASTARTE_PAIRING_URL").ok(); - let astarte_ignore_ssl = env::var("ASTARTE_IGNORE_SSL_ERRORS") - .map(|s| s.parse().unwrap_or_default()) - .ok(); - let credential = env::var("ASTARTE_CREDENTIALS_SECRET") - .ok() - .map(Credential::secret) - .or_else(|| { - env::var("ASTARTE_PAIRING_TOKEN") - .ok() - .map(Credential::paring_token) - }); - - self.mqtt_config = MqttConfigBuilder { - device_id, - realm, - credential, - pairing_url, - astarte_ignore_ssl, - }; + match con { + AstarteConnection::Mqtt => { + self.astarte_connection = Some(con); + + // update the mqtt config info + let device_id = env::var("ASTARTE_DEVICE_ID")?; + let realm = env::var("ASTARTE_REALM")?; + let pairing_url = env::var("ASTARTE_PAIRING_URL")?; + let astarte_ignore_ssl = env::var("ASTARTE_IGNORE_SSL_ERRORS") + .map(|s| s.parse::().unwrap_or_default())?; + let credential = env::var("ASTARTE_CREDENTIALS_SECRET") + .map(Credential::secret) + .or_else(|_| env::var("ASTARTE_PAIRING_TOKEN").map(Credential::paring_token))?; + + self.mqtt_config = Some(MqttConfigBuilder { + device_id, + realm, + credential, + pairing_url, + ignore_ssl_errors: astarte_ignore_ssl, + }); + } + AstarteConnection::Grpc => { + self.astarte_connection = Some(con); - // update the mqtt config info - let endpoint = env::var("ASTARTE_MSGHUB_ENDPOINT").ok(); + let endpoint = env::var("ASTARTE_MSGHUB_ENDPOINT")?; - self.grpc_config = GrpcConfigBuilder { endpoint }; + // update the grpc config info + self.grpc_config = Some(GrpcConfigBuilder { endpoint }); + } + } + + Ok(()) } /// Update the missing config values taking them from a config.toml file - pub async fn update_with_toml(&mut self, path: impl AsRef) { + pub async fn from_toml(&mut self, path: impl AsRef) { match tokio::fs::read_to_string(&path).await { Ok(file) => { // retrieve the astarte config information from the config.toml file match toml::from_str::(&file) { Ok(toml_cfg) => { // update the configs - self.merge(toml_cfg.astarte); + *self = toml_cfg.astarte; } Err(err) => { error!("error deserializing astarte cfg from toml: {err}"); @@ -147,41 +136,6 @@ impl ConnectionConfigBuilder { } } - /// Merge two configs - /// - /// Prioritize the already existing fields - fn merge(&mut self, other: ConnectionConfigBuilder) { - // doesn't change it if it's been set from CLI or from ENV - if self.astarte_connection.is_none() { - self.astarte_connection = other.astarte_connection; - } - - self.store_directory = self.store_directory.take().or(other.store_directory); - - // update the mqtt config info - let mqtt_config = &mut self.mqtt_config; - - mqtt_config.device_id = mqtt_config.device_id.take().or(other.mqtt_config.device_id); - mqtt_config.realm = mqtt_config.realm.take().or(other.mqtt_config.realm); - mqtt_config.credential = mqtt_config - .credential - .take() - .or(other.mqtt_config.credential); - mqtt_config.pairing_url = mqtt_config - .pairing_url - .take() - .or(other.mqtt_config.pairing_url); - mqtt_config.astarte_ignore_ssl = mqtt_config - .astarte_ignore_ssl - .take() - .or(other.mqtt_config.astarte_ignore_ssl); - - // update the grpc config info - let grpc_config = &mut self.grpc_config; - - grpc_config.endpoint = grpc_config.endpoint.take().or(other.grpc_config.endpoint); - } - /// Build a complete Astarte configuration or return an error pub async fn build(self) -> eyre::Result<(DeviceClient, SdkConnection)> { let astarte_connection = self @@ -198,7 +152,8 @@ impl ConnectionConfigBuilder { match astarte_connection { AstarteConnection::Mqtt => { // define MQTT configuration options - let mqtt_cfg = self.mqtt_config.build()?; + let mqtt_cfg: MqttConfig = + self.mqtt_config.ok_or_eyre("invalid mqtt config")?.into(); debug!("parsed Astarte Device Sdk config: {:#?}", mqtt_cfg); // connect to Astarte @@ -207,7 +162,11 @@ impl ConnectionConfigBuilder { Ok((client, SdkConnection::Mqtt(connection))) } AstarteConnection::Grpc => { - let grpc_cfg = self.grpc_config.build()?; + let grpc_endpoint = self.grpc_config.ok_or_eyre("invalid grpc config")?.endpoint; + + let grpc_cfg = GrpcConfig::from_url(STREAM_RUST_TEST_NODE_UUID, grpc_endpoint) + .wrap_err("failed to create a gRPC config")?; + debug!("parsed Astarte Message Hub config: {:#?}", grpc_cfg); let (client, connection) = builder.connect(grpc_cfg).await?.build().await; @@ -227,39 +186,38 @@ pub enum SdkConnection { } /// Config for an MQTT connection to Astarte -#[derive(Debug, Default, Deserialize)] +/// +/// The struct isn't really necessary, nevertheless we cannot deserialize the entire [MqttConfig] +/// struct without having the fields `keepalive`, `conn_timeout` and `bounded_channel_size`. +#[derive(Debug, Deserialize)] struct MqttConfigBuilder { /// Device ID - device_id: Option, + device_id: String, /// Astarte realm - realm: Option, + realm: String, /// Device credential #[serde(flatten)] - credential: Option, + credential: Credential, /// Astarte pairing url - pairing_url: Option, + pairing_url: String, /// Flag to ignore Astarte SSL errors - astarte_ignore_ssl: Option, + ignore_ssl_errors: bool, } -impl MqttConfigBuilder { - fn build(self) -> eyre::Result { - let device_id = self.device_id.ok_or_eyre("missing device id")?; - let realm = self.realm.ok_or_eyre("missing realm")?; - let credential = self - .credential - .ok_or_eyre("missing either a credential secret or a pairing token")?; - let pairing_url = self.pairing_url.ok_or_eyre("missing pairing url")?; - // if missing, set the ignore ssl error flat to false - let astarte_ignore_ssl = self.astarte_ignore_ssl.unwrap_or_default(); - - let mut mqtt_cfg = MqttConfig::new(realm, device_id, credential, pairing_url); - - if astarte_ignore_ssl { - mqtt_cfg.ignore_ssl_errors(); +impl From for MqttConfig { + fn from(value: MqttConfigBuilder) -> Self { + let mut cfg = MqttConfig::new( + value.realm, + value.device_id, + value.credential, + value.pairing_url, + ); + + if value.ignore_ssl_errors { + cfg.ignore_ssl_errors(); } - Ok(mqtt_cfg) + cfg } } @@ -267,18 +225,7 @@ impl MqttConfigBuilder { #[derive(Debug, Default, Deserialize)] struct GrpcConfigBuilder { /// The Endpoint of the Astarte Message Hub - endpoint: Option, -} - -impl GrpcConfigBuilder { - fn build(self) -> eyre::Result { - let endpoint = self.endpoint.ok_or_eyre("missing endpoint")?; - - let grpc_cfg = GrpcConfig::from_url(STREAM_RUST_TEST_NODE_UUID, endpoint) - .wrap_err("failed to create a gRPC config")?; - - Ok(grpc_cfg) - } + endpoint: String, } /// Send data to Astarte @@ -319,39 +266,10 @@ mod test { #[tokio::test] async fn test_connection_config_builder_build_failures() { // empty config builder cannot build successfully - let cfg_builder = ConnectionConfigBuilder::with_connection(None).build().await; + let cfg_builder = ConnectionConfigBuilder::default().build().await; assert!(cfg_builder.is_err()); - // config builder with only the connection to astarte specified cannot build successfully - let con = AstarteConnection::Mqtt; - let cfg_builder = ConnectionConfigBuilder::with_connection(Some(con)) - .build() - .await; - assert!(cfg_builder.is_err()); - - // check that only the astarte connection is added to the configuration - let mut cfg_builder = ConnectionConfigBuilder::with_connection(None); - let toml_str = r#" - [astarte] - connection = "mqtt" - "#; - let toml = toml::from_str::(toml_str).unwrap(); - cfg_builder.merge(toml.astarte); - assert!(cfg_builder.astarte_connection.is_some()); - - // check that the astarte connection is not updated with toml info if it already contains a - // value - let con = AstarteConnection::Mqtt; - let mut cfg_builder = ConnectionConfigBuilder::with_connection(Some(con)); - let toml_str = r#" - [astarte] - connection = "grpc" - "#; - let toml = toml::from_str::(toml_str).unwrap(); - cfg_builder.merge(toml.astarte); - assert_eq!(cfg_builder.astarte_connection, Some(con)); - - // define store dire for the next tests + // define store dir for the next tests let mut tmp_dir = env::temp_dir(); tmp_dir.push("stream-rust-test-tests"); std::fs::create_dir_all(&tmp_dir).expect("failed to create store dir"); @@ -365,21 +283,5 @@ mod test { }; let res = cfg_builder.build().await; assert!(res.is_err()); - - // check that the store path is not updated with toml info if it already contains a value - let mut cfg_builder = ConnectionConfigBuilder { - astarte_connection: Some(AstarteConnection::Mqtt), - store_directory: Some(tmp_dir.clone()), - mqtt_config: Default::default(), - grpc_config: Default::default(), - }; - let toml_str = r#" - [astarte] - connection = "grpc" - store_directory = "/tmp/stream-rust-test/store/" - "#; - let toml = toml::from_str::(toml_str).unwrap(); - cfg_builder.merge(toml.astarte); - assert_eq!(cfg_builder.store_directory, Some(tmp_dir.clone())); } } diff --git a/src/cli.rs b/src/cli.rs index 14aa155..31ce577 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -6,7 +6,6 @@ //! CLI configuration options -use crate::astarte::AstarteConnection; use crate::math::MathFunction; use clap::Parser; use std::path::PathBuf; @@ -15,9 +14,6 @@ use std::path::PathBuf; #[derive(Debug, Clone, Parser)] #[clap(version, about)] pub struct Config { - /// Either use the Astarte Device SDK or the Astarte Message Hub - #[clap(short = 'c', long, env = "ASTARTE_CONNECTION")] - pub astarte_connection: Option, /// Path to the directory containing the Astarte configuration file config.toml /// /// First, the Astarte configuration is taken from ENV vars, then from the config.toml if the diff --git a/src/main.rs b/src/main.rs index d8de0a6..52d7ae5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::time::SystemTime; use stream_rust_test::astarte::{send_data, ConnectionConfigBuilder, SdkConnection}; use stream_rust_test::cli::Config; use tokio::task::JoinSet; -use tracing::{debug, error}; +use tracing::{debug, error, info, warn}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{fmt, EnvFilter}; @@ -40,21 +40,24 @@ async fn main() -> eyre::Result<()> { // initialize CLI configuration options let cli_cfg = Config::parse(); - debug!("Parsed config: {:#?}", cli_cfg); + debug!("parsed CLI config: {:#?}", cli_cfg); let mut tasks = JoinSet::>::new(); - // Load astarte configuration - let mut astarte_cfg_builder = - ConnectionConfigBuilder::with_connection(cli_cfg.astarte_connection); - // populate the builder using the environment variables (if set) - astarte_cfg_builder.from_env(); + let mut astarte_cfg_builder = ConnectionConfigBuilder::default(); - if let Some(path) = &cli_cfg.astarte_config_path { - let path = path.join("config.toml"); - astarte_cfg_builder.update_with_toml(path).await; - } + info!("retrieve Astarte connection config from ENV"); + if let Err(err) = astarte_cfg_builder.try_from_env() { + warn!("failed to retrieve Astarte connection config from ENV: {err}"); + + if let Some(path) = &cli_cfg.astarte_config_path { + let path = path.join("config.toml"); + info!("retrieve Astarte connection config from {}", path.display()); + + astarte_cfg_builder.from_toml(path).await; + } + }; let (client, connection) = astarte_cfg_builder.build().await?;