From a6f84a657a0a45c3c05c48ee8642c25bc6933692 Mon Sep 17 00:00:00 2001 From: Riccardo Gallo Date: Tue, 3 Sep 2024 15:25:09 +0200 Subject: [PATCH] refactor(config): update astarte device config Now the configuration information to connect a device to Astarte are provided inside a `astarte-device-DEVICE_ID_HERE-conf` directory, in a `config.toml` file. Instead, all the information necessary to generate samples to be sent to Astarte are provided via CLI or environment variables. Also the README have been update to explain how to configure the application and build/run it. Signed-off-by: Riccardo Gallo --- .reuse/dep5 | 4 + Cargo.lock | 55 +++++++ Cargo.toml | 3 + README.md | 55 ++++++- .../config.toml | 7 + scripts/run.sh | 7 - src/main.rs | 153 +++++++++++------- 7 files changed, 221 insertions(+), 63 deletions(-) create mode 100644 astarte-device-DEVICE_ID_HERE-conf/config.toml diff --git a/.reuse/dep5 b/.reuse/dep5 index 82de5d4..aa2b6ca 100644 --- a/.reuse/dep5 +++ b/.reuse/dep5 @@ -9,3 +9,7 @@ License: Apache-2.0 Files: interfaces/* Copyright: 2024 SECO Mind Srl License: Apache-2.0 + +Files: astarte-device-*-conf/* +Copyright: 2024 SECO Mind Srl +License: Apache-2.0 diff --git a/Cargo.lock b/Cargo.lock index 2a8f3cc..b46abc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2066,6 +2066,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb5b1b31579f3811bf615c144393417496f152e12ac8b7663bf664f4a815306d" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2398,7 +2407,10 @@ dependencies = [ "clap", "color-eyre", "rand", + "serde", "tokio", + "tokio-stream", + "toml", "tracing", "tracing-subscriber", ] @@ -2676,6 +2688,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tower" version = "0.4.13" @@ -3181,6 +3227,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a9bda4691f099d435ad181000724da8e5899daa10713c2d432552b9ccd3a6f" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index a282de0..f91c4a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,10 @@ description = "Astarte Rust SDK based data stream test." astarte-device-sdk = "0.8.2" clap = { version = "=4.4.18", features = ["derive", "env", "string"] } color-eyre = "0.6.3" +toml = "0.8.12" tokio = { version = "1.37.0", features = ["rt-multi-thread", "sync", "macros", "signal"] } +tokio-stream = "0.1.15" tracing = "0.1.37" tracing-subscriber = { version = "0.3.0", features = ["env-filter"]} rand = "0.8.5" +serde = { version = "1.0.207", features = ["derive"] } diff --git a/README.md b/README.md index 2edef1f..e29e8f2 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,60 @@ Copyright 2024 SECO Mind Srl SPDX-License-Identifier: Apache-2.0 --> -# stream-rust-test +# Astarte Stream Rust Test [![ci](https://github.com/astarte-platform/stream-rust-test/actions/workflows/ci.yaml/badge.svg)](https://github.com/astarte-platform/stream-rust-test/actions/workflows/ci.yaml) [![codecov](https://codecov.io/gh/astarte-platform/stream-rust-test/graph/badge.svg?token=wW2Hsm5edX)](https://codecov.io/gh/astarte-platform/stream-rust-test) + +Astarte Rust SDK based data stream test. + +Requirements +============ + +* Astarte Device Rust SDK + +Getting started +=============== + +## Choosing a Device ID + +A Base64 url-encoded uuid should be used, you can use [astartectl](https://github.com/astarte-platform/astartectl#installation) to generate one: + +```bash +astartectl utils device-id generate-random +``` + +## Configuring the application + +Create a configuration directory `astarte-device-DEVICE_ID_HERE-conf` with a `config.toml` file inside it containing: +- `realm`: the name of the Astarte realm. +- `device_id`: the id of the device you want to connect to Astarte. +- `pairing_url`: the URL of the Astarte Pairing endpoint. It should be something like `https:///pairing`. +- `credentials_secret` or `pairing_token`: the identifiers used to authenticate the device through Astarte. If both are + present, the credential secret will be used. +- `store_directory`: the directory specifying where persistent data will be saved. +- `interfaces_directory`: the directory where the astarte interfaces used by the device are saved. + +## Build and run + +Build the application using following commands: +```sh +cargo build --release +``` + +Then run the application either by running the `run.sh` script inside the `scripts` folder or with the CLI: +```sh +./target/release/stream-rust-test [OPTIONS] +``` + +The full list of options can be shown with the command: +```sh +cargo run -- -h +``` + +Several options are available: + +- `--device` allows to set the device ID; +- `--function` allows to choose the data generation function (one between `sin`, `noisesin`, `randomspikessin`, `saw`, `rect`, `sinc`, `random`, `x` and a default one); +- `--interval` allows to set the sending interval; +- `--scale` allows to scale the generated result; diff --git a/astarte-device-DEVICE_ID_HERE-conf/config.toml b/astarte-device-DEVICE_ID_HERE-conf/config.toml new file mode 100644 index 0000000..f2d942b --- /dev/null +++ b/astarte-device-DEVICE_ID_HERE-conf/config.toml @@ -0,0 +1,7 @@ +realm = "REALM_HERE" +device_id = "DEVICE_ID_HERE" +pairing_url = "PAIRING_URL_HERE" +credentials_secret = "CREDENTIALS_SECRET_HERE" +astarte_ignore_ssl = false +interfaces_directory = "INTERFACES_DIRECTORY_HERE" +store_directory = "STORE_DIRECTORY_HERE" diff --git a/scripts/run.sh b/scripts/run.sh index 929c92f..6f2b65f 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -12,13 +12,6 @@ set -eEuo pipefail export RUST_LOG="debug" # Set application environment variables -export REALM="" -export DEVICE_ID="" -export CREDENTIALS_SECRET="" -#export PAIRING_TOKEN="" -export PAIRING_URL="http://api.astarte.localhost/pairing" -export STORE_DIR="/tmp/stream-rust-test/store" -export IGNORE_SSL_ERRORS="true" export MATH_FUNCTION="sin" export INTERVAL_BTW_SAMPLES=500 export SCALE=3 diff --git a/src/main.rs b/src/main.rs index c5fe462..df500c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,61 +10,62 @@ use astarte_device_sdk::transport::mqtt::{Credential, MqttConfig}; use astarte_device_sdk::{Client, DeviceClient, EventLoop}; use clap::Parser; use color_eyre::eyre; -use color_eyre::eyre::bail; +use color_eyre::eyre::{bail, eyre}; +use serde::Deserialize; use std::path::PathBuf; use std::time::SystemTime; use stream_rust_test::math::{BaseValue, MathFunction}; use tokio::task::JoinSet; +use tokio_stream::wrappers::ReadDirStream; +use tokio_stream::StreamExt; use tracing::error; use tracing::log::debug; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -/// Astarte device configuration. -#[derive(Debug, Clone, Parser)] -#[clap(version, about)] -pub struct Config { +#[derive(Debug, Clone, Deserialize)] +struct AstarteConfig { /// Astarte realm - #[clap(long, env = "REALM")] - pub realm: String, + realm: String, /// Device ID - #[clap(long, env = "DEVICE_ID")] - pub device_id: String, + device_id: String, /// Device credential secret - #[clap(long, env = "CREDENTIALS_SECRET")] - pub credentials_secret: Option, + #[serde(default)] + credentials_secret: Option, /// Device pairing token - #[clap(long, env = "PAIRING_TOKEN")] - pub pairing_token: Option, + #[serde(default)] + pairing_token: Option, /// Astarte pairing url - #[clap(long, env = "PAIRING_URL")] - pub pairing_url: String, + pairing_url: String, /// Astarte store directory - #[clap(long, env = "STORE_DIR")] - pub store_dir: String, + store_directory: PathBuf, /// Flag to ignore Astarte SSL errors - #[clap(long, default_value = "true", env = "IGNORE_SSL_ERRORS")] - pub ignore_ssl_errors: bool, + astarte_ignore_ssl: bool, /// Path to folder containing the Astarte Device interfaces - #[clap(long, default_value = PathBuf::from("interfaces").into_os_string())] - pub interfaces_folder: PathBuf, + interfaces_directory: PathBuf, +} + +/// Configuration for the values to be sent to Astarte +#[derive(Debug, Clone, Parser)] +#[clap(version, about)] +struct Config { /// Math function the device will use to send data to Astarte - #[clap(long, default_value = "default", env = "MATH_FUNCTION")] - pub math_function: MathFunction, + #[clap(short, long, default_value = "default", env = "MATH_FUNCTION")] + math_function: MathFunction, /// Interface name to send data to #[clap( long, default_value = "org.astarte-platform.genericsensors.Values", env = "INTERFACE_NAME" )] - pub interface_datastream_do: String, + interface_datastream_do: String, /// Milliseconds the device must wait before sending data to Astarte - #[clap(long, default_value = "1000", env = "INTERVAL_BTW_SAMPLES")] - pub interval_btw_samples: u64, + #[clap(short, long, default_value = "1000", env = "INTERVAL_BTW_SAMPLES")] + interval_btw_samples: u64, /// Scale for the generation of the data to send - #[clap(long, default_value = "1.0", env = "SCALE")] - pub scale: f64, + #[clap(short, long, default_value = "1.0", env = "SCALE")] + scale: f64, } #[tokio::main] @@ -79,33 +80,43 @@ async fn main() -> eyre::Result<()> { // time instant when the program starts its execution let now = SystemTime::now(); - // initialize configuration options - let cfg = Config::parse(); + // initialize CLI configuration options + let cli_cfg = Config::parse(); + + debug!("Parsed config: {:#?}", cli_cfg); - debug!("Parsed config: {:#?}", cfg); + // Load astarte configuration + let astarte_cfg: AstarteConfig = load_astarte_cfg().await?; + + debug!("Parsed Astarte config: {:#?}", astarte_cfg); // define type of credential (pairing token or credential secret) to use to establish an MQTT // connection with Astarte - let cred = if let Some(pairing) = cfg.pairing_token.as_deref() { + let cred = if let Some(pairing) = astarte_cfg.pairing_token.as_deref() { Credential::paring_token(pairing) - } else if let Some(secret) = cfg.credentials_secret.as_deref() { + } else if let Some(secret) = astarte_cfg.credentials_secret.as_deref() { Credential::secret(secret) } else { bail!("missing credential secret or pairing token"); }; // define MQTT configuration options - let mut mqtt_config = MqttConfig::new(cfg.realm, cfg.device_id, cred, cfg.pairing_url); - - if cfg.ignore_ssl_errors { + let mut mqtt_config = MqttConfig::new( + astarte_cfg.realm, + astarte_cfg.device_id, + cred, + astarte_cfg.pairing_url, + ); + + if astarte_cfg.astarte_ignore_ssl { mqtt_config.ignore_ssl_errors(); } // connect to Astarte let (client, mut connection) = DeviceBuilder::new() - .store_dir(cfg.store_dir) + .store_dir(astarte_cfg.store_directory.as_path()) .await? - .interface_directory(cfg.interfaces_folder.as_path())? + .interface_directory(astarte_cfg.interfaces_directory.as_path())? .connect(mqtt_config) .await? .build(); @@ -116,14 +127,7 @@ async fn main() -> eyre::Result<()> { tasks.spawn(async move { connection.handle_events().await.map_err(Into::into) }); // spawn task to send data to Astarte - tasks.spawn(send_data( - client, - now, - cfg.math_function, - cfg.interface_datastream_do, - cfg.interval_btw_samples, - cfg.scale, - )); + tasks.spawn(send_data(client, now, cli_cfg)); // handle tasks termination while let Some(res) = tasks.join_next().await { @@ -144,25 +148,64 @@ async fn main() -> eyre::Result<()> { Ok(()) } +async fn load_astarte_cfg() -> eyre::Result { + // search the astarte-device-DEVICE_ID_HERE-conf with the DEVICE_ID specified by the user + // starting from the root of the project + let dirs = tokio::fs::read_dir(".").await?; + let dirs_stream = ReadDirStream::new(dirs); + + let mut dirs = dirs_stream + .filter_map(|res| res.ok().map(|e| e.path())) + .filter(|path| { + if !path.is_dir() { + return false; + } + + let name = path + .file_name() + .expect("failed to retrieve the folder name") + .to_string_lossy(); + + // true if the folder name starts and ends with the predefined values + name.starts_with("astarte-device-") && name.ends_with("-conf") + }) + .collect::>() + .await; + + // if more folders are present, take only the first one + let Some(dir) = dirs.first_mut() else { + return Err(eyre!("No astarte devices config folder found")); + }; + + dir.push("config.toml"); + + let file = tokio::fs::read_to_string(dir).await?; + + // retrieve the astarte config information + let astarte_cfg: AstarteConfig = toml::from_str(&file)?; + + Ok(astarte_cfg) +} + /// Send data to Astarte async fn send_data( client: DeviceClient, now: SystemTime, - math_function: MathFunction, - interface_datastream_do: String, - interval_btw_samples: u64, - scale: f64, + cfg: Config, ) -> eyre::Result<()> { - let mut base_value = BaseValue::try_from_system_time(now, scale)?; + let mut base_value = BaseValue::try_from_system_time(now, cfg.scale)?; - debug!("sending data to Astarte with {math_function} math function"); + debug!( + "sending data to Astarte with {} math function", + cfg.math_function + ); loop { // Send data to Astarte - let value = math_function.compute(base_value.value()); + let value = cfg.math_function.compute(base_value.value()); client - .send(&interface_datastream_do, "/test/value", value) + .send(&cfg.interface_datastream_do, "/test/value", value) .await?; debug!("data sent on endpoint /test/value, content: {value}"); @@ -171,6 +214,6 @@ async fn send_data( base_value.update(); // Sleep interval secs - tokio::time::sleep(std::time::Duration::from_millis(interval_btw_samples)).await; + tokio::time::sleep(std::time::Duration::from_millis(cfg.interval_btw_samples)).await; } }