From ffc81cdf51ccd0e7f1c5c56ebbb3e09700770957 Mon Sep 17 00:00:00 2001 From: Pradeep Kumar K J Date: Wed, 2 Aug 2023 20:04:34 +0530 Subject: [PATCH] tedge/# topic to te/# converter Signed-off-by: Pradeep Kumar K J --- Cargo.lock | 1 + crates/core/tedge_agent/Cargo.toml | 1 + crates/core/tedge_agent/src/agent.rs | 36 +++ crates/core/tedge_agent/src/main.rs | 1 + .../src/tedge_to_te_converter/converter.rs | 128 ++++++++ .../src/tedge_to_te_converter/mod.rs | 4 + .../src/tedge_to_te_converter/tests.rs | 295 ++++++++++++++++++ .../convert_tedge_topics_to_te_topics.robot | 49 +++ 8 files changed, 515 insertions(+) create mode 100644 crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs create mode 100644 crates/core/tedge_agent/src/tedge_to_te_converter/mod.rs create mode 100644 crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs create mode 100644 tests/RobotFramework/tests/tedge_to_te_converter/convert_tedge_topics_to_te_topics.robot diff --git a/Cargo.lock b/Cargo.lock index 405633b0b09..72a54cf8d89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3487,6 +3487,7 @@ dependencies = [ "flockfile", "hyper", "lazy_static", + "log", "path-clean", "plugin_sm", "routerify", diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml index 9245ae0f920..b7c29b113d2 100644 --- a/crates/core/tedge_agent/Cargo.toml +++ b/crates/core/tedge_agent/Cargo.toml @@ -40,6 +40,7 @@ clap = { version = "3.2", features = ["cargo", "derive"] } flockfile = { path = "../../common/flockfile" } hyper = { version = "0.14", features = ["full"] } lazy_static = "1.4" +log = "0.4" path-clean = "0.1" plugin_sm = { path = "../plugin_sm" } routerify = "3.0" diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 9274e1faf1c..90f37257da7 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -5,15 +5,21 @@ use crate::restart_manager::config::RestartManagerConfig; use crate::software_manager::builder::SoftwareManagerBuilder; use crate::software_manager::config::SoftwareManagerConfig; use crate::tedge_operation_converter::builder::TedgeOperationConverterBuilder; +use crate::tedge_to_te_converter::converter::TedgetoTeConverter; use camino::Utf8PathBuf; use flockfile::check_another_instance_is_not_running; use flockfile::Flockfile; use flockfile::FlockfileError; use std::fmt::Debug; +use tedge_actors::ConvertingActor; +use tedge_actors::ConvertingActorBuilder; +use tedge_actors::MessageSink; +use tedge_actors::MessageSource; use tedge_actors::Runtime; use tedge_health_ext::HealthMonitorBuilder; use tedge_mqtt_ext::MqttActorBuilder; use tedge_mqtt_ext::MqttConfig; +use tedge_mqtt_ext::TopicFilter; use tedge_signal_ext::SignalActor; use tedge_utils::file::create_directory_with_defaults; use tracing::info; @@ -163,6 +169,9 @@ impl Agent { // Health actor let health_actor = HealthMonitorBuilder::new(TEDGE_AGENT, &mut mqtt_actor_builder); + // Tedge to Te topic converter + let tedge_to_te_converter = create_tedge_to_te_converter(&mut mqtt_actor_builder)?; + // Spawn all runtime.spawn(signal_actor_builder).await?; runtime.spawn(file_transfer_server_builder).await?; @@ -171,9 +180,36 @@ impl Agent { runtime.spawn(software_update_builder).await?; runtime.spawn(converter_actor_builder).await?; runtime.spawn(health_actor).await?; + runtime.spawn(tedge_to_te_converter).await?; runtime.run_to_completion().await?; Ok(()) } } + +pub fn create_tedge_to_te_converter( + mqtt_actor_builder: &mut MqttActorBuilder, +) -> Result, anyhow::Error> { + let tedge_to_te_converter = TedgetoTeConverter::new(); + let subscriptions: TopicFilter = vec![ + "tedge/measurements", + "tedge/measurements/+", + "tedge/events/+", + "tedge/events/+/+", + "tedge/alarms/+/+", + "tedge/alarms/+/+/+", + "tedge/health/+", + "tedge/health/+/+", + ] + .try_into()?; + + // Tedge to Te converter + let mut tedge_converter_actor = + ConvertingActor::builder("TedgetoTeConverter", tedge_to_te_converter, subscriptions); + + tedge_converter_actor.add_input(mqtt_actor_builder); + tedge_converter_actor.add_sink(mqtt_actor_builder); + + Ok(tedge_converter_actor) +} diff --git a/crates/core/tedge_agent/src/main.rs b/crates/core/tedge_agent/src/main.rs index a6040a3490b..cb59c5f603c 100644 --- a/crates/core/tedge_agent/src/main.rs +++ b/crates/core/tedge_agent/src/main.rs @@ -12,6 +12,7 @@ mod restart_manager; mod software_manager; mod state_repository; mod tedge_operation_converter; +mod tedge_to_te_converter; #[derive(Debug, clap::Parser)] #[clap( diff --git a/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs b/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs new file mode 100644 index 00000000000..f468f4f15be --- /dev/null +++ b/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs @@ -0,0 +1,128 @@ +use log::error; +use serde_json::Value; +use std::collections::HashMap; +use std::convert::Infallible; +use tedge_actors::Converter; +use tedge_mqtt_ext::MqttMessage; +use tedge_mqtt_ext::Topic; + +pub struct TedgetoTeConverter {} + +impl Converter for TedgetoTeConverter { + type Input = MqttMessage; + type Output = MqttMessage; + type Error = Infallible; + + fn convert(&mut self, input: &Self::Input) -> Result, Self::Error> { + let messages_or_err = self.try_convert(input.clone()); + Ok(self.wrap_errors(messages_or_err)) + } +} + +impl TedgetoTeConverter { + pub fn new() -> Self { + TedgetoTeConverter {} + } + + fn try_convert( + &mut self, + message: MqttMessage, + ) -> Result, serde_json::Error> { + match message.topic.clone() { + topic if topic.name.starts_with("tedge/measurements") => { + Ok(self.convert_measurement(message)) + } + topic if topic.name.starts_with("tedge/events") => Ok(self.convert_event(message)), + topic if topic.name.starts_with("tedge/alarms") => self.convert_alarm(message), + topic if topic.name.starts_with("tedge/health") => { + Ok(self.convert_health_status_message(message)) + } + _ => Ok(vec![]), + } + } + + // tedge/measurements -> te/device/main///m/ + // tedge/measurements/child -> te/device/child///m/ + fn convert_measurement(&mut self, mut message: MqttMessage) -> Vec { + let te_topic = match message.topic.name.split('/').collect::>()[..] { + ["tedge", "measurements"] => Topic::new_unchecked("te/device/main///m/"), + ["tedge", "measurements", cid] => { + Topic::new_unchecked(format!("te/device/{cid}///m/").as_str()) + } + _ => return vec![], + }; + + message.topic = te_topic; + vec![(message)] + } + + // tedge/alarms/severity/alarm_type -> te/device/main///a/alarm_type, put severity in payload + // tedge/alarms/severity/child/alarm_type -> te/device/child///a/alarm_type, put severity in payload + fn convert_alarm( + &mut self, + mut message: MqttMessage, + ) -> Result, serde_json::Error> { + let (te_topic, severity) = match message.topic.name.split('/').collect::>()[..] { + ["tedge", "alarms", severity, alarm_type] => ( + Topic::new_unchecked(format!("te/device/main///a/{alarm_type}").as_str()), + severity, + ), + ["tedge", "alarms", severity, cid, alarm_type] => ( + Topic::new_unchecked(format!("te/device/{cid}///a/{alarm_type}").as_str()), + severity, + ), + _ => return Ok(vec![]), + }; + + let mut alarm: HashMap = serde_json::from_slice(message.payload.as_bytes())?; + alarm.insert("severity".to_string(), severity.into()); + message.topic = te_topic; + message.payload = serde_json::to_string(&alarm)?.into(); + Ok(vec![message]) + } + + // tedge/events/event_type -> te/device/main///e/event_type + // tedge/events/child/event_type -> te/device/child///e/event_type + fn convert_event(&mut self, mut message: MqttMessage) -> Vec { + let topic = match message.topic.name.split('/').collect::>()[..] { + ["tedge", "events", event_type] => { + Topic::new_unchecked(format!("te/device/main///e/{event_type}").as_str()) + } + ["tedge", "events", cid, event_type] => { + Topic::new_unchecked(format!("te/device/{cid}///e/{event_type}").as_str()) + } + _ => return vec![], + }; + + message.topic = topic; + vec![message] + } + + // tedge/health/service-name -> te/device/main/service//status/health + // tedge/health/child/service-name -> te/device/child/service//status/health + fn convert_health_status_message(&mut self, mut message: MqttMessage) -> Vec { + let topic = match message.topic.name.split('/').collect::>()[..] { + ["tedge", "health", service_name] => Topic::new_unchecked( + format!("te/device/main/service/{service_name}/status/health").as_str(), + ), + ["tedge", "health", cid, service_name] => Topic::new_unchecked( + format!("te/device/{cid}/service/{service_name}/status/health").as_str(), + ), + _ => return vec![], + }; + message.topic = topic; + vec![message] + } + + fn wrap_errors( + &self, + messages_or_err: Result, serde_json::Error>, + ) -> Vec { + messages_or_err.unwrap_or_else(|error| vec![self.new_error_message(error)]) + } + + fn new_error_message(&self, error: serde_json::Error) -> MqttMessage { + error!("Mapping error: {}", error); + MqttMessage::new(&Topic::new_unchecked("tedge/errors"), error.to_string()) + } +} diff --git a/crates/core/tedge_agent/src/tedge_to_te_converter/mod.rs b/crates/core/tedge_agent/src/tedge_to_te_converter/mod.rs new file mode 100644 index 00000000000..44191091cf8 --- /dev/null +++ b/crates/core/tedge_agent/src/tedge_to_te_converter/mod.rs @@ -0,0 +1,4 @@ +pub mod converter; + +#[cfg(test)] +mod tests; diff --git a/crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs b/crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs new file mode 100644 index 00000000000..76a4cf1cbf8 --- /dev/null +++ b/crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs @@ -0,0 +1,295 @@ +use super::converter::TedgetoTeConverter; +use std::time::Duration; +use tedge_actors::test_helpers::MessageReceiverExt; +use tedge_actors::test_helpers::TimedMessageBox; +use tedge_actors::Actor; +use tedge_actors::Builder; +use tedge_actors::ConvertingActor; +use tedge_actors::DynError; +use tedge_actors::Sender; +use tedge_actors::ServiceConsumer; +use tedge_actors::SimpleMessageBox; +use tedge_actors::SimpleMessageBoxBuilder; +use tedge_mqtt_ext::MqttMessage; +use tedge_mqtt_ext::Topic; +use tedge_mqtt_ext::TopicFilter; + +#[tokio::test] +async fn convert_incoming_main_device_measurement_topic() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/measurements"), + r#"{"temperature": 2500 }"#, + ); + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/main///m/"), + r#"{"temperature": 2500 }"#, + ); + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box.assert_received([expected_mqtt_message]).await; + Ok(()) +} + +#[tokio::test] +async fn convert_incoming_child_device_measurement_topic() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/measurements/child1"), + r#"{"temperature": 2500 }"#, + ); + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/child1///m/"), + r#"{"temperature": 2500 }"#, + ); + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box.assert_received([expected_mqtt_message]).await; + Ok(()) +} + +#[tokio::test] +async fn convert_incoming_main_device_alarm_topic() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/alarms/critical/MyCustomAlarm"), + r#"{ + "text": "I raised it", + "time": "2021-04-23T19:00:00+05:00" + }"#, + ); + + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/main///a/MyCustomAlarm"), + r#"{"text":"I raised it","time":"2021-04-23T19:00:00+05:00","severity":"critical"}"#, + ); + + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box + .assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message]) + .await; + Ok(()) +} + +fn same_json_over_mqtt_msg(left: &MqttMessage, right: &MqttMessage) -> bool { + let left_msg: serde_json::Value = serde_json::from_slice(left.payload.as_bytes()).unwrap(); + let right_msg: serde_json::Value = serde_json::from_slice(right.payload.as_bytes()).unwrap(); + + (left.topic == right.topic) && (left_msg == right_msg) +} + +#[tokio::test] +async fn convert_incoming_custom_main_device_alarm_topic() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/alarms/critical/MyCustomAlarm"), + r#"{ + "text": "I raised it", + "someOtherCustomFragment": {"nested":{"value": "extra info"}} + }"#, + ); + + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/main///a/MyCustomAlarm"), + r#"{"text":"I raised it","severity":"critical","someOtherCustomFragment":{"nested":{"value":"extra info"}}}"#, + ); + + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box + .assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message]) + .await; + Ok(()) +} + +#[tokio::test] +async fn convert_incoming_child_device_alarm_topic() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/alarms/critical/child/MyCustomAlarm"), + r#"{ + "text": "I raised it", + "time": "2021-04-23T19:00:00+05:00" + }"#, + ); + + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/child///a/MyCustomAlarm"), + r#"{"text":"I raised it","time":"2021-04-23T19:00:00+05:00","severity":"critical"}"#, + ); + + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box + .assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message]) + .await; + Ok(()) +} + +#[tokio::test] +async fn convert_incoming_main_device_event_topic() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/events/MyEvent"), + r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00"}"#, + ); + + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/main///e/MyEvent"), + r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00"}"#, + ); + + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box.assert_received([expected_mqtt_message]).await; + Ok(()) +} + +#[tokio::test] +async fn convert_custom_incoming_main_device_event_topic() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/events/MyEvent"), + r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00","someOtherCustomFragment":{"nested":{"value":"extra info"}}}"#, + ); + + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/main///e/MyEvent"), + r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00","someOtherCustomFragment":{"nested":{"value":"extra info"}}}"#, + ); + + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box.assert_received([expected_mqtt_message]).await; + Ok(()) +} + +#[tokio::test] +async fn convert_incoming_child_device_event_topic() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/events/child/MyEvent"), + r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00"}"#, + ); + + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/child///e/MyEvent"), + r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00"}"#, + ); + + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box.assert_received([expected_mqtt_message]).await; + Ok(()) +} + +// tedge/health/service-name -> te/device/main/service//status/health +// tedge/health/child/service-name -> te/device/child/service//status/health +#[tokio::test] +async fn convert_incoming_main_device_service_health_status() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/health/myservice"), + r#"{""pid":1234,"status":"up","time":1674739912}"#, + ); + + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/main/service/myservice/status/health"), + r#"{""pid":1234,"status":"up","time":1674739912}"#, + ); + + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box.assert_received([expected_mqtt_message]).await; + Ok(()) +} + +#[tokio::test] +async fn convert_incoming_child_device_service_health_status() -> Result<(), DynError> { + // Spawn incoming mqtt message converter + let mut mqtt_box = spawn_tedge_to_te_converter().await?; + + // Simulate SoftwareList MQTT message received. + let mqtt_message = MqttMessage::new( + &Topic::new_unchecked("tedge/health/child/myservice"), + r#"{""pid":1234,"status":"up","time":1674739912}"#, + ); + + let expected_mqtt_message = MqttMessage::new( + &Topic::new_unchecked("te/device/child/service/myservice/status/health"), + r#"{""pid":1234,"status":"up","time":1674739912}"#, + ); + + mqtt_box.send(mqtt_message).await?; + + // Assert SoftwareListRequest + mqtt_box.assert_received([expected_mqtt_message]).await; + Ok(()) +} + +async fn spawn_tedge_to_te_converter( +) -> Result>, DynError> { + // Tedge to Te topic converter + let tedge_to_te_converter = TedgetoTeConverter::new(); + let subscriptions: TopicFilter = vec![ + "tedge/measurements", + "tedge/measurements/+", + "tedge/events/+", + "tedge/events/+/+", + "tedge/alarms/+/+", + "tedge/alarms/+/+/+", + "tedge/health/+", + "tedge/health/+/+", + ] + .try_into()?; + + // Tedge to Te converter + let mut tedge_converter_actor = + ConvertingActor::builder("TedgetoTeConverter", tedge_to_te_converter, subscriptions); + + let mqtt_box = SimpleMessageBoxBuilder::new("MQTT", 5) + .with_connection(&mut tedge_converter_actor) + .build() + .with_timeout(Duration::from_millis(100)); + + tokio::spawn(async move { tedge_converter_actor.build().run().await }); + + Ok(mqtt_box) +} diff --git a/tests/RobotFramework/tests/tedge_to_te_converter/convert_tedge_topics_to_te_topics.robot b/tests/RobotFramework/tests/tedge_to_te_converter/convert_tedge_topics_to_te_topics.robot new file mode 100644 index 00000000000..ed1391a5dd7 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_to_te_converter/convert_tedge_topics_to_te_topics.robot @@ -0,0 +1,49 @@ +*** Settings *** +Documentation Purpose of this test is to verify that tedge-agent converts the tedge/# topics to te/# topics + +Resource ../../resources/common.resource +Library ThinEdgeIO + +Suite Setup Custom Setup +Suite Teardown Custom Teardown + +Test Tags theme:mqtt theme:tedge to te + + +*** Test Cases *** +Convert main device measurement topic + Execute Command tedge mqtt pub tedge/measurements '{"temperature":25}' + Should Have MQTT Messages te/device/main///m/ message_pattern={"temperature":25} + + +Convert child device measurement topic + Execute Command tedge mqtt pub tedge/measurements/child '{"temperature":25}' + Should Have MQTT Messages te/device/child///m/ message_pattern={"temperature":25} + +Convert main device event topic + Execute Command tedge mqtt pub tedge/events/login_event '{"text":"someone logedin"}' + Should Have MQTT Messages te/device/main///e/login_event message_pattern={"text":"someone logedin"} + +Convert child device event topic + Execute Command tedge mqtt pub tedge/events/child/login_event '{"text":"someone logedin"}' + Should Have MQTT Messages te/device/child///e/login_event message_pattern={"text":"someone logedin"} + +Convert main device alarm topic + Execute Command tedge mqtt pub tedge/alarms/minor/test_alarm '{"text":"test alarm"}' + Should Have MQTT Messages te/device/main///a/test_alarm message_pattern={"text":"test alarm","severity":"minor"} + +Convert child device alarm topic + Execute Command tedge mqtt pub tedge/alarms/minor/child/test_alarm '{"text":"test alarm"}' + Should Have MQTT Messages te/device/child///a/test_alarm message_pattern={"text":"test alarm","severity":"minor"} + + +*** Keywords *** +Custom Setup + Setup + # Execute Command sudo tedge config set c8y.topics tedge/measurements/+ + # Execute Command sudo systemctl restart tedge-mapper-c8y.service + ThinEdgeIO.Service Health Status Should Be Up tedge-agent + +Custom Teardown + Get Logs + # Execute Command sudo tedge config unset c8y.topics