Skip to content

Commit

Permalink
refactor(con): change retrieval of Astarte config
Browse files Browse the repository at this point in the history
Now it is only possible to specify the type of connection (mqtt or grpc)
through the ASTARTE_CONNECTION env variable. The application will first
try to retrieve the config from env; if an error occurs, it tries to
retrieve them from a config.toml (if a path has beem specified). The
merge operation for the 2 sources of config has been removed.

Signed-off-by: Riccardo Gallo <riccardo.gallo@secomind.com>
  • Loading branch information
rgallor committed Nov 29, 2024
1 parent 45e2c83 commit 18394e0
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 190 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ clap = { version = "=4.4.18", features = ["derive", "env", "string"] }
color-eyre = "0.6.3"
toml = "0.8.12"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "sync", "macros", "signal"] }
tokio-stream = "0.1.15"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"]}
rand = "0.8.5"
Expand Down
248 changes: 75 additions & 173 deletions src/astarte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use astarte_device_sdk::transport::mqtt::{Credential, Mqtt, MqttConfig};
use astarte_device_sdk::{Client, DeviceClient, DeviceConnection};
use clap::ValueEnum;
use color_eyre::eyre;
use color_eyre::eyre::{OptionExt, WrapErr};
use color_eyre::eyre::{eyre, OptionExt, WrapErr};
use serde::Deserialize;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
Expand Down Expand Up @@ -60,78 +60,67 @@ pub struct ConnectionConfigBuilder {
store_directory: Option<PathBuf>,
/// Astarte Device SDK config options
#[serde(rename = "mqtt", default)]
mqtt_config: MqttConfigBuilder,
/// Astarte Message Hub config options
mqtt_config: Option<MqttConfigBuilder>,
/// Astarte Message Hub endpoint
#[serde(rename = "grpc", default)]
grpc_config: GrpcConfigBuilder,
grpc_config: Option<GrpcConfigBuilder>,
}

