Skip to content

Commit

Permalink
refactor(con): change retrieval of Astarte config
Browse files Browse the repository at this point in the history
Now it is only possible to specify the type of connection (mqtt or grpc)
through the ASTARTE_CONNECTION env variable. The application will first
try to retrieve the config from env; if an error occurs, it tries to
retrieve them from a config.toml (if a path has beem specified). The
merge operation for the 2 sources of config has been removed.

Signed-off-by: Riccardo Gallo <riccardo.gallo@secomind.com>
  • Loading branch information
rgallor committed Nov 25, 2024
1 parent ba6c64d commit e0eb289
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 212 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ clap = { version = "=4.4.18", features = ["derive", "env", "string"] }
color-eyre = "0.6.3"
toml = "0.8.12"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "sync", "macros", "signal"] }
tokio-stream = "0.1.15"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"]}
rand = "0.8.5"
Expand Down
245 changes: 50 additions & 195 deletions src/astarte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use astarte_device_sdk::transport::mqtt::{Credential, Mqtt, MqttConfig};
use astarte_device_sdk::{Client, DeviceClient, DeviceConnection};
use clap::ValueEnum;
use color_eyre::eyre;
use color_eyre::eyre::{OptionExt, WrapErr};
use color_eyre::eyre::{eyre, OptionExt, WrapErr};
use serde::Deserialize;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
Expand Down Expand Up @@ -60,78 +60,64 @@ pub struct ConnectionConfigBuilder {
store_directory: Option<PathBuf>,
/// Astarte Device SDK config options
#[serde(rename = "mqtt", default)]
mqtt_config: MqttConfigBuilder,
/// Astarte Message Hub config options
mqtt_config: Option<MqttConfig>,
/// Astarte Message Hub endpoint
#[serde(rename = "grpc", default)]
grpc_config: GrpcConfigBuilder,
grpc_endpoint: Option<String>,
}