impl ConnectionConfigBuilder {
/// Builder constructor
/// Init astarte config from env var if they have been set
///
/// Specify if the builder should use the Astarte Device SDK or the Astarte Message Hub
pub fn with_connection(astarte_connection: Option<AstarteConnection>) -> Self {
Self {
astarte_connection,
store_directory: None,
mqtt_config: MqttConfigBuilder::default(),
grpc_config: GrpcConfigBuilder::default(),
}
}
/// If an error is returned, it means that one or more environment variables have not been set
pub fn try_from_env(&mut self) -> eyre::Result<()> {
let con = env::var("ASTARTE_CONNECTION")
.map(|s| AstarteConnection::from_str(&s, true))?
.map_err(|err| eyre!(err))?;

/// Init astarte config from env var if they have been set
pub fn from_env(&mut self) {
// doesn't change it if it's been set from CLI
if self.astarte_connection.is_none() {
self.astarte_connection = env::var("ASTARTE_CONNECTION")
.ok()
.map(|s| AstarteConnection::from_str(&s, true))
.transpose()
.ok()
.unwrap_or_default();
}
self.store_directory = Some(env::var("ASTARTE_STORE_DIRECTORY").map(PathBuf::from)?);

self.store_directory = env::var("ASTARTE_STORE_DIRECTORY").ok().map(PathBuf::from);

// update the mqtt config info
let device_id = env::var("ASTARTE_DEVICE_ID").ok();
let realm = env::var("ASTARTE_REALM").ok();
let pairing_url = env::var("ASTARTE_PAIRING_URL").ok();
let astarte_ignore_ssl = env::var("ASTARTE_IGNORE_SSL_ERRORS")
.map(|s| s.parse().unwrap_or_default())
.ok();
let credential = env::var("ASTARTE_CREDENTIALS_SECRET")
.ok()
.map(Credential::secret)
.or_else(|| {
env::var("ASTARTE_PAIRING_TOKEN")
.ok()
.map(Credential::paring_token)
});

self.mqtt_config = MqttConfigBuilder {
device_id,
realm,
credential,
pairing_url,
astarte_ignore_ssl,
};
match con {
AstarteConnection::Mqtt => {
self.astarte_connection = Some(con);

// update the mqtt config info
let device_id = env::var("ASTARTE_DEVICE_ID")?;
let realm = env::var("ASTARTE_REALM")?;
let pairing_url = env::var("ASTARTE_PAIRING_URL")?;
let astarte_ignore_ssl = env::var("ASTARTE_IGNORE_SSL_ERRORS")
.map(|s| s.parse::<bool>().unwrap_or_default())?;
let credential = env::var("ASTARTE_CREDENTIALS_SECRET")
.map(Credential::secret)
.or_else(|_| env::var("ASTARTE_PAIRING_TOKEN").map(Credential::paring_token))?;

self.mqtt_config = Some(MqttConfigBuilder {
device_id,
realm,
credential,
pairing_url,
ignore_ssl_errors: astarte_ignore_ssl,
});
}
AstarteConnection::Grpc => {
self.astarte_connection = Some(con);

// update the mqtt config info
let endpoint = env::var("ASTARTE_MSGHUB_ENDPOINT").ok();
let endpoint = env::var("ASTARTE_MSGHUB_ENDPOINT")?;

self.grpc_config = GrpcConfigBuilder { endpoint };
// update the grpc config info
self.grpc_config = Some(GrpcConfigBuilder { endpoint });
}
}

Ok(())
}

/// Update the missing config values taking them from a config.toml file
pub async fn update_with_toml(&mut self, path: impl AsRef<Path>) {
pub async fn from_toml(&mut self, path: impl AsRef<Path>) {
match tokio::fs::read_to_string(&path).await {
Ok(file) => {
// retrieve the astarte config information from the config.toml file
match toml::from_str::<ConfigToml>(&file) {
Ok(toml_cfg) => {
// update the configs
self.merge(toml_cfg.astarte);
*self = toml_cfg.astarte;
}
Err(err) => {
error!("error deserializing astarte cfg from toml: {err}");
Expand All @@ -147,41 +136,6 @@ impl ConnectionConfigBuilder {
}
}

/// Merge two configs
///
/// Prioritize the already existing fields
fn merge(&mut self, other: ConnectionConfigBuilder) {
// doesn't change it if it's been set from CLI or from ENV
if self.astarte_connection.is_none() {
self.astarte_connection = other.astarte_connection;
}

self.store_directory = self.store_directory.take().or(other.store_directory);

// update the mqtt config info
let mqtt_config = &mut self.mqtt_config;

mqtt_config.device_id = mqtt_config.device_id.take().or(other.mqtt_config.device_id);
mqtt_config.realm = mqtt_config.realm.take().or(other.mqtt_config.realm);
mqtt_config.credential = mqtt_config
.credential
.take()
.or(other.mqtt_config.credential);
mqtt_config.pairing_url = mqtt_config
.pairing_url
.take()
.or(other.mqtt_config.pairing_url);
mqtt_config.astarte_ignore_ssl = mqtt_config
.astarte_ignore_ssl
.take()
.or(other.mqtt_config.astarte_ignore_ssl);

// update the grpc config info
let grpc_config = &mut self.grpc_config;

grpc_config.endpoint = grpc_config.endpoint.take().or(other.grpc_config.endpoint);
}

/// Build a complete Astarte configuration or return an error
pub async fn build(self) -> eyre::Result<(DeviceClient<SqliteStore>, SdkConnection)> {
let astarte_connection = self
Expand All @@ -198,7 +152,8 @@ impl ConnectionConfigBuilder {
match astarte_connection {
AstarteConnection::Mqtt => {
// define MQTT configuration options
let mqtt_cfg = self.mqtt_config.build()?;
let mqtt_cfg: MqttConfig =
self.mqtt_config.ok_or_eyre("invalid mqtt config")?.into();
debug!("parsed Astarte Device Sdk config: {:#?}", mqtt_cfg);

// connect to Astarte
Expand All @@ -207,7 +162,11 @@ impl ConnectionConfigBuilder {
Ok((client, SdkConnection::Mqtt(connection)))
}
AstarteConnection::Grpc => {
let grpc_cfg = self.grpc_config.build()?;
let grpc_endpoint = self.grpc_config.ok_or_eyre("invalid grpc config")?.endpoint;

let grpc_cfg = GrpcConfig::from_url(STREAM_RUST_TEST_NODE_UUID, grpc_endpoint)
.wrap_err("failed to create a gRPC config")?;

debug!("parsed Astarte Message Hub config: {:#?}", grpc_cfg);

let (client, connection) = builder.connect(grpc_cfg).await?.build().await;
Expand All @@ -227,58 +186,46 @@ pub enum SdkConnection {
}

/// Config for an MQTT connection to Astarte
#[derive(Debug, Default, Deserialize)]
///
/// The struct isn't really necessary, nevertheless we cannot deserialize the entire [MqttConfig]
/// struct without having the fields `keepalive`, `conn_timeout` and `bounded_channel_size`.
#[derive(Debug, Deserialize)]
struct MqttConfigBuilder {
/// Device ID
device_id: Option<String>,
device_id: String,
/// Astarte realm
realm: Option<String>,
realm: String,
/// Device credential
#[serde(flatten)]
credential: Option<Credential>,
credential: Credential,
/// Astarte pairing url
pairing_url: Option<String>,
pairing_url: String,
/// Flag to ignore Astarte SSL errors
astarte_ignore_ssl: Option<bool>,
ignore_ssl_errors: bool,
}

impl MqttConfigBuilder {
fn build(self) -> eyre::Result<MqttConfig> {
let device_id = self.device_id.ok_or_eyre("missing device id")?;
let realm = self.realm.ok_or_eyre("missing realm")?;
let credential = self
.credential
.ok_or_eyre("missing either a credential secret or a pairing token")?;
let pairing_url = self.pairing_url.ok_or_eyre("missing pairing url")?;
// if missing, set the ignore ssl error flat to false
let astarte_ignore_ssl = self.astarte_ignore_ssl.unwrap_or_default();

let mut mqtt_cfg = MqttConfig::new(realm, device_id, credential, pairing_url);

if astarte_ignore_ssl {
mqtt_cfg.ignore_ssl_errors();
impl From<MqttConfigBuilder> for MqttConfig {
fn from(value: MqttConfigBuilder) -> Self {
let mut cfg = MqttConfig::new(
value.realm,
value.device_id,
value.credential,
value.pairing_url,
);

if value.ignore_ssl_errors {
cfg.ignore_ssl_errors();
}

Ok(mqtt_cfg)
cfg
}
}

/// Config for a gRPC connection to an Astarte Message Hub instance
#[derive(Debug, Default, Deserialize)]
struct GrpcConfigBuilder {
/// The Endpoint of the Astarte Message Hub
endpoint: Option<String>,
}

impl GrpcConfigBuilder {
fn build(self) -> eyre::Result<GrpcConfig> {
let endpoint = self.endpoint.ok_or_eyre("missing endpoint")?;

let grpc_cfg = GrpcConfig::from_url(STREAM_RUST_TEST_NODE_UUID, endpoint)
.wrap_err("failed to create a gRPC config")?;

Ok(grpc_cfg)
}
endpoint: String,
}

/// Send data to Astarte
Expand Down Expand Up @@ -319,39 +266,10 @@ mod test {
#[tokio::test]
async fn test_connection_config_builder_build_failures() {
// empty config builder cannot build successfully
let cfg_builder = ConnectionConfigBuilder::with_connection(None).build().await;
let cfg_builder = ConnectionConfigBuilder::default().build().await;
assert!(cfg_builder.is_err());

// config builder with only the connection to astarte specified cannot build successfully
let con = AstarteConnection::Mqtt;
let cfg_builder = ConnectionConfigBuilder::with_connection(Some(con))
.build()
.await;
assert!(cfg_builder.is_err());

// check that only the astarte connection is added to the configuration
let mut cfg_builder = ConnectionConfigBuilder::with_connection(None);
let toml_str = r#"
[astarte]
connection = "mqtt"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert!(cfg_builder.astarte_connection.is_some());

// check that the astarte connection is not updated with toml info if it already contains a
// value
let con = AstarteConnection::Mqtt;
let mut cfg_builder = ConnectionConfigBuilder::with_connection(Some(con));
let toml_str = r#"
[astarte]
connection = "grpc"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert_eq!(cfg_builder.astarte_connection, Some(con));

// define store dire for the next tests
// define store dir for the next tests
let mut tmp_dir = env::temp_dir();
tmp_dir.push("stream-rust-test-tests");
std::fs::create_dir_all(&tmp_dir).expect("failed to create store dir");
Expand All @@ -365,21 +283,5 @@ mod test {
};
let res = cfg_builder.build().await;
assert!(res.is_err());

// check that the store path is not updated with toml info if it already contains a value
let mut cfg_builder = ConnectionConfigBuilder {
astarte_connection: Some(AstarteConnection::Mqtt),
store_directory: Some(tmp_dir.clone()),
mqtt_config: Default::default(),
grpc_config: Default::default(),
};
let toml_str = r#"
[astarte]
connection = "grpc"
store_directory = "/tmp/stream-rust-test/store/"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert_eq!(cfg_builder.store_directory, Some(tmp_dir.clone()));
}
}
4 changes: 0 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

//! CLI configuration options
use crate::astarte::AstarteConnection;
use crate::math::MathFunction;
use clap::Parser;
use std::path::PathBuf;
Expand All @@ -15,9 +14,6 @@ use std::path::PathBuf;
#[derive(Debug, Clone, Parser)]
#[clap(version, about)]
pub struct Config {
/// Either use the Astarte Device SDK or the Astarte Message Hub
#[clap(short = 'c', long, env = "ASTARTE_CONNECTION")]
pub astarte_connection: Option<AstarteConnection>,
/// Path to the directory containing the Astarte configuration file config.toml
///
/// First, the Astarte configuration is taken from ENV vars, then from the config.toml if the
Expand Down
Loading

0 comments on commit 18394e0

Please sign in to comment.