impl ConnectionConfigBuilder {
/// Builder constructor
/// Init astarte config from env var if they have been set
///
/// Specify if the builder should use the Astarte Device SDK or the Astarte Message Hub
pub fn with_connection(astarte_connection: Option<AstarteConnection>) -> Self {
Self {
astarte_connection,
store_directory: None,
mqtt_config: MqttConfigBuilder::default(),
grpc_config: GrpcConfigBuilder::default(),
}
}
/// If an error is returned, it means that one or more environment variables have not been set
pub fn try_from_env(&mut self) -> eyre::Result<()> {
let con = env::var("ASTARTE_CONNECTION")
.map(|s| AstarteConnection::from_str(&s, true))?
.map_err(|err| eyre!(err))?;

Check warning on line 76 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L73-L76

Added lines #L73 - L76 were not covered by tests

/// Init astarte config from env var if they have been set
pub fn from_env(&mut self) {
// doesn't change it if it's been set from CLI
if self.astarte_connection.is_none() {
self.astarte_connection = env::var("ASTARTE_CONNECTION")
.ok()
.map(|s| AstarteConnection::from_str(&s, true))
.transpose()
.ok()
.unwrap_or_default();
}
self.store_directory = Some(env::var("ASTARTE_STORE_DIRECTORY").map(PathBuf::from)?);

Check warning on line 78 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L78

Added line #L78 was not covered by tests

self.store_directory = env::var("ASTARTE_STORE_DIRECTORY").ok().map(PathBuf::from);

// update the mqtt config info
let device_id = env::var("ASTARTE_DEVICE_ID").ok();
let realm = env::var("ASTARTE_REALM").ok();
let pairing_url = env::var("ASTARTE_PAIRING_URL").ok();
let astarte_ignore_ssl = env::var("ASTARTE_IGNORE_SSL_ERRORS")
.map(|s| s.parse().unwrap_or_default())
.ok();
let credential = env::var("ASTARTE_CREDENTIALS_SECRET")
.ok()
.map(Credential::secret)
.or_else(|| {
env::var("ASTARTE_PAIRING_TOKEN")
.ok()
.map(Credential::paring_token)
});

self.mqtt_config = MqttConfigBuilder {
device_id,
realm,
credential,
pairing_url,
astarte_ignore_ssl,
};
match con {
AstarteConnection::Mqtt => {
self.astarte_connection = Some(con);

Check warning on line 82 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L82

Added line #L82 was not covered by tests

// update the mqtt config info
let device_id = env::var("ASTARTE_DEVICE_ID")?;
let realm = env::var("ASTARTE_REALM")?;
let pairing_url = env::var("ASTARTE_PAIRING_URL")?;
let astarte_ignore_ssl = env::var("ASTARTE_IGNORE_SSL_ERRORS")
.map(|s| s.parse::<bool>().unwrap_or_default())?;
let credential = env::var("ASTARTE_CREDENTIALS_SECRET")

Check warning on line 90 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L85-L90

Added lines #L85 - L90 were not covered by tests
.map(Credential::secret)
.or_else(|_| env::var("ASTARTE_PAIRING_TOKEN").map(Credential::paring_token))?;

Check warning on line 92 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L92

Added line #L92 was not covered by tests

let mut mqtt_cfg = MqttConfig::new(device_id, realm, credential, pairing_url);
if astarte_ignore_ssl {
mqtt_cfg.ignore_ssl_errors();

Check warning on line 96 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L95-L96

Added lines #L95 - L96 were not covered by tests
}

self.mqtt_config = Some(mqtt_cfg);

Check warning on line 99 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L99

Added line #L99 was not covered by tests
}
AstarteConnection::Grpc => {
self.astarte_connection = Some(con);

Check warning on line 102 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L102

Added line #L102 was not covered by tests

// update the mqtt config info
let endpoint = env::var("ASTARTE_MSGHUB_ENDPOINT").ok();
// update the grpc config info
self.grpc_endpoint = Some(env::var("ASTARTE_MSGHUB_ENDPOINT")?);

Check warning on line 105 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L105

Added line #L105 was not covered by tests
}
}

self.grpc_config = GrpcConfigBuilder { endpoint };
Ok(())

Check warning on line 109 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L109

Added line #L109 was not covered by tests
}

/// Update the missing config values taking them from a config.toml file
pub async fn update_with_toml(&mut self, path: impl AsRef<Path>) {
pub async fn from_toml(&mut self, path: impl AsRef<Path>) {
match tokio::fs::read_to_string(&path).await {
Ok(file) => {

Check warning on line 115 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L113-L115

Added lines #L113 - L115 were not covered by tests
// retrieve the astarte config information from the config.toml file
match toml::from_str::<ConfigToml>(&file) {
Ok(toml_cfg) => {

Check warning on line 118 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L117-L118

Added lines #L117 - L118 were not covered by tests
// update the configs
self.merge(toml_cfg.astarte);
*self = toml_cfg.astarte;

Check warning on line 120 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L120

Added line #L120 was not covered by tests
}
Err(err) => {
error!("error deserializing astarte cfg from toml: {err}");

Check warning on line 123 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L122-L123

Added lines #L122 - L123 were not covered by tests
Expand All @@ -147,41 +133,6 @@ impl ConnectionConfigBuilder {
}
}

/// Merge two configs
///
/// Prioritize the already existing fields
fn merge(&mut self, other: ConnectionConfigBuilder) {
// doesn't change it if it's been set from CLI or from ENV
if self.astarte_connection.is_none() {
self.astarte_connection = other.astarte_connection;
}

self.store_directory = self.store_directory.take().or(other.store_directory);

// update the mqtt config info
let mqtt_config = &mut self.mqtt_config;

mqtt_config.device_id = mqtt_config.device_id.take().or(other.mqtt_config.device_id);
mqtt_config.realm = mqtt_config.realm.take().or(other.mqtt_config.realm);
mqtt_config.credential = mqtt_config
.credential
.take()
.or(other.mqtt_config.credential);
mqtt_config.pairing_url = mqtt_config
.pairing_url
.take()
.or(other.mqtt_config.pairing_url);
mqtt_config.astarte_ignore_ssl = mqtt_config
.astarte_ignore_ssl
.take()
.or(other.mqtt_config.astarte_ignore_ssl);

// update the grpc config info
let grpc_config = &mut self.grpc_config;

grpc_config.endpoint = grpc_config.endpoint.take().or(other.grpc_config.endpoint);
}

/// Build a complete Astarte configuration or return an error
pub async fn build(self) -> eyre::Result<(DeviceClient<SqliteStore>, SdkConnection)> {
let astarte_connection = self
Expand All @@ -198,7 +149,7 @@ impl ConnectionConfigBuilder {
match astarte_connection {
AstarteConnection::Mqtt => {
// define MQTT configuration options
let mqtt_cfg = self.mqtt_config.build()?;
let mqtt_cfg = self.mqtt_config.ok_or_eyre("invalid mqtt config")?;
debug!("parsed Astarte Device Sdk config: {:#?}", mqtt_cfg);

Check warning on line 153 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L153

Added line #L153 was not covered by tests

// connect to Astarte
Expand All @@ -207,7 +158,11 @@ impl ConnectionConfigBuilder {
Ok((client, SdkConnection::Mqtt(connection)))

Check warning on line 158 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L158

Added line #L158 was not covered by tests
}
AstarteConnection::Grpc => {
let grpc_cfg = self.grpc_config.build()?;
let grpc_endpoint = self.grpc_endpoint.ok_or_eyre("invalid grpc config")?;

Check warning on line 161 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L161

Added line #L161 was not covered by tests

let grpc_cfg = GrpcConfig::from_url(STREAM_RUST_TEST_NODE_UUID, grpc_endpoint)

Check warning on line 163 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L163

Added line #L163 was not covered by tests
.wrap_err("failed to create a gRPC config")?;

debug!("parsed Astarte Message Hub config: {:#?}", grpc_cfg);

Check warning on line 166 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L166

Added line #L166 was not covered by tests

let (client, connection) = builder.connect(grpc_cfg).await?.build().await;

Check warning on line 168 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L168

Added line #L168 was not covered by tests
Expand All @@ -226,61 +181,6 @@ pub enum SdkConnection {
Grpc(DeviceConnection<SqliteStore, Grpc<SqliteStore>>),
}

/// Config for an MQTT connection to Astarte
#[derive(Debug, Default, Deserialize)]
struct MqttConfigBuilder {
/// Device ID
device_id: Option<String>,
/// Astarte realm
realm: Option<String>,
/// Device credential
#[serde(flatten)]
credential: Option<Credential>,
/// Astarte pairing url
pairing_url: Option<String>,
/// Flag to ignore Astarte SSL errors
astarte_ignore_ssl: Option<bool>,
}

impl MqttConfigBuilder {
fn build(self) -> eyre::Result<MqttConfig> {
let device_id = self.device_id.ok_or_eyre("missing device id")?;
let realm = self.realm.ok_or_eyre("missing realm")?;
let credential = self
.credential
.ok_or_eyre("missing either a credential secret or a pairing token")?;
let pairing_url = self.pairing_url.ok_or_eyre("missing pairing url")?;
// if missing, set the ignore ssl error flat to false
let astarte_ignore_ssl = self.astarte_ignore_ssl.unwrap_or_default();

let mut mqtt_cfg = MqttConfig::new(realm, device_id, credential, pairing_url);

if astarte_ignore_ssl {
mqtt_cfg.ignore_ssl_errors();
}

Ok(mqtt_cfg)
}
}

/// Config for a gRPC connection to an Astarte Message Hub instance
#[derive(Debug, Default, Deserialize)]
struct GrpcConfigBuilder {
/// The Endpoint of the Astarte Message Hub
endpoint: Option<String>,
}

impl GrpcConfigBuilder {
fn build(self) -> eyre::Result<GrpcConfig> {
let endpoint = self.endpoint.ok_or_eyre("missing endpoint")?;

let grpc_cfg = GrpcConfig::from_url(STREAM_RUST_TEST_NODE_UUID, endpoint)
.wrap_err("failed to create a gRPC config")?;

Ok(grpc_cfg)
}
}

/// Send data to Astarte
pub async fn send_data(

Check warning on line 185 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L185

Added line #L185 was not covered by tests
client: DeviceClient<SqliteStore>,
Expand Down Expand Up @@ -319,39 +219,10 @@ mod test {
#[tokio::test]
async fn test_connection_config_builder_build_failures() {
// empty config builder cannot build successfully
let cfg_builder = ConnectionConfigBuilder::with_connection(None).build().await;
assert!(cfg_builder.is_err());

// config builder with only the connection to astarte specified cannot build successfully
let con = AstarteConnection::Mqtt;
let cfg_builder = ConnectionConfigBuilder::with_connection(Some(con))
.build()
.await;
let cfg_builder = ConnectionConfigBuilder::default().build().await;
assert!(cfg_builder.is_err());

// check that only the astarte connection is added to the configuration
let mut cfg_builder = ConnectionConfigBuilder::with_connection(None);
let toml_str = r#"
[astarte]
connection = "mqtt"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert!(cfg_builder.astarte_connection.is_some());

// check that the astarte connection is not updated with toml info if it already contains a
// value
let con = AstarteConnection::Mqtt;
let mut cfg_builder = ConnectionConfigBuilder::with_connection(Some(con));
let toml_str = r#"
[astarte]
connection = "grpc"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert_eq!(cfg_builder.astarte_connection, Some(con));

// define store dire for the next tests
// define store dir for the next tests
let mut tmp_dir = env::temp_dir();
tmp_dir.push("stream-rust-test-tests");
std::fs::create_dir_all(&tmp_dir).expect("failed to create store dir");
Expand All @@ -361,25 +232,9 @@ mod test {
astarte_connection: Some(AstarteConnection::Mqtt),
store_directory: Some(tmp_dir.clone()),
mqtt_config: Default::default(),
grpc_config: Default::default(),
grpc_endpoint: Default::default(),
};
let res = cfg_builder.build().await;
assert!(res.is_err());

// check that the store path is not updated with toml info if it already contains a value
let mut cfg_builder = ConnectionConfigBuilder {
astarte_connection: Some(AstarteConnection::Mqtt),
store_directory: Some(tmp_dir.clone()),
mqtt_config: Default::default(),
grpc_config: Default::default(),
};
let toml_str = r#"
[astarte]
connection = "grpc"
store_directory = "/tmp/stream-rust-test/store/"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert_eq!(cfg_builder.store_directory, Some(tmp_dir.clone()));
}
}
4 changes: 0 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

//! CLI configuration options
use crate::astarte::AstarteConnection;
use crate::math::MathFunction;
use clap::Parser;
use std::path::PathBuf;
Expand All @@ -15,9 +14,6 @@ use std::path::PathBuf;
#[derive(Debug, Clone, Parser)]
#[clap(version, about)]
pub struct Config {
/// Either use the Astarte Device SDK or the Astarte Message Hub
#[clap(short = 'c', long, env = "ASTARTE_CONNECTION")]
pub astarte_connection: Option<AstarteConnection>,
/// Path to the directory containing the Astarte configuration file config.toml
///
/// First, the Astarte configuration is taken from ENV vars, then from the config.toml if the
Expand Down
Loading

0 comments on commit e0eb289

Please sign in to comment